Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class PipeCronEventInjector {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeCronEventInjector.class);

private static final int CRON_EVENT_INJECTOR_INTERVAL_SECONDS = 1;
private static final int CRON_EVENT_INJECTOR_INTERVAL_SECONDS = 30;

private static final ScheduledExecutorService CRON_EVENT_INJECTOR_EXECUTOR =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public boolean onEvent(TabletInsertionEvent event, long requestCommitId)
throws IOException, WALPipeException {
final TPipeTransferReq req = buildTabletInsertionReq(event);

if (events.isEmpty() || !events.get(events.size() - 1).equals(event)) {
if (requestCommitIds.isEmpty()
|| !requestCommitIds.get(requestCommitIds.size() - 1).equals(requestCommitId)) {
reqs.add(req);

if (event instanceof EnrichedEvent) {
Expand Down Expand Up @@ -86,4 +87,8 @@ public List<Event> deepcopyEvents() {
public List<Long> deepcopyRequestCommitIds() {
return new ArrayList<>(requestCommitIds);
}

public long getLastCommitId() {
return requestCommitIds.isEmpty() ? -1 : requestCommitIds.get(requestCommitIds.size() - 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,10 @@ private void transferBatchedEventsIfNecessary() throws IOException {
return;
}

final long requestCommitId = commitIdGenerator.incrementAndGet();
// requestCommitId can not be generated by commitIdGenerator because the commit id must
// be bind to a specific InsertTabletEvent or TsFileInsertionEvent, otherwise the commit
// process will be stuck.
final long requestCommitId = tabletBatchBuilder.getLastCommitId();
final PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler =
new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,21 @@ private void extractTabletInsertion(PipeRealtimeEvent event) {
// size of wal buffer), the write operation will be throttled, so we should not extract any
// more tablet events.
// 3. The number of tsfile events in the pending queue has exceeded the limit.
event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
event
.getTsFileEpoch()
.migrateState(
this,
state -> {
switch (state) {
case EMPTY:
case USING_TSFILE:
return TsFileEpoch.State.USING_TSFILE;
case USING_TABLET:
case USING_BOTH:
default:
return TsFileEpoch.State.USING_BOTH;
}
});
}

final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
Expand All @@ -97,6 +111,7 @@ private void extractTabletInsertion(PipeRealtimeEvent event) {
break;
case EMPTY:
case USING_TABLET:
case USING_BOTH:
if (!pendingQueue.waitedOffer(event)) {
// this would not happen, but just in case.
// pendingQueue is unbounded, so it should never reach capacity.
Expand Down Expand Up @@ -127,13 +142,24 @@ private void extractTsFileInsertion(PipeRealtimeEvent event) {
.getTsFileEpoch()
.migrateState(
this,
state ->
state.equals(TsFileEpoch.State.EMPTY) ? TsFileEpoch.State.USING_TSFILE : state);
state -> {
switch (state) {
case EMPTY:
case USING_TSFILE:
return TsFileEpoch.State.USING_TSFILE;
case USING_TABLET:
return TsFileEpoch.State.USING_TABLET;
case USING_BOTH:
default:
return TsFileEpoch.State.USING_BOTH;
}
});

final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
switch (state) {
case EMPTY:
case USING_TSFILE:
case USING_BOTH:
if (!pendingQueue.waitedOffer(event)) {
// this would not happen, but just in case.
// pendingQueue is unbounded, so it should never reach capacity.
Expand Down Expand Up @@ -265,23 +291,29 @@ private Event supplyTabletInsertion(PipeRealtimeEvent event) {
state ->
(state.equals(TsFileEpoch.State.EMPTY)) ? TsFileEpoch.State.USING_TABLET : state);

if (event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TABLET)) {
if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) {
return event.getEvent();
} else {
// if the event's reference count can not be increased, it means the data represented by
// this event is not reliable anymore. but the data represented by this event
// has been carried by the following tsfile event, so we can just discard this event.
event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
LOGGER.warn(
"Discard tablet event {} because it is not reliable anymore. "
+ "Change the state of TsFileEpoch to USING_TSFILE.",
event);
final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
switch (state) {
case USING_TSFILE:
// if the state is USING_TSFILE, discard the event and poll the next one.
return null;
}
case EMPTY:
case USING_TABLET:
case USING_BOTH:
default:
if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) {
return event.getEvent();
} else {
// if the event's reference count can not be increased, it means the data represented by
// this event is not reliable anymore. but the data represented by this event
// has been carried by the following tsfile event, so we can just discard this event.
event.getTsFileEpoch().migrateState(this, s -> TsFileEpoch.State.USING_BOTH);
LOGGER.warn(
"Discard tablet event {} because it is not reliable anymore. "
+ "Change the state of TsFileEpoch to USING_TSFILE.",
event);
return null;
}
}
// if the state is USING_TSFILE, discard the event and poll the next one.
return null;
}

private Event supplyTsFileInsertion(PipeRealtimeEvent event) {
Expand All @@ -299,26 +331,34 @@ private Event supplyTsFileInsertion(PipeRealtimeEvent event) {
return state;
});

if (event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) {
if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) {
return event.getEvent();
} else {
// if the event's reference count can not be increased, it means the data represented by
// this event is not reliable anymore. the data has been lost. we simply discard this event
// and report the exception to PipeRuntimeAgent.
final String errorMessage =
String.format(
"TsFile Event %s can not be supplied because "
+ "the reference count can not be increased, "
+ "the data represented by this event is lost",
event.getEvent());
LOGGER.error(errorMessage);
PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
switch (state) {
case USING_TABLET:
// if the state is USING_TABLET, discard the event and poll the next one.
return null;
}
case EMPTY:
case USING_TSFILE:
case USING_BOTH:
default:
if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) {
return event.getEvent();
} else {
// if the event's reference count can not be increased, it means the data represented by
// this event is not reliable anymore. the data has been lost. we simply discard this
// event
// and report the exception to PipeRuntimeAgent.
final String errorMessage =
String.format(
"TsFile Event %s can not be supplied because "
+ "the reference count can not be increased, "
+ "the data represented by this event is lost",
event.getEvent());
LOGGER.error(errorMessage);
PipeAgent.runtime()
.report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
return null;
}
}
// if the state is USING_TABLET, discard the event and poll the next one.
return null;
}

private Event supplyHeartbeat(PipeRealtimeEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public String toString() {
public enum State {
EMPTY,
USING_TABLET,
USING_TSFILE
USING_TSFILE,
USING_BOTH
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ public PipeRealtimeEvent bindPipeTsFileInsertionEvent(
return new TsFileEpoch(path);
});

final TsFileEpoch epoch = filePath2Epoch.remove(filePath);
LOGGER.info("All data in TsFileEpoch {} was extracted", epoch);
return new PipeRealtimeEvent(
event,
filePath2Epoch.remove(filePath),
epoch,
resource.getDevices().stream()
.collect(Collectors.toMap(device -> device, device -> EMPTY_MEASUREMENT_ARRAY)),
event.getPattern());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public abstract class PipeWALResource implements Closeable {

private final AtomicInteger referenceCount;

public static final long MIN_TIME_TO_LIVE_IN_MS = 1000L * 60;
public static final long MIN_TIME_TO_LIVE_IN_MS = 1000L * 20;
private final AtomicLong lastLogicalPinTime;
private final AtomicBoolean isPhysicallyPinned;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ public class CommonConfig {
private long pipeConnectorTimeoutMs = 15 * 60 * 1000L; // 15 minutes
private int pipeConnectorReadFileBufferSize = 8388608;
private long pipeConnectorRetryIntervalMs = 1000L;
private int pipeConnectorPendingQueueSize = 16;
// recommend to set this value to 3 * pipeSubtaskExecutorMaxThreadNum *
// pipeAsyncConnectorCoreClientNumber
private int pipeConnectorPendingQueueSize = 256;
private boolean pipeConnectorRPCThriftCompressionEnabled = false;

private int pipeAsyncConnectorSelectorNumber = 1;
Expand All @@ -187,7 +189,7 @@ public class CommonConfig {
private boolean pipeAirGapReceiverEnabled = false;
private int pipeAirGapReceiverPort = 9780;

private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 3;
private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 1;

/** Whether to use persistent schema mode. */
private String schemaEngineMode = "Memory";
Expand Down