Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {

public static final PipePlanToStatementVisitor PLAN_TO_STATEMENT_VISITOR =
new PipePlanToStatementVisitor();
private static final PipeStatementTSStatusVisitor STATEMENT_STATUS_VISITOR =
public static final PipeStatementTSStatusVisitor STATEMENT_STATUS_VISITOR =
new PipeStatementTSStatusVisitor();
private static final PipeStatementExceptionVisitor STATEMENT_EXCEPTION_VISITOR =
public static final PipeStatementExceptionVisitor STATEMENT_EXCEPTION_VISITOR =
new PipeStatementExceptionVisitor();
private static final PipeStatementPatternParseVisitor STATEMENT_PATTERN_PARSE_VISITOR =
new PipeStatementPatternParseVisitor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
import org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement;
import org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
Expand Down Expand Up @@ -105,15 +106,36 @@ file, new IoTDBPipePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {
PipeTransferTabletRawReq.toTPipeTransferRawReq(
tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight())
.constructStatement());
TSStatus result = statementExecutor.execute(statement);

// Retry once if the write process is rejected
if (result.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
result = statementExecutor.execute(statement);
TSStatus result;
try {
result =
IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.visitStatement(
statement, statementExecutor.execute(statement));

// Retry max 5 times if the write process is rejected
for (int i = 0;
i < 5
&& result.getCode()
== TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION
.getStatusCode();
i++) {
Thread.sleep(100L * (i + 1));
result =
IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.visitStatement(
statement, statementExecutor.execute(statement));
}
} catch (final Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
result = statement.accept(IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR, e);
}

if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())) {
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
|| result.getCode()
== TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public TSStatus visitInsertMultiTablets(

private TSStatus visitInsertBase(
final InsertBaseStatement insertBaseStatement, final TSStatus context) {
if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()) {
if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
|| context.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
Expand Down