-
Notifications
You must be signed in to change notification settings - Fork 965
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[IOTDB-2933] detect sender exit and set pipestatus=STOP #5557
Conversation
Cpaulyz
commented
Apr 15, 2022
- Use thrift to detect sender exit
- Add concurrent control in receiverService
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some advices, please check
logger.info("create Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime); | ||
createDir(pipeName, remoteIp, createTime); | ||
receiverManager.createPipe(pipeName, remoteIp, createTime); | ||
PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipeName and remoteIp is not enough for judge, consider following situation:
- user create a pipe at sender, but receiver returen ACK for CREATE SyncRequest error, but the PipeInfo remains in receiver.
- user try again with the same pipeName and remoteIp, at this time, it works normally, but due to this line, a new pipe will not be created but keep the pipe created last with wront createTime.
logger.info("start Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime); | ||
receiverManager.startPipe(pipeName, remoteIp); | ||
collector.startPipe(pipeName, remoteIp, createTime); | ||
PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same reason as what happens in createPipe.
logger.info("stop Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime); | ||
receiverManager.stopPipe(pipeName, remoteIp); | ||
collector.stopPipe(pipeName, remoteIp, createTime); | ||
PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same reason as what happens in createPipe.
SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime)); | ||
File dir = new File(SyncPathUtil.getReceiverPipeDir(pipeName, remoteIp, createTime)); | ||
FileUtils.deleteDirectory(dir); | ||
PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same reason as what happens in createPipe.
server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* remotes/upstream/master: [IOTDB-2996] Fix wildcard import in test folders (apache#5652) [IOTDB-2955] Design and implement ClientManager for thrift client's pooling management (apache#5595) [IOTDB-2945] Reconstruct ConfigNode manage layer and persistence layer (apache#5627) Fix import wildcard violation [IOTDB-2933] detect sender exit and set pipestatus=STOP (apache#5557) [IOTDB-2989] Expression Serialize & Deserialize (apache#5649) [IOTDB-2930]Fix concurrent UnPin bug & Improve template implementation (apache#5647) [IOTDB-2984] RatisConsensus Recovery Logic (apache#5648) [IOTDB-2979] Optimize the serialization and deserialization of thrift data structures (apache#5637) Add config example for new cluster (apache#5624) [IOTDB-2982] Recover tsfile after datanode restart (apache#5643) Fix TimeJoinNode clone and serde bug (apache#5644) Refactor attributes in PlanNode (apache#5616) # Conflicts: # server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java # server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java # server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java