Skip to content

Commit

Permalink
Pipe: Add completion signal to historical events & allow all data reg…
Browse files Browse the repository at this point in the history
…ions' completion signal to drop the pipe (#12490)

Co-authored-by: Steve Yurong Su <rong@apache.org>
  • Loading branch information
Caideyipi and SteveYurongSu committed May 23, 2024
1 parent 9a29af7 commit 40934dd
Show file tree
Hide file tree
Showing 16 changed files with 593 additions and 116 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.pipe.it.autocreate;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.rpc.TSStatusCode;

import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class IoTDBPipeAutoDropIT extends AbstractPipeDualAutoIT {
@Test
public void testAutoDropInHistoricalTransfer() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {

if (!TestUtils.tryExecuteNonQueryWithRetry(
senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
return;
}

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.history.terminate-pipe-on-all-consumed", "true");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

TestUtils.assertDataEventuallyOnEnv(
senderEnv,
"show pipes",
"ID,CreationTime,State,PipeSource,PipeProcessor,PipeSink,ExceptionMessage,",
Collections.emptySet());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void testPureDeleteInclusion() throws Exception {
senderEnv,
Arrays.asList(
"create timeseries root.ln.wf01.wt01.status with datatype=BOOLEAN,encoding=PLAIN",
"insert into root.ln.wf01.wt01(time, status) values(0, 1)",
"insert into root.ln.wf01.wt01(time, status) values(0, true)",
"flush"))) {
return;
}
Expand All @@ -240,7 +240,7 @@ public void testPureDeleteInclusion() throws Exception {
receiverEnv,
Arrays.asList(
"create timeseries root.ln.wf01.wt01.status1 with datatype=BOOLEAN,encoding=PLAIN",
"insert into root.ln.wf01.wt01(time, status1) values(0, 1)",
"insert into root.ln.wf01.wt01(time, status1) values(0, true)",
"flush"))) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) {
regionDisk.putAll(heartbeatResp.getRegionDisk());
}
if (heartbeatResp.getPipeMetaList() != null) {
pipeRuntimeCoordinator.parseHeartbeat(nodeId, heartbeatResp.getPipeMetaList());
pipeRuntimeCoordinator.parseHeartbeat(
nodeId, heartbeatResp.getPipeMetaList(), heartbeatResp.getPipeCompletedList());
}
if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) {
loadManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
import org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent;
import org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeat;
import org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeatScheduler;

import javax.validation.constraints.NotNull;

Expand All @@ -45,7 +47,7 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
private final PipeMetaSyncer pipeMetaSyncer;
private final PipeHeartbeatScheduler pipeHeartbeatScheduler;

public PipeRuntimeCoordinator(ConfigManager configManager) {
public PipeRuntimeCoordinator(final ConfigManager configManager) {
if (procedureSubmitterHolder.get() == null) {
synchronized (PipeRuntimeCoordinator.class) {
if (procedureSubmitterHolder.get() == null) {
Expand All @@ -71,18 +73,18 @@ public synchronized void onConfigRegionGroupLeaderChanged() {
}

@Override
public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
public void onNodeStatisticsChanged(final NodeStatisticsChangeEvent event) {
// Do nothing
}

@Override
public void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent event) {
public void onRegionGroupStatisticsChanged(final RegionGroupStatisticsChangeEvent event) {
// Do nothing
}

@Override
public synchronized void onConsensusGroupStatisticsChanged(
ConsensusGroupStatisticsChangeEvent event) {
final ConsensusGroupStatisticsChangeEvent event) {
pipeLeaderChangeHandler.onConsensusGroupStatisticsChanged(event);
}

Expand All @@ -103,7 +105,11 @@ public void stopPipeHeartbeat() {
}

public void parseHeartbeat(
int dataNodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromDataNode) {
pipeHeartbeatScheduler.parseHeartbeat(dataNodeId, pipeMetaByteBufferListFromDataNode);
final int dataNodeId,
@NotNull final List<ByteBuffer> pipeMetaByteBufferListFromDataNode,
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent) {
pipeHeartbeatScheduler.parseHeartbeat(
dataNodeId,
new PipeHeartbeat(pipeMetaByteBufferListFromDataNode, pipeCompletedListFromAgent));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;

import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;

import javax.validation.constraints.NotNull;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class PipeHeartbeat {

private final Map<PipeStaticMeta, PipeMeta> pipeMetaMap = new HashMap<>();
private final Map<PipeStaticMeta, Boolean> isCompletedMap = new HashMap<>();

public PipeHeartbeat(
@NotNull final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent) {
for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) {
final PipeMeta pipeMeta = PipeMeta.deserialize(pipeMetaByteBufferListFromAgent.get(i));
pipeMetaMap.put(pipeMeta.getStaticMeta(), pipeMeta);
isCompletedMap.put(
pipeMeta.getStaticMeta(),
Objects.nonNull(pipeCompletedListFromAgent) && pipeCompletedListFromAgent.get(i));
}
}

public PipeMeta getPipeMeta(PipeStaticMeta pipeStaticMeta) {
return pipeMetaMap.get(pipeStaticMeta);
}

public Boolean isCompleted(PipeStaticMeta pipeStaticMeta) {
return isCompletedMap.get(pipeStaticMeta);
}

public boolean isEmpty() {
return pipeMetaMap.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,25 @@
* under the License.
*/

package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime;
package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;

import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.validation.constraints.NotNull;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -55,7 +51,7 @@ public class PipeHeartbeatParser {
private final AtomicBoolean needWriteConsensusOnConfigNodes;
private final AtomicBoolean needPushPipeMetaToDataNodes;

PipeHeartbeatParser(ConfigManager configManager) {
PipeHeartbeatParser(final ConfigManager configManager) {
this.configManager = configManager;

heartbeatCounter = 0;
Expand All @@ -65,8 +61,7 @@ public class PipeHeartbeatParser {
needPushPipeMetaToDataNodes = new AtomicBoolean(false);
}

public synchronized void parseHeartbeat(
int nodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromAgent) {
synchronized void parseHeartbeat(final int nodeId, final PipeHeartbeat pipeHeartbeat) {
final long heartbeatCount = ++heartbeatCounter;

final AtomicBoolean canSubmitHandleMetaChangeProcedure = new AtomicBoolean(false);
Expand All @@ -87,7 +82,7 @@ public synchronized void parseHeartbeat(
}
}

if (pipeMetaByteBufferListFromAgent.isEmpty()
if (pipeHeartbeat.isEmpty()
&& !(canSubmitHandleMetaChangeProcedure.get()
&& (needWriteConsensusOnConfigNodes.get() || needPushPipeMetaToDataNodes.get()))) {
return;
Expand All @@ -108,9 +103,8 @@ public synchronized void parseHeartbeat(
}

try {
if (!pipeMetaByteBufferListFromAgent.isEmpty()) {
parseHeartbeatAndSaveMetaChangeLocally(
pipeTaskInfo, nodeId, pipeMetaByteBufferListFromAgent);
if (!pipeHeartbeat.isEmpty()) {
parseHeartbeatAndSaveMetaChangeLocally(pipeTaskInfo, nodeId, pipeHeartbeat);
}

if (canSubmitHandleMetaChangeProcedure.get()
Expand All @@ -134,16 +128,10 @@ public synchronized void parseHeartbeat(
private void parseHeartbeatAndSaveMetaChangeLocally(
final AtomicReference<PipeTaskInfo> pipeTaskInfo,
final int nodeId,
@NotNull final List<ByteBuffer> pipeMetaByteBufferListFromAgent) {
final Map<PipeStaticMeta, PipeMeta> pipeMetaMapFromAgent = new HashMap<>();
for (ByteBuffer byteBuffer : pipeMetaByteBufferListFromAgent) {
final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
pipeMetaMapFromAgent.put(pipeMeta.getStaticMeta(), pipeMeta);
}

final PipeHeartbeat pipeHeartbeat) {
for (final PipeMeta pipeMetaFromCoordinator : pipeTaskInfo.get().getPipeMetaList()) {
final PipeMeta pipeMetaFromAgent =
pipeMetaMapFromAgent.get(pipeMetaFromCoordinator.getStaticMeta());
pipeHeartbeat.getPipeMeta(pipeMetaFromCoordinator.getStaticMeta());
if (pipeMetaFromAgent == null) {
LOGGER.info(
"PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
Expand All @@ -152,6 +140,25 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
continue;
}

// Remove completed pipes
final Boolean isPipeCompletedFromAgent =
pipeHeartbeat.isCompleted(pipeMetaFromCoordinator.getStaticMeta());
if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {
final PipeTemporaryMeta temporaryMeta = pipeMetaFromCoordinator.getTemporaryMeta();

temporaryMeta.markDataNodeCompleted(nodeId);

final Set<Integer> uncompletedDataNodeIds =
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
if (uncompletedDataNodeIds.isEmpty()) {
pipeTaskInfo.get().removePipeMeta(pipeMetaFromCoordinator.getStaticMeta().getPipeName());
needWriteConsensusOnConfigNodes.set(true);
needPushPipeMetaToDataNodes.set(true);
continue;
}
}

final Map<Integer, PipeTaskMeta> pipeTaskMetaMapFromCoordinator =
pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
final Map<Integer, PipeTaskMeta> pipeTaskMetaMapFromAgent =
Expand Down
Loading

0 comments on commit 40934dd

Please sign in to comment.