Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -132,7 +130,7 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor {

private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionExtractor.class);

private @Nullable PipeHistoricalDataRegionExtractor historicalExtractor;
private PipeHistoricalDataRegionExtractor historicalExtractor;
private PipeRealtimeDataRegionExtractor realtimeExtractor;

private DataRegionWatermarkInjector watermarkInjector;
Expand Down Expand Up @@ -297,22 +295,10 @@ public void validate(final PipeParameterValidator validator) throws Exception {

checkInvalidParameters(validator);

if (validator
.getParameters()
.getBooleanOrDefault(SystemConstant.RESTART_KEY, SystemConstant.RESTART_DEFAULT_VALUE)
|| validator
.getParameters()
.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY),
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE)) {
// Do not flush or open historical extractor when historical tsFile is disabled
constructHistoricalExtractor();
}
constructHistoricalExtractor();
constructRealtimeExtractor(validator.getParameters());

if (Objects.nonNull(historicalExtractor)) {
historicalExtractor.validate(validator);
}
historicalExtractor.validate(validator);
realtimeExtractor.validate(validator);
}

Expand Down Expand Up @@ -536,9 +522,7 @@ public void customize(

super.customize(parameters, configuration);

if (Objects.nonNull(historicalExtractor)) {
historicalExtractor.customize(parameters, configuration);
}
historicalExtractor.customize(parameters, configuration);
realtimeExtractor.customize(parameters, configuration);

// Set watermark injector
Expand Down Expand Up @@ -582,9 +566,7 @@ public void start() throws Exception {
"Pipe {}@{}: Starting historical extractor {} and realtime extractor {}.",
pipeName,
regionId,
Objects.nonNull(historicalExtractor)
? historicalExtractor.getClass().getSimpleName()
: null,
historicalExtractor.getClass().getSimpleName(),
realtimeExtractor.getClass().getSimpleName());

super.start();
Expand Down Expand Up @@ -619,9 +601,7 @@ public void start() throws Exception {
"Pipe {}@{}: Started historical extractor {} and realtime extractor {} successfully within {} ms.",
pipeName,
regionId,
Objects.nonNull(historicalExtractor)
? historicalExtractor.getClass().getSimpleName()
: null,
historicalExtractor.getClass().getSimpleName(),
realtimeExtractor.getClass().getSimpleName(),
System.currentTimeMillis() - startTime);
return;
Expand All @@ -639,18 +619,14 @@ private void startHistoricalExtractorAndRealtimeExtractor(
// There can still be writing when tsFile events are added. If we start
// realtimeExtractor after the process, then this part of data will be lost.
realtimeExtractor.start();
if (Objects.nonNull(historicalExtractor)) {
historicalExtractor.start();
}
historicalExtractor.start();
} catch (final Exception e) {
exceptionHolder.set(e);
LOGGER.warn(
"Pipe {}@{}: Start historical extractor {} and realtime extractor {} error.",
pipeName,
regionId,
Objects.nonNull(historicalExtractor)
? historicalExtractor.getClass().getSimpleName()
: null,
historicalExtractor.getClass().getSimpleName(),
realtimeExtractor.getClass().getSimpleName(),
e);
}
Expand All @@ -669,7 +645,7 @@ public Event supply() throws Exception {
}

Event event = null;
if (Objects.nonNull(historicalExtractor) && !historicalExtractor.hasConsumedAll()) {
if (!historicalExtractor.hasConsumedAll()) {
event = historicalExtractor.supply();
} else {
if (Objects.nonNull(watermarkInjector)) {
Expand Down Expand Up @@ -699,9 +675,7 @@ public void close() throws Exception {
return;
}

if (Objects.nonNull(historicalExtractor)) {
historicalExtractor.close();
}
historicalExtractor.close();
realtimeExtractor.close();
if (Objects.nonNull(taskID)) {
PipeDataRegionExtractorMetrics.getInstance().deregister(taskID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
Expand Down Expand Up @@ -141,7 +140,6 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor
private boolean isHistoricalExtractorEnabled = false;
private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event time
private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time
private long historicalDataExtractionTimeLowerBound; // Arrival time

private boolean sloppyTimeRange; // true to disable time range filter after extraction
private boolean sloppyPattern; // true to disable pattern filter after extraction
Expand Down Expand Up @@ -263,17 +261,14 @@ public void validate(final PipeParameterValidator validator) {

try {
historicalDataExtractionStartTime =
isHistoricalExtractorEnabled
&& parameters.hasAnyAttributes(
EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY)
parameters.hasAnyAttributes(
EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY)
? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
parameters.getStringByKeys(
EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY))
: Long.MIN_VALUE;
historicalDataExtractionEndTime =
isHistoricalExtractorEnabled
&& parameters.hasAnyAttributes(
EXTRACTOR_HISTORY_END_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY)
parameters.hasAnyAttributes(EXTRACTOR_HISTORY_END_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY)
? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
parameters.getStringByKeys(
EXTRACTOR_HISTORY_END_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY))
Expand Down Expand Up @@ -342,46 +337,6 @@ public void customize(
}
}

// Enable historical extractor by default
historicalDataExtractionTimeLowerBound =
isHistoricalExtractorEnabled
? Long.MIN_VALUE
// We define the realtime data as the data generated after the creation time
// of the pipe from user's perspective. But we still need to use
// PipeHistoricalDataRegionExtractor to extract the realtime data generated between the
// creation time of the pipe and the time when the pipe starts, because those data
// can not be listened by PipeRealtimeDataRegionExtractor, and should be extracted by
// PipeHistoricalDataRegionExtractor from implementation perspective.
: environment.getCreationTime();

// Only invoke flushDataRegionAllTsFiles() when the pipe runs in the realtime only mode.
// realtime only mode -> (historicalDataExtractionTimeLowerBound != Long.MIN_VALUE)
//
// Ensure that all data in the data region is flushed to disk before extracting data.
// This ensures the generation time of all newly generated TsFiles (realtime data) after the
// invocation of flushDataRegionAllTsFiles() is later than the creationTime of the pipe
// (historicalDataExtractionTimeLowerBound).
//
// Note that: the generation time of the TsFile is the time when the TsFile is created, not
// the time when the data is flushed to the TsFile.
//
// Then we can use the generation time of the TsFile to determine whether the data in the
// TsFile should be extracted by comparing the generation time of the TsFile with the
// historicalDataExtractionTimeLowerBound when starting the pipe in realtime only mode.
//
// If we don't invoke flushDataRegionAllTsFiles() in the realtime only mode, the data generated
// between the creation time of the pipe the time when the pipe starts will be lost.
if (historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) {
synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
final long lastFlushedByPipeTime =
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
if (System.currentTimeMillis() - lastFlushedByPipeTime >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
flushDataRegionAllTsFiles();
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis());
}
}
}

if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) {
shouldTransferModFile =
parameters.getBooleanOrDefault(
Expand Down Expand Up @@ -597,8 +552,10 @@ private void extractTsFiles(
.peek(originalResourceList::add)
.filter(
resource ->
// Some resource is marked as deleted but not removed from the list.
!resource.isDeleted()
isHistoricalExtractorEnabled
&&
// Some resource is marked as deleted but not removed from the list.
!resource.isDeleted()
// Some resource is generated by pipe. We ignore them if the pipe should
// not transfer pipe requests.
&& (!resource.isGeneratedByPipe() || isForwardingPipeRequests)
Expand All @@ -608,7 +565,6 @@ private void extractTsFiles(
!resource.isClosed()
|| mayTsFileContainUnprocessedData(resource)
&& isTsFileResourceOverlappedWithTimeRange(resource)
&& isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
&& mayTsFileResourceOverlappedWithPattern(resource)))
.collect(Collectors.toList());
filteredTsFileResources.addAll(sequenceTsFileResources);
Expand All @@ -618,8 +574,10 @@ && mayTsFileResourceOverlappedWithPattern(resource)))
.peek(originalResourceList::add)
.filter(
resource ->
// Some resource is marked as deleted but not removed from the list.
!resource.isDeleted()
isHistoricalExtractorEnabled
&&
// Some resource is marked as deleted but not removed from the list.
!resource.isDeleted()
// Some resource is generated by pipe. We ignore them if the pipe should
// not transfer pipe requests.
&& (!resource.isGeneratedByPipe() || isForwardingPipeRequests)
Expand All @@ -629,7 +587,6 @@ && mayTsFileResourceOverlappedWithPattern(resource)))
!resource.isClosed()
|| mayTsFileContainUnprocessedData(resource)
&& isTsFileResourceOverlappedWithTimeRange(resource)
&& isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
&& mayTsFileResourceOverlappedWithPattern(resource)))
.collect(Collectors.toList());
filteredTsFileResources.addAll(unsequenceTsFileResources);
Expand Down Expand Up @@ -759,25 +716,6 @@ private boolean isTsFileResourceCoveredByTimeRange(final TsFileResource resource
&& historicalDataExtractionEndTime >= resource.getFileEndTime();
}

private boolean isTsFileGeneratedAfterExtractionTimeLowerBound(final TsFileResource resource) {
try {
return historicalDataExtractionTimeLowerBound
<= TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime();
} catch (final IOException e) {
LOGGER.warn(
"Pipe {}@{}: failed to get the generation time of TsFile {}, extract it anyway"
+ " (historical data extraction time lower bound: {})",
pipeName,
dataRegionId,
resource.getTsFilePath(),
historicalDataExtractionTimeLowerBound,
e);
// If failed to get the generation time of the TsFile, we will extract the data in the TsFile
// anyway.
return true;
}
}

private void extractDeletions(
final DeletionResourceManager deletionResourceManager,
final List<PersistentResource> resourceList) {
Expand Down
Loading