diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 7445741ce35a8..5a510da047526 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -118,9 +118,9 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { public static final PipePlanToStatementVisitor PLAN_TO_STATEMENT_VISITOR = new PipePlanToStatementVisitor(); - private static final PipeStatementTSStatusVisitor STATEMENT_STATUS_VISITOR = + public static final PipeStatementTSStatusVisitor STATEMENT_STATUS_VISITOR = new PipeStatementTSStatusVisitor(); - private static final PipeStatementExceptionVisitor STATEMENT_EXCEPTION_VISITOR = + public static final PipeStatementExceptionVisitor STATEMENT_EXCEPTION_VISITOR = new PipeStatementExceptionVisitor(); private static final PipeStatementPatternParseVisitor STATEMENT_PATTERN_PARSE_VISITOR = new PipeStatementPatternParseVisitor(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java index e6d0a64a79efc..db8db28b15ec0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq; import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer; +import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver; import org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement; import org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.Statement; @@ -105,15 +106,36 @@ file, new IoTDBPipePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { PipeTransferTabletRawReq.toTPipeTransferRawReq( tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight()) .constructStatement()); - TSStatus result = statementExecutor.execute(statement); - // Retry once if the write process is rejected - if (result.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) { - result = statementExecutor.execute(statement); + TSStatus result; + try { + result = + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.visitStatement( + statement, statementExecutor.execute(statement)); + + // Retry max 5 times if the write process is rejected + for (int i = 0; + i < 5 + && result.getCode() + == TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION + .getStatusCode(); + i++) { + Thread.sleep(100L * (i + 1)); + result = + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.visitStatement( + statement, statementExecutor.execute(statement)); + } + } catch (final Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + result = statement.accept(IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR, e); } if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())) { + || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() + || result.getCode() + == TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) { return Optional.empty(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java index 3e3992343fd9e..0874857c3f751 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java @@ -97,7 +97,8 @@ public TSStatus visitInsertMultiTablets( private TSStatus visitInsertBase( final InsertBaseStatement insertBaseStatement, final TSStatus context) { - if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()) { + if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() + || context.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) .setMessage(context.getMessage());