Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,16 @@
package org.apache.iotdb.db.pipe.extractor.dataregion.realtime;

import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
Expand All @@ -42,7 +38,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.Optional;

public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegionExtractor {
Expand Down Expand Up @@ -83,7 +78,7 @@ public boolean isNeedListenToInsertNode() {
private void extractTabletInsertion(final PipeRealtimeEvent event) {
TsFileEpoch.State state;

if (canNotUseTabletAnyMore(event)) {
if (canNotUseTabletAnymore(event)) {
event.getTsFileEpoch().migrateState(this, curState -> TsFileEpoch.State.USING_TSFILE);
PipeTsFileEpochProgressIndexKeeper.getInstance()
.registerProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getResource());
Expand Down Expand Up @@ -163,7 +158,7 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
return TsFileEpoch.State.USING_TSFILE;
case USING_BOTH:
default:
return canNotUseTabletAnyMore(event)
return canNotUseTabletAnymore(event)
? TsFileEpoch.State.USING_TSFILE
: TsFileEpoch.State.USING_BOTH;
}
Expand All @@ -172,9 +167,10 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
switch (state) {
case USING_TABLET:
// Though the data in tsfile event has been extracted in tablet mode, we still need to
// extract the tsfile event to help to determine isTsFileEventCountInQueueExceededLimit().
// The extracted tsfile event will be discarded in supplyTsFileInsertion.
// If the state is USING_TABLET, discard the event
PipeTsFileEpochProgressIndexKeeper.getInstance()
.eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath());
return;
case EMPTY:
case USING_TSFILE:
case USING_BOTH:
Expand Down Expand Up @@ -203,17 +199,9 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
}
}

private boolean canNotUseTabletAnyMore(final PipeRealtimeEvent event) {
// In the following 4 cases, we should not extract this tablet event. all the data
// represented by the tablet event should be carried by the following tsfile event:
// the write operation will be throttled, so we should not extract any more tablet events.
// 1. The shallow memory usage of the insert node has reached the dangerous threshold.
// 2. Deprecated logics (unused by default)
return mayInsertNodeMemoryReachDangerousThreshold(event)
|| canNotUseTabletAnymoreDeprecated(event);
}

private boolean mayInsertNodeMemoryReachDangerousThreshold(final PipeRealtimeEvent event) {
// If the insertNode's memory has reached the dangerous threshold, we should not extract any
// tablets.
private boolean canNotUseTabletAnymore(final PipeRealtimeEvent event) {
final long floatingMemoryUsageInByte =
PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
Expand All @@ -225,7 +213,7 @@ private boolean mayInsertNodeMemoryReachDangerousThreshold(final PipeRealtimeEve
final PipeDataNodeRemainingEventAndTimeOperator operator =
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.get(pipeID);
LOGGER.info(
"Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory usage of the insert node {} has reached the dangerous threshold of single pipe {}, event count: {}",
"Pipe task {}@{} canNotUseTabletAnyMore for tsFile {}: The memory usage of the insert node {} has reached the dangerous threshold of single pipe {}, event count: {}",
pipeName,
dataRegionId,
event.getTsFileEpoch().getFilePath(),
Expand All @@ -238,83 +226,6 @@ private boolean mayInsertNodeMemoryReachDangerousThreshold(final PipeRealtimeEve
return mayInsertNodeMemoryReachDangerousThreshold;
}

/**
* These judgements are deprecated, and are only reserved for manual operation and compatibility.
*/
@Deprecated
private boolean canNotUseTabletAnymoreDeprecated(final PipeRealtimeEvent event) {
// In the following 5 cases, we should not extract any more tablet events. all the data
// represented by the tablet events should be carried by the following tsfile event:
// 1. The number of historical tsFile events to transfer has exceeded the limit.
// 2. The number of realtime tsfile events to transfer has exceeded the limit.
// 3. The number of linked tsFiles has reached the dangerous threshold.
return isHistoricalTsFileEventCountExceededLimit(event)
|| isRealtimeTsFileEventCountExceededLimit(event)
|| mayTsFileLinkedCountReachDangerousThreshold(event);
}

private boolean isHistoricalTsFileEventCountExceededLimit(final PipeRealtimeEvent event) {
if (PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion()
== Integer.MAX_VALUE) {
return false;
}
final IoTDBDataRegionExtractor extractor =
PipeDataRegionExtractorMetrics.getInstance().getExtractorMap().get(getTaskID());
final boolean isHistoricalTsFileEventCountExceededLimit =
Objects.nonNull(extractor)
&& extractor.getHistoricalTsFileInsertionEventCount()
>= PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion();
if (isHistoricalTsFileEventCountExceededLimit && event.mayExtractorUseTablets(this)) {
LOGGER.info(
"Pipe task {}@{} canNotUseTabletAnymoreDeprecated(1) for tsFile {}: The number of historical tsFile events {} has exceeded the limit {}",
pipeName,
dataRegionId,
event.getTsFileEpoch().getFilePath(),
extractor.getHistoricalTsFileInsertionEventCount(),
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion());
}
return isHistoricalTsFileEventCountExceededLimit;
}

private boolean isRealtimeTsFileEventCountExceededLimit(final PipeRealtimeEvent event) {
if (PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion()
== Integer.MAX_VALUE) {
return false;
}
final boolean isRealtimeTsFileEventCountExceededLimit =
pendingQueue.getTsFileInsertionEventCount()
>= PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
if (isRealtimeTsFileEventCountExceededLimit && event.mayExtractorUseTablets(this)) {
LOGGER.info(
"Pipe task {}@{} canNotUseTabletAnymoreDeprecated(2) for tsFile {}: The number of realtime tsFile events {} has exceeded the limit {}",
pipeName,
dataRegionId,
event.getTsFileEpoch().getFilePath(),
pendingQueue.getTsFileInsertionEventCount(),
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
}
return isRealtimeTsFileEventCountExceededLimit;
}

private boolean mayTsFileLinkedCountReachDangerousThreshold(final PipeRealtimeEvent event) {
if (PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount() == Long.MAX_VALUE) {
return false;
}
final boolean mayTsFileLinkedCountReachDangerousThreshold =
PipeDataNodeResourceManager.tsfile().getLinkedTsFileCount(pipeName)
>= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
if (mayTsFileLinkedCountReachDangerousThreshold && event.mayExtractorUseTablets(this)) {
LOGGER.info(
"Pipe task {}@{} canNotUseTabletAnymoreDeprecated(3) for tsFile {}: The number of linked tsFiles {} has reached the dangerous threshold {}",
pipeName,
dataRegionId,
event.getTsFileEpoch().getFilePath(),
PipeDataNodeResourceManager.tsfile().getLinkedTsFileCount(pipeName),
PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount());
}
return mayTsFileLinkedCountReachDangerousThreshold;
}

@Override
public Event supply() {
PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) pendingQueue.directPoll();
Expand Down Expand Up @@ -356,103 +267,40 @@ public Event supply() {
}

private Event supplyTabletInsertion(final PipeRealtimeEvent event) {
event
.getTsFileEpoch()
.migrateState(
this,
state -> {
switch (state) {
case EMPTY:
return canNotUseTabletAnyMore(event)
? TsFileEpoch.State.USING_TSFILE
: TsFileEpoch.State.USING_TABLET;
case USING_TSFILE:
return canNotUseTabletAnyMore(event)
? TsFileEpoch.State.USING_TSFILE
: TsFileEpoch.State.USING_BOTH;
case USING_TABLET:
case USING_BOTH:
default:
return state;
}
});

final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
if (state == TsFileEpoch.State.USING_TSFILE) {
PipeTsFileEpochProgressIndexKeeper.getInstance()
.registerProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getResource());
}

switch (state) {
case USING_TSFILE:
// If the state is USING_TSFILE, discard the event and poll the next one.
return null;
case EMPTY:
case USING_TABLET:
case USING_BOTH:
default:
if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) {
return event.getEvent();
} else {
// If the event's reference count can not be increased, it means the data represented by
// this event is not reliable anymore. but the data represented by this event
// has been carried by the following tsfile event, so we can just discard this event.
event.getTsFileEpoch().migrateState(this, s -> TsFileEpoch.State.USING_BOTH);
LOGGER.warn(
"Discard tablet event {} because it is not reliable anymore. "
+ "Change the state of TsFileEpoch to USING_TSFILE.",
event);
return null;
}
if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) {
return event.getEvent();
} else {
// If the event's reference count can not be increased, it means the data represented by
// this event is not reliable anymore. but the data represented by this event
// has been carried by the following tsfile event, so we can just discard this event.
event.getTsFileEpoch().migrateState(this, s -> TsFileEpoch.State.USING_BOTH);
LOGGER.warn(
"Discard tablet event {} because it is not reliable anymore. "
+ "Change the state of TsFileEpoch to USING_BOTH.",
event);
return null;
}
}

private Event supplyTsFileInsertion(final PipeRealtimeEvent event) {
event
.getTsFileEpoch()
.migrateState(
this,
state -> {
// This would not happen, but just in case.
if (state.equals(TsFileEpoch.State.EMPTY)) {
LOGGER.error(
String.format("EMPTY TsFileEpoch when supplying TsFile Event %s", event));
return TsFileEpoch.State.USING_TSFILE;
}
return state;
});

final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
switch (state) {
case USING_TABLET:
// If the state is USING_TABLET, discard the event and poll the next one.
PipeTsFileEpochProgressIndexKeeper.getInstance()
.eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath());
return null;
case EMPTY:
case USING_TSFILE:
case USING_BOTH:
default:
if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) {
return event.getEvent();
} else {
// If the event's reference count can not be increased, it means the data represented by
// this event is not reliable anymore. the data has been lost. we simply discard this
// event
// and report the exception to PipeRuntimeAgent.
final String errorMessage =
String.format(
"TsFile Event %s can not be supplied because "
+ "the reference count can not be increased, "
+ "the data represented by this event is lost",
event.getEvent());
LOGGER.error(errorMessage);
PipeDataNodeAgent.runtime()
.report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
PipeTsFileEpochProgressIndexKeeper.getInstance()
.eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath());
return null;
}
if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) {
return event.getEvent();
} else {
// If the event's reference count can not be increased, it means the data represented by
// this event is not reliable anymore. the data has been lost. we simply discard this
// event and report the exception to PipeRuntimeAgent.
final String errorMessage =
String.format(
"TsFile Event %s can not be supplied because "
+ "the reference count can not be increased, "
+ "the data represented by this event is lost",
event.getEvent());
LOGGER.error(errorMessage);
PipeDataNodeAgent.runtime()
.report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
PipeTsFileEpochProgressIndexKeeper.getInstance()
.eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath());
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,6 @@ public class CommonConfig {

private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB

private int pipeMaxAllowedHistoricalTsFilePerDataRegion = Integer.MAX_VALUE; // Deprecated
private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = Integer.MAX_VALUE; // Deprecated
private long pipeMaxAllowedLinkedTsFileCount = Long.MAX_VALUE; // Deprecated

private double pipeMetaReportMaxLogNumPerRound = 0.1;
private int pipeMetaReportMaxLogIntervalRounds = 360;
private int pipeTsFilePinMaxLogNumPerRound = 10;
Expand Down Expand Up @@ -1511,51 +1507,6 @@ public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
return pipeReceiverReqDecompressedMaxLengthInBytes;
}

public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
return pipeMaxAllowedHistoricalTsFilePerDataRegion;
}

public void setPipeMaxAllowedHistoricalTsFilePerDataRegion(
int pipeMaxAllowedPendingTsFileEpochPerDataRegion) {
if (this.pipeMaxAllowedHistoricalTsFilePerDataRegion
== pipeMaxAllowedPendingTsFileEpochPerDataRegion) {
return;
}
this.pipeMaxAllowedHistoricalTsFilePerDataRegion =
pipeMaxAllowedPendingTsFileEpochPerDataRegion;
logger.info(
"pipeMaxAllowedHistoricalTsFilePerDataRegion is set to {}",
pipeMaxAllowedPendingTsFileEpochPerDataRegion);
}

public int getPipeMaxAllowedPendingTsFileEpochPerDataRegion() {
return pipeMaxAllowedPendingTsFileEpochPerDataRegion;
}

public void setPipeMaxAllowedPendingTsFileEpochPerDataRegion(
int pipeExtractorPendingQueueTsfileLimit) {
if (this.pipeMaxAllowedPendingTsFileEpochPerDataRegion
== pipeExtractorPendingQueueTsfileLimit) {
return;
}
this.pipeMaxAllowedPendingTsFileEpochPerDataRegion = pipeExtractorPendingQueueTsfileLimit;
logger.info(
"pipeMaxAllowedPendingTsFileEpochPerDataRegion is set to {}.",
pipeMaxAllowedPendingTsFileEpochPerDataRegion);
}

public long getPipeMaxAllowedLinkedTsFileCount() {
return pipeMaxAllowedLinkedTsFileCount;
}

public void setPipeMaxAllowedLinkedTsFileCount(long pipeMaxAllowedLinkedTsFileCount) {
if (this.pipeMaxAllowedLinkedTsFileCount == pipeMaxAllowedLinkedTsFileCount) {
return;
}
this.pipeMaxAllowedLinkedTsFileCount = pipeMaxAllowedLinkedTsFileCount;
logger.info("pipeMaxAllowedLinkedTsFileCount is set to {}", pipeMaxAllowedLinkedTsFileCount);
}

public double getPipeMetaReportMaxLogNumPerRound() {
return pipeMetaReportMaxLogNumPerRound;
}
Expand Down
Loading
Loading