Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove index_realtime and index_realtime_appenderator tasks #16602

Merged
merged 27 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
60ceeed
Remove index_realtime and index_realtime_appenderator tasks.
gianm Jan 18, 2024
2548eeb
Updates.
gianm Jan 18, 2024
abfef32
Style.
gianm Jan 18, 2024
3396f17
Fixes for static analysis.
gianm Jan 18, 2024
c773fcd
Fix test.
gianm Jan 18, 2024
7fd4f4f
Updates for static analysis, ITs.
gianm Jan 18, 2024
a319f16
Fix test.
gianm Jan 18, 2024
676dd4c
Fixes.
gianm Jan 19, 2024
7f39083
Remove function.
gianm Jan 19, 2024
0e77cd2
Merge branch 'master' into realtime-rampage
gianm Jan 19, 2024
1fbb38e
Merge branch 'master' into realtime-rampage
gianm Jan 20, 2024
42680fe
IT WIP
gianm Jan 20, 2024
cfb4faf
delete
gianm Jan 20, 2024
c85f6ff
Merge branch 'master' into realtime-rampage
gianm Mar 20, 2024
d7ec2cf
Merge branch 'master' into realtime-rampage
gianm Mar 20, 2024
9a90795
Fix style.
gianm Mar 20, 2024
2eff3ea
Merge remote-tracking branch 'upstream/master' into realtime-rampage
clintropolis Jun 14, 2024
7e6d38b
fix inspection, add union query integration test using new framework
clintropolis Jun 14, 2024
8499c4a
style
clintropolis Jun 14, 2024
e165efd
style again
clintropolis Jun 14, 2024
7d663fd
dependencies
clintropolis Jun 14, 2024
b3dceea
adjust
clintropolis Jun 14, 2024
f2401cd
revert some unintended changes
clintropolis Jun 14, 2024
aaf2812
Merge remote-tracking branch 'upstream/master' into realtime-rampage
clintropolis Jun 14, 2024
9f3da17
fix test
clintropolis Jun 14, 2024
e06647a
maybe fix tests
clintropolis Jun 14, 2024
f3ce50b
missed a spot
clintropolis Jun 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/revised-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
matrix:
#jdk: [8, 11, 17]
jdk: [8]
it: [HighAvailability, MultiStageQuery, Catalog, BatchIndex, MultiStageQueryWithMM, InputSource, InputFormat, Security]
it: [HighAvailability, MultiStageQuery, Catalog, BatchIndex, MultiStageQueryWithMM, InputSource, InputFormat, Security, Query]
#indexer: [indexer, middleManager]
indexer: [middleManager]
uses: ./.github/workflows/reusable-revised-its.yml
Expand Down
3 changes: 1 addition & 2 deletions distribution/bin/tag-missing-milestones.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
# limitations under the License.

import os
import requests
import subprocess
import sys
import requests


if len(sys.argv) != 5:
sys.stderr.write('usage: program <github-username> <previous-release-commit> <new-release-commit> <milestone-number-to-tag>\n')
Expand Down
10 changes: 2 additions & 8 deletions docs/ingestion/tranquility.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,9 @@ title: "Tranquility"
~ under the License.
-->

[Tranquility](https://github.com/druid-io/tranquility/) is a separately distributed package for pushing
streams to Druid in real-time.

Tranquility has not been built against a version of Druid later than Druid 0.9.2
release. It may still work with the latest Druid servers, but not all features and functionality will be available
due to limitations of older Druid APIs on the Tranquility side.
[Tranquility](https://github.com/druid-io/tranquility/) was a separately distributed package for pushing
streams to Druid in real-time. It is not compatible with recent versions of Druid.

For new projects that require streaming ingestion, we recommend using Druid's native support for
[Apache Kafka](../ingestion/kafka-ingestion.md) or
[Amazon Kinesis](../ingestion/kinesis-ingestion.md).

For more details, check out the [Tranquility GitHub page](https://github.com/druid-io/tranquility/).
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.junit.Rule;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -145,7 +144,7 @@ public void setUp()
}

@Test
public void testCheckSegments() throws IOException
public void testCheckSegments()
{
List<DataSegment> baseSegments = createBaseSegments();
Set<DataSegment> derivativeSegments = Sets.newHashSet(createDerivativeSegments());
Expand All @@ -171,7 +170,7 @@ public void testCheckSegments() throws IOException
}

@Test
public void testSubmitTasksDoesNotFailIfTaskAlreadyExists() throws IOException
public void testSubmitTasksDoesNotFailIfTaskAlreadyExists()
{
Set<DataSegment> baseSegments = Sets.newHashSet(createBaseSegments());
Set<DataSegment> derivativeSegments = Sets.newHashSet(createDerivativeSegments());
Expand All @@ -193,7 +192,7 @@ public void testSubmitTasksDoesNotFailIfTaskAlreadyExists() throws IOException
}

@Test
public void testSubmitTasksFailsIfTaskCannotBeAdded() throws IOException
public void testSubmitTasksFailsIfTaskCannotBeAdded()
{
Set<DataSegment> baseSegments = Sets.newHashSet(createBaseSegments());
Set<DataSegment> derivativeSegments = Sets.newHashSet(createDerivativeSegments());
Expand All @@ -219,7 +218,7 @@ public void testSubmitTasksFailsIfTaskCannotBeAdded() throws IOException
}

@Test
public void testCheckSegmentsAndSubmitTasks() throws IOException
public void testCheckSegmentsAndSubmitTasks()
{
Set<DataSegment> baseSegments = Collections.singleton(createBaseSegments().get(0));
indexerMetadataStorageCoordinator.commitSegments(baseSegments, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,8 @@ public void testOptimize() throws InterruptedException
Lists.newArrayList("dim1", "dim2", "dim3", "dim4"),
1024 * 1024
);
try {
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment), null);
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
}
catch (IOException e) {
return false;
}
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment), null);
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
return true;
}
);
Expand All @@ -203,13 +198,8 @@ public void testOptimize() throws InterruptedException
Lists.newArrayList("dim1", "dim2", "dim3"),
1024
);
try {
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment), null);
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
}
catch (IOException e) {
return false;
}
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment), null);
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
return true;
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.junit.Test;

import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import java.io.IOException;
import java.util.Collection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public void testRunAfterDataInserted() throws Exception
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());

// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
Expand Down Expand Up @@ -505,7 +505,7 @@ public void testRunAfterDataInsertedWithLegacyParser() throws Exception
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());

// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
Expand Down Expand Up @@ -554,7 +554,7 @@ public void testRunBeforeDataInserted() throws Exception
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());

// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
Expand Down Expand Up @@ -89,9 +89,7 @@
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
Expand Down Expand Up @@ -1194,15 +1192,13 @@ public void testDontKillTasksWithMismatchedType() throws Exception
addSomeEvents(1);

// non KafkaIndexTask (don't kill)
Task id2 = new RealtimeIndexTask(
Task id2 = new NoopTask(
"id2",
null,
new FireDepartment(
dataSchema,
new RealtimeIOConfig(null, null),
null
),
null
dataSchema.getDataSource(),
100,
100,
ImmutableMap.of()
);

List<Task> existingTasks = ImmutableList.of(id2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
import org.apache.druid.indexing.kinesis.KinesisIndexTask;
Expand Down Expand Up @@ -84,9 +84,7 @@
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
Expand Down Expand Up @@ -940,15 +938,13 @@ public void testDontKillTasksWithMismatchedType() throws Exception
EasyMock.expectLastCall().anyTimes();

// non KinesisIndexTask (don't kill)
Task id2 = new RealtimeIndexTask(
Task id2 = new NoopTask(
"id2",
null,
new FireDepartment(
dataSchema,
new RealtimeIOConfig(null, null),
null
),
null
dataSchema.getDataSource(),
100,
100,
ImmutableMap.of()
);

List<Task> existingTasks = ImmutableList.of(id2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
package org.apache.druid.msq.counters;

import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessor;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;

/**
* Wrapper around {@link FireDepartmentMetrics} which updates the progress counters while updating its metrics. This
* Wrapper around {@link SegmentGenerationMetrics} which updates the progress counters while updating its metrics. This
* is necessary as the {@link org.apache.druid.segment.realtime.appenderator.BatchAppenderator} used by the
* {@link SegmentGeneratorFrameProcessor} is not part of the MSQ extension, and hence,
* cannot update the counters used in MSQ reports as it persists and pushes segments to deep storage.
*/
public class SegmentGeneratorMetricsWrapper extends FireDepartmentMetrics
public class SegmentGeneratorMetricsWrapper extends SegmentGenerationMetrics
{
private final SegmentGenerationProgressCounter segmentGenerationProgressCounter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.mockito.Mockito;

import javax.annotation.Nullable;

import java.io.File;
import java.util.List;
import java.util.Set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,34 @@

package org.apache.druid.indexing.common;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;

public class TaskRealtimeMetricsMonitorBuilder
{
private TaskRealtimeMetricsMonitorBuilder()
{
}

public static RealtimeMetricsMonitor build(Task task, FireDepartment fireDepartment)
{
return new RealtimeMetricsMonitor(
ImmutableList.of(fireDepartment),
ImmutableMap.of(
DruidMetrics.TASK_ID, new String[]{task.getId()},
DruidMetrics.TASK_TYPE, new String[]{task.getType()}
)
);
}

public static TaskRealtimeMetricsMonitor build(
Task task,
FireDepartment fireDepartment,
SegmentGenerationMetrics metrics,
RowIngestionMeters meters
)
{
return new TaskRealtimeMetricsMonitor(
fireDepartment,
metrics,
meters,
ImmutableMap.of(
DruidMetrics.DATASOURCE, new String[]{task.getDataSource()},
DruidMetrics.TASK_ID, new String[]{task.getId()},
DruidMetrics.TASK_TYPE, new String[]{task.getType()},
DruidMetrics.GROUP_ID, new String[]{task.getGroupId()}
),
),
task.getContextValue(DruidMetrics.TAGS)
);
}
Expand Down

This file was deleted.

Loading
Loading