Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public List<PipeMeta> getPipeMetaList() {
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
protected void serializeImpl(final DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());

stream.writeInt(pipeMetaList.size());
Expand All @@ -58,16 +58,16 @@ protected void serializeImpl(DataOutputStream stream) throws IOException {
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
protected void deserializeImpl(final ByteBuffer buffer) throws IOException {
int size = buffer.getInt();
for (int i = 0; i < size; i++) {
PipeMeta pipeMeta = PipeMeta.deserialize(buffer);
PipeMeta pipeMeta = PipeMeta.deserialize4Coordinator(buffer);
pipeMetaList.add(pipeMeta);
}
}

@Override
public boolean equals(Object obj) {
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
import org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
Expand Down Expand Up @@ -177,7 +177,8 @@ public TShowPipeResp convertToTShowPipeResp() {
staticMeta.getProcessorParameters().toString(),
staticMeta.getConnectorParameters().toString(),
exceptionMessageBuilder.toString());
final PipeTemporaryMeta temporaryMeta = pipeMeta.getTemporaryMeta();
final PipeTemporaryMetaInCoordinator temporaryMeta =
(PipeTemporaryMetaInCoordinator) pipeMeta.getTemporaryMeta();
final boolean canCalculateOnLocal = canCalculateOnLocal(pipeMeta);

showPipeInfo.setRemainingEventCount(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ protected TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChangesInternal(
final PipeMeta pipeMetaFromCoordinator) {
try {
return PipeConfigNodeAgent.runtime().isLeaderReady()
? super.handleSinglePipeMetaChangesInternal(pipeMetaFromCoordinator.deepCopy())
? super.handleSinglePipeMetaChangesInternal(pipeMetaFromCoordinator.deepCopy4TaskAgent())
: null;
} catch (final Exception e) {
return new TPushPipeMetaRespExceptionMessage(
Expand Down Expand Up @@ -152,7 +152,7 @@ protected List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal(
.map(
pipeMeta -> {
try {
return pipeMeta.deepCopy();
return pipeMeta.deepCopy4TaskAgent();
} catch (Exception e) {
throw new PipeException("failed to deep copy pipeMeta", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public PipeHeartbeat(
/* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) {
final PipeMeta pipeMeta = PipeMeta.deserialize(pipeMetaByteBufferListFromAgent.get(i));
final PipeMeta pipeMeta =
PipeMeta.deserialize4TaskAgent(pipeMetaByteBufferListFromAgent.get(i));
pipeMetaMap.put(pipeMeta.getStaticMeta(), pipeMeta);
isCompletedMap.put(
pipeMeta.getStaticMeta(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
Expand Down Expand Up @@ -144,7 +144,8 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
continue;
}

final PipeTemporaryMeta temporaryMeta = pipeMetaFromCoordinator.getTemporaryMeta();
final PipeTemporaryMetaInCoordinator temporaryMeta =
(PipeTemporaryMetaInCoordinator) pipeMetaFromCoordinator.getTemporaryMeta();

// Remove completed pipes
final Boolean isPipeCompletedFromAgent = pipeHeartbeat.isCompleted(staticMeta);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void bindTo(final AbstractMetricService metricService) {
PipeConfigRegionExtractorMetrics.getInstance().bindTo(metricService);
PipeConfigRegionConnectorMetrics.getInstance().bindTo(metricService);
PipeConfigNodeRemainingTimeMetrics.getInstance().bindTo(metricService);
PipeTemporaryMetaMetrics.getInstance().bindTo(metricService);
PipeTemporaryMetaInCoordinatorMetrics.getInstance().bindTo(metricService);
PipeConfigNodeReceiverMetrics.getInstance().bindTo(metricService);
}

Expand All @@ -53,7 +53,7 @@ public void unbindFrom(final AbstractMetricService metricService) {
PipeConfigRegionExtractorMetrics.getInstance().unbindFrom(metricService);
PipeConfigRegionConnectorMetrics.getInstance().unbindFrom(metricService);
PipeConfigNodeRemainingTimeMetrics.getInstance().unbindFrom(metricService);
PipeTemporaryMetaMetrics.getInstance().unbindFrom(metricService);
PipeTemporaryMetaInCoordinatorMetrics.getInstance().unbindFrom(metricService);
PipeConfigNodeReceiverMetrics.getInstance().unbindFrom(metricService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.metrics.AbstractMetricService;
Expand All @@ -39,17 +40,19 @@
import java.util.concurrent.ConcurrentHashMap;

/**
* The {@link PipeTemporaryMetaMetrics} is to calculate the pipe-statistics from the {@link
* PipeTemporaryMeta}. The class is lock-free and can only read from the thread-safe variables from
* the {@link PipeTemporaryMeta}.
* The {@link PipeTemporaryMetaInCoordinatorMetrics} is to calculate the pipe-statistics from the
* {@link PipeTemporaryMeta}. The class is lock-free and can only read from the thread-safe
* variables from the {@link PipeTemporaryMeta}.
*/
public class PipeTemporaryMetaMetrics implements IMetricSet {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTemporaryMetaMetrics.class);
public class PipeTemporaryMetaInCoordinatorMetrics implements IMetricSet {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTemporaryMetaInCoordinatorMetrics.class);

@SuppressWarnings("java:S3077")
private volatile AbstractMetricService metricService;

private final Map<String, PipeTemporaryMeta> pipeTemporaryMetaMap = new ConcurrentHashMap<>();
private final Map<String, PipeTemporaryMetaInCoordinator> pipeTemporaryMetaMap =
new ConcurrentHashMap<>();

//////////////////////////// bindTo & unbindFrom (metric framework) ////////////////////////////

Expand All @@ -64,13 +67,13 @@ private void createMetrics(final String pipeID) {
}

private void createAutoGauge(final String pipeID) {
final PipeTemporaryMeta pipeTemporaryMeta = pipeTemporaryMetaMap.get(pipeID);
final PipeTemporaryMetaInCoordinator pipeTemporaryMeta = pipeTemporaryMetaMap.get(pipeID);
final String[] pipeNameAndCreationTime = pipeID.split("_");
metricService.createAutoGauge(
Metric.PIPE_GLOBAL_REMAINING_EVENT_COUNT.toString(),
MetricLevel.IMPORTANT,
pipeTemporaryMeta,
PipeTemporaryMeta::getGlobalRemainingEvents,
PipeTemporaryMetaInCoordinator::getGlobalRemainingEvents,
Tag.NAME.toString(),
pipeNameAndCreationTime[0],
Tag.CREATION_TIME.toString(),
Expand All @@ -79,7 +82,7 @@ private void createAutoGauge(final String pipeID) {
Metric.PIPE_GLOBAL_REMAINING_TIME.toString(),
MetricLevel.IMPORTANT,
pipeTemporaryMeta,
PipeTemporaryMeta::getGlobalRemainingTime,
PipeTemporaryMetaInCoordinator::getGlobalRemainingTime,
Tag.NAME.toString(),
pipeNameAndCreationTime[0],
Tag.CREATION_TIME.toString(),
Expand Down Expand Up @@ -123,7 +126,8 @@ private void removeAutoGauge(final String pipeID) {
public void register(final PipeMeta pipeMeta) {
final String taskID =
pipeMeta.getStaticMeta().getPipeName() + "_" + pipeMeta.getStaticMeta().getCreationTime();
pipeTemporaryMetaMap.putIfAbsent(taskID, pipeMeta.getTemporaryMeta());
pipeTemporaryMetaMap.putIfAbsent(
taskID, (PipeTemporaryMetaInCoordinator) pipeMeta.getTemporaryMeta());
if (Objects.nonNull(metricService)) {
createMetrics(taskID);
}
Expand Down Expand Up @@ -163,14 +167,15 @@ public void handleTemporaryMetaChanges(final Iterable<PipeMeta> pipeMetaList) {

private static class PipeTemporaryMetaMetricsHolder {

private static final PipeTemporaryMetaMetrics INSTANCE = new PipeTemporaryMetaMetrics();
private static final PipeTemporaryMetaInCoordinatorMetrics INSTANCE =
new PipeTemporaryMetaInCoordinatorMetrics();

private PipeTemporaryMetaMetricsHolder() {
// Empty constructor
}
}

public static PipeTemporaryMetaMetrics getInstance() {
public static PipeTemporaryMetaInCoordinatorMetrics getInstance() {
return PipeTemporaryMetaMetricsHolder.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.iotdb.confignode.manager.pipe.agent.runtime.PipeConfigRegionListener;
import org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeSubtask;
import org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeTaskAgent;
import org.apache.iotdb.confignode.manager.pipe.metric.PipeTemporaryMetaMetrics;
import org.apache.iotdb.confignode.manager.pipe.metric.PipeTemporaryMetaInCoordinatorMetrics;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
Expand Down Expand Up @@ -94,7 +94,7 @@ public TSStatus createPipe(final CreatePipePlanV2 plan) {
throw new PipeException("Failed to increase listener reference", e);
}
});
PipeTemporaryMetaMetrics.getInstance()
PipeTemporaryMetaInCoordinatorMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
Expand All @@ -114,7 +114,7 @@ public TSStatus setPipeStatus(final SetPipeStatusPlanV2 plan) {

PipeConfigNodeAgent.task()
.handleSinglePipeMetaChanges(pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeName()));
PipeTemporaryMetaMetrics.getInstance()
PipeTemporaryMetaInCoordinatorMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (final Exception e) {
Expand Down Expand Up @@ -143,7 +143,7 @@ public TSStatus dropPipe(final DropPipePlanV2 plan) {
throw new PipeException("Failed to decrease listener reference", e);
}
});
PipeTemporaryMetaMetrics.getInstance()
PipeTemporaryMetaInCoordinatorMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
Expand Down Expand Up @@ -181,7 +181,7 @@ public TSStatus alterPipe(final AlterPipePlanV2 plan) {
throw new PipeException("Failed to decrease listener reference", e);
}
});
PipeTemporaryMetaMetrics.getInstance()
PipeTemporaryMetaInCoordinatorMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
Expand All @@ -206,7 +206,7 @@ public TSStatus alterPipe(final AlterPipePlanV2 plan) {
public TSStatus operateMultiplePipes(final OperateMultiplePipesPlanV2 plans) {
try {
final TSStatus status = pipeTaskInfo.operateMultiplePipes(plans);
PipeTemporaryMetaMetrics.getInstance()
PipeTemporaryMetaInCoordinatorMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return status;
} catch (final Exception e) {
Expand All @@ -225,7 +225,7 @@ public TSStatus handleLeaderChange(final PipeHandleLeaderChangePlan plan) {
pipeMetaListFromCoordinator.add(pipeMeta);
}
PipeConfigNodeAgent.task().handlePipeMetaChanges(pipeMetaListFromCoordinator);
PipeTemporaryMetaMetrics.getInstance()
PipeTemporaryMetaInCoordinatorMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (final Exception e) {
Expand All @@ -245,7 +245,7 @@ public TSStatus handleMetaChanges(final PipeHandleMetaChangePlan plan) {
pipeTaskInfo.getPipeMetaByPipeName(pipeMeta.getStaticMeta().getPipeName()));
}
PipeConfigNodeAgent.task().handlePipeMetaChanges(pipeMetaListFromCoordinator);
PipeTemporaryMetaMetrics.getInstance()
PipeTemporaryMetaInCoordinatorMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
Expand Down Expand Up @@ -602,7 +603,8 @@ private TSStatus handleLeaderChangeInternal(final PipeHandleLeaderChangePlan pla
.get(consensusGroupId.getId())
.setLeaderNodeId(newLeader);
// New region leader may contain un-transferred events
pipeMeta.getTemporaryMeta().markDataNodeUncompleted(newLeader);
((PipeTemporaryMetaInCoordinator) pipeMeta.getTemporaryMeta())
.markDataNodeUncompleted(newLeader);
} else {
consensusGroupIdToTaskMetaMap.remove(consensusGroupId.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public static synchronized void launchPipeTaskAgent() {
getAllPipeInfoResp.getAllPipeInfo().stream()
.map(
byteBuffer -> {
final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
final PipeMeta pipeMeta = PipeMeta.deserialize4TaskAgent(byteBuffer);
LOGGER.info(
"Pulled pipe meta from config node: {}, recovering ...", pipeMeta);
return pipeMeta;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,15 +530,25 @@ && mayDeletedTsFileSizeReachDangerousThreshold()) {
}

// Only restart the stream mode pipes for releasing memTables.
if (extractors.get(0).isStreamMode()
&& extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)
&& (mayMemTablePinnedCountReachDangerousThreshold()
|| mayWalSizeReachThrottleThreshold())) {
// Extractors of this pipe may be stuck and is pinning too many MemTables.
LOGGER.warn(
"Pipe {} needs to restart because too many memTables are pinned.",
pipeMeta.getStaticMeta());
stuckPipes.add(pipeMeta);
if (extractors.get(0).isStreamMode()) {
if (extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)
&& (mayMemTablePinnedCountReachDangerousThreshold()
|| mayWalSizeReachThrottleThreshold())) {
// Extractors of this pipe may be stuck and is pinning too many MemTables.
LOGGER.warn(
"Pipe {} needs to restart because too many memTables are pinned.",
pipeMeta.getStaticMeta());
stuckPipes.add(pipeMeta);
} else if (getFloatingMemoryUsageInByte(pipeName)
>= (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
- PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes())
/ pipeMetaKeeper.getPipeMetaCount()) {
// Extractors of this pipe may have too many insert nodes
LOGGER.warn(
"Pipe {} needs to restart because too many insertNodes are extracted.",
pipeMeta.getStaticMeta());
stuckPipes.add(pipeMeta);
}
}
}

Expand Down Expand Up @@ -584,7 +594,7 @@ private void restartStuckPipe(final PipeMeta pipeMeta) {
acquireWriteLock();
try {
final long startTime = System.currentTimeMillis();
final PipeMeta originalPipeMeta = pipeMeta.deepCopy();
final PipeMeta originalPipeMeta = pipeMeta.deepCopy4TaskAgent();
handleDropPipe(pipeMeta.getStaticMeta().getPipeName());
handleSinglePipeMetaChanges(originalPipeMeta);
LOGGER.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ public void updateLeaderCache(final String deviceId, final TEndPoint endPoint) {
private void logOnClientException(
final AsyncPipeDataTransferServiceClient client, final Exception e) {
if (client == null) {
LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT, e);
LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT);
} else {
LOGGER.warn(
String.format(THRIFT_ERROR_FORMATTER_WITH_ENDPOINT, client.getIp(), client.getPort()), e);
Expand Down
Loading