Skip to content

Commit

Permalink
0002681: Only query incoming_error for batch being retried
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jul 15, 2016
1 parent 6a89f23 commit a972593
Showing 1 changed file with 13 additions and 10 deletions.
Expand Up @@ -516,7 +516,8 @@ protected List<IncomingBatch> loadDataFromTransport(final ProcessInfo processInf
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
return buildDataWriter(processInfo, sourceNode.getNodeId(),
batch.getChannelId(), batch.getBatchId());
batch.getChannelId(), batch.getBatchId(),
((ManageIncomingBatchListener) listener).getCurrentBatch().isRetry());
}
};
processor.process(ctx);
Expand Down Expand Up @@ -573,7 +574,7 @@ protected void logAndRethrow(Node remoteNode, Throwable ex) throws IOException {
}

protected IDataWriter buildDataWriter(ProcessInfo processInfo, String sourceNodeId,
String channelId, long batchId) {
String channelId, long batchId, boolean isRetry) {
TransformTable[] transforms = null;
NodeGroupLink link = null;
List<ResolvedData> resolvedDatas = new ArrayList<ResolvedData>();
Expand Down Expand Up @@ -612,15 +613,16 @@ protected IDataWriter buildDataWriter(ProcessInfo processInfo, String sourceNode
transforms = transformsList != null ? transformsList
.toArray(new TransformTable[transformsList.size()]) : null;

List<IncomingError> incomingErrors = getIncomingErrors(batchId, sourceNodeId);
for (IncomingError incomingError : incomingErrors) {
if (incomingError.isResolveIgnore()
|| StringUtils.isNotBlank(incomingError.getResolveData())) {
resolvedDatas.add(new ResolvedData(incomingError.getFailedRowNumber(),
incomingError.getResolveData(), incomingError.isResolveIgnore()));
if (isRetry) {
List<IncomingError> incomingErrors = getIncomingErrors(batchId, sourceNodeId);
for (IncomingError incomingError : incomingErrors) {
if (incomingError.isResolveIgnore()
|| StringUtils.isNotBlank(incomingError.getResolveData())) {
resolvedDatas.add(new ResolvedData(incomingError.getFailedRowNumber(),
incomingError.getResolveData(), incomingError.isResolveIgnore()));
}
}
}

}

TransformWriter transformWriter = new TransformWriter(platform, TransformPoint.LOAD, null,
Expand Down Expand Up @@ -922,7 +924,8 @@ public IncomingBatch call() throws Exception {
batch.getTargetNodeId(), resource), null, listener, "data load from stage") {
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
return buildDataWriter(processInfo, sourceNodeId, batch.getChannelId(), batch.getBatchId());
return buildDataWriter(processInfo, sourceNodeId, batch.getChannelId(), batch.getBatchId(),
((ManageIncomingBatchListener) listener).getCurrentBatch().isRetry());
}
};
processor.process(ctx);
Expand Down

0 comments on commit a972593

Please sign in to comment.