Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public void onComplete(TSyncLogEntriesRes response) {
public static boolean needRetry(int statusCode) {
return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()
|| statusCode == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
|| statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()
|| statusCode == TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode();
|| statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
Expand Down Expand Up @@ -61,6 +62,9 @@ public TSStatus visitInsertRow(InsertRowNode node, DataRegion dataRegion) {
} catch (OutOfTTLException e) {
LOGGER.warn("Error in executing plan node: {}, caused by {}", node, e.getMessage());
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
} catch (WriteProcessRejectException e) {
LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, e.getMessage());
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
} catch (WriteProcessException e) {
LOGGER.error("Error in executing plan node: {}", node, e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
Expand All @@ -75,6 +79,9 @@ public TSStatus visitInsertTablet(InsertTabletNode node, DataRegion dataRegion)
} catch (OutOfTTLException e) {
LOGGER.warn("Error in executing plan node: {}, caused by {}", node, e.getMessage());
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
} catch (WriteProcessRejectException e) {
LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, e.getMessage());
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
} catch (WriteProcessException e) {
LOGGER.error("Error in executing plan node: {}", node, e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
Expand All @@ -85,7 +92,18 @@ public TSStatus visitInsertTablet(InsertTabletNode node, DataRegion dataRegion)
node.getTimes()[0],
node.getMeasurements(),
e.getFailingStatus());
return new TSStatus(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode());
// for each error
TSStatus firstStatus = null;
for (TSStatus status : e.getFailingStatus()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
firstStatus = status;
}
// return WRITE_PROCESS_REJECT directly for the consensus retry logic
if (status.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
return status;
}
}
return firstStatus;
}
}

Expand All @@ -109,6 +127,10 @@ public TSStatus visitInsertRows(InsertRowsNode node, DataRegion dataRegion) {
insertRowNode.getTime(),
insertRowNode.getMeasurements(),
failedEntry.getValue());
// return WRITE_PROCESS_REJECT directly for the consensus retry logic
if (failedEntry.getValue().getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
return failedEntry.getValue();
}
}
return firstStatus;
}
Expand All @@ -134,6 +156,10 @@ public TSStatus visitInsertMultiTablets(InsertMultiTabletsNode node, DataRegion
insertTabletNode.getTimes()[0],
insertTabletNode.getMeasurements(),
failedEntry.getValue());
// return WRITE_PROCESS_REJECT directly for the consensus retry logic
if (failedEntry.getValue().getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
return failedEntry.getValue();
}
}
return firstStatus;
}
Expand All @@ -145,6 +171,9 @@ public TSStatus visitInsertRowsOfOneDevice(
try {
dataRegion.insert(node);
return StatusUtils.OK;
} catch (WriteProcessRejectException e) {
LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, e.getMessage());
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
} catch (WriteProcessException e) {
LOGGER.error("Error in executing plan node: {}", node, e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
Expand All @@ -162,6 +191,10 @@ public TSStatus visitInsertRowsOfOneDevice(
insertRowNode.getTime(),
insertRowNode.getMeasurements(),
failedEntry.getValue());
// return WRITE_PROCESS_REJECT directly for the consensus retry logic
if (failedEntry.getValue().getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
return failedEntry.getValue();
}
}
return firstStatus;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ public static boolean needRetry(int statusCode) {
// To fix the atomicity problem, we only need to add retry for system reject.
// In other cases, such as readonly, we can return directly because there are retries at the
// consensus layer.
return statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()
|| statusCode == TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode();
return statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3056,10 +3056,10 @@ public void insertTablets(InsertMultiTabletsNode insertMultiTabletsNode)
InsertTabletNode insertTabletNode = insertMultiTabletsNode.getInsertTabletNodeList().get(i);
try {
insertTablet(insertTabletNode);
} catch (WriteProcessException | BatchProcessException e) {
} catch (WriteProcessException e) {
insertMultiTabletsNode
.getResults()
.put(i, new TSStatus(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode()));
.put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ private void updateAlignedMemCost(

private void updateMemoryInfo(
long memTableIncrement, long chunkMetadataIncrement, long textDataIncrement)
throws WriteProcessException {
throws WriteProcessRejectException {
memTableIncrement += textDataIncrement;
dataRegionInfo.addStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement);
Expand Down