diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java index 01c1c43d18c08..02db92b8cbed8 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java @@ -735,6 +735,17 @@ public void testExtractorTimeRangeMatch() throws Exception { return; } + TestUtils.assertDataOnEnv( + receiverEnv, + "select count(*) from root.**", + "count(root.db.d1.at1),count(root.db.d3.at1),", + Collections.singleton("3,3,")); + + // flush realtime data - test PipeTsFileInsertionEvent + if (!TestUtils.tryExecuteNonQueriesWithRetry(senderEnv, Collections.singletonList("flush"))) { + return; + } + TestUtils.assertDataOnEnv( receiverEnv, "select count(*) from root.**", @@ -750,6 +761,17 @@ public void testExtractorTimeRangeMatch() throws Exception { return; } + TestUtils.assertDataOnEnv( + receiverEnv, + "select count(*) from root.**", + "count(root.db.d1.at1),count(root.db.d3.at1),", + Collections.singleton("3,3,")); + + // flush realtime data - test PipeTsFileInsertionEvent + if (!TestUtils.tryExecuteNonQueriesWithRetry(senderEnv, Collections.singletonList("flush"))) { + return; + } + TestUtils.assertDataOnEnv( receiverEnv, "select count(*) from root.**", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java index 13e335648c9b1..ec1c45d782c86 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java @@ -248,6 +248,13 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception ((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPatternOrTime()); } return; + } else { + // ignore raw tablet event with zero rows + if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) { + if (((PipeRawTabletInsertionEvent) tabletInsertionEvent).hasNoNeedParsingAndIsEmpty()) { + return; + } + } } final int socketIndex = nextSocketIndex(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java index a66ce02a63053..ad94523288eaa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java @@ -160,6 +160,13 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception ((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPatternOrTime()); } return; + } else { + // ignore raw tablet event with zero rows + if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) { + if (((PipeRawTabletInsertionEvent) tabletInsertionEvent).hasNoNeedParsingAndIsEmpty()) { + return; + } + } } if (isTabletBatchModeEnabled) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java index 0d4ef07d53927..b2225afe3624e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java @@ -207,6 +207,13 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception ((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPatternOrTime()); } return; + } else { + // ignore raw tablet event with zero rows + if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) { + if (((PipeRawTabletInsertionEvent) tabletInsertionEvent).hasNoNeedParsingAndIsEmpty()) { + return; + } + } } try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java index 5e6c3beda4c20..3b2a7e2359265 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java @@ -102,6 +102,13 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception ((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPatternOrTime()); } return; + } else { + // ignore raw tablet event with zero rows + if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) { + if (((PipeRawTabletInsertionEvent) tabletInsertionEvent).hasNoNeedParsingAndIsEmpty()) { + return; + } + } } if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java index 1deedd1120fae..0212d64e065bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java @@ -109,6 +109,7 @@ public ProgressIndex getProgressIndex() { public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long startTime, long endTime) { // Should record PipeTaskMeta, for sometimes HeartbeatEvents should report exceptions. + // Here we ignore parameters `pattern`, `startTime`, and `endTime`. return new PipeHeartbeatEvent( pipeName, pipeTaskMeta, dataRegionId, timePublished, shouldPrintMessage); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 559c638b6f645..68d8049a81f27 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -238,7 +238,7 @@ public Tablet convertToTablet() { } } - /////////////////////////// parsePattern /////////////////////////// + /////////////////////////// parsePatternOrTime /////////////////////////// public TabletInsertionEvent parseEventWithPatternOrTime() { return new PipeRawTabletInsertionEvent( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index b515b6afc0584..4e30a6b1f68d9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -198,10 +198,14 @@ public Tablet convertToTablet() { return dataContainer.convertToTablet(); } - /////////////////////////// parsePattern /////////////////////////// + /////////////////////////// parsePatternOrTime /////////////////////////// public TabletInsertionEvent parseEventWithPatternOrTime() { return new PipeRawTabletInsertionEvent( convertToTablet(), isAligned, pipeName, pipeTaskMeta, this, needToReport); } + + public boolean hasNoNeedParsingAndIsEmpty() { + return !shouldParsePatternOrTime() && tablet.rowSize == 0; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index c4c3675c7ddff..222e135335aa0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -167,8 +167,8 @@ private void parse(InsertRowNode insertRowNode, String pattern) { } rowCount = rowIndexList.size(); - if (rowCount == 0) { - LOGGER.info( + if (rowCount == 0 && LOGGER.isDebugEnabled()) { + LOGGER.debug( "InsertRowNode({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.", insertRowNode, pattern, @@ -238,8 +238,8 @@ private void parse(InsertTabletNode insertTabletNode, String pattern) { } rowCount = timestampColumn.length; - if (rowCount == 0) { - LOGGER.info( + if (rowCount == 0 && LOGGER.isDebugEnabled()) { + LOGGER.debug( "InsertTabletNode({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.", insertTabletNode, pattern, @@ -315,8 +315,8 @@ private void parse(Tablet tablet, boolean isAligned, String pattern) { } rowCount = tablet.rowSize; - if (rowCount == 0) { - LOGGER.info( + if (rowCount == 0 && LOGGER.isDebugEnabled()) { + LOGGER.debug( "Tablet({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.", tablet, pattern, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index f296deb2af32c..473c922f15f2f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -166,14 +166,7 @@ public ProgressIndex getProgressIndex() { public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long startTime, long endTime) { return new PipeTsFileInsertionEvent( - resource, - isLoaded, - isGeneratedByPipe, - pipeName, - pipeTaskMeta, - pattern, - this.startTime, - this.endTime); + resource, isLoaded, isGeneratedByPipe, pipeName, pipeTaskMeta, pattern, startTime, endTime); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java index 8f171c4243238..917c4cd330810 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -86,8 +86,8 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa private boolean isHistoricalExtractorEnabled = false; - private long historicalDataExtractionStartTime; // Event time - private long historicalDataExtractionEndTime; // Event time + private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event time + private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time private long historicalDataExtractionTimeLowerBound; // Arrival time diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java index 94aa97b38831b..f12c24ee718e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java @@ -70,8 +70,8 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { protected String pattern; private boolean isDbNameCoveredByPattern = false; - protected long realtimeDataExtractionStartTime; // Event time - protected long realtimeDataExtractionEndTime; // Event time + protected long realtimeDataExtractionStartTime = Long.MIN_VALUE; // Event time + protected long realtimeDataExtractionEndTime = Long.MAX_VALUE; // Event time private final AtomicBoolean enableSkippingTimeParseByTimePartition = new AtomicBoolean(false); private boolean disableSkippingTimeParse = false; @@ -136,6 +136,13 @@ public void customize(PipeParameters parameters, PipeExtractorRuntimeConfigurati dataRegionId = String.valueOf(environment.getRegionId()); pipeTaskMeta = environment.getPipeTaskMeta(); + // Metrics related to TsFileEpoch are managed in PipeExtractorMetrics. These metrics are + // indexed by the taskID of IoTDBDataRegionExtractor. To avoid PipeRealtimeDataRegionExtractor + // holding a reference to IoTDBDataRegionExtractor, the taskID should be constructed to + // match that of IoTDBDataRegionExtractor. + long creationTime = environment.getCreationTime(); + taskID = pipeName + "_" + dataRegionId + "_" + creationTime; + pattern = parameters.getStringOrDefault( Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY), @@ -179,13 +186,6 @@ public void customize(PipeParameters parameters, PipeExtractorRuntimeConfigurati PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); - - // Metrics related to TsFileEpoch are managed in PipeExtractorMetrics. These metrics are - // indexed by the taskID of IoTDBDataRegionExtractor. To avoid PipeRealtimeDataRegionExtractor - // holding a reference to IoTDBDataRegionExtractor, the taskID should be constructed to - // match that of IoTDBDataRegionExtractor. - long creationTime = configuration.getRuntimeEnvironment().getCreationTime(); - taskID = pipeName + "_" + dataRegionId + "_" + creationTime; } @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java index c1a7ed97575d9..8e189e9ab5265 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -104,57 +105,68 @@ public void tearDown() { public void testRealtimeExtractProcess() { // set up realtime extractor - try (PipeRealtimeDataRegionLogExtractor extractor1 = new PipeRealtimeDataRegionLogExtractor(); - PipeRealtimeDataRegionHybridExtractor extractor2 = + try (PipeRealtimeDataRegionLogExtractor extractor0 = new PipeRealtimeDataRegionLogExtractor(); + PipeRealtimeDataRegionHybridExtractor extractor1 = new PipeRealtimeDataRegionHybridExtractor(); - PipeRealtimeDataRegionTsFileExtractor extractor3 = + PipeRealtimeDataRegionTsFileExtractor extractor2 = new PipeRealtimeDataRegionTsFileExtractor(); - PipeRealtimeDataRegionHybridExtractor extractor4 = + PipeRealtimeDataRegionHybridExtractor extractor3 = new PipeRealtimeDataRegionHybridExtractor()) { - extractor1.customize( + PipeParameters parameters0 = new PipeParameters( new HashMap() { { put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern1); } - }), - new PipeTaskRuntimeConfiguration( - new PipeTaskExtractorRuntimeEnvironment( - "1", 1, Integer.parseInt(dataRegion1), null))); - extractor2.customize( + }); + PipeParameters parameters1 = new PipeParameters( new HashMap() { { put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern2); } - }), - new PipeTaskRuntimeConfiguration( - new PipeTaskExtractorRuntimeEnvironment( - "1", 1, Integer.parseInt(dataRegion1), null))); - extractor3.customize( + }); + PipeParameters parameters2 = new PipeParameters( new HashMap() { { put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern1); } - }), - new PipeTaskRuntimeConfiguration( - new PipeTaskExtractorRuntimeEnvironment( - "1", 1, Integer.parseInt(dataRegion2), null))); - extractor4.customize( + }); + PipeParameters parameters3 = new PipeParameters( new HashMap() { { put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern2); } - }), + }); + + PipeTaskRuntimeConfiguration configuration0 = new PipeTaskRuntimeConfiguration( - new PipeTaskExtractorRuntimeEnvironment( - "1", 1, Integer.parseInt(dataRegion2), null))); + new PipeTaskExtractorRuntimeEnvironment("1", 1, Integer.parseInt(dataRegion1), null)); + PipeTaskRuntimeConfiguration configuration1 = + new PipeTaskRuntimeConfiguration( + new PipeTaskExtractorRuntimeEnvironment("1", 1, Integer.parseInt(dataRegion1), null)); + PipeTaskRuntimeConfiguration configuration2 = + new PipeTaskRuntimeConfiguration( + new PipeTaskExtractorRuntimeEnvironment("1", 1, Integer.parseInt(dataRegion2), null)); + PipeTaskRuntimeConfiguration configuration3 = + new PipeTaskRuntimeConfiguration( + new PipeTaskExtractorRuntimeEnvironment("1", 1, Integer.parseInt(dataRegion2), null)); + + // Some parameters of extractor are validated and initialized during the validation process. + extractor0.validate(new PipeParameterValidator(parameters0)); + extractor0.customize(parameters0, configuration0); + extractor1.validate(new PipeParameterValidator(parameters1)); + extractor1.customize(parameters1, configuration1); + extractor2.validate(new PipeParameterValidator(parameters2)); + extractor2.customize(parameters2, configuration2); + extractor3.validate(new PipeParameterValidator(parameters3)); + extractor3.customize(parameters3, configuration3); PipeRealtimeDataRegionExtractor[] extractors = - new PipeRealtimeDataRegionExtractor[] {extractor1, extractor2, extractor3, extractor4}; + new PipeRealtimeDataRegionExtractor[] {extractor0, extractor1, extractor2, extractor3}; // start extractor 0, 1 extractors[0].start();