From 71b410f65430de75b7d5437ece22b5c7d0f4f426 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 8 May 2024 16:03:29 +0800 Subject: [PATCH 01/14] partial completion --- .../runtime/PipeHeartbeatParser.java | 39 +++++++-- .../persistence/pipe/PipeTaskInfo.java | 13 +++ .../agent/task/PipeDataNodeTaskAgent.java | 83 ++++++++++++++---- .../common/terminate/PipeTerminateEvent.java | 87 +++++++++++++++++++ ...peHistoricalDataRegionTsFileExtractor.java | 42 ++++++--- .../iotdb/db/pipe/task/PipeDataNodeTask.java | 19 ++-- .../constant/PipeExtractorConstant.java | 6 ++ .../commons/pipe/task/meta/PipeMeta.java | 38 +++++--- .../pipe/task/meta/PipeTemporaryMeta.java | 36 ++++++++ .../src/main/thrift/datanode.thrift | 2 + 10 files changed, 312 insertions(+), 53 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java index 3bda79f0dd59..d995dd67c5b3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.task.meta.PipeStatus; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta; import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp; import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo; @@ -40,6 +41,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -66,7 +69,9 @@ public class PipeHeartbeatParser { } public synchronized void parseHeartbeat( - int nodeId, @NotNull List pipeMetaByteBufferListFromAgent) { + int nodeId, + @NotNull List pipeMetaByteBufferListFromAgent, + final List pipeCompletedListFromAgent) { final long heartbeatCount = ++heartbeatCounter; final AtomicBoolean canSubmitHandleMetaChangeProcedure = new AtomicBoolean(false); @@ -110,7 +115,10 @@ public synchronized void parseHeartbeat( try { if (!pipeMetaByteBufferListFromAgent.isEmpty()) { parseHeartbeatAndSaveMetaChangeLocally( - pipeTaskInfo, nodeId, pipeMetaByteBufferListFromAgent); + pipeTaskInfo, + nodeId, + pipeMetaByteBufferListFromAgent, + pipeCompletedListFromAgent); } if (canSubmitHandleMetaChangeProcedure.get() @@ -134,14 +142,35 @@ public synchronized void parseHeartbeat( private void parseHeartbeatAndSaveMetaChangeLocally( final AtomicReference pipeTaskInfo, final int nodeId, - @NotNull final List pipeMetaByteBufferListFromAgent) { + @NotNull final List pipeMetaByteBufferListFromAgent, + final List pipeCompletedListFromAgent) { final Map pipeMetaMapFromAgent = new HashMap<>(); - for (ByteBuffer byteBuffer : pipeMetaByteBufferListFromAgent) { - final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer); + final Map pipeCompletedMapFromAgent = new HashMap<>(); + for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) { + final PipeMeta pipeMeta = PipeMeta.deserialize(pipeMetaByteBufferListFromAgent.get(i)); pipeMetaMapFromAgent.put(pipeMeta.getStaticMeta(), pipeMeta); + pipeCompletedMapFromAgent.put( + pipeMeta.getStaticMeta(), + Objects.nonNull(pipeCompletedListFromAgent) && pipeCompletedListFromAgent.get(i)); } for (final PipeMeta pipeMetaFromCoordinator : pipeTaskInfo.get().getPipeMetaList()) { + // Remove completed pipes + final Boolean pipeCompletedFromAgent = + pipeCompletedMapFromAgent.get(pipeMetaFromCoordinator.getStaticMeta()); + if (Objects.nonNull(pipeCompletedFromAgent)) { + final PipeTemporaryMeta temporaryMeta = pipeMetaFromCoordinator.getTemporaryMeta(); + temporaryMeta.putDataNodeCompletion(nodeId, pipeCompletedFromAgent); + + final Set dataNodeIds = + configManager.getNodeManager().getRegisteredDataNodeLocations().keySet(); + dataNodeIds.retainAll(temporaryMeta.getDataNodeId2CompletedMap().keySet()); + if (dataNodeIds.isEmpty()) { + pipeTaskInfo.get().removePipeMeta(pipeMetaFromCoordinator.getStaticMeta().getPipeName()); + continue; + } + } + final PipeMeta pipeMetaFromAgent = pipeMetaMapFromAgent.get(pipeMetaFromCoordinator.getStaticMeta()); if (pipeMetaFromAgent == null) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index d41493762d2a..7417f1e736cd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -758,6 +758,19 @@ private void handleSuccessfulRestartInternal() { }); } + public void removePipeMeta(final String pipeName) { + acquireWriteLock(); + try { + removePipeMetaInternal(pipeName); + } finally { + releaseWriteLock(); + } + } + + private void removePipeMetaInternal(final String pipeName) { + pipeMetaKeeper.removePipeMeta(pipeName); + } + /////////////////////////////// Snapshot /////////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 327fe6190d6f..0190d5ebb90e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -88,7 +88,7 @@ protected boolean isShutdown() { } @Override - protected Map buildPipeTasks(PipeMeta pipeMetaFromConfigNode) + protected Map buildPipeTasks(final PipeMeta pipeMetaFromConfigNode) throws IllegalPathException { return new PipeDataNodeBuilder(pipeMetaFromConfigNode).build(); } @@ -97,7 +97,9 @@ protected Map buildPipeTasks(PipeMeta pipeMetaFromConfigNode) @Override protected void createPipeTask( - int consensusGroupId, PipeStaticMeta pipeStaticMeta, PipeTaskMeta pipeTaskMeta) + final int consensusGroupId, + final PipeStaticMeta pipeStaticMeta, + final PipeTaskMeta pipeTaskMeta) throws IllegalPathException { if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) { final PipeParameters extractorParameters = pipeStaticMeta.getExtractorParameters(); @@ -131,7 +133,7 @@ protected void createPipeTask( @Override public List handlePipeMetaChangesInternal( - List pipeMetaListFromCoordinator) { + final List pipeMetaListFromCoordinator) { // Do nothing if the node is removing or removed if (isShutdown()) { return Collections.emptyList(); @@ -144,7 +146,7 @@ public List handlePipeMetaChangesInternal( final Set validSchemaRegionIds = clearSchemaRegionListeningQueueIfNecessary(pipeMetaListFromCoordinator); closeSchemaRegionListeningQueueIfNecessary(validSchemaRegionIds, exceptionMessages); - } catch (Exception e) { + } catch (final Exception e) { throw new PipeException("Failed to clear/close schema region listening queue.", e); } @@ -152,7 +154,7 @@ public List handlePipeMetaChangesInternal( } private Set clearSchemaRegionListeningQueueIfNecessary( - List pipeMetaListFromCoordinator) throws IllegalPathException { + final List pipeMetaListFromCoordinator) throws IllegalPathException { final Map schemaRegionId2ListeningQueueNewFirstIndex = new HashMap<>(); // Check each pipe @@ -198,8 +200,8 @@ private Set clearSchemaRegionListeningQueueIfNecessary( } private void closeSchemaRegionListeningQueueIfNecessary( - Set validSchemaRegionIds, - List exceptionMessages) { + final Set validSchemaRegionIds, + final List exceptionMessages) { if (!exceptionMessages.isEmpty()) { return; } @@ -215,7 +217,7 @@ private void closeSchemaRegionListeningQueueIfNecessary( .write( schemaRegionId, new PipeOperateSchemaQueueNode(new PlanNodeId(""), false)); - } catch (ConsensusException e) { + } catch (final ConsensusException e) { throw new PipeException( "Failed to close listening queue for SchemaRegion " + schemaRegionId, e); } @@ -230,7 +232,7 @@ public void stopAllPipesWithCriticalException() { ///////////////////////// Heartbeat ///////////////////////// - public void collectPipeMetaList(TDataNodeHeartbeatResp resp) throws TException { + public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws TException { // Try the lock instead of directly acquire it to prevent the block of the cluster heartbeat // 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class BaseNodeCache in ConfigNode if (!tryReadLockWithTimeOut(10)) { @@ -243,13 +245,19 @@ public void collectPipeMetaList(TDataNodeHeartbeatResp resp) throws TException { } } - private void collectPipeMetaListInternal(TDataNodeHeartbeatResp resp) throws TException { + private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) throws TException { // Do nothing if data node is removing or removed, or request does not need pipe meta list if (PipeAgent.runtime().isShutdown()) { return; } + final Set dataRegionIds = + StorageEngine.getInstance().getAllDataRegionIds().stream() + .map(DataRegionId::getId) + .collect(Collectors.toSet()); + final List pipeMetaBinaryList = new ArrayList<>(); + final List pipeCompletedList = new ArrayList<>(); try { final Optional logger = PipeResourceManager.log() @@ -260,25 +268,40 @@ private void collectPipeMetaListInternal(TDataNodeHeartbeatResp resp) throws TEx pipeMetaKeeper.getPipeMetaCount()); for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); + final Map pipeTaskMap = + pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta()); + pipeCompletedList.add( + pipeTaskMap == null + || pipeTaskMap.entrySet().stream() + .filter(entry -> dataRegionIds.contains(entry.getKey())) + .allMatch(entry -> ((PipeDataNodeTask) entry.getValue()).isCompleted())); + logger.ifPresent(l -> l.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage())); } LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size()); - } catch (IOException e) { + } catch (final IOException e) { throw new TException(e); } resp.setPipeMetaList(pipeMetaBinaryList); + resp.setPipeCompletedList(pipeCompletedList); } @Override - protected void collectPipeMetaListInternal(TPipeHeartbeatReq req, TPipeHeartbeatResp resp) - throws TException { + protected void collectPipeMetaListInternal( + final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws TException { // Do nothing if data node is removing or removed, or request does not need pipe meta list if (PipeAgent.runtime().isShutdown()) { return; } LOGGER.info("Received pipe heartbeat request {} from config node.", req.heartbeatId); + final Set dataRegionIds = + StorageEngine.getInstance().getAllDataRegionIds().stream() + .map(DataRegionId::getId) + .collect(Collectors.toSet()); + final List pipeMetaBinaryList = new ArrayList<>(); + final List pipeCompletedList = new ArrayList<>(); try { final Optional logger = PipeResourceManager.log() @@ -289,14 +312,22 @@ protected void collectPipeMetaListInternal(TPipeHeartbeatReq req, TPipeHeartbeat pipeMetaKeeper.getPipeMetaCount()); for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); + final Map pipeTaskMap = + pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta()); + pipeCompletedList.add( + pipeTaskMap == null + || pipeTaskMap.entrySet().stream() + .filter(entry -> dataRegionIds.contains(entry.getKey())) + .allMatch(entry -> ((PipeDataNodeTask) entry.getValue()).isCompleted())); + logger.ifPresent(l -> l.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage())); } LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size()); - } catch (IOException e) { + } catch (final IOException e) { throw new TException(e); } resp.setPipeMetaList(pipeMetaBinaryList); - + resp.setPipeCompletedList(pipeCompletedList); PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true); } @@ -351,7 +382,7 @@ private boolean mayWalSizeReachThrottleThreshold() { > 2 * IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold(); } - private void restartStuckPipe(PipeMeta pipeMeta) { + private void restartStuckPipe(final PipeMeta pipeMeta) { LOGGER.warn("Pipe {} will be restarted because of stuck.", pipeMeta.getStaticMeta()); final long startTime = System.currentTimeMillis(); changePipeStatusBeforeRestart(pipeMeta.getStaticMeta().getPipeName()); @@ -362,7 +393,7 @@ private void restartStuckPipe(PipeMeta pipeMeta) { System.currentTimeMillis() - startTime); } - private void changePipeStatusBeforeRestart(String pipeName) { + private void changePipeStatusBeforeRestart(final String pipeName) { final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); final Map pipeTasks = pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta()); final Set taskRegionIds = new HashSet<>(pipeTasks.keySet()); @@ -408,9 +439,25 @@ private void changePipeStatusBeforeRestart(String pipeName) { pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED); } + ///////////////////////// Terminate Logic ///////////////////////// + + public void markCompleted(final String pipeName, final int regionId) { + acquireWriteLock(); + try { + if (pipeMetaKeeper.containsPipeMeta(pipeName)) { + ((PipeDataNodeTask) + pipeTaskManager.getPipeTask( + pipeMetaKeeper.getPipeMeta(pipeName).getStaticMeta(), regionId)) + .markCompleted(); + } + } finally { + releaseWriteLock(); + } + } + ///////////////////////// Utils ///////////////////////// - public Set getPipeTaskRegionIdSet(String pipeName, long creationTime) { + public Set getPipeTaskRegionIdSet(final String pipeName, final long creationTime) { final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); return pipeMeta == null || pipeMeta.getStaticMeta().getCreationTime() != creationTime ? Collections.emptySet() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java new file mode 100644 index 000000000000..11da516ee679 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.terminate; + +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.pattern.PipePattern; +import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; +import org.apache.iotdb.db.pipe.agent.PipeAgent; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.task.PipeDataNodeTask; + +/** + * The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls the termination of pipe, + * that is, when the historical {@link PipeTsFileInsertionEvent}s are all processed, this will be + * reported next and mark the {@link PipeDataNodeTask} as completed. WARNING: This event shall never + * be discarded. + */ +public class PipeTerminateEvent extends EnrichedEvent { + private final int dataRegionId; + + public PipeTerminateEvent( + final String pipeName, final PipeTaskMeta pipeTaskMeta, final int dataRegionId) { + super(pipeName, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE); + this.dataRegionId = dataRegionId; + } + + @Override + public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) { + return true; + } + + @Override + public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) { + return true; + } + + @Override + public ProgressIndex getProgressIndex() { + return MinimumProgressIndex.INSTANCE; + } + + @Override + public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( + final String pipeName, + final PipeTaskMeta pipeTaskMeta, + final PipePattern pattern, + final long startTime, + final long endTime) { + // Should record PipeTaskMeta, for the terminateEvent shall report progress to + // notify the pipeTask it's completed. + return new PipeTerminateEvent(pipeName, pipeTaskMeta, dataRegionId); + } + + @Override + public boolean isGeneratedByPipe() { + return false; + } + + @Override + public boolean mayEventTimeOverlappedWithTimeRange() { + return true; + } + + @Override + public void reportProgress() { + PipeAgent.task().markCompleted(pipeName, dataRegionId); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index 3d1528fe6599..3b5a05b2d6e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; import org.apache.iotdb.commons.pipe.pattern.PipePattern; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; +import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter; import org.apache.iotdb.db.pipe.resource.PipeResourceManager; @@ -64,6 +65,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY; @@ -72,6 +74,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY; @@ -104,11 +107,13 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa private boolean shouldExtractInsertion; private boolean shouldTransferModFile; // Whether to transfer mods + private boolean shouldTerminatePipeOnAllConsumed; + private boolean isTerminateSignalSent = false; private Queue pendingQueue; @Override - public void validate(PipeParameterValidator validator) { + public void validate(final PipeParameterValidator validator) { final PipeParameters parameters = validator.getParameters(); if (parameters.hasAnyAttributes( @@ -139,7 +144,7 @@ public void validate(PipeParameterValidator validator) { EXTRACTOR_END_TIME_KEY)); } return; - } catch (Exception e) { + } catch (final Exception e) { // compatible with the current validation framework throw new PipeParameterNotValidException(e.getMessage()); } @@ -191,14 +196,20 @@ public void validate(PipeParameterValidator validator) { || // Should extract deletion DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters) .getRight()); - } catch (Exception e) { + shouldTerminatePipeOnAllConsumed = + parameters.getBooleanOrDefault( + Arrays.asList( + SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY, EXTRACTOR_HISTORY_ENABLE_KEY), + EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE); + } catch (final Exception e) { // Compatible with the current validation framework throw new PipeParameterNotValidException(e.getMessage()); } } @Override - public void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration) + public void customize( + final PipeParameters parameters, final PipeExtractorRuntimeConfiguration configuration) throws IllegalPathException { shouldExtractInsertion = DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters).getLeft(); @@ -394,7 +405,7 @@ && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) // Will unpin it after the PipeTsFileInsertionEvent is created and pinned. try { PipeResourceManager.tsfile().pinTsFileResource(resource, shouldTransferModFile); - } catch (IOException e) { + } catch (final IOException e) { LOGGER.warn("Pipe: failed to pin TsFileResource {}", resource.getTsFilePath()); } }); @@ -426,7 +437,7 @@ && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) } } - private boolean mayTsFileContainUnprocessedData(TsFileResource resource) { + private boolean mayTsFileContainUnprocessedData(final TsFileResource resource) { if (startIndex instanceof TimeWindowStateProgressIndex) { // The resource is closed thus the TsFileResource#getFileEndTime() is safe to use return ((TimeWindowStateProgressIndex) startIndex).getMinTime() <= resource.getFileEndTime(); @@ -446,21 +457,21 @@ private boolean mayTsFileContainUnprocessedData(TsFileResource resource) { return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()); } - private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource resource) { + private boolean isTsFileResourceOverlappedWithTimeRange(final TsFileResource resource) { return !(resource.getFileEndTime() < historicalDataExtractionStartTime || historicalDataExtractionEndTime < resource.getFileStartTime()); } - private boolean isTsFileResourceCoveredByTimeRange(TsFileResource resource) { + private boolean isTsFileResourceCoveredByTimeRange(final TsFileResource resource) { return historicalDataExtractionStartTime <= resource.getFileStartTime() && historicalDataExtractionEndTime >= resource.getFileEndTime(); } - private boolean isTsFileGeneratedAfterExtractionTimeLowerBound(TsFileResource resource) { + private boolean isTsFileGeneratedAfterExtractionTimeLowerBound(final TsFileResource resource) { try { return historicalDataExtractionTimeLowerBound <= TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime(); - } catch (IOException e) { + } catch (final IOException e) { LOGGER.warn( "Pipe {}@{}: failed to get the generation time of TsFile {}, extract it anyway" + " (historical data extraction time lower bound: {})", @@ -483,7 +494,8 @@ public synchronized Event supply() { final TsFileResource resource = pendingQueue.poll(); if (resource == null) { - return null; + isTerminateSignalSent = true; + return new PipeTerminateEvent(pipeName, pipeTaskMeta, dataRegionId); } final PipeTsFileInsertionEvent event = @@ -508,7 +520,7 @@ public synchronized Event supply() { event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName()); try { PipeResourceManager.tsfile().unpinTsFileResource(resource); - } catch (IOException e) { + } catch (final IOException e) { LOGGER.warn( "Pipe {}@{}: failed to unpin TsFileResource after creating event, original path: {}", pipeName, @@ -520,7 +532,9 @@ public synchronized Event supply() { } public synchronized boolean hasConsumedAll() { - return Objects.isNull(pendingQueue) || pendingQueue.isEmpty(); + return Objects.nonNull(pendingQueue) + && pendingQueue.isEmpty() + && (!shouldTerminatePipeOnAllConsumed || isTerminateSignalSent); } @Override @@ -535,7 +549,7 @@ public synchronized void close() { resource -> { try { PipeResourceManager.tsfile().unpinTsFileResource(resource); - } catch (IOException e) { + } catch (final IOException e) { LOGGER.warn( "Pipe {}@{}: failed to unpin TsFileResource after dropping pipe, original path: {}", pipeName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java index c5b58c6e9b8e..d0b93fcdb3ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java @@ -31,17 +31,18 @@ public class PipeDataNodeTask implements PipeTask { private final String pipeName; private final int regionId; + private volatile boolean isCompleted = false; private final PipeTaskStage extractorStage; private final PipeTaskStage processorStage; private final PipeTaskStage connectorStage; public PipeDataNodeTask( - String pipeName, - int regionId, - PipeTaskStage extractorStage, - PipeTaskStage processorStage, - PipeTaskStage connectorStage) { + final String pipeName, + final int regionId, + final PipeTaskStage extractorStage, + final PipeTaskStage processorStage, + final PipeTaskStage connectorStage) { this.pipeName = pipeName; this.regionId = regionId; @@ -98,6 +99,14 @@ public void stop() { System.currentTimeMillis() - startTime); } + public boolean isCompleted() { + return isCompleted; + } + + public void markCompleted() { + this.isCompleted = true; + } + public int getRegionId() { return regionId; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java index 05ed098976f9..6a60d77cd0e9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java @@ -61,6 +61,12 @@ public class PipeExtractorConstant { public static final String EXTRACTOR_MODS_ENABLE_KEY = "extractor.mods.enable"; public static final String SOURCE_MODS_ENABLE_KEY = "source.mods.enable"; public static final boolean EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE = false; + public static final String EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY = + "extractor.history.terminate-pipe-on-all-consumed"; + public static final String SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY = + "source.history.terminate-pipe-on-all-consumed"; + public static final boolean EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE = + false; public static final String EXTRACTOR_REALTIME_ENABLE_KEY = "extractor.realtime.enable"; public static final String SOURCE_REALTIME_ENABLE_KEY = "source.realtime.enable"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java index dbbc8fdd961d..e30fcf11269b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java @@ -33,9 +33,13 @@ public class PipeMeta { private final PipeStaticMeta staticMeta; private final PipeRuntimeMeta runtimeMeta; - public PipeMeta(PipeStaticMeta staticMeta, PipeRuntimeMeta runtimeMeta) { + // This is temporary information of pipe and will not be serialized. + private final PipeTemporaryMeta temporaryMeta; + + public PipeMeta(final PipeStaticMeta staticMeta, final PipeRuntimeMeta runtimeMeta) { this.staticMeta = staticMeta; this.runtimeMeta = runtimeMeta; + this.temporaryMeta = new PipeTemporaryMeta(); } public PipeStaticMeta getStaticMeta() { @@ -46,25 +50,29 @@ public PipeRuntimeMeta getRuntimeMeta() { return runtimeMeta; } + public PipeTemporaryMeta getTemporaryMeta() { + return temporaryMeta; + } + public ByteBuffer serialize() throws IOException { - PublicBAOS byteArrayOutputStream = new PublicBAOS(); - DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); serialize(outputStream); return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); } - public void serialize(OutputStream outputStream) throws IOException { + public void serialize(final OutputStream outputStream) throws IOException { staticMeta.serialize(outputStream); runtimeMeta.serialize(outputStream); } - public static PipeMeta deserialize(FileInputStream fileInputStream) throws IOException { + public static PipeMeta deserialize(final FileInputStream fileInputStream) throws IOException { final PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(fileInputStream); final PipeRuntimeMeta runtimeMeta = PipeRuntimeMeta.deserialize(fileInputStream); return new PipeMeta(staticMeta, runtimeMeta); } - public static PipeMeta deserialize(ByteBuffer byteBuffer) { + public static PipeMeta deserialize(final ByteBuffer byteBuffer) { final PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(byteBuffer); final PipeRuntimeMeta runtimeMeta = PipeRuntimeMeta.deserialize(byteBuffer); return new PipeMeta(staticMeta, runtimeMeta); @@ -104,25 +112,33 @@ public String coreReportMessage() { } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } - PipeMeta pipeMeta = (PipeMeta) o; + final PipeMeta pipeMeta = (PipeMeta) o; return Objects.equals(staticMeta, pipeMeta.staticMeta) - && Objects.equals(runtimeMeta, pipeMeta.runtimeMeta); + && Objects.equals(runtimeMeta, pipeMeta.runtimeMeta) + && Objects.equals(temporaryMeta, pipeMeta.temporaryMeta); } @Override public int hashCode() { - return Objects.hash(staticMeta, runtimeMeta); + return Objects.hash(staticMeta, runtimeMeta, temporaryMeta); } @Override public String toString() { - return "PipeMeta{" + "staticMeta=" + staticMeta + ", runtimeMeta=" + runtimeMeta + '}'; + return "PipeMeta{" + + "staticMeta=" + + staticMeta + + ", runtimeMeta=" + + runtimeMeta + + ", temporaryMeta=" + + temporaryMeta + + '}'; } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java new file mode 100644 index 000000000000..bb216f63f673 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.task.meta; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class PipeTemporaryMeta { + private final ConcurrentMap dataNodeId2CompletedMap = new ConcurrentHashMap<>(); + + public void putDataNodeCompletion(final int dataNodeId, final boolean completion) { + dataNodeId2CompletedMap.put(dataNodeId, completion); + } + + public Map getDataNodeId2CompletedMap() { + return dataNodeId2CompletedMap; + } +} diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index f5d6839ce6c3..06d0fd8d0e75 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -294,6 +294,7 @@ struct TDataNodeHeartbeatResp { 11: optional string activateStatus 12: optional set confirmedConfigNodeEndPoints 13: optional map consensusLogicalTimeMap + 14: optional list pipeCompletedList } struct TPipeHeartbeatReq { @@ -302,6 +303,7 @@ struct TPipeHeartbeatReq { struct TPipeHeartbeatResp { 1: required list pipeMetaList + 2: optional list pipeCompletedList } enum TSchemaLimitLevel{ From 715621c5f174f33270ffa6d5386c8cd0fbef89ae Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 8 May 2024 16:43:04 +0800 Subject: [PATCH 02/14] completion --- .../heartbeat/DataNodeHeartbeatHandler.java | 3 ++- .../coordinator/runtime/PipeHeartbeatParser.java | 16 ++++++++-------- .../runtime/PipeHeartbeatScheduler.java | 14 ++++++++++---- .../runtime/PipeRuntimeCoordinator.java | 7 +++++-- .../pipe/task/meta/PipeTemporaryMeta.java | 12 ++++++------ 5 files changed, 31 insertions(+), 21 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index 49981d72834b..5829fd07a56e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -130,7 +130,8 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { regionDisk.putAll(heartbeatResp.getRegionDisk()); } if (heartbeatResp.getPipeMetaList() != null) { - pipeRuntimeCoordinator.parseHeartbeat(nodeId, heartbeatResp.getPipeMetaList()); + pipeRuntimeCoordinator.parseHeartbeat( + nodeId, heartbeatResp.getPipeMetaList(), heartbeatResp.getPipeCompletedList()); } if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) { loadManager diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java index d995dd67c5b3..48f95f6cf9fa 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java @@ -58,7 +58,7 @@ public class PipeHeartbeatParser { private final AtomicBoolean needWriteConsensusOnConfigNodes; private final AtomicBoolean needPushPipeMetaToDataNodes; - PipeHeartbeatParser(ConfigManager configManager) { + PipeHeartbeatParser(final ConfigManager configManager) { this.configManager = configManager; heartbeatCounter = 0; @@ -69,9 +69,9 @@ public class PipeHeartbeatParser { } public synchronized void parseHeartbeat( - int nodeId, - @NotNull List pipeMetaByteBufferListFromAgent, - final List pipeCompletedListFromAgent) { + final int nodeId, + @NotNull final List pipeMetaByteBufferListFromAgent, + /* @Nullable */ final List pipeCompletedListFromAgent) { final long heartbeatCount = ++heartbeatCounter; final AtomicBoolean canSubmitHandleMetaChangeProcedure = new AtomicBoolean(false); @@ -143,7 +143,7 @@ private void parseHeartbeatAndSaveMetaChangeLocally( final AtomicReference pipeTaskInfo, final int nodeId, @NotNull final List pipeMetaByteBufferListFromAgent, - final List pipeCompletedListFromAgent) { + /* @Nullable */ final List pipeCompletedListFromAgent) { final Map pipeMetaMapFromAgent = new HashMap<>(); final Map pipeCompletedMapFromAgent = new HashMap<>(); for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) { @@ -158,13 +158,13 @@ private void parseHeartbeatAndSaveMetaChangeLocally( // Remove completed pipes final Boolean pipeCompletedFromAgent = pipeCompletedMapFromAgent.get(pipeMetaFromCoordinator.getStaticMeta()); - if (Objects.nonNull(pipeCompletedFromAgent)) { + if (Objects.nonNull(pipeCompletedFromAgent) && Boolean.TRUE.equals(pipeCompletedFromAgent)) { final PipeTemporaryMeta temporaryMeta = pipeMetaFromCoordinator.getTemporaryMeta(); - temporaryMeta.putDataNodeCompletion(nodeId, pipeCompletedFromAgent); + temporaryMeta.putDataNodeCompletion(nodeId); final Set dataNodeIds = configManager.getNodeManager().getRegisteredDataNodeLocations().keySet(); - dataNodeIds.retainAll(temporaryMeta.getDataNodeId2CompletedMap().keySet()); + dataNodeIds.retainAll(temporaryMeta.getCompletedDataNode()); if (dataNodeIds.isEmpty()) { pipeTaskInfo.get().removePipeMeta(pipeMetaFromCoordinator.getStaticMeta().getPipeName()); continue; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java index 65217818e7d0..bc846569c4ed 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java @@ -109,7 +109,8 @@ private synchronized void heartbeat() { .getResponseMap() .forEach( (dataNodeId, resp) -> - pipeHeartbeatParser.parseHeartbeat(dataNodeId, resp.getPipeMetaList())); + pipeHeartbeatParser.parseHeartbeat( + dataNodeId, resp.getPipeMetaList(), resp.getPipeCompletedList())); // config node heartbeat try { @@ -117,7 +118,8 @@ private synchronized void heartbeat() { PipeConfigNodeAgent.task().collectPipeMetaList(request, configNodeResp); pipeHeartbeatParser.parseHeartbeat( ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), - configNodeResp.getPipeMetaList()); + configNodeResp.getPipeMetaList(), + null); } catch (Exception e) { LOGGER.warn("Failed to collect pipe meta list from config node task agent", e); } @@ -131,7 +133,11 @@ public synchronized void stop() { } } - public void parseHeartbeat(int dataNodeId, List pipeMetaByteBufferListFromDataNode) { - pipeHeartbeatParser.parseHeartbeat(dataNodeId, pipeMetaByteBufferListFromDataNode); + public void parseHeartbeat( + final int dataNodeId, + final List pipeMetaByteBufferListFromDataNode, + final List pipeCompletedListFromAgent) { + pipeHeartbeatParser.parseHeartbeat( + dataNodeId, pipeMetaByteBufferListFromDataNode, pipeCompletedListFromAgent); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java index 8d9644eae5fe..31ea2f6b3c7d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java @@ -103,7 +103,10 @@ public void stopPipeHeartbeat() { } public void parseHeartbeat( - int dataNodeId, @NotNull List pipeMetaByteBufferListFromDataNode) { - pipeHeartbeatScheduler.parseHeartbeat(dataNodeId, pipeMetaByteBufferListFromDataNode); + int dataNodeId, + @NotNull List pipeMetaByteBufferListFromDataNode, /* @Nullable */ + final List pipeCompletedListFromAgent) { + pipeHeartbeatScheduler.parseHeartbeat( + dataNodeId, pipeMetaByteBufferListFromDataNode, pipeCompletedListFromAgent); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java index bb216f63f673..b2b70f02edb2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java @@ -19,18 +19,18 @@ package org.apache.iotdb.commons.pipe.task.meta; -import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public class PipeTemporaryMeta { - private final ConcurrentMap dataNodeId2CompletedMap = new ConcurrentHashMap<>(); + private final ConcurrentMap completedDataNode = new ConcurrentHashMap<>(); - public void putDataNodeCompletion(final int dataNodeId, final boolean completion) { - dataNodeId2CompletedMap.put(dataNodeId, completion); + public void putDataNodeCompletion(final int dataNodeId) { + completedDataNode.put(dataNodeId, dataNodeId); } - public Map getDataNodeId2CompletedMap() { - return dataNodeId2CompletedMap; + public Set getCompletedDataNode() { + return completedDataNode.keySet(); } } From 0db377c52b1e1e71a087ffb22b7f30f8341b1d81 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 8 May 2024 16:55:14 +0800 Subject: [PATCH 03/14] Update PipeDataNodeTaskAgent.java --- .../iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 0190d5ebb90e..e9de49a26d02 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -445,10 +445,13 @@ public void markCompleted(final String pipeName, final int regionId) { acquireWriteLock(); try { if (pipeMetaKeeper.containsPipeMeta(pipeName)) { - ((PipeDataNodeTask) + final PipeDataNodeTask pipeDataNodeTask = + ((PipeDataNodeTask) pipeTaskManager.getPipeTask( - pipeMetaKeeper.getPipeMeta(pipeName).getStaticMeta(), regionId)) - .markCompleted(); + pipeMetaKeeper.getPipeMeta(pipeName).getStaticMeta(), regionId)); + if (Objects.nonNull(pipeDataNodeTask)) { + pipeDataNodeTask.markCompleted(); + } } } finally { releaseWriteLock(); From 9c75a8ee973c4ec4ecfa1ba6a60fd3960c66c195 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 8 May 2024 17:02:21 +0800 Subject: [PATCH 04/14] style --- .../runtime/PipeHeartbeatScheduler.java | 4 +- .../runtime/PipeRuntimeCoordinator.java | 14 ++-- .../persistence/pipe/PipeTaskInfo.java | 76 ++++++++++--------- 3 files changed, 48 insertions(+), 46 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java index bc846569c4ed..95ad7240df0f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java @@ -61,7 +61,7 @@ public class PipeHeartbeatScheduler { private Future heartbeatFuture; - PipeHeartbeatScheduler(ConfigManager configManager) { + PipeHeartbeatScheduler(final ConfigManager configManager) { this.configManager = configManager; this.pipeHeartbeatParser = new PipeHeartbeatParser(configManager); } @@ -120,7 +120,7 @@ private synchronized void heartbeat() { ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), configNodeResp.getPipeMetaList(), null); - } catch (Exception e) { + } catch (final Exception e) { LOGGER.warn("Failed to collect pipe meta list from config node task agent", e); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java index 31ea2f6b3c7d..07c92c2a2e3b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java @@ -45,7 +45,7 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber { private final PipeMetaSyncer pipeMetaSyncer; private final PipeHeartbeatScheduler pipeHeartbeatScheduler; - public PipeRuntimeCoordinator(ConfigManager configManager) { + public PipeRuntimeCoordinator(final ConfigManager configManager) { if (procedureSubmitterHolder.get() == null) { synchronized (PipeRuntimeCoordinator.class) { if (procedureSubmitterHolder.get() == null) { @@ -71,18 +71,18 @@ public synchronized void onConfigRegionGroupLeaderChanged() { } @Override - public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) { + public void onNodeStatisticsChanged(final NodeStatisticsChangeEvent event) { // Do nothing } @Override - public void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent event) { + public void onRegionGroupStatisticsChanged(final RegionGroupStatisticsChangeEvent event) { // Do nothing } @Override public synchronized void onConsensusGroupStatisticsChanged( - ConsensusGroupStatisticsChangeEvent event) { + final ConsensusGroupStatisticsChangeEvent event) { pipeLeaderChangeHandler.onConsensusGroupStatisticsChanged(event); } @@ -103,9 +103,9 @@ public void stopPipeHeartbeat() { } public void parseHeartbeat( - int dataNodeId, - @NotNull List pipeMetaByteBufferListFromDataNode, /* @Nullable */ - final List pipeCompletedListFromAgent) { + final int dataNodeId, + @NotNull final List pipeMetaByteBufferListFromDataNode, + /* @Nullable */ final List pipeCompletedListFromAgent) { pipeHeartbeatScheduler.parseHeartbeat( dataNodeId, pipeMetaByteBufferListFromDataNode, pipeCompletedListFromAgent); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 7417f1e736cd..201f2f049eda 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -143,7 +143,7 @@ public boolean canSkipNextSync() { /////////////////////////////// Validator /////////////////////////////// - public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) throws PipeException { + public void checkBeforeCreatePipe(final TCreatePipeReq createPipeRequest) throws PipeException { acquireReadLock(); try { checkBeforeCreatePipeInternal(createPipeRequest); @@ -166,7 +166,7 @@ private void checkBeforeCreatePipeInternal(TCreatePipeReq createPipeRequest) throw new PipeException(exceptionMessage); } - public void checkAndUpdateRequestBeforeAlterPipe(TAlterPipeReq alterPipeRequest) + public void checkAndUpdateRequestBeforeAlterPipe(final TAlterPipeReq alterPipeRequest) throws PipeException { acquireReadLock(); try { @@ -176,7 +176,7 @@ public void checkAndUpdateRequestBeforeAlterPipe(TAlterPipeReq alterPipeRequest) } } - private void checkAndUpdateRequestBeforeAlterPipeInternal(TAlterPipeReq alterPipeRequest) + private void checkAndUpdateRequestBeforeAlterPipeInternal(final TAlterPipeReq alterPipeRequest) throws PipeException { if (!isPipeExisted(alterPipeRequest.getPipeName())) { final String exceptionMessage = @@ -229,7 +229,7 @@ private void checkAndUpdateRequestBeforeAlterPipeInternal(TAlterPipeReq alterPip } } - public void checkBeforeStartPipe(String pipeName) throws PipeException { + public void checkBeforeStartPipe(final String pipeName) throws PipeException { acquireReadLock(); try { checkBeforeStartPipeInternal(pipeName); @@ -238,7 +238,7 @@ public void checkBeforeStartPipe(String pipeName) throws PipeException { } } - private void checkBeforeStartPipeInternal(String pipeName) throws PipeException { + private void checkBeforeStartPipeInternal(final String pipeName) throws PipeException { if (!isPipeExisted(pipeName)) { final String exceptionMessage = String.format("Failed to start pipe %s, the pipe does not exist", pipeName); @@ -255,7 +255,7 @@ private void checkBeforeStartPipeInternal(String pipeName) throws PipeException } } - public void checkBeforeStopPipe(String pipeName) throws PipeException { + public void checkBeforeStopPipe(final String pipeName) throws PipeException { acquireReadLock(); try { checkBeforeStopPipeInternal(pipeName); @@ -264,7 +264,7 @@ public void checkBeforeStopPipe(String pipeName) throws PipeException { } } - private void checkBeforeStopPipeInternal(String pipeName) throws PipeException { + private void checkBeforeStopPipeInternal(final String pipeName) throws PipeException { if (!isPipeExisted(pipeName)) { final String exceptionMessage = String.format("Failed to stop pipe %s, the pipe does not exist", pipeName); @@ -281,7 +281,7 @@ private void checkBeforeStopPipeInternal(String pipeName) throws PipeException { } } - public void checkBeforeDropPipe(String pipeName) { + public void checkBeforeDropPipe(final String pipeName) { acquireReadLock(); try { checkBeforeDropPipeInternal(pipeName); @@ -290,7 +290,7 @@ public void checkBeforeDropPipe(String pipeName) { } } - private void checkBeforeDropPipeInternal(String pipeName) { + private void checkBeforeDropPipeInternal(final String pipeName) { if (LOGGER.isDebugEnabled()) { LOGGER.debug( "Check before drop pipe {}, pipe exists: {}.", pipeName, isPipeExisted(pipeName)); @@ -300,7 +300,7 @@ private void checkBeforeDropPipeInternal(String pipeName) { // DO NOTHING HERE! } - public boolean isPipeExisted(String pipeName) { + public boolean isPipeExisted(final String pipeName) { acquireReadLock(); try { return pipeMetaKeeper.containsPipeMeta(pipeName); @@ -309,7 +309,7 @@ public boolean isPipeExisted(String pipeName) { } } - private PipeStatus getPipeStatus(String pipeName) { + private PipeStatus getPipeStatus(final String pipeName) { acquireReadLock(); try { return pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta().getStatus().get(); @@ -318,7 +318,7 @@ private PipeStatus getPipeStatus(String pipeName) { } } - public boolean isPipeRunning(String pipeName) { + public boolean isPipeRunning(final String pipeName) { acquireReadLock(); try { return pipeMetaKeeper.containsPipeMeta(pipeName) @@ -328,7 +328,7 @@ public boolean isPipeRunning(String pipeName) { } } - public boolean isPipeStoppedByUser(String pipeName) { + public boolean isPipeStoppedByUser(final String pipeName) { acquireReadLock(); try { return pipeMetaKeeper.containsPipeMeta(pipeName) @@ -341,7 +341,7 @@ public boolean isPipeStoppedByUser(String pipeName) { /////////////////////////////// Pipe Task Management /////////////////////////////// - public TSStatus createPipe(CreatePipePlanV2 plan) { + public TSStatus createPipe(final CreatePipePlanV2 plan) { acquireWriteLock(); try { pipeMetaKeeper.addPipeMeta( @@ -353,7 +353,7 @@ public TSStatus createPipe(CreatePipePlanV2 plan) { } } - public TSStatus operateMultiplePipes(OperateMultiplePipesPlanV2 plan) { + public TSStatus operateMultiplePipes(final OperateMultiplePipesPlanV2 plan) { acquireWriteLock(); try { if (plan.getSubPlans() == null || plan.getSubPlans().isEmpty()) { @@ -364,7 +364,7 @@ public TSStatus operateMultiplePipes(OperateMultiplePipesPlanV2 plan) { // We use sub-status to record the status of each subPlan status.setSubStatus(new ArrayList<>()); - for (ConfigPhysicalPlan subPlan : plan.getSubPlans()) { + for (final ConfigPhysicalPlan subPlan : plan.getSubPlans()) { try { if (subPlan instanceof CreatePipePlanV2) { createPipe((CreatePipePlanV2) subPlan); @@ -379,7 +379,7 @@ public TSStatus operateMultiplePipes(OperateMultiplePipesPlanV2 plan) { String.format("Unsupported subPlan type: %s", subPlan.getClass().getName())); } status.getSubStatus().add(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); - } catch (Exception e) { + } catch (final Exception e) { // If one of the subPlan fails, we stop operating the rest of the pipes LOGGER.error("Failed to operate pipe", e); status.setCode(TSStatusCode.PIPE_ERROR.getStatusCode()); @@ -399,7 +399,7 @@ public TSStatus operateMultiplePipes(OperateMultiplePipesPlanV2 plan) { } } - public TSStatus alterPipe(AlterPipePlanV2 plan) { + public TSStatus alterPipe(final AlterPipePlanV2 plan) { acquireWriteLock(); try { pipeMetaKeeper.removePipeMeta(plan.getPipeStaticMeta().getPipeName()); @@ -412,7 +412,7 @@ public TSStatus alterPipe(AlterPipePlanV2 plan) { } } - public TSStatus setPipeStatus(SetPipeStatusPlanV2 plan) { + public TSStatus setPipeStatus(final SetPipeStatusPlanV2 plan) { acquireWriteLock(); try { pipeMetaKeeper @@ -426,7 +426,7 @@ public TSStatus setPipeStatus(SetPipeStatusPlanV2 plan) { } } - public TSStatus dropPipe(DropPipePlanV2 plan) { + public TSStatus dropPipe(final DropPipePlanV2 plan) { acquireWriteLock(); try { pipeMetaKeeper.removePipeMeta(plan.getPipeName()); @@ -457,7 +457,7 @@ public Iterable getPipeMetaList() { } } - public PipeMeta getPipeMetaByPipeName(String pipeName) { + public PipeMeta getPipeMetaByPipeName(final String pipeName) { acquireReadLock(); try { return pipeMetaKeeper.getPipeMetaByPipeName(pipeName); @@ -478,7 +478,7 @@ public boolean isEmpty() { /////////////////////////////// Pipe Runtime Management /////////////////////////////// /** Handle the region leader change event and update the pipe task meta accordingly. */ - public TSStatus handleLeaderChange(PipeHandleLeaderChangePlan plan) { + public TSStatus handleLeaderChange(final PipeHandleLeaderChangePlan plan) { acquireWriteLock(); try { return handleLeaderChangeInternal(plan); @@ -487,7 +487,7 @@ public TSStatus handleLeaderChange(PipeHandleLeaderChangePlan plan) { } } - private TSStatus handleLeaderChangeInternal(PipeHandleLeaderChangePlan plan) { + private TSStatus handleLeaderChangeInternal(final PipeHandleLeaderChangePlan plan) { plan.getConsensusGroupId2NewLeaderIdMap() .forEach( (consensusGroupId, newLeader) -> @@ -532,7 +532,7 @@ private TSStatus handleLeaderChangeInternal(PipeHandleLeaderChangePlan plan) { * @param plan The plan containing all the {@link PipeMeta}s from leader {@link ConfigNode} * @return {@link TSStatusCode#SUCCESS_STATUS} */ - public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) { + public TSStatus handleMetaChanges(final PipeHandleMetaChangePlan plan) { acquireWriteLock(); try { return handleMetaChangesInternal(plan); @@ -541,7 +541,7 @@ public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) { } } - private TSStatus handleMetaChangesInternal(PipeHandleMetaChangePlan plan) { + private TSStatus handleMetaChangesInternal(final PipeHandleMetaChangePlan plan) { LOGGER.info("Handling pipe meta changes ..."); pipeMetaKeeper.clear(); @@ -556,7 +556,7 @@ private TSStatus handleMetaChangesInternal(PipeHandleMetaChangePlan plan) { return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } - public boolean isStoppedByRuntimeException(String pipeName) { + public boolean isStoppedByRuntimeException(final String pipeName) { acquireReadLock(); try { return isStoppedByRuntimeExceptionInternal(pipeName); @@ -565,7 +565,7 @@ public boolean isStoppedByRuntimeException(String pipeName) { } } - private boolean isStoppedByRuntimeExceptionInternal(String pipeName) { + private boolean isStoppedByRuntimeExceptionInternal(final String pipeName) { return pipeMetaKeeper.containsPipeMeta(pipeName) && pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta().getIsStoppedByRuntimeException(); } @@ -578,7 +578,7 @@ private boolean isStoppedByRuntimeExceptionInternal(String pipeName) { * * @param pipeName The name of the pipe to be clear exception */ - public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(String pipeName) { + public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(final String pipeName) { acquireWriteLock(); try { clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(pipeName); @@ -587,7 +587,8 @@ public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(String pipeN } } - private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(String pipeName) { + private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( + final String pipeName) { if (!pipeMetaKeeper.containsPipeMeta(pipeName)) { return; } @@ -616,7 +617,7 @@ private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(Str }); } - public void setIsStoppedByRuntimeExceptionToFalse(String pipeName) { + public void setIsStoppedByRuntimeExceptionToFalse(final String pipeName) { acquireWriteLock(); try { setIsStoppedByRuntimeExceptionToFalseInternal(pipeName); @@ -625,7 +626,7 @@ public void setIsStoppedByRuntimeExceptionToFalse(String pipeName) { } } - private void setIsStoppedByRuntimeExceptionToFalseInternal(String pipeName) { + private void setIsStoppedByRuntimeExceptionToFalseInternal(final String pipeName) { if (!pipeMetaKeeper.containsPipeMeta(pipeName)) { return; } @@ -641,9 +642,10 @@ private void setIsStoppedByRuntimeExceptionToFalseInternal(String pipeName) { * messages will then be updated to all the nodes through {@link PipeHandleMetaChangeProcedure}. * * @param respMap The responseMap after pushing pipe meta - * @return {@link true} if there are exceptions encountered + * @return {@code true} if there are exceptions encountered */ - public boolean recordDataNodePushPipeMetaExceptions(Map respMap) { + public boolean recordDataNodePushPipeMetaExceptions( + final Map respMap) { acquireWriteLock(); try { return recordDataNodePushPipeMetaExceptionsInternal(respMap); @@ -653,7 +655,7 @@ public boolean recordDataNodePushPipeMetaExceptions(Map respMap) { + final Map respMap) { boolean hasException = false; for (final Map.Entry respEntry : respMap.entrySet()) { @@ -774,7 +776,7 @@ private void removePipeMetaInternal(final String pipeName) { /////////////////////////////// Snapshot /////////////////////////////// @Override - public boolean processTakeSnapshot(File snapshotDir) throws IOException { + public boolean processTakeSnapshot(final File snapshotDir) throws IOException { acquireReadLock(); try { final File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME); @@ -796,7 +798,7 @@ public boolean processTakeSnapshot(File snapshotDir) throws IOException { } @Override - public void processLoadSnapshot(File snapshotDir) throws IOException { + public void processLoadSnapshot(final File snapshotDir) throws IOException { acquireWriteLock(); try { final File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME); @@ -823,7 +825,7 @@ public int hashCode() { } @Override - public boolean equals(Object obj) { + public boolean equals(final Object obj) { if (this == obj) { return true; } From f099bf294f16f4f8e4f323c41d93c145b7a28988 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 8 May 2024 18:59:41 +0800 Subject: [PATCH 05/14] bug fix --- .../runtime/PipeHeartbeatParser.java | 4 +++- .../common/terminate/PipeTerminateEvent.java | 7 ++++++ ...peHistoricalDataRegionTsFileExtractor.java | 10 ++++++-- .../pipe/task/meta/PipeTemporaryMeta.java | 23 +++++++++++++++++++ 4 files changed, 41 insertions(+), 3 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java index 48f95f6cf9fa..7246fb0a9885 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java @@ -164,9 +164,11 @@ private void parseHeartbeatAndSaveMetaChangeLocally( final Set dataNodeIds = configManager.getNodeManager().getRegisteredDataNodeLocations().keySet(); - dataNodeIds.retainAll(temporaryMeta.getCompletedDataNode()); + dataNodeIds.removeAll(temporaryMeta.getCompletedDataNode()); if (dataNodeIds.isEmpty()) { pipeTaskInfo.get().removePipeMeta(pipeMetaFromCoordinator.getStaticMeta().getPipeName()); + needWriteConsensusOnConfigNodes.set(true); + needPushPipeMetaToDataNodes.set(true); continue; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java index 11da516ee679..c1252fe668f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java @@ -84,4 +84,11 @@ public boolean mayEventTimeOverlappedWithTimeRange() { public void reportProgress() { PipeAgent.task().markCompleted(pipeName, dataRegionId); } + + @Override + public String toString() { + return String.format("PipeTerminateEvent{dataRegionId=%s}", dataRegionId) + + " - " + + super.toString(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index 3b5a05b2d6e2..c9994599ffbc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -66,6 +66,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY; @@ -199,7 +200,8 @@ public void validate(final PipeParameterValidator validator) { shouldTerminatePipeOnAllConsumed = parameters.getBooleanOrDefault( Arrays.asList( - SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY, EXTRACTOR_HISTORY_ENABLE_KEY), + SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY, + EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY), EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE); } catch (final Exception e) { // Compatible with the current validation framework @@ -495,7 +497,11 @@ public synchronized Event supply() { final TsFileResource resource = pendingQueue.poll(); if (resource == null) { isTerminateSignalSent = true; - return new PipeTerminateEvent(pipeName, pipeTaskMeta, dataRegionId); + final PipeTerminateEvent terminateEvent = + new PipeTerminateEvent(pipeName, pipeTaskMeta, dataRegionId); + terminateEvent.increaseReferenceCount( + PipeHistoricalDataRegionTsFileExtractor.class.getName()); + return terminateEvent; } final PipeTsFileInsertionEvent event = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java index b2b70f02edb2..3a3b4e533ca1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java @@ -19,6 +19,7 @@ package org.apache.iotdb.commons.pipe.task.meta; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -33,4 +34,26 @@ public void putDataNodeCompletion(final int dataNodeId) { public Set getCompletedDataNode() { return completedDataNode.keySet(); } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final PipeTemporaryMeta pipeTemporaryMeta = (PipeTemporaryMeta) o; + return Objects.equals(completedDataNode, pipeTemporaryMeta.completedDataNode); + } + + @Override + public int hashCode() { + return Objects.hash(completedDataNode); + } + + @Override + public String toString() { + return "PipeMeta{" + "completedDataNode=" + completedDataNode + '}'; + } } From e5e94c38daf6d435162d204dca634c60d16412b2 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 8 May 2024 19:38:18 +0800 Subject: [PATCH 06/14] bug fix --- .../agent/task/PipeDataNodeTaskAgent.java | 51 ++++++++++++++++--- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index e9de49a26d02..fd830fd84a4b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -64,6 +64,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -74,6 +75,10 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY; + public class PipeDataNodeTaskAgent extends PipeTaskAgent { private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataNodeTaskAgent.class); @@ -268,18 +273,35 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro pipeMetaKeeper.getPipeMetaCount()); for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); + final Map pipeTaskMap = pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta()); - pipeCompletedList.add( + final boolean isAllDataRegionCompleted = pipeTaskMap == null || pipeTaskMap.entrySet().stream() .filter(entry -> dataRegionIds.contains(entry.getKey())) - .allMatch(entry -> ((PipeDataNodeTask) entry.getValue()).isCompleted())); + .allMatch(entry -> ((PipeDataNodeTask) entry.getValue()).isCompleted()); + // If the "source.history.terminate-pipe-on-all-consumed" is false or the pipe does + // not include data transfer, we should not terminate the pipe. + final boolean includeDataAndNeedDrop = + DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair( + pipeMeta.getStaticMeta().getExtractorParameters()) + .getLeft() + && pipeMeta + .getStaticMeta() + .getExtractorParameters() + .getBooleanOrDefault( + Arrays.asList( + SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY, + EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY), + EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE); + + pipeCompletedList.add(isAllDataRegionCompleted && includeDataAndNeedDrop); logger.ifPresent(l -> l.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage())); } LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size()); - } catch (final IOException e) { + } catch (final IOException | IllegalPathException e) { throw new TException(e); } resp.setPipeMetaList(pipeMetaBinaryList); @@ -312,18 +334,35 @@ protected void collectPipeMetaListInternal( pipeMetaKeeper.getPipeMetaCount()); for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); + final Map pipeTaskMap = pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta()); - pipeCompletedList.add( + final boolean isAllDataRegionCompleted = pipeTaskMap == null || pipeTaskMap.entrySet().stream() .filter(entry -> dataRegionIds.contains(entry.getKey())) - .allMatch(entry -> ((PipeDataNodeTask) entry.getValue()).isCompleted())); + .allMatch(entry -> ((PipeDataNodeTask) entry.getValue()).isCompleted()); + // If the "source.history.terminate-pipe-on-all-consumed" is false or the pipe does + // not include data transfer, we should not terminate the pipe. + final boolean includeDataAndNeedDrop = + DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair( + pipeMeta.getStaticMeta().getExtractorParameters()) + .getLeft() + && pipeMeta + .getStaticMeta() + .getExtractorParameters() + .getBooleanOrDefault( + Arrays.asList( + SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY, + EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY), + EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE); + + pipeCompletedList.add(isAllDataRegionCompleted && includeDataAndNeedDrop); logger.ifPresent(l -> l.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage())); } LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size()); - } catch (final IOException e) { + } catch (final IOException | IllegalPathException e) { throw new TException(e); } resp.setPipeMetaList(pipeMetaBinaryList); From 7267491bb5d37a7da803c5b974a29a90734b9118 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 8 May 2024 20:25:41 +0800 Subject: [PATCH 07/14] Added IT --- .../it/autocreate/IoTDBPipeAutoDropIT.java | 80 +++++++++++++++++++ .../agent/task/PipeDataNodeTaskAgent.java | 14 +++- 2 files changed, 92 insertions(+), 2 deletions(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java new file mode 100644 index 000000000000..b223a245d0a7 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.autocreate; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class IoTDBPipeAutoDropIT extends AbstractPipeDualAutoIT { + @Test + public void testAutoDropInHistoricalTransfer() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + + if (!TestUtils.tryExecuteNonQueryWithRetry( + senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) { + return; + } + + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.history.terminate-pipe-on-all-consumed", "true"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + TestUtils.assertDataEventuallyOnEnv( + senderEnv, + "show pipes", + "ID,CreationTime,State,PipeSource,PipeProcessor,PipeSink,ExceptionMessage,", + Collections.emptySet()); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index fd830fd84a4b..0b45099ed445 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -298,7 +298,12 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro pipeCompletedList.add(isAllDataRegionCompleted && includeDataAndNeedDrop); - logger.ifPresent(l -> l.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage())); + logger.ifPresent( + l -> + l.info( + "Reporting pipe meta: {}, isCompleted: {}", + pipeMeta.coreReportMessage(), + includeDataAndNeedDrop)); } LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size()); } catch (final IOException | IllegalPathException e) { @@ -359,7 +364,12 @@ protected void collectPipeMetaListInternal( pipeCompletedList.add(isAllDataRegionCompleted && includeDataAndNeedDrop); - logger.ifPresent(l -> l.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage())); + logger.ifPresent( + l -> + l.info( + "Reporting pipe meta: {}, isCompleted: {}", + pipeMeta.coreReportMessage(), + includeDataAndNeedDrop)); } LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size()); } catch (final IOException | IllegalPathException e) { From 3c2e093ce8bc7ed4f60bdd79f110eb0905cace09 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 8 May 2024 22:48:20 +0800 Subject: [PATCH 08/14] IT fix --- .../apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java | 7 +++---- .../common/schema/PipeSchemaRegionWritePlanEvent.java | 4 ++-- .../PipeHistoricalDataRegionTsFileExtractor.java | 8 +++++--- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java index 0b1f0f63cc2a..2cc5c5c066aa 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java @@ -180,7 +180,7 @@ public void testPureDeleteInclusion() throws Exception { senderEnv, Arrays.asList( "create timeseries root.ln.wf01.wt01.status with datatype=BOOLEAN,encoding=PLAIN", - "insert into root.ln.wf01.wt01(time, status) values(0, 1)", + "insert into root.ln.wf01.wt01(time, status) values(0, true)", "flush"))) { return; } @@ -191,7 +191,7 @@ public void testPureDeleteInclusion() throws Exception { receiverEnv, Arrays.asList( "create timeseries root.ln.wf01.wt01.status1 with datatype=BOOLEAN,encoding=PLAIN", - "insert into root.ln.wf01.wt01(time, status1) values(0, 1)", + "insert into root.ln.wf01.wt01(time, status1) values(0, true)", "flush"))) { return; } @@ -206,8 +206,7 @@ public void testPureDeleteInclusion() throws Exception { receiverEnv, "select * from root.**", "Time,root.ln.wf01.wt01.status1,", - Collections.emptySet(), - 10); + Collections.emptySet()); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java index 6e62fe80610f..6e0b5cccf866 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java @@ -88,14 +88,14 @@ public void deserializeFromByteBuffer(final ByteBuffer buffer) { @Override public String toString() { - return String.format("PipeConfigRegionWritePlanEvent{planNode=%s}", planNode) + return String.format("PipeSchemaRegionWritePlanEvent{planNode=%s}", planNode) + " - " + super.toString(); } @Override public String coreReportMessage() { - return String.format("PipeConfigRegionWritePlanEvent{planNode=%s}", planNode) + return String.format("PipeSchemaRegionWritePlanEvent{planNode=%s}", planNode) + " - " + super.coreReportMessage(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index c9994599ffbc..147b62338ab7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -538,9 +538,11 @@ public synchronized Event supply() { } public synchronized boolean hasConsumedAll() { - return Objects.nonNull(pendingQueue) - && pendingQueue.isEmpty() - && (!shouldTerminatePipeOnAllConsumed || isTerminateSignalSent); + // If the pendingQueue is null when the function is called, it + // implies that the extractor only extracts deletion thus the + // Historical event has nothing to consume + return Objects.isNull(pendingQueue) + || pendingQueue.isEmpty() && (!shouldTerminatePipeOnAllConsumed || isTerminateSignalSent); } @Override From 204fe0ff2b5a0fc1cc9c6271a37773e7fc15fa57 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 14 May 2024 18:25:30 +0800 Subject: [PATCH 09/14] Refactor --- .../runtime/NodePipeHeartbeat.java | 52 +++++++++++++++++++ .../runtime/PipeHeartbeatParser.java | 45 +++++----------- .../runtime/PipeHeartbeatScheduler.java | 22 +++----- .../runtime/PipeRuntimeCoordinator.java | 3 +- .../runtime/SinglePipeHeartbeat.java | 40 ++++++++++++++ 5 files changed, 114 insertions(+), 48 deletions(-) create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/NodePipeHeartbeat.java create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/SinglePipeHeartbeat.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/NodePipeHeartbeat.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/NodePipeHeartbeat.java new file mode 100644 index 000000000000..959fddb3f550 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/NodePipeHeartbeat.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime; + +import org.apache.iotdb.commons.pipe.task.meta.PipeMeta; +import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta; + +import javax.validation.constraints.NotNull; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +class NodePipeHeartbeat { + private final Map pipeHeartbeatMap = new HashMap<>(); + + NodePipeHeartbeat( + @NotNull final List pipeMetaByteBufferListFromAgent, + /* @Nullable */ final List pipeCompletedListFromAgent) { + for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) { + final PipeMeta pipeMeta = PipeMeta.deserialize(pipeMetaByteBufferListFromAgent.get(i)); + pipeHeartbeatMap.put( + pipeMeta.getStaticMeta(), + new SinglePipeHeartbeat( + pipeMeta, + Objects.nonNull(pipeCompletedListFromAgent) && pipeCompletedListFromAgent.get(i))); + } + } + + Map getPipeHeartbeatMap() { + return pipeHeartbeatMap; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java index 7246fb0a9885..c1302d38bb85 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java @@ -35,13 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.validation.constraints.NotNull; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -68,10 +62,7 @@ public class PipeHeartbeatParser { needPushPipeMetaToDataNodes = new AtomicBoolean(false); } - public synchronized void parseHeartbeat( - final int nodeId, - @NotNull final List pipeMetaByteBufferListFromAgent, - /* @Nullable */ final List pipeCompletedListFromAgent) { + synchronized void parseHeartbeat(final int nodeId, final NodePipeHeartbeat nodePipeHeartbeat) { final long heartbeatCount = ++heartbeatCounter; final AtomicBoolean canSubmitHandleMetaChangeProcedure = new AtomicBoolean(false); @@ -92,7 +83,7 @@ public synchronized void parseHeartbeat( } } - if (pipeMetaByteBufferListFromAgent.isEmpty() + if (nodePipeHeartbeat.getPipeHeartbeatMap().isEmpty() && !(canSubmitHandleMetaChangeProcedure.get() && (needWriteConsensusOnConfigNodes.get() || needPushPipeMetaToDataNodes.get()))) { return; @@ -113,12 +104,8 @@ public synchronized void parseHeartbeat( } try { - if (!pipeMetaByteBufferListFromAgent.isEmpty()) { - parseHeartbeatAndSaveMetaChangeLocally( - pipeTaskInfo, - nodeId, - pipeMetaByteBufferListFromAgent, - pipeCompletedListFromAgent); + if (!nodePipeHeartbeat.getPipeHeartbeatMap().isEmpty()) { + parseHeartbeatAndSaveMetaChangeLocally(pipeTaskInfo, nodeId, nodePipeHeartbeat); } if (canSubmitHandleMetaChangeProcedure.get() @@ -142,23 +129,16 @@ public synchronized void parseHeartbeat( private void parseHeartbeatAndSaveMetaChangeLocally( final AtomicReference pipeTaskInfo, final int nodeId, - @NotNull final List pipeMetaByteBufferListFromAgent, - /* @Nullable */ final List pipeCompletedListFromAgent) { - final Map pipeMetaMapFromAgent = new HashMap<>(); - final Map pipeCompletedMapFromAgent = new HashMap<>(); - for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) { - final PipeMeta pipeMeta = PipeMeta.deserialize(pipeMetaByteBufferListFromAgent.get(i)); - pipeMetaMapFromAgent.put(pipeMeta.getStaticMeta(), pipeMeta); - pipeCompletedMapFromAgent.put( - pipeMeta.getStaticMeta(), - Objects.nonNull(pipeCompletedListFromAgent) && pipeCompletedListFromAgent.get(i)); - } + final NodePipeHeartbeat nodePipeHeartbeat) { + final Map pipeHeartbeatMap = + nodePipeHeartbeat.getPipeHeartbeatMap(); for (final PipeMeta pipeMetaFromCoordinator : pipeTaskInfo.get().getPipeMetaList()) { + final SinglePipeHeartbeat singlePipeHeartbeat = + pipeHeartbeatMap.get(pipeMetaFromCoordinator.getStaticMeta()); // Remove completed pipes - final Boolean pipeCompletedFromAgent = - pipeCompletedMapFromAgent.get(pipeMetaFromCoordinator.getStaticMeta()); - if (Objects.nonNull(pipeCompletedFromAgent) && Boolean.TRUE.equals(pipeCompletedFromAgent)) { + final boolean pipeCompletedFromAgent = singlePipeHeartbeat.isCompleted(); + if (Boolean.TRUE.equals(pipeCompletedFromAgent)) { final PipeTemporaryMeta temporaryMeta = pipeMetaFromCoordinator.getTemporaryMeta(); temporaryMeta.putDataNodeCompletion(nodeId); @@ -173,8 +153,7 @@ private void parseHeartbeatAndSaveMetaChangeLocally( } } - final PipeMeta pipeMetaFromAgent = - pipeMetaMapFromAgent.get(pipeMetaFromCoordinator.getStaticMeta()); + final PipeMeta pipeMetaFromAgent = singlePipeHeartbeat.getPipeMeta(); if (pipeMetaFromAgent == null) { LOGGER.info( "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, " diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java index 95ad7240df0f..aa8f797e21f5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java @@ -36,8 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.util.List; import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -66,7 +64,7 @@ public class PipeHeartbeatScheduler { this.pipeHeartbeatParser = new PipeHeartbeatParser(configManager); } - public synchronized void start() { + synchronized void start() { if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture == null) { heartbeatFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay( @@ -90,7 +88,7 @@ private synchronized void heartbeat() { return; } - // data node heartbeat + // Data node heartbeat final Map dataNodeLocationMap = configManager.getNodeManager().getRegisteredDataNodeLocations(); final TPipeHeartbeatReq request = new TPipeHeartbeatReq(System.currentTimeMillis()); @@ -110,7 +108,8 @@ private synchronized void heartbeat() { .forEach( (dataNodeId, resp) -> pipeHeartbeatParser.parseHeartbeat( - dataNodeId, resp.getPipeMetaList(), resp.getPipeCompletedList())); + dataNodeId, + new NodePipeHeartbeat(resp.getPipeMetaList(), resp.getPipeCompletedList()))); // config node heartbeat try { @@ -118,14 +117,13 @@ private synchronized void heartbeat() { PipeConfigNodeAgent.task().collectPipeMetaList(request, configNodeResp); pipeHeartbeatParser.parseHeartbeat( ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), - configNodeResp.getPipeMetaList(), - null); + new NodePipeHeartbeat(configNodeResp.getPipeMetaList(), null)); } catch (final Exception e) { LOGGER.warn("Failed to collect pipe meta list from config node task agent", e); } } - public synchronized void stop() { + synchronized void stop() { if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture != null) { heartbeatFuture.cancel(false); heartbeatFuture = null; @@ -133,11 +131,7 @@ public synchronized void stop() { } } - public void parseHeartbeat( - final int dataNodeId, - final List pipeMetaByteBufferListFromDataNode, - final List pipeCompletedListFromAgent) { - pipeHeartbeatParser.parseHeartbeat( - dataNodeId, pipeMetaByteBufferListFromDataNode, pipeCompletedListFromAgent); + void parseHeartbeat(final int dataNodeId, final NodePipeHeartbeat nodePipeHeartbeat) { + pipeHeartbeatParser.parseHeartbeat(dataNodeId, nodePipeHeartbeat); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java index 07c92c2a2e3b..8c544e16f354 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java @@ -107,6 +107,7 @@ public void parseHeartbeat( @NotNull final List pipeMetaByteBufferListFromDataNode, /* @Nullable */ final List pipeCompletedListFromAgent) { pipeHeartbeatScheduler.parseHeartbeat( - dataNodeId, pipeMetaByteBufferListFromDataNode, pipeCompletedListFromAgent); + dataNodeId, + new NodePipeHeartbeat(pipeMetaByteBufferListFromDataNode, pipeCompletedListFromAgent)); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/SinglePipeHeartbeat.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/SinglePipeHeartbeat.java new file mode 100644 index 000000000000..837307a9e594 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/SinglePipeHeartbeat.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime; + +import org.apache.iotdb.commons.pipe.task.meta.PipeMeta; + +class SinglePipeHeartbeat { + private final PipeMeta pipeMeta; + private final boolean isCompleted; + + SinglePipeHeartbeat(final PipeMeta pipeMeta, final boolean isCompleted) { + this.pipeMeta = pipeMeta; + this.isCompleted = isCompleted; + } + + PipeMeta getPipeMeta() { + return pipeMeta; + } + + boolean isCompleted() { + return isCompleted; + } +} From f6f4c94563aebc99f2c904fb32622474d636dc5e Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Wed, 22 May 2024 19:58:07 +0800 Subject: [PATCH 10/14] Update PipeDataNodeTask.java --- .../iotdb/db/pipe/task/PipeDataNodeTask.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java index d0b93fcdb3ac..502bdec2c7cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java @@ -31,12 +31,13 @@ public class PipeDataNodeTask implements PipeTask { private final String pipeName; private final int regionId; - private volatile boolean isCompleted = false; private final PipeTaskStage extractorStage; private final PipeTaskStage processorStage; private final PipeTaskStage connectorStage; + private volatile boolean isCompleted = false; + public PipeDataNodeTask( final String pipeName, final int regionId, @@ -99,14 +100,6 @@ public void stop() { System.currentTimeMillis() - startTime); } - public boolean isCompleted() { - return isCompleted; - } - - public void markCompleted() { - this.isCompleted = true; - } - public int getRegionId() { return regionId; } @@ -115,6 +108,14 @@ public String getPipeName() { return pipeName; } + public boolean isCompleted() { + return isCompleted; + } + + public void markCompleted() { + this.isCompleted = true; + } + @Override public String toString() { return pipeName + "@" + regionId; From 597edfd9422afdab62ecd7dc613f48c7c8ad2da5 Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Wed, 22 May 2024 20:10:36 +0800 Subject: [PATCH 11/14] refactor --- .../runtime/PipeHeartbeatParser.java | 4 ++-- .../pipe/task/meta/PipeTemporaryMeta.java | 19 ++++++++++--------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java index c1302d38bb85..20be59ecf673 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java @@ -140,11 +140,11 @@ private void parseHeartbeatAndSaveMetaChangeLocally( final boolean pipeCompletedFromAgent = singlePipeHeartbeat.isCompleted(); if (Boolean.TRUE.equals(pipeCompletedFromAgent)) { final PipeTemporaryMeta temporaryMeta = pipeMetaFromCoordinator.getTemporaryMeta(); - temporaryMeta.putDataNodeCompletion(nodeId); + temporaryMeta.markDataNodeCompleted(nodeId); final Set dataNodeIds = configManager.getNodeManager().getRegisteredDataNodeLocations().keySet(); - dataNodeIds.removeAll(temporaryMeta.getCompletedDataNode()); + dataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds()); if (dataNodeIds.isEmpty()) { pipeTaskInfo.get().removePipeMeta(pipeMetaFromCoordinator.getStaticMeta().getPipeName()); needWriteConsensusOnConfigNodes.set(true); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java index 3a3b4e533ca1..6da2be8e81f0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java @@ -25,14 +25,15 @@ import java.util.concurrent.ConcurrentMap; public class PipeTemporaryMeta { - private final ConcurrentMap completedDataNode = new ConcurrentHashMap<>(); - public void putDataNodeCompletion(final int dataNodeId) { - completedDataNode.put(dataNodeId, dataNodeId); + private final ConcurrentMap completedDataNodeIds = new ConcurrentHashMap<>(); + + public void markDataNodeCompleted(final int dataNodeId) { + completedDataNodeIds.put(dataNodeId, dataNodeId); } - public Set getCompletedDataNode() { - return completedDataNode.keySet(); + public Set getCompletedDataNodeIds() { + return completedDataNodeIds.keySet(); } @Override @@ -43,17 +44,17 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) { return false; } - final PipeTemporaryMeta pipeTemporaryMeta = (PipeTemporaryMeta) o; - return Objects.equals(completedDataNode, pipeTemporaryMeta.completedDataNode); + final PipeTemporaryMeta that = (PipeTemporaryMeta) o; + return Objects.equals(this.completedDataNodeIds, that.completedDataNodeIds); } @Override public int hashCode() { - return Objects.hash(completedDataNode); + return Objects.hash(completedDataNodeIds); } @Override public String toString() { - return "PipeMeta{" + "completedDataNode=" + completedDataNode + '}'; + return "PipeTemporaryMeta{" + "completedDataNodeIds=" + completedDataNodeIds + '}'; } } From 4fdbed06d25f1a38f97c0d0a63d582cafbe28453 Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Wed, 22 May 2024 20:18:49 +0800 Subject: [PATCH 12/14] Update PipeHistoricalDataRegionTsFileExtractor.java --- .../PipeHistoricalDataRegionTsFileExtractor.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index 147b62338ab7..880d3f9f7f26 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -97,18 +97,16 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa private boolean isDbNameCoveredByPattern = false; private boolean isHistoricalExtractorEnabled = false; - private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event time private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time - private long historicalDataExtractionTimeLowerBound; // Arrival time private boolean sloppyTimeRange; // true to disable time range filter after extraction private boolean shouldExtractInsertion; - private boolean shouldTransferModFile; // Whether to transfer mods - private boolean shouldTerminatePipeOnAllConsumed; + + private boolean shouldTerminatePipeOnAllHistoricalEventsConsumed; private boolean isTerminateSignalSent = false; private Queue pendingQueue; @@ -197,7 +195,8 @@ public void validate(final PipeParameterValidator validator) { || // Should extract deletion DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters) .getRight()); - shouldTerminatePipeOnAllConsumed = + + shouldTerminatePipeOnAllHistoricalEventsConsumed = parameters.getBooleanOrDefault( Arrays.asList( SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY, @@ -542,7 +541,8 @@ public synchronized boolean hasConsumedAll() { // implies that the extractor only extracts deletion thus the // Historical event has nothing to consume return Objects.isNull(pendingQueue) - || pendingQueue.isEmpty() && (!shouldTerminatePipeOnAllConsumed || isTerminateSignalSent); + || pendingQueue.isEmpty() + && (!shouldTerminatePipeOnAllHistoricalEventsConsumed || isTerminateSignalSent); } @Override From 1279c0fe3451c5059cf2a6d534602e076796cb51 Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Wed, 22 May 2024 21:19:25 +0800 Subject: [PATCH 13/14] refactor --- .../runtime/PipeRuntimeCoordinator.java | 4 +- .../runtime/SinglePipeHeartbeat.java | 40 ---------------- .../PipeHeartbeat.java} | 29 ++++++++---- .../{ => heartbeat}/PipeHeartbeatParser.java | 46 +++++++++---------- .../PipeHeartbeatScheduler.java | 16 +++---- 5 files changed, 51 insertions(+), 84 deletions(-) delete mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/SinglePipeHeartbeat.java rename iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/{NodePipeHeartbeat.java => heartbeat/PipeHeartbeat.java} (68%) rename iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/{ => heartbeat}/PipeHeartbeatParser.java (92%) rename iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/{ => heartbeat}/PipeHeartbeatScheduler.java (91%) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java index 8c544e16f354..97f893f4eaeb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java @@ -26,6 +26,8 @@ import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber; import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent; import org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent; +import org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeat; +import org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeatScheduler; import javax.validation.constraints.NotNull; @@ -108,6 +110,6 @@ public void parseHeartbeat( /* @Nullable */ final List pipeCompletedListFromAgent) { pipeHeartbeatScheduler.parseHeartbeat( dataNodeId, - new NodePipeHeartbeat(pipeMetaByteBufferListFromDataNode, pipeCompletedListFromAgent)); + new PipeHeartbeat(pipeMetaByteBufferListFromDataNode, pipeCompletedListFromAgent)); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/SinglePipeHeartbeat.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/SinglePipeHeartbeat.java deleted file mode 100644 index 837307a9e594..000000000000 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/SinglePipeHeartbeat.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime; - -import org.apache.iotdb.commons.pipe.task.meta.PipeMeta; - -class SinglePipeHeartbeat { - private final PipeMeta pipeMeta; - private final boolean isCompleted; - - SinglePipeHeartbeat(final PipeMeta pipeMeta, final boolean isCompleted) { - this.pipeMeta = pipeMeta; - this.isCompleted = isCompleted; - } - - PipeMeta getPipeMeta() { - return pipeMeta; - } - - boolean isCompleted() { - return isCompleted; - } -} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/NodePipeHeartbeat.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java similarity index 68% rename from iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/NodePipeHeartbeat.java rename to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java index 959fddb3f550..203ba96ed44d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/NodePipeHeartbeat.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime; +package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat; import org.apache.iotdb.commons.pipe.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta; @@ -30,23 +30,32 @@ import java.util.Map; import java.util.Objects; -class NodePipeHeartbeat { - private final Map pipeHeartbeatMap = new HashMap<>(); +public class PipeHeartbeat { - NodePipeHeartbeat( + private final Map pipeMetaMap = new HashMap<>(); + private final Map isCompletedMap = new HashMap<>(); + + public PipeHeartbeat( @NotNull final List pipeMetaByteBufferListFromAgent, /* @Nullable */ final List pipeCompletedListFromAgent) { for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) { final PipeMeta pipeMeta = PipeMeta.deserialize(pipeMetaByteBufferListFromAgent.get(i)); - pipeHeartbeatMap.put( + pipeMetaMap.put(pipeMeta.getStaticMeta(), pipeMeta); + isCompletedMap.put( pipeMeta.getStaticMeta(), - new SinglePipeHeartbeat( - pipeMeta, - Objects.nonNull(pipeCompletedListFromAgent) && pipeCompletedListFromAgent.get(i))); + Objects.nonNull(pipeCompletedListFromAgent) && pipeCompletedListFromAgent.get(i)); } } - Map getPipeHeartbeatMap() { - return pipeHeartbeatMap; + public PipeMeta getPipeMeta(PipeStaticMeta pipeStaticMeta) { + return pipeMetaMap.get(pipeStaticMeta); + } + + public Boolean isCompleted(PipeStaticMeta pipeStaticMeta) { + return isCompletedMap.get(pipeStaticMeta); + } + + public boolean isEmpty() { + return pipeMetaMap.isEmpty(); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java similarity index 92% rename from iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java rename to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java index 20be59ecf673..333f172afb4d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java @@ -17,14 +17,13 @@ * under the License. */ -package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime; +package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; import org.apache.iotdb.commons.pipe.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta; -import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.task.meta.PipeStatus; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta; @@ -62,7 +61,7 @@ public class PipeHeartbeatParser { needPushPipeMetaToDataNodes = new AtomicBoolean(false); } - synchronized void parseHeartbeat(final int nodeId, final NodePipeHeartbeat nodePipeHeartbeat) { + synchronized void parseHeartbeat(final int nodeId, final PipeHeartbeat pipeHeartbeat) { final long heartbeatCount = ++heartbeatCounter; final AtomicBoolean canSubmitHandleMetaChangeProcedure = new AtomicBoolean(false); @@ -83,7 +82,7 @@ synchronized void parseHeartbeat(final int nodeId, final NodePipeHeartbeat nodeP } } - if (nodePipeHeartbeat.getPipeHeartbeatMap().isEmpty() + if (pipeHeartbeat.isEmpty() && !(canSubmitHandleMetaChangeProcedure.get() && (needWriteConsensusOnConfigNodes.get() || needPushPipeMetaToDataNodes.get()))) { return; @@ -104,8 +103,8 @@ synchronized void parseHeartbeat(final int nodeId, final NodePipeHeartbeat nodeP } try { - if (!nodePipeHeartbeat.getPipeHeartbeatMap().isEmpty()) { - parseHeartbeatAndSaveMetaChangeLocally(pipeTaskInfo, nodeId, nodePipeHeartbeat); + if (!pipeHeartbeat.isEmpty()) { + parseHeartbeatAndSaveMetaChangeLocally(pipeTaskInfo, nodeId, pipeHeartbeat); } if (canSubmitHandleMetaChangeProcedure.get() @@ -129,23 +128,29 @@ synchronized void parseHeartbeat(final int nodeId, final NodePipeHeartbeat nodeP private void parseHeartbeatAndSaveMetaChangeLocally( final AtomicReference pipeTaskInfo, final int nodeId, - final NodePipeHeartbeat nodePipeHeartbeat) { - final Map pipeHeartbeatMap = - nodePipeHeartbeat.getPipeHeartbeatMap(); - + final PipeHeartbeat pipeHeartbeat) { for (final PipeMeta pipeMetaFromCoordinator : pipeTaskInfo.get().getPipeMetaList()) { - final SinglePipeHeartbeat singlePipeHeartbeat = - pipeHeartbeatMap.get(pipeMetaFromCoordinator.getStaticMeta()); + final PipeMeta pipeMetaFromAgent = + pipeHeartbeat.getPipeMeta(pipeMetaFromCoordinator.getStaticMeta()); + if (pipeMetaFromAgent == null) { + LOGGER.info( + "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, " + + "pipeMetaFromAgent is null, pipeMetaFromCoordinator: {}", + pipeMetaFromCoordinator); + continue; + } + // Remove completed pipes - final boolean pipeCompletedFromAgent = singlePipeHeartbeat.isCompleted(); + final Boolean pipeCompletedFromAgent = + pipeHeartbeat.isCompleted(pipeMetaFromCoordinator.getStaticMeta()); if (Boolean.TRUE.equals(pipeCompletedFromAgent)) { final PipeTemporaryMeta temporaryMeta = pipeMetaFromCoordinator.getTemporaryMeta(); temporaryMeta.markDataNodeCompleted(nodeId); - final Set dataNodeIds = + final Set uncompletedDataNodeIds = configManager.getNodeManager().getRegisteredDataNodeLocations().keySet(); - dataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds()); - if (dataNodeIds.isEmpty()) { + uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds()); + if (uncompletedDataNodeIds.isEmpty()) { pipeTaskInfo.get().removePipeMeta(pipeMetaFromCoordinator.getStaticMeta().getPipeName()); needWriteConsensusOnConfigNodes.set(true); needPushPipeMetaToDataNodes.set(true); @@ -153,15 +158,6 @@ private void parseHeartbeatAndSaveMetaChangeLocally( } } - final PipeMeta pipeMetaFromAgent = singlePipeHeartbeat.getPipeMeta(); - if (pipeMetaFromAgent == null) { - LOGGER.info( - "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, " - + "pipeMetaFromAgent is null, pipeMetaFromCoordinator: {}", - pipeMetaFromCoordinator); - continue; - } - final Map pipeTaskMetaMapFromCoordinator = pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap(); final Map pipeTaskMetaMapFromAgent = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java similarity index 91% rename from iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java rename to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java index aa8f797e21f5..462b6a017a8f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime; +package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; @@ -59,12 +59,12 @@ public class PipeHeartbeatScheduler { private Future heartbeatFuture; - PipeHeartbeatScheduler(final ConfigManager configManager) { + public PipeHeartbeatScheduler(final ConfigManager configManager) { this.configManager = configManager; this.pipeHeartbeatParser = new PipeHeartbeatParser(configManager); } - synchronized void start() { + public synchronized void start() { if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture == null) { heartbeatFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay( @@ -109,7 +109,7 @@ private synchronized void heartbeat() { (dataNodeId, resp) -> pipeHeartbeatParser.parseHeartbeat( dataNodeId, - new NodePipeHeartbeat(resp.getPipeMetaList(), resp.getPipeCompletedList()))); + new PipeHeartbeat(resp.getPipeMetaList(), resp.getPipeCompletedList()))); // config node heartbeat try { @@ -117,13 +117,13 @@ private synchronized void heartbeat() { PipeConfigNodeAgent.task().collectPipeMetaList(request, configNodeResp); pipeHeartbeatParser.parseHeartbeat( ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), - new NodePipeHeartbeat(configNodeResp.getPipeMetaList(), null)); + new PipeHeartbeat(configNodeResp.getPipeMetaList(), null)); } catch (final Exception e) { LOGGER.warn("Failed to collect pipe meta list from config node task agent", e); } } - synchronized void stop() { + public synchronized void stop() { if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture != null) { heartbeatFuture.cancel(false); heartbeatFuture = null; @@ -131,7 +131,7 @@ synchronized void stop() { } } - void parseHeartbeat(final int dataNodeId, final NodePipeHeartbeat nodePipeHeartbeat) { - pipeHeartbeatParser.parseHeartbeat(dataNodeId, nodePipeHeartbeat); + public void parseHeartbeat(final int dataNodeId, final PipeHeartbeat pipeHeartbeat) { + pipeHeartbeatParser.parseHeartbeat(dataNodeId, pipeHeartbeat); } } From 7b4fbfed06bd0974f19a8e196af24f9f1e301ef6 Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Wed, 22 May 2024 21:24:14 +0800 Subject: [PATCH 14/14] Update PipeHeartbeatParser.java --- .../coordinator/runtime/heartbeat/PipeHeartbeatParser.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java index 333f172afb4d..68ea140dda98 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java @@ -141,10 +141,11 @@ private void parseHeartbeatAndSaveMetaChangeLocally( } // Remove completed pipes - final Boolean pipeCompletedFromAgent = + final Boolean isPipeCompletedFromAgent = pipeHeartbeat.isCompleted(pipeMetaFromCoordinator.getStaticMeta()); - if (Boolean.TRUE.equals(pipeCompletedFromAgent)) { + if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) { final PipeTemporaryMeta temporaryMeta = pipeMetaFromCoordinator.getTemporaryMeta(); + temporaryMeta.markDataNodeCompleted(nodeId); final Set uncompletedDataNodeIds =