Skip to content

Commit

Permalink
Realtime cleanup and renaming
Browse files Browse the repository at this point in the history
changes:
* `FireHydrant` is now `PartialSegment`. This name much more clearly describes what this class does, and with all the other fireman terminology removed it didn't even fit a theme anymore.
* `Sink` is now `AppendableSegment`. This name also much more clearly describes what this class does, and is composed of `PartialSegments` per the previous `FireHydrant` rename.
* Additionally, `SinkQuerySegmentWalker` -> `AppendableSegmentQuerySegmentWalker`, and `SinkQueryRunner` -> `AppendableSegmentQueryRunner`
* Remove `Firehose`, `IngestSegmentFirehose` was only used by Hadoop indexing `DruidRecordReader`, moved to internal class of `DruidRecordReader` as `SegmentReader`
* Remove `FirehoseFactory` and remaining implementations, after apache#16602 they were no longer used
* Moved things from `org.apache.druid.segment.realtime.sink` and `org.apache.druid.segment.realtime.firehose` up to `org.apache.druid.segment.realtime`.
  • Loading branch information
clintropolis committed Jun 25, 2024
1 parent 37a50e6 commit 4b99c73
Show file tree
Hide file tree
Showing 152 changed files with 1,597 additions and 4,347 deletions.
6 changes: 3 additions & 3 deletions docs/api-reference/tasks-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1042,12 +1042,12 @@ Host: http://ROUTER_IP:ROUTER_PORT
2023-07-03T22:11:17,902 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-kafka-supervisor-dcanhmig-1, groupId=kafka-supervisor-dcanhmig] Unsubscribed all topics or patterns and assigned partitions
2023-07-03T22:11:17,912 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted rows[0] and (estimated) bytes[0]
2023-07-03T22:11:17,916 INFO [[index_kafka_social_media_0e905aa31037879_nommnaeg]-appenderator-persist] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Flushed in-memory data with commit metadata [AppenderatorDriverMetadata{segments={}, lastSegmentIds={}, callerMetadata={nextPartitions=SeekableStreamEndSequenceNumbers{stream='social_media', partitionSequenceNumberMap={0=230985}}}}] for segments:
2023-07-03T22:11:17,917 INFO [[index_kafka_social_media_0e905aa31037879_nommnaeg]-appenderator-persist] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted stats: processed rows: [0], persisted rows[0], sinks: [0], total fireHydrants (across sinks): [0], persisted fireHydrants (across sinks): [0]
2023-07-03T22:11:17,917 INFO [[index_kafka_social_media_0e905aa31037879_nommnaeg]-appenderator-persist] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted stats: processed rows: [0], persisted rows[0], appendable segments: [0], total partial segments (across appendable segments): [0], persisted partial segments (across appendable segments): [0]
2023-07-03T22:11:17,919 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Pushing [0] segments in background
2023-07-03T22:11:17,921 INFO [task-runner-0-priority-0] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted rows[0] and (estimated) bytes[0]
2023-07-03T22:11:17,924 INFO [[index_kafka_social_media_0e905aa31037879_nommnaeg]-appenderator-persist] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Flushed in-memory data with commit metadata [AppenderatorDriverMetadata{segments={}, lastSegmentIds={}, callerMetadata={nextPartitions=SeekableStreamStartSequenceNumbers{stream='social_media', partitionSequenceNumberMap={0=230985}, exclusivePartitions=[]}, publishPartitions=SeekableStreamEndSequenceNumbers{stream='social_media', partitionSequenceNumberMap={0=230985}}}}] for segments:
2023-07-03T22:11:17,924 INFO [[index_kafka_social_media_0e905aa31037879_nommnaeg]-appenderator-persist] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted stats: processed rows: [0], persisted rows[0], sinks: [0], total fireHydrants (across sinks): [0], persisted fireHydrants (across sinks): [0]
2023-07-03T22:11:17,925 INFO [[index_kafka_social_media_0e905aa31037879_nommnaeg]-appenderator-merge] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Preparing to push (stats): processed rows: [0], sinks: [0], fireHydrants (across sinks): [0]
2023-07-03T22:11:17,924 INFO [[index_kafka_social_media_0e905aa31037879_nommnaeg]-appenderator-persist] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted stats: processed rows: [0], persisted rows[0], appendable segments: [0], total partial segments (across appendable segments): [0], persisted partial segments (across appendable segments): [0]
2023-07-03T22:11:17,925 INFO [[index_kafka_social_media_0e905aa31037879_nommnaeg]-appenderator-merge] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Preparing to push (stats): processed rows: [0], appendable segments: [0], partial segments (across appendable segments): [0]
2023-07-03T22:11:17,925 INFO [[index_kafka_social_media_0e905aa31037879_nommnaeg]-appenderator-merge] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Push complete...
2023-07-03T22:11:17,929 INFO [[index_kafka_social_media_0e905aa31037879_nommnaeg]-publish] org.apache.druid.indexing.seekablestream.SequenceMetadata - With empty segment set, start offsets [SeekableStreamStartSequenceNumbers{stream='social_media', partitionSequenceNumberMap={0=230985}, exclusivePartitions=[]}] and end offsets [SeekableStreamEndSequenceNumbers{stream='social_media', partitionSequenceNumberMap={0=230985}}] are the same, skipping metadata commit.
2023-07-03T22:11:17,930 INFO [[index_kafka_social_media_0e905aa31037879_nommnaeg]-publish] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Published [0] segments with commit metadata [{nextPartitions=SeekableStreamStartSequenceNumbers{stream='social_media', partitionSequenceNumberMap={0=230985}, exclusivePartitions=[]}, publishPartitions=SeekableStreamEndSequenceNumbers{stream='social_media', partitionSequenceNumberMap={0=230985}}}]
Expand Down
1 change: 0 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ Metric monitoring is an essential part of Druid operations. The following monito
|`org.apache.druid.java.util.metrics.CgroupCpuSetMonitor`|Reports CPU core/HT and memory node allocations as per the `cpuset` cgroup.|
|`org.apache.druid.java.util.metrics.CgroupDiskMonitor`|Reports disk statistic as per the blkio cgroup.|
|`org.apache.druid.java.util.metrics.CgroupMemoryMonitor`|Reports memory statistic as per the memory cgroup.|
|`org.apache.druid.server.metrics.EventReceiverFirehoseMonitor`|Reports how many events have been queued in the EventReceiverFirehose.|
|`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical services. Available only on Historical services.|
|`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** Reports statistics about segments on Historical services. Available only on Historical services. Not to be used when lazy loading is configured.|
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.|
Expand Down
2 changes: 1 addition & 1 deletion docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
|`ingest/merge/time`|Milliseconds spent merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the configuration. Generally a few minutes at most.|
|`ingest/merge/cpu`|CPU time in Nanoseconds spent on merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the configuration. Generally a few minutes at most.|
|`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Varies. Generally greater than 0 once every segment granular period if cluster operating normally.|
|`ingest/sink/count`|Number of sinks not handed off.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|1~3|
|`ingest/sink/count`|Number of appendable segments not handed off.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|1~3|
|`ingest/events/messageGap`|Time gap in milliseconds between the latest ingested event timestamp and the current system timestamp of metrics emission. If the value is increasing but lag is low, Druid may not be receiving new data. This metric is reset as new tasks spawn up.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Greater than 0, depends on the time carried in event.|
|`ingest/notices/queueSize`|Number of pending notices to be processed by the coordinator.|`dataSource`, `tags`|Typically 0 and occasionally in lower single digits. Should not be a very high number. |
|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public static Task getTask()
null
),
new IndexTask.IndexIOConfig(
null,
new LocalInputSource(new File("lol"), "rofl"),
new NoopInputFormat(),
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

package org.apache.druid.k8s.overlord.taskadapter;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
Expand Down Expand Up @@ -83,9 +81,6 @@ public void setup()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
for (Module jacksonModule : new FirehoseModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
jsonMapper.registerSubtypes(
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.k8s.overlord.taskadapter;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Joiner;
Expand All @@ -44,7 +43,6 @@
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
Expand Down Expand Up @@ -101,9 +99,6 @@ public K8sTaskAdapterTest()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
for (Module jacksonModule : new FirehoseModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
jsonMapper.registerSubtypes(
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.k8s.overlord.taskadapter;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
Expand All @@ -28,7 +27,6 @@
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
Expand Down Expand Up @@ -67,9 +65,6 @@ public void setup()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
for (Module jacksonModule : new FirehoseModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
jsonMapper.registerSubtypes(
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

package org.apache.druid.k8s.overlord.taskadapter;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
Expand Down Expand Up @@ -66,9 +64,6 @@ public void setup()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
for (Module jacksonModule : new FirehoseModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
jsonMapper.registerSubtypes(
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.apache.druid.query.QueryContext;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.indexing.IndexerResourcePermissionMapper;
import org.apache.druid.msq.rpc.ControllerResource;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.server.security.AuthorizerMapper;

public class ControllerChatHandler extends ControllerResource implements ChatHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import org.apache.druid.msq.statistics.serde.ClusterByStatisticsSnapshotSerde;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlers;
import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.segment.realtime.ChatHandlers;
import org.apache.druid.server.security.Action;
import org.apache.druid.utils.CloseableUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
Expand Down
Loading

0 comments on commit 4b99c73

Please sign in to comment.