Skip to content

Commit

Permalink
Remove index_realtime and index_realtime_appenderator tasks (#16602)
Browse files Browse the repository at this point in the history
index_realtime tasks were removed from the documentation in #13107. Even
at that time, they weren't really documented per se— just mentioned. They
existed solely to support Tranquility, which is an obsolete ingestion
method that predates migration of Druid to ASF and is no longer being
maintained. Tranquility docs were also de-linked from the sidebars and
the other doc pages in #11134. Only a stub remains, so people with
links to the page can see that it's no longer recommended.

index_realtime_appenderator tasks existed in the code base, but were
never documented, nor as far as I am aware were they used for any purpose.

This patch removes both task types completely, as well as removes all
supporting code that was otherwise unused. It also updates the stub
doc for Tranquility to be firmer that it is not compatible. (Previously,
the stub doc said it wasn't recommended, and pointed out that it is
built against an ancient 0.9.2 version of Druid.)

ITUnionQueryTest has been migrated to the new integration tests framework and updated to use Kafka ingestion.

Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
  • Loading branch information
clintropolis and gianm committed Jun 25, 2024
1 parent 2131917 commit 37a50e6
Show file tree
Hide file tree
Showing 176 changed files with 2,219 additions and 14,273 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/revised-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
matrix:
#jdk: [8, 11, 17]
jdk: [8]
it: [HighAvailability, MultiStageQuery, Catalog, BatchIndex, MultiStageQueryWithMM, InputSource, InputFormat, Security]
it: [HighAvailability, MultiStageQuery, Catalog, BatchIndex, MultiStageQueryWithMM, InputSource, InputFormat, Security, Query]
#indexer: [indexer, middleManager]
indexer: [middleManager]
uses: ./.github/workflows/reusable-revised-its.yml
Expand Down
3 changes: 1 addition & 2 deletions distribution/bin/tag-missing-milestones.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
# limitations under the License.

import os
import requests
import subprocess
import sys
import requests


if len(sys.argv) != 5:
sys.stderr.write('usage: program <github-username> <previous-release-commit> <new-release-commit> <milestone-number-to-tag>\n')
Expand Down
10 changes: 2 additions & 8 deletions docs/ingestion/tranquility.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,9 @@ title: "Tranquility"
~ under the License.
-->

[Tranquility](https://github.com/druid-io/tranquility/) is a separately distributed package for pushing
streams to Druid in real-time.

Tranquility has not been built against a version of Druid later than Druid 0.9.2
release. It may still work with the latest Druid servers, but not all features and functionality will be available
due to limitations of older Druid APIs on the Tranquility side.
[Tranquility](https://github.com/druid-io/tranquility/) was a separately distributed package for pushing
streams to Druid in real-time. It is not compatible with recent versions of Druid.

For new projects that require streaming ingestion, we recommend using Druid's native support for
[Apache Kafka](../ingestion/kafka-ingestion.md) or
[Amazon Kinesis](../ingestion/kinesis-ingestion.md).

For more details, check out the [Tranquility GitHub page](https://github.com/druid-io/tranquility/).
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.junit.Rule;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -145,7 +144,7 @@ public void setUp()
}

@Test
public void testCheckSegments() throws IOException
public void testCheckSegments()
{
List<DataSegment> baseSegments = createBaseSegments();
Set<DataSegment> derivativeSegments = Sets.newHashSet(createDerivativeSegments());
Expand All @@ -171,7 +170,7 @@ public void testCheckSegments() throws IOException
}

@Test
public void testSubmitTasksDoesNotFailIfTaskAlreadyExists() throws IOException
public void testSubmitTasksDoesNotFailIfTaskAlreadyExists()
{
Set<DataSegment> baseSegments = Sets.newHashSet(createBaseSegments());
Set<DataSegment> derivativeSegments = Sets.newHashSet(createDerivativeSegments());
Expand All @@ -193,7 +192,7 @@ public void testSubmitTasksDoesNotFailIfTaskAlreadyExists() throws IOException
}

@Test
public void testSubmitTasksFailsIfTaskCannotBeAdded() throws IOException
public void testSubmitTasksFailsIfTaskCannotBeAdded()
{
Set<DataSegment> baseSegments = Sets.newHashSet(createBaseSegments());
Set<DataSegment> derivativeSegments = Sets.newHashSet(createDerivativeSegments());
Expand All @@ -219,7 +218,7 @@ public void testSubmitTasksFailsIfTaskCannotBeAdded() throws IOException
}

@Test
public void testCheckSegmentsAndSubmitTasks() throws IOException
public void testCheckSegmentsAndSubmitTasks()
{
Set<DataSegment> baseSegments = Collections.singleton(createBaseSegments().get(0));
indexerMetadataStorageCoordinator.commitSegments(baseSegments, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,8 @@ public void testOptimize() throws InterruptedException
Lists.newArrayList("dim1", "dim2", "dim3", "dim4"),
1024 * 1024
);
try {
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment), null);
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
}
catch (IOException e) {
return false;
}
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment), null);
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
return true;
}
);
Expand All @@ -203,13 +198,8 @@ public void testOptimize() throws InterruptedException
Lists.newArrayList("dim1", "dim2", "dim3"),
1024
);
try {
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment), null);
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
}
catch (IOException e) {
return false;
}
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment), null);
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
return true;
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.junit.Test;

import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import java.io.IOException;
import java.util.Collection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public void testRunAfterDataInserted() throws Exception
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());

// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
Expand Down Expand Up @@ -505,7 +505,7 @@ public void testRunAfterDataInsertedWithLegacyParser() throws Exception
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());

// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
Expand Down Expand Up @@ -554,7 +554,7 @@ public void testRunBeforeDataInserted() throws Exception
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());

// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
Expand Down Expand Up @@ -89,9 +89,7 @@
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
Expand Down Expand Up @@ -1194,15 +1192,13 @@ public void testDontKillTasksWithMismatchedType() throws Exception
addSomeEvents(1);

// non KafkaIndexTask (don't kill)
Task id2 = new RealtimeIndexTask(
Task id2 = new NoopTask(
"id2",
null,
new FireDepartment(
dataSchema,
new RealtimeIOConfig(null, null),
null
),
null
dataSchema.getDataSource(),
100,
100,
ImmutableMap.of()
);

List<Task> existingTasks = ImmutableList.of(id2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
import org.apache.druid.indexing.kinesis.KinesisIndexTask;
Expand Down Expand Up @@ -84,9 +84,7 @@
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
Expand Down Expand Up @@ -940,15 +938,13 @@ public void testDontKillTasksWithMismatchedType() throws Exception
EasyMock.expectLastCall().anyTimes();

// non KinesisIndexTask (don't kill)
Task id2 = new RealtimeIndexTask(
Task id2 = new NoopTask(
"id2",
null,
new FireDepartment(
dataSchema,
new RealtimeIOConfig(null, null),
null
),
null
dataSchema.getDataSource(),
100,
100,
ImmutableMap.of()
);

List<Task> existingTasks = ImmutableList.of(id2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
package org.apache.druid.msq.counters;

import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessor;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;

/**
* Wrapper around {@link FireDepartmentMetrics} which updates the progress counters while updating its metrics. This
* Wrapper around {@link SegmentGenerationMetrics} which updates the progress counters while updating its metrics. This
* is necessary as the {@link org.apache.druid.segment.realtime.appenderator.BatchAppenderator} used by the
* {@link SegmentGeneratorFrameProcessor} is not part of the MSQ extension, and hence,
* cannot update the counters used in MSQ reports as it persists and pushes segments to deep storage.
*/
public class SegmentGeneratorMetricsWrapper extends FireDepartmentMetrics
public class SegmentGeneratorMetricsWrapper extends SegmentGenerationMetrics
{
private final SegmentGenerationProgressCounter segmentGenerationProgressCounter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.mockito.Mockito;

import javax.annotation.Nullable;

import java.io.File;
import java.util.List;
import java.util.Set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,34 @@

package org.apache.druid.indexing.common;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;

public class TaskRealtimeMetricsMonitorBuilder
{
private TaskRealtimeMetricsMonitorBuilder()
{
}

public static RealtimeMetricsMonitor build(Task task, FireDepartment fireDepartment)
{
return new RealtimeMetricsMonitor(
ImmutableList.of(fireDepartment),
ImmutableMap.of(
DruidMetrics.TASK_ID, new String[]{task.getId()},
DruidMetrics.TASK_TYPE, new String[]{task.getType()}
)
);
}

public static TaskRealtimeMetricsMonitor build(
Task task,
FireDepartment fireDepartment,
SegmentGenerationMetrics metrics,
RowIngestionMeters meters
)
{
return new TaskRealtimeMetricsMonitor(
fireDepartment,
metrics,
meters,
ImmutableMap.of(
DruidMetrics.DATASOURCE, new String[]{task.getDataSource()},
DruidMetrics.TASK_ID, new String[]{task.getId()},
DruidMetrics.TASK_TYPE, new String[]{task.getType()},
DruidMetrics.GROUP_ID, new String[]{task.getGroupId()}
),
),
task.getContextValue(DruidMetrics.TAGS)
);
}
Expand Down

This file was deleted.

Loading

0 comments on commit 37a50e6

Please sign in to comment.