Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: Added completion signal to historical events & allow all data regions' completion signal to drop the pipe #12490

Merged
merged 17 commits into from
May 23, 2024
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.pipe.it.autocreate;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.rpc.TSStatusCode;

import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class IoTDBPipeAutoDropIT extends AbstractPipeDualAutoIT {
@Test
public void testAutoDropInHistoricalTransfer() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {

if (!TestUtils.tryExecuteNonQueryWithRetry(
senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
return;
}

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.history.terminate-pipe-on-all-consumed", "true");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

TestUtils.assertDataEventuallyOnEnv(
senderEnv,
"show pipes",
"ID,CreationTime,State,PipeSource,PipeProcessor,PipeSink,ExceptionMessage,",
Collections.emptySet());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void testPureDeleteInclusion() throws Exception {
senderEnv,
Arrays.asList(
"create timeseries root.ln.wf01.wt01.status with datatype=BOOLEAN,encoding=PLAIN",
"insert into root.ln.wf01.wt01(time, status) values(0, 1)",
"insert into root.ln.wf01.wt01(time, status) values(0, true)",
"flush"))) {
return;
}
Expand All @@ -240,7 +240,7 @@ public void testPureDeleteInclusion() throws Exception {
receiverEnv,
Arrays.asList(
"create timeseries root.ln.wf01.wt01.status1 with datatype=BOOLEAN,encoding=PLAIN",
"insert into root.ln.wf01.wt01(time, status1) values(0, 1)",
"insert into root.ln.wf01.wt01(time, status1) values(0, true)",
"flush"))) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) {
regionDisk.putAll(heartbeatResp.getRegionDisk());
}
if (heartbeatResp.getPipeMetaList() != null) {
pipeRuntimeCoordinator.parseHeartbeat(nodeId, heartbeatResp.getPipeMetaList());
pipeRuntimeCoordinator.parseHeartbeat(
nodeId, heartbeatResp.getPipeMetaList(), heartbeatResp.getPipeCompletedList());
}
if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) {
loadManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
import org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent;
import org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeat;
import org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeatScheduler;

import javax.validation.constraints.NotNull;

Expand All @@ -45,7 +47,7 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
private final PipeMetaSyncer pipeMetaSyncer;
private final PipeHeartbeatScheduler pipeHeartbeatScheduler;

public PipeRuntimeCoordinator(ConfigManager configManager) {
public PipeRuntimeCoordinator(final ConfigManager configManager) {
if (procedureSubmitterHolder.get() == null) {
synchronized (PipeRuntimeCoordinator.class) {
if (procedureSubmitterHolder.get() == null) {
Expand All @@ -71,18 +73,18 @@ public synchronized void onConfigRegionGroupLeaderChanged() {
}

@Override
public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
public void onNodeStatisticsChanged(final NodeStatisticsChangeEvent event) {
// Do nothing
}

@Override
public void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent event) {
public void onRegionGroupStatisticsChanged(final RegionGroupStatisticsChangeEvent event) {
// Do nothing
}

@Override
public synchronized void onConsensusGroupStatisticsChanged(
ConsensusGroupStatisticsChangeEvent event) {
final ConsensusGroupStatisticsChangeEvent event) {
pipeLeaderChangeHandler.onConsensusGroupStatisticsChanged(event);
}

Expand All @@ -103,7 +105,11 @@ public void stopPipeHeartbeat() {
}

public void parseHeartbeat(
int dataNodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromDataNode) {
pipeHeartbeatScheduler.parseHeartbeat(dataNodeId, pipeMetaByteBufferListFromDataNode);
final int dataNodeId,
@NotNull final List<ByteBuffer> pipeMetaByteBufferListFromDataNode,
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent) {
pipeHeartbeatScheduler.parseHeartbeat(
dataNodeId,
new PipeHeartbeat(pipeMetaByteBufferListFromDataNode, pipeCompletedListFromAgent));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;

import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;

import javax.validation.constraints.NotNull;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class PipeHeartbeat {

private final Map<PipeStaticMeta, PipeMeta> pipeMetaMap = new HashMap<>();
private final Map<PipeStaticMeta, Boolean> isCompletedMap = new HashMap<>();

public PipeHeartbeat(
@NotNull final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent) {
for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) {
final PipeMeta pipeMeta = PipeMeta.deserialize(pipeMetaByteBufferListFromAgent.get(i));
pipeMetaMap.put(pipeMeta.getStaticMeta(), pipeMeta);
isCompletedMap.put(
pipeMeta.getStaticMeta(),
Objects.nonNull(pipeCompletedListFromAgent) && pipeCompletedListFromAgent.get(i));
}
}

public PipeMeta getPipeMeta(PipeStaticMeta pipeStaticMeta) {
return pipeMetaMap.get(pipeStaticMeta);
}

public Boolean isCompleted(PipeStaticMeta pipeStaticMeta) {
return isCompletedMap.get(pipeStaticMeta);
}

public boolean isEmpty() {
return pipeMetaMap.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,25 @@
* under the License.
*/

package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime;
package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;

import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.validation.constraints.NotNull;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -55,7 +51,7 @@ public class PipeHeartbeatParser {
private final AtomicBoolean needWriteConsensusOnConfigNodes;
private final AtomicBoolean needPushPipeMetaToDataNodes;

PipeHeartbeatParser(ConfigManager configManager) {
PipeHeartbeatParser(final ConfigManager configManager) {
this.configManager = configManager;

heartbeatCounter = 0;
Expand All @@ -65,8 +61,7 @@ public class PipeHeartbeatParser {
needPushPipeMetaToDataNodes = new AtomicBoolean(false);
}

public synchronized void parseHeartbeat(
int nodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromAgent) {
synchronized void parseHeartbeat(final int nodeId, final PipeHeartbeat pipeHeartbeat) {
final long heartbeatCount = ++heartbeatCounter;

final AtomicBoolean canSubmitHandleMetaChangeProcedure = new AtomicBoolean(false);
Expand All @@ -87,7 +82,7 @@ public synchronized void parseHeartbeat(
}
}

if (pipeMetaByteBufferListFromAgent.isEmpty()
if (pipeHeartbeat.isEmpty()
&& !(canSubmitHandleMetaChangeProcedure.get()
&& (needWriteConsensusOnConfigNodes.get() || needPushPipeMetaToDataNodes.get()))) {
return;
Expand All @@ -108,9 +103,8 @@ public synchronized void parseHeartbeat(
}

try {
if (!pipeMetaByteBufferListFromAgent.isEmpty()) {
parseHeartbeatAndSaveMetaChangeLocally(
pipeTaskInfo, nodeId, pipeMetaByteBufferListFromAgent);
if (!pipeHeartbeat.isEmpty()) {
parseHeartbeatAndSaveMetaChangeLocally(pipeTaskInfo, nodeId, pipeHeartbeat);
}

if (canSubmitHandleMetaChangeProcedure.get()
Expand All @@ -134,16 +128,10 @@ public synchronized void parseHeartbeat(
private void parseHeartbeatAndSaveMetaChangeLocally(
final AtomicReference<PipeTaskInfo> pipeTaskInfo,
final int nodeId,
@NotNull final List<ByteBuffer> pipeMetaByteBufferListFromAgent) {
final Map<PipeStaticMeta, PipeMeta> pipeMetaMapFromAgent = new HashMap<>();
for (ByteBuffer byteBuffer : pipeMetaByteBufferListFromAgent) {
final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
pipeMetaMapFromAgent.put(pipeMeta.getStaticMeta(), pipeMeta);
}

final PipeHeartbeat pipeHeartbeat) {
for (final PipeMeta pipeMetaFromCoordinator : pipeTaskInfo.get().getPipeMetaList()) {
final PipeMeta pipeMetaFromAgent =
pipeMetaMapFromAgent.get(pipeMetaFromCoordinator.getStaticMeta());
pipeHeartbeat.getPipeMeta(pipeMetaFromCoordinator.getStaticMeta());
if (pipeMetaFromAgent == null) {
LOGGER.info(
"PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
Expand All @@ -152,6 +140,25 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
continue;
}

// Remove completed pipes
final Boolean isPipeCompletedFromAgent =
pipeHeartbeat.isCompleted(pipeMetaFromCoordinator.getStaticMeta());
if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {
final PipeTemporaryMeta temporaryMeta = pipeMetaFromCoordinator.getTemporaryMeta();

temporaryMeta.markDataNodeCompleted(nodeId);

final Set<Integer> uncompletedDataNodeIds =
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
if (uncompletedDataNodeIds.isEmpty()) {
pipeTaskInfo.get().removePipeMeta(pipeMetaFromCoordinator.getStaticMeta().getPipeName());
needWriteConsensusOnConfigNodes.set(true);
needPushPipeMetaToDataNodes.set(true);
continue;
}
}

final Map<Integer, PipeTaskMeta> pipeTaskMetaMapFromCoordinator =
pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
final Map<Integer, PipeTaskMeta> pipeTaskMetaMapFromAgent =
Expand Down
Loading
Loading