Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,17 @@ public void testExtractorTimeRangeMatch() throws Exception {
return;
}

TestUtils.assertDataOnEnv(
receiverEnv,
"select count(*) from root.**",
"count(root.db.d1.at1),count(root.db.d3.at1),",
Collections.singleton("3,3,"));

// flush realtime data - test PipeTsFileInsertionEvent
if (!TestUtils.tryExecuteNonQueriesWithRetry(senderEnv, Collections.singletonList("flush"))) {
return;
}

TestUtils.assertDataOnEnv(
receiverEnv,
"select count(*) from root.**",
Expand All @@ -750,6 +761,17 @@ public void testExtractorTimeRangeMatch() throws Exception {
return;
}

TestUtils.assertDataOnEnv(
receiverEnv,
"select count(*) from root.**",
"count(root.db.d1.at1),count(root.db.d3.at1),",
Collections.singleton("3,3,"));

// flush realtime data - test PipeTsFileInsertionEvent
if (!TestUtils.tryExecuteNonQueriesWithRetry(senderEnv, Collections.singletonList("flush"))) {
return;
}

TestUtils.assertDataOnEnv(
receiverEnv,
"select count(*) from root.**",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPatternOrTime());
}
return;
} else {
// ignore raw tablet event with zero rows
if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
if (((PipeRawTabletInsertionEvent) tabletInsertionEvent).hasNoNeedParsingAndIsEmpty()) {
return;
}
}
}

final int socketIndex = nextSocketIndex();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPatternOrTime());
}
return;
} else {
// ignore raw tablet event with zero rows
if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
if (((PipeRawTabletInsertionEvent) tabletInsertionEvent).hasNoNeedParsingAndIsEmpty()) {
return;
}
}
}

if (isTabletBatchModeEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPatternOrTime());
}
return;
} else {
// ignore raw tablet event with zero rows
if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
if (((PipeRawTabletInsertionEvent) tabletInsertionEvent).hasNoNeedParsingAndIsEmpty()) {
return;
}
}
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPatternOrTime());
}
return;
} else {
// ignore raw tablet event with zero rows
if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
if (((PipeRawTabletInsertionEvent) tabletInsertionEvent).hasNoNeedParsingAndIsEmpty()) {
return;
}
}
}

if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public ProgressIndex getProgressIndex() {
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long startTime, long endTime) {
// Should record PipeTaskMeta, for sometimes HeartbeatEvents should report exceptions.
// Here we ignore parameters `pattern`, `startTime`, and `endTime`.
return new PipeHeartbeatEvent(
pipeName, pipeTaskMeta, dataRegionId, timePublished, shouldPrintMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public Tablet convertToTablet() {
}
}

/////////////////////////// parsePattern ///////////////////////////
/////////////////////////// parsePatternOrTime ///////////////////////////

public TabletInsertionEvent parseEventWithPatternOrTime() {
return new PipeRawTabletInsertionEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,14 @@ public Tablet convertToTablet() {
return dataContainer.convertToTablet();
}

/////////////////////////// parsePattern ///////////////////////////
/////////////////////////// parsePatternOrTime ///////////////////////////

public TabletInsertionEvent parseEventWithPatternOrTime() {
return new PipeRawTabletInsertionEvent(
convertToTablet(), isAligned, pipeName, pipeTaskMeta, this, needToReport);
}

public boolean hasNoNeedParsingAndIsEmpty() {
return !shouldParsePatternOrTime() && tablet.rowSize == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ private void parse(InsertRowNode insertRowNode, String pattern) {
}

rowCount = rowIndexList.size();
if (rowCount == 0) {
LOGGER.info(
if (rowCount == 0 && LOGGER.isDebugEnabled()) {
LOGGER.debug(
"InsertRowNode({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.",
insertRowNode,
pattern,
Expand Down Expand Up @@ -238,8 +238,8 @@ private void parse(InsertTabletNode insertTabletNode, String pattern) {
}

rowCount = timestampColumn.length;
if (rowCount == 0) {
LOGGER.info(
if (rowCount == 0 && LOGGER.isDebugEnabled()) {
LOGGER.debug(
"InsertTabletNode({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.",
insertTabletNode,
pattern,
Expand Down Expand Up @@ -315,8 +315,8 @@ private void parse(Tablet tablet, boolean isAligned, String pattern) {
}

rowCount = tablet.rowSize;
if (rowCount == 0) {
LOGGER.info(
if (rowCount == 0 && LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Tablet({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.",
tablet,
pattern,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,7 @@ public ProgressIndex getProgressIndex() {
public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long startTime, long endTime) {
return new PipeTsFileInsertionEvent(
resource,
isLoaded,
isGeneratedByPipe,
pipeName,
pipeTaskMeta,
pattern,
this.startTime,
this.endTime);
resource, isLoaded, isGeneratedByPipe, pipeName, pipeTaskMeta, pattern, startTime, endTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa

private boolean isHistoricalExtractorEnabled = false;

private long historicalDataExtractionStartTime; // Event time
private long historicalDataExtractionEndTime; // Event time
private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event time
private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time

private long historicalDataExtractionTimeLowerBound; // Arrival time

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor {
protected String pattern;
private boolean isDbNameCoveredByPattern = false;

protected long realtimeDataExtractionStartTime; // Event time
protected long realtimeDataExtractionEndTime; // Event time
protected long realtimeDataExtractionStartTime = Long.MIN_VALUE; // Event time
protected long realtimeDataExtractionEndTime = Long.MAX_VALUE; // Event time

private final AtomicBoolean enableSkippingTimeParseByTimePartition = new AtomicBoolean(false);
private boolean disableSkippingTimeParse = false;
Expand Down Expand Up @@ -136,6 +136,13 @@ public void customize(PipeParameters parameters, PipeExtractorRuntimeConfigurati
dataRegionId = String.valueOf(environment.getRegionId());
pipeTaskMeta = environment.getPipeTaskMeta();

// Metrics related to TsFileEpoch are managed in PipeExtractorMetrics. These metrics are
// indexed by the taskID of IoTDBDataRegionExtractor. To avoid PipeRealtimeDataRegionExtractor
// holding a reference to IoTDBDataRegionExtractor, the taskID should be constructed to
// match that of IoTDBDataRegionExtractor.
long creationTime = environment.getCreationTime();
taskID = pipeName + "_" + dataRegionId + "_" + creationTime;

pattern =
parameters.getStringOrDefault(
Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY),
Expand Down Expand Up @@ -179,13 +186,6 @@ public void customize(PipeParameters parameters, PipeExtractorRuntimeConfigurati
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);

// Metrics related to TsFileEpoch are managed in PipeExtractorMetrics. These metrics are
// indexed by the taskID of IoTDBDataRegionExtractor. To avoid PipeRealtimeDataRegionExtractor
// holding a reference to IoTDBDataRegionExtractor, the taskID should be constructed to
// match that of IoTDBDataRegionExtractor.
long creationTime = configuration.getRuntimeEnvironment().getCreationTime();
taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
Expand Down Expand Up @@ -104,57 +105,68 @@ public void tearDown() {
public void testRealtimeExtractProcess() {
// set up realtime extractor

try (PipeRealtimeDataRegionLogExtractor extractor1 = new PipeRealtimeDataRegionLogExtractor();
PipeRealtimeDataRegionHybridExtractor extractor2 =
try (PipeRealtimeDataRegionLogExtractor extractor0 = new PipeRealtimeDataRegionLogExtractor();
PipeRealtimeDataRegionHybridExtractor extractor1 =
new PipeRealtimeDataRegionHybridExtractor();
PipeRealtimeDataRegionTsFileExtractor extractor3 =
PipeRealtimeDataRegionTsFileExtractor extractor2 =
new PipeRealtimeDataRegionTsFileExtractor();
PipeRealtimeDataRegionHybridExtractor extractor4 =
PipeRealtimeDataRegionHybridExtractor extractor3 =
new PipeRealtimeDataRegionHybridExtractor()) {

extractor1.customize(
PipeParameters parameters0 =
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern1);
}
}),
new PipeTaskRuntimeConfiguration(
new PipeTaskExtractorRuntimeEnvironment(
"1", 1, Integer.parseInt(dataRegion1), null)));
extractor2.customize(
});
PipeParameters parameters1 =
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern2);
}
}),
new PipeTaskRuntimeConfiguration(
new PipeTaskExtractorRuntimeEnvironment(
"1", 1, Integer.parseInt(dataRegion1), null)));
extractor3.customize(
});
PipeParameters parameters2 =
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern1);
}
}),
new PipeTaskRuntimeConfiguration(
new PipeTaskExtractorRuntimeEnvironment(
"1", 1, Integer.parseInt(dataRegion2), null)));
extractor4.customize(
});
PipeParameters parameters3 =
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern2);
}
}),
});

PipeTaskRuntimeConfiguration configuration0 =
new PipeTaskRuntimeConfiguration(
new PipeTaskExtractorRuntimeEnvironment(
"1", 1, Integer.parseInt(dataRegion2), null)));
new PipeTaskExtractorRuntimeEnvironment("1", 1, Integer.parseInt(dataRegion1), null));
PipeTaskRuntimeConfiguration configuration1 =
new PipeTaskRuntimeConfiguration(
new PipeTaskExtractorRuntimeEnvironment("1", 1, Integer.parseInt(dataRegion1), null));
PipeTaskRuntimeConfiguration configuration2 =
new PipeTaskRuntimeConfiguration(
new PipeTaskExtractorRuntimeEnvironment("1", 1, Integer.parseInt(dataRegion2), null));
PipeTaskRuntimeConfiguration configuration3 =
new PipeTaskRuntimeConfiguration(
new PipeTaskExtractorRuntimeEnvironment("1", 1, Integer.parseInt(dataRegion2), null));

// Some parameters of extractor are validated and initialized during the validation process.
extractor0.validate(new PipeParameterValidator(parameters0));
extractor0.customize(parameters0, configuration0);
extractor1.validate(new PipeParameterValidator(parameters1));
extractor1.customize(parameters1, configuration1);
extractor2.validate(new PipeParameterValidator(parameters2));
extractor2.customize(parameters2, configuration2);
extractor3.validate(new PipeParameterValidator(parameters3));
extractor3.customize(parameters3, configuration3);

PipeRealtimeDataRegionExtractor[] extractors =
new PipeRealtimeDataRegionExtractor[] {extractor1, extractor2, extractor3, extractor4};
new PipeRealtimeDataRegionExtractor[] {extractor0, extractor1, extractor2, extractor3};

// start extractor 0, 1
extractors[0].start();
Expand Down