Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,17 @@ public boolean checkBeforeStartPipe(String pipeName) {
return false;
}

if (getPipeStatus(pipeName) == PipeStatus.RUNNING) {
final PipeStatus pipeStatus = getPipeStatus(pipeName);
if (pipeStatus == PipeStatus.RUNNING) {
LOGGER.info(
String.format("Failed to start pipe [%s], the pipe is already running", pipeName));
return false;
}
if (pipeStatus == PipeStatus.DROPPED) {
LOGGER.info(
String.format("Failed to start pipe [%s], the pipe is already dropped", pipeName));
return false;
}

return true;
}
Expand All @@ -96,21 +102,29 @@ public boolean checkBeforeStopPipe(String pipeName) {
return false;
}

if (getPipeStatus(pipeName) == PipeStatus.STOPPED) {
final PipeStatus pipeStatus = getPipeStatus(pipeName);
if (pipeStatus == PipeStatus.STOPPED) {
LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already stop", pipeName));
return false;
}
if (pipeStatus == PipeStatus.DROPPED) {
LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already dropped", pipeName));
return false;
}

return true;
}

public boolean checkBeforeDropPipe(String pipeName) {
if (isPipeExisted(pipeName)) {
return true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Check before drop pipe {}, pipe exists: {}.",
pipeName,
isPipeExisted(pipeName) ? "true" : "false");
}

LOGGER.info(String.format("Failed to drop pipe [%s], the pipe does not exist", pipeName));
return false;
// no matter whether the pipe exists, we allow the drop operation executed on all nodes to
// ensure the consistency
return true;
}

private boolean isPipeExisted(String pipeName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,11 @@ protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeTaskState st
throws IOException, InterruptedException, ProcedureException {
switch (state) {
case VALIDATE_TASK:
rollbackFromValidateTask(env);
env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
try {
rollbackFromValidateTask(env);
} finally {
env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
}
break;
case CALCULATE_INFO_FOR_TASK:
rollbackFromCalculateInfoForTask(env);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeMana
.getLoadManager()
.getRegionLeaderMap()
.forEach(
(region, leader) -> {
consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0, leader));
});
(region, leader) ->
// TODO: make index configurable
consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0, leader)));
pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public class PipeStaticMeta {

private String pipeName;
private long createTime;
private long creationTime;

private PipeParameters collectorParameters;
private PipeParameters processorParameters;
Expand All @@ -42,12 +42,12 @@ private PipeStaticMeta() {}

public PipeStaticMeta(
String pipeName,
long createTime,
long creationTime,
Map<String, String> collectorAttributes,
Map<String, String> processorAttributes,
Map<String, String> connectorAttributes) {
this.pipeName = pipeName.toUpperCase();
this.createTime = createTime;
this.creationTime = creationTime;
collectorParameters = new PipeParameters(collectorAttributes);
processorParameters = new PipeParameters(processorAttributes);
connectorParameters = new PipeParameters(connectorAttributes);
Expand All @@ -57,8 +57,8 @@ public String getPipeName() {
return pipeName;
}

public long getCreateTime() {
return createTime;
public long getCreationTime() {
return creationTime;
}

public PipeParameters getCollectorParameters() {
Expand All @@ -82,7 +82,7 @@ public ByteBuffer serialize() throws IOException {

public void serialize(DataOutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(pipeName, outputStream);
ReadWriteIOUtils.write(createTime, outputStream);
ReadWriteIOUtils.write(creationTime, outputStream);

outputStream.writeInt(collectorParameters.getAttribute().size());
for (Map.Entry<String, String> entry : collectorParameters.getAttribute().entrySet()) {
Expand Down Expand Up @@ -110,7 +110,7 @@ public static PipeStaticMeta deserialize(ByteBuffer byteBuffer) {
final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta();

pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(byteBuffer);
pipeStaticMeta.createTime = ReadWriteIOUtils.readLong(byteBuffer);
pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(byteBuffer);

pipeStaticMeta.collectorParameters = new PipeParameters(new HashMap<>());
pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
Expand Down Expand Up @@ -151,7 +151,7 @@ public boolean equals(Object obj) {
}
PipeStaticMeta that = (PipeStaticMeta) obj;
return pipeName.equals(that.pipeName)
&& createTime == that.createTime
&& creationTime == that.creationTime
&& collectorParameters.equals(that.collectorParameters)
&& processorParameters.equals(that.processorParameters)
&& connectorParameters.equals(that.connectorParameters);
Expand All @@ -168,8 +168,8 @@ public String toString() {
+ "pipeName='"
+ pipeName
+ '\''
+ ", createTime="
+ createTime
+ ", creationTime="
+ creationTime
+ ", collectorParameters="
+ collectorParameters.getAttribute()
+ ", processorParameters="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent;

/** PipeAgent is the entry point of the pipe module in DatNode. */
/** PipeAgent is the entry point of the pipe module in DataNode. */
public class PipeAgent {

private final PipePluginAgent pipePluginAgent;
Expand Down
Loading