From fc3b407969f4aa1a092b083ce97405e0a9bdfd85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=AD=90=E5=9D=A4?= <55695098+DanielWang2035@users.noreply.github.com> Date: Wed, 30 Aug 2023 23:33:02 +0800 Subject: [PATCH] Pipe: Fix start-time and end-time parameters not working when extracting history data (#11001) (cherry picked from commit 35736cc678211ec1f1e1f51ae6b855789302ad9d) --- .../connector/protocol/airgap/IoTDBAirGapConnector.java | 4 ++-- .../protocol/thrift/async/IoTDBThriftAsyncConnector.java | 4 ++-- .../protocol/thrift/sync/IoTDBThriftSyncConnector.java | 4 ++-- .../org/apache/iotdb/db/pipe/event/EnrichedEvent.java | 9 +++++---- .../event/common/tablet/PipeRawTabletInsertionEvent.java | 2 +- .../event/common/tsfile/PipeTsFileInsertionEvent.java | 3 +++ 6 files changed, 15 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java index 854205588f6db..0637cda4a4fea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java @@ -186,7 +186,7 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception return; } - if (((EnrichedEvent) tabletInsertionEvent).shouldParsePattern()) { + if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) { if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { transfer( ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern()); @@ -226,7 +226,7 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception return; } - if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePattern()) { + if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) { for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { transfer(event); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java index d983da389bda5..cb2c68ab6806c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java @@ -150,7 +150,7 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception return; } - if (((EnrichedEvent) tabletInsertionEvent).shouldParsePattern()) { + if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) { if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { transfer( ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern()); @@ -291,7 +291,7 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception return; } - if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePattern()) { + if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) { for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { transfer(event); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java index f75bc7c612a16..dfecd2170b354 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java @@ -185,7 +185,7 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception return; } - if (((EnrichedEvent) tabletInsertionEvent).shouldParsePattern()) { + if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) { if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { transfer( ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern()); @@ -231,7 +231,7 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception return; } - if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePattern()) { + if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) { for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { transfer(event); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java index 5c2651e584735..269d21f6a5d73 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java @@ -39,13 +39,14 @@ public abstract class EnrichedEvent implements Event { protected final PipeTaskMeta pipeTaskMeta; private final String pattern; - private final boolean isPatternParsed; + protected boolean isPatternAndTimeParsed; protected EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) { referenceCount = new AtomicInteger(0); this.pipeTaskMeta = pipeTaskMeta; this.pattern = pattern; - isPatternParsed = getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE); + isPatternAndTimeParsed = + getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE); } /** @@ -130,8 +131,8 @@ public final String getPattern() { return pattern == null ? PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE : pattern; } - public boolean shouldParsePattern() { - return !isPatternParsed; + public boolean shouldParsePatternOrTime() { + return !isPatternAndTimeParsed; } public abstract EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 5f30d3f934ec6..bd36d7eba5660 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -140,7 +140,7 @@ public boolean isAligned() { } public Tablet convertToTablet() { - if (!shouldParsePattern()) { + if (!shouldParsePatternOrTime()) { return tablet; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index d1d1adf1d4eb0..4c7d7f6fb0e56 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -69,6 +69,9 @@ public PipeTsFileInsertionEvent( this.startTime = startTime; this.endTime = endTime; + if (hasTimeFilter()) { + this.isPatternAndTimeParsed = false; + } this.resource = resource; tsFile = resource.getTsFile();