From a9725930164bd5307f878010683b9adc6255bc80 Mon Sep 17 00:00:00 2001 From: elong Date: Fri, 15 Jul 2016 19:09:00 -0400 Subject: [PATCH] 0002681: Only query incoming_error for batch being retried --- .../service/impl/DataLoaderService.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index 3ab2699502..c3007fe1ec 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -516,7 +516,8 @@ protected List loadDataFromTransport(final ProcessInfo processInf @Override protected IDataWriter chooseDataWriter(Batch batch) { return buildDataWriter(processInfo, sourceNode.getNodeId(), - batch.getChannelId(), batch.getBatchId()); + batch.getChannelId(), batch.getBatchId(), + ((ManageIncomingBatchListener) listener).getCurrentBatch().isRetry()); } }; processor.process(ctx); @@ -573,7 +574,7 @@ protected void logAndRethrow(Node remoteNode, Throwable ex) throws IOException { } protected IDataWriter buildDataWriter(ProcessInfo processInfo, String sourceNodeId, - String channelId, long batchId) { + String channelId, long batchId, boolean isRetry) { TransformTable[] transforms = null; NodeGroupLink link = null; List resolvedDatas = new ArrayList(); @@ -612,15 +613,16 @@ protected IDataWriter buildDataWriter(ProcessInfo processInfo, String sourceNode transforms = transformsList != null ? transformsList .toArray(new TransformTable[transformsList.size()]) : null; - List incomingErrors = getIncomingErrors(batchId, sourceNodeId); - for (IncomingError incomingError : incomingErrors) { - if (incomingError.isResolveIgnore() - || StringUtils.isNotBlank(incomingError.getResolveData())) { - resolvedDatas.add(new ResolvedData(incomingError.getFailedRowNumber(), - incomingError.getResolveData(), incomingError.isResolveIgnore())); + if (isRetry) { + List incomingErrors = getIncomingErrors(batchId, sourceNodeId); + for (IncomingError incomingError : incomingErrors) { + if (incomingError.isResolveIgnore() + || StringUtils.isNotBlank(incomingError.getResolveData())) { + resolvedDatas.add(new ResolvedData(incomingError.getFailedRowNumber(), + incomingError.getResolveData(), incomingError.isResolveIgnore())); + } } } - } TransformWriter transformWriter = new TransformWriter(platform, TransformPoint.LOAD, null, @@ -922,7 +924,8 @@ public IncomingBatch call() throws Exception { batch.getTargetNodeId(), resource), null, listener, "data load from stage") { @Override protected IDataWriter chooseDataWriter(Batch batch) { - return buildDataWriter(processInfo, sourceNodeId, batch.getChannelId(), batch.getBatchId()); + return buildDataWriter(processInfo, sourceNodeId, batch.getChannelId(), batch.getBatchId(), + ((ManageIncomingBatchListener) listener).getCurrentBatch().isRetry()); } }; processor.process(ctx);