Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@

import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.compatibility.runtimemeta.PipeRuntimeMetaVersion;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

public class CreatePipePlanV2 extends ConfigPhysicalPlan {

Expand Down Expand Up @@ -61,6 +63,33 @@ protected void serializeImpl(DataOutputStream stream) throws IOException {
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
pipeStaticMeta = PipeStaticMeta.deserialize(buffer);
pipeRuntimeMeta = PipeRuntimeMeta.deserialize(buffer);
pipeRuntimeMeta = PipeRuntimeMetaVersion.deserializeRuntimeMeta(buffer);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
CreatePipePlanV2 that = (CreatePipePlanV2) obj;
return pipeStaticMeta.equals(that.pipeStaticMeta) && pipeRuntimeMeta == that.pipeRuntimeMeta;
}

@Override
public int hashCode() {
return Objects.hash(pipeStaticMeta, pipeRuntimeMeta);
}

@Override
public String toString() {
return "CreatePipePlanV2{"
+ "pipeStaticMeta='"
+ pipeStaticMeta
+ "', pipeRuntimeMeta="
+ pipeRuntimeMeta
+ "'}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

public class DropPipePlanV2 extends ConfigPhysicalPlan {

Expand Down Expand Up @@ -54,4 +55,26 @@ protected void serializeImpl(DataOutputStream stream) throws IOException {
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
pipeName = BasicStructureSerDeUtil.readString(buffer);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
DropPipePlanV2 that = (DropPipePlanV2) obj;
return pipeName.equals(that.pipeName);
}

@Override
public int hashCode() {
return Objects.hash(pipeName);
}

@Override
public String toString() {
return "DropPipePlanV2{" + "pipeName='" + pipeName + "'}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

public class SetPipeStatusPlanV2 extends ConfigPhysicalPlan {

Expand Down Expand Up @@ -63,4 +64,26 @@ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
pipeName = ReadWriteIOUtils.readString(buffer);
status = PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(buffer));
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
SetPipeStatusPlanV2 that = (SetPipeStatusPlanV2) obj;
return pipeName.equals(that.pipeName) && status.equals(that.status);
}

@Override
public int hashCode() {
return Objects.hash(pipeName, status);
}

@Override
public String toString() {
return "SetPipeStatusPlanV2{" + "pipeName='" + pipeName + "', status=" + status + "'}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public TShowPipeResp convertToTShowPipeResp() {
.append(e.getMessage())
.append("\n");
}
for (PipeTaskMeta pipeTaskMeta : runtimeMeta.getConsensusGroupId2TaskMetaMap().values()) {
for (PipeTaskMeta pipeTaskMeta : runtimeMeta.getDataRegionId2TaskMetaMap().values()) {
for (PipeRuntimeException e : pipeTaskMeta.getExceptionMessages()) {
exceptionMessageBuilder
.append(DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
}

final Map<TConsensusGroupId, PipeTaskMeta> pipeTaskMetaMapOnConfigNode =
pipeMetaOnConfigNode.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
pipeMetaOnConfigNode.getRuntimeMeta().getDataRegionId2TaskMetaMap();
final Map<TConsensusGroupId, PipeTaskMeta> pipeTaskMetaMapFromDataNode =
pipeMetaFromDataNode.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
pipeMetaFromDataNode.getRuntimeMeta().getDataRegionId2TaskMetaMap();
for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> runtimeMetaOnConfigNode :
pipeTaskMetaMapOnConfigNode.entrySet()) {
if (runtimeMetaOnConfigNode.getValue().getLeaderDataNodeId() != dataNodeId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ private TSStatus handleLeaderChangeInternal(PipeHandleLeaderChangePlan plan) {
.forEach(
pipeMeta -> {
final Map<TConsensusGroupId, PipeTaskMeta> consensusGroupIdToTaskMetaMap =
pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
pipeMeta.getRuntimeMeta().getDataRegionId2TaskMetaMap();

if (consensusGroupIdToTaskMetaMap.containsKey(dataRegionGroupId)) {
// If the data region leader is -1, it means the data region is
Expand Down Expand Up @@ -443,7 +443,7 @@ private boolean hasExceptionsInternal(String pipeName) {

final AtomicBoolean hasException = new AtomicBoolean(false);
runtimeMeta
.getConsensusGroupId2TaskMetaMap()
.getDataRegionId2TaskMetaMap()
.values()
.forEach(
pipeTaskMeta -> {
Expand Down Expand Up @@ -505,7 +505,7 @@ private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(Str
}

runtimeMeta
.getConsensusGroupId2TaskMetaMap()
.getDataRegionId2TaskMetaMap()
.values()
.forEach(
pipeTaskMeta -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,30 @@ protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
createPipeRequest.getProcessorAttributes(),
createPipeRequest.getConnectorAttributes());

final Map<TConsensusGroupId, PipeTaskMeta> consensusGroupIdToTaskMetaMap = new HashMap<>();
final Map<TConsensusGroupId, PipeTaskMeta> dataRegionIdToTaskMetaMap = new HashMap<>();
final Map<TConsensusGroupId, PipeTaskMeta> schemaRegionIdToTaskMetaMap = new HashMap<>();

env.getConfigManager()
.getLoadManager()
.getRegionLeaderMap()
.forEach(
(regionGroupId, regionLeaderNodeId) -> {
if (regionGroupId.getType().equals(TConsensusGroupType.DataRegion)) {
final String databaseName =
env.getConfigManager()
.getPartitionManager()
.getRegionStorageGroup(regionGroupId);
if (databaseName != null && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)) {
// Pipe only collect user's data, filter metric database here.
consensusGroupIdToTaskMetaMap.put(
final String databaseName =
env.getConfigManager().getPartitionManager().getRegionStorageGroup(regionGroupId);
if (databaseName != null && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)) {
TConsensusGroupType type = regionGroupId.getType();
if (type.equals(TConsensusGroupType.DataRegion)) {
dataRegionIdToTaskMetaMap.put(
regionGroupId,
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, regionLeaderNodeId));
} else if (type.equals(TConsensusGroupType.SchemaRegion)) {
schemaRegionIdToTaskMetaMap.put(
regionGroupId,
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, regionLeaderNodeId));
}
}
});
pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
pipeRuntimeMeta = new PipeRuntimeMeta(dataRegionIdToTaskMetaMap, schemaRegionIdToTaskMetaMap);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1109,13 +1109,17 @@ public void CreatePipePlanV2Test() throws IOException {
extractorAttributes.put("extractor", "org.apache.iotdb.pipe.extractor.DefaultExtractor");
processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor");
connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocal.ThriftTransporter");
PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
Map<TConsensusGroupId, PipeTaskMeta> pipeTasks = new HashMap<>();
pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
PipeTaskMeta dataRegionPipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
PipeTaskMeta schemaRegionPipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
Map<TConsensusGroupId, PipeTaskMeta> dataRegionPipeTasks = new HashMap<>();
Map<TConsensusGroupId, PipeTaskMeta> schemaRegionPipeTasks = new HashMap<>();
dataRegionPipeTasks.put(new TConsensusGroupId(DataRegion, 1), dataRegionPipeTaskMeta);
schemaRegionPipeTasks.put(new TConsensusGroupId(SchemaRegion, 2), schemaRegionPipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
new PipeStaticMeta(
"testPipe", 121, extractorAttributes, processorAttributes, connectorAttributes);
PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
PipeRuntimeMeta pipeRuntimeMeta =
new PipeRuntimeMeta(dataRegionPipeTasks, schemaRegionPipeTasks);
CreatePipePlanV2 createPipePlanV2 = new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta);
CreatePipePlanV2 createPipePlanV21 =
(CreatePipePlanV2)
Expand Down Expand Up @@ -1229,6 +1233,18 @@ public void pipeHandleMetaChangePlanTest() throws IOException {
new PipeTaskMeta(
MinimumProgressIndex.INSTANCE, 789)); // TODO: replace with IoTConsensus
}
},
new HashMap() {
{
put(
new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 111),
new PipeTaskMeta(
MinimumProgressIndex.INSTANCE, 222)); // TODO: replace with IoTConsensus
put(
new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 333),
new PipeTaskMeta(
MinimumProgressIndex.INSTANCE, 444)); // TODO: replace with IoTConsensus
}
});
pipeMetaList.add(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta));
PipeHandleMetaChangePlan pipeHandleMetaChangePlan1 = new PipeHandleMetaChangePlan(pipeMetaList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Map;

import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.DataRegion;
import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.SchemaRegion;

public class PipeTableRespTest {

Expand All @@ -55,13 +56,17 @@ public PipeTableResp constructPipeTableResp() {
connectorAttributes.put("host", "127.0.0.1");
connectorAttributes.put("port", "6667");

PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
Map<TConsensusGroupId, PipeTaskMeta> pipeTasks = new HashMap<>();
pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
PipeTaskMeta dataRegionPipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
PipeTaskMeta schemaRegionPipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2);
Map<TConsensusGroupId, PipeTaskMeta> dataRegionPipeTasks = new HashMap<>();
Map<TConsensusGroupId, PipeTaskMeta> schemaRegionPipeTasks = new HashMap<>();
dataRegionPipeTasks.put(new TConsensusGroupId(DataRegion, 1), dataRegionPipeTaskMeta);
schemaRegionPipeTasks.put(new TConsensusGroupId(SchemaRegion, 2), schemaRegionPipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
new PipeStaticMeta(
"testPipe", 121, extractorAttributes, processorAttributes, connectorAttributes);
PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
PipeRuntimeMeta pipeRuntimeMeta =
new PipeRuntimeMeta(dataRegionPipeTasks, schemaRegionPipeTasks);
pipeMetaList.add(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta));

// PipeMeta 2
Expand All @@ -75,13 +80,17 @@ public PipeTableResp constructPipeTableResp() {
connectorAttributes1.put("host", "127.0.0.1");
connectorAttributes1.put("port", "6667");

PipeTaskMeta pipeTaskMeta1 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
Map<TConsensusGroupId, PipeTaskMeta> pipeTasks1 = new HashMap<>();
pipeTasks1.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta1);
PipeTaskMeta dataRegionPipeTaskMeta1 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
PipeTaskMeta schemaRegionPipeTaskMeta1 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2);
Map<TConsensusGroupId, PipeTaskMeta> dataRegionPipeTasks1 = new HashMap<>();
Map<TConsensusGroupId, PipeTaskMeta> schemaRegionPipeTasks1 = new HashMap<>();
dataRegionPipeTasks1.put(new TConsensusGroupId(DataRegion, 1), dataRegionPipeTaskMeta1);
schemaRegionPipeTasks1.put(new TConsensusGroupId(SchemaRegion, 2), schemaRegionPipeTaskMeta1);
PipeStaticMeta pipeStaticMeta1 =
new PipeStaticMeta(
"testPipe", 121, extractorAttributes1, processorAttributes1, connectorAttributes1);
PipeRuntimeMeta pipeRuntimeMeta1 = new PipeRuntimeMeta(pipeTasks1);
PipeRuntimeMeta pipeRuntimeMeta1 =
new PipeRuntimeMeta(dataRegionPipeTasks1, schemaRegionPipeTasks1);
pipeMetaList.add(new PipeMeta(pipeStaticMeta1, pipeRuntimeMeta1));

// PipeMeta 3
Expand All @@ -95,13 +104,17 @@ public PipeTableResp constructPipeTableResp() {
connectorAttributes2.put("host", "172.30.30.30");
connectorAttributes2.put("port", "6667");

PipeTaskMeta pipeTaskMeta2 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
Map<TConsensusGroupId, PipeTaskMeta> pipeTasks2 = new HashMap<>();
pipeTasks2.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta2);
PipeTaskMeta dataRegionPipeTaskMeta2 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
PipeTaskMeta schemaRegionPipeTaskMeta2 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2);
Map<TConsensusGroupId, PipeTaskMeta> dataRegionPipeTasks2 = new HashMap<>();
Map<TConsensusGroupId, PipeTaskMeta> schemaRegionPipeTasks2 = new HashMap<>();
dataRegionPipeTasks2.put(new TConsensusGroupId(DataRegion, 1), dataRegionPipeTaskMeta2);
schemaRegionPipeTasks2.put(new TConsensusGroupId(SchemaRegion, 2), schemaRegionPipeTaskMeta2);
PipeStaticMeta pipeStaticMeta2 =
new PipeStaticMeta(
"testPipe", 121, extractorAttributes2, processorAttributes2, connectorAttributes2);
PipeRuntimeMeta pipeRuntimeMeta2 = new PipeRuntimeMeta(pipeTasks2);
PipeRuntimeMeta pipeRuntimeMeta2 =
new PipeRuntimeMeta(dataRegionPipeTasks2, schemaRegionPipeTasks2);
pipeMetaList.add(new PipeMeta(pipeStaticMeta2, pipeRuntimeMeta2));

return new PipeTableResp(status, pipeMetaList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Map;

import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.DataRegion;
import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.SchemaRegion;
import static org.apache.iotdb.db.utils.constant.TestConstant.BASE_OUTPUT_PATH;

public class PipeInfoTest {
Expand Down Expand Up @@ -86,13 +87,17 @@ public void testSnapshot() throws TException, IOException {
connectorAttributes.put("host", "127.0.0.1");
connectorAttributes.put("port", "6667");

PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
Map<TConsensusGroupId, PipeTaskMeta> pipeTasks = new HashMap<>();
pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
PipeTaskMeta dataRegionPipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
PipeTaskMeta schemaRegionPipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2);
Map<TConsensusGroupId, PipeTaskMeta> dataRegionPipeTasks = new HashMap<>();
Map<TConsensusGroupId, PipeTaskMeta> schemaRegionPipeTasks = new HashMap<>();
dataRegionPipeTasks.put(new TConsensusGroupId(DataRegion, 1), dataRegionPipeTaskMeta);
schemaRegionPipeTasks.put(new TConsensusGroupId(SchemaRegion, 2), schemaRegionPipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
new PipeStaticMeta(
pipeName, 121, extractorAttributes, processorAttributes, connectorAttributes);
PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
PipeRuntimeMeta pipeRuntimeMeta =
new PipeRuntimeMeta(dataRegionPipeTasks, schemaRegionPipeTasks);
CreatePipePlanV2 createPipePlanV2 = new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta);
pipeInfo.getPipeTaskInfo().createPipe(createPipePlanV2);

Expand Down Expand Up @@ -121,13 +126,17 @@ public void testManagement() {
extractorAttributes.put("extractor", "org.apache.iotdb.pipe.extractor.DefaultExtractor");
processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor");
connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocal.ThriftTransporter");
PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
Map<TConsensusGroupId, PipeTaskMeta> pipeTasks = new HashMap<>();
pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
PipeTaskMeta dataRegionPipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
PipeTaskMeta schemaRegionPipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2);
Map<TConsensusGroupId, PipeTaskMeta> dataRegionPipeTasks = new HashMap<>();
Map<TConsensusGroupId, PipeTaskMeta> schemaRegionPipeTasks = new HashMap<>();
dataRegionPipeTasks.put(new TConsensusGroupId(DataRegion, 1), dataRegionPipeTaskMeta);
schemaRegionPipeTasks.put(new TConsensusGroupId(SchemaRegion, 2), schemaRegionPipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
new PipeStaticMeta(
pipeName, 121, extractorAttributes, processorAttributes, connectorAttributes);
PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
PipeRuntimeMeta pipeRuntimeMeta =
new PipeRuntimeMeta(dataRegionPipeTasks, schemaRegionPipeTasks);
CreatePipePlanV2 createPipePlanV2 = new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta);
pipeInfo.getPipeTaskInfo().createPipe(createPipePlanV2);

Expand Down
Loading