Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@
import org.apache.iotdb.commons.pipe.task.PipeTask;
import org.apache.iotdb.commons.pipe.task.stage.PipeTaskStage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipeDataNodeTask implements PipeTask {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataNodeTask.class);

protected final String pipeName;
protected final TConsensusGroupId regionId;

Expand All @@ -48,30 +53,50 @@ public PipeDataNodeTask(

@Override
public void create() {
final long startTime = System.currentTimeMillis();
extractorStage.create();
processorStage.create();
connectorStage.create();
LOGGER.info(
"Create pipe DN task {} successfully within {} ms",
this,
System.currentTimeMillis() - startTime);
}

@Override
public void drop() {
final long startTime = System.currentTimeMillis();
extractorStage.drop();
processorStage.drop();
connectorStage.drop();
LOGGER.info(
"Drop pipe DN task {} successfully within {} ms",
this,
System.currentTimeMillis() - startTime);
}

@Override
public void start() {
final long startTime = System.currentTimeMillis();
extractorStage.start();
processorStage.start();
connectorStage.start();
LOGGER.info(
"Start pipe DN task {} successfully within {} ms",
this,
System.currentTimeMillis() - startTime);
}

@Override
public void stop() {
final long startTime = System.currentTimeMillis();
extractorStage.stop();
processorStage.stop();
connectorStage.stop();
LOGGER.info(
"Stop pipe DN task {} successfully within {} ms",
this,
System.currentTimeMillis() - startTime);
}

public TConsensusGroupId getRegionId() {
Expand All @@ -81,4 +106,9 @@ public TConsensusGroupId getRegionId() {
public String getPipeName() {
return pipeName;
}

@Override
public String toString() {
return pipeName + "@" + regionId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,17 @@ private boolean createPipe(PipeMeta pipeMetaFromCoordinator) {
dropPipe(pipeName, existedPipeMeta.getStaticMeta().getCreationTime());
}

// Create pipe tasks and trigger create() method for each pipe task
// Create pipe tasks
final Map<TConsensusGroupId, PipeTask> pipeTasks = buildPipeTasks(pipeMetaFromCoordinator);
for (PipeTask pipeTask : pipeTasks.values()) {
pipeTask.create();
}

// Trigger create() method for each pipe task by parallel stream
final long startTime = System.currentTimeMillis();
pipeTasks.values().parallelStream().forEach(PipeTask::create);
LOGGER.info(
"Create all pipe tasks on Pipe {} successfully within {} ms",
pipeName,
System.currentTimeMillis() - startTime);

pipeTaskManager.addPipeTasks(pipeMetaFromCoordinator.getStaticMeta(), pipeTasks);

// No matter the pipe status from coordinator is RUNNING or STOPPED, we always set the status
Expand Down Expand Up @@ -444,7 +450,7 @@ private void dropPipe(String pipeName, long creationTime) {
// dropPipeTaskByConsensusGroup).
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);

// Drop pipe tasks and trigger drop() method for each pipe task
// Drop pipe tasks
final Map<TConsensusGroupId, PipeTask> pipeTasks =
pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
if (pipeTasks == null) {
Expand All @@ -455,9 +461,14 @@ private void dropPipe(String pipeName, long creationTime) {
creationTime);
return;
}
for (PipeTask pipeTask : pipeTasks.values()) {
pipeTask.drop();
}

// Trigger drop() method for each pipe task by parallel stream
final long startTime = System.currentTimeMillis();
pipeTasks.values().parallelStream().forEach(PipeTask::drop);
LOGGER.info(
"Drop all pipe tasks on Pipe {} successfully within {} ms",
pipeName,
System.currentTimeMillis() - startTime);

// Remove pipe meta from pipe meta keeper
pipeMetaKeeper.removePipeMeta(pipeName);
Expand All @@ -475,17 +486,22 @@ private void dropPipe(String pipeName) {
// dropPipeTaskByConsensusGroup).
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);

// Drop pipe tasks and trigger drop() method for each pipe task
// Drop pipe tasks
final Map<TConsensusGroupId, PipeTask> pipeTasks =
pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
if (pipeTasks == null) {
LOGGER.info(
"Pipe {} has already been dropped or has not been created. Skip dropping.", pipeName);
return;
}
for (PipeTask pipeTask : pipeTasks.values()) {
pipeTask.drop();
}

// Trigger drop() method for each pipe task by parallel stream
final long startTime = System.currentTimeMillis();
pipeTasks.values().parallelStream().forEach(PipeTask::drop);
LOGGER.info(
"Drop all pipe tasks on Pipe {} successfully within {} ms",
pipeName,
System.currentTimeMillis() - startTime);

// Remove pipe meta from pipe meta keeper
pipeMetaKeeper.removePipeMeta(pipeName);
Expand All @@ -498,7 +514,7 @@ private void startPipe(String pipeName, long creationTime) {
return;
}

// Trigger start() method for each pipe task
// Get pipe tasks
final Map<TConsensusGroupId, PipeTask> pipeTasks =
pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
if (pipeTasks == null) {
Expand All @@ -509,9 +525,14 @@ private void startPipe(String pipeName, long creationTime) {
creationTime);
return;
}
for (PipeTask pipeTask : pipeTasks.values()) {
pipeTask.start();
}

// Trigger start() method for each pipe task by parallel stream
final long startTime = System.currentTimeMillis();
pipeTasks.values().parallelStream().forEach(PipeTask::start);
LOGGER.info(
"Start all pipe tasks on Pipe {} successfully within {} ms",
pipeName,
System.currentTimeMillis() - startTime);

// Set pipe meta status to RUNNING
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
Expand All @@ -530,7 +551,7 @@ protected void stopPipe(String pipeName, long creationTime) {
return;
}

// Trigger stop() method for each pipe task
// Get pipe tasks
final Map<TConsensusGroupId, PipeTask> pipeTasks =
pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
if (pipeTasks == null) {
Expand All @@ -541,9 +562,14 @@ protected void stopPipe(String pipeName, long creationTime) {
creationTime);
return;
}
for (PipeTask pipeTask : pipeTasks.values()) {
pipeTask.stop();
}

// Trigger stop() method for each pipe task by parallel stream
final long startTime = System.currentTimeMillis();
pipeTasks.values().parallelStream().forEach(PipeTask::stop);
LOGGER.info(
"Stop all pipe tasks on Pipe {} successfully within {} ms",
pipeName,
System.currentTimeMillis() - startTime);

// Set pipe meta status to STOPPED
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
Expand Down