From 39d85e6d02ea7460888477b22fa05a6abf1cb234 Mon Sep 17 00:00:00 2001 From: Chris Henson Date: Wed, 13 Sep 2017 19:46:23 -0400 Subject: [PATCH] 0003142: Sync Columns Between Incoming and Outgoing Batch Use the extract_row_count to seend the process info object with the correct number of total rows that will be loaded. --- .../jumpmind/symmetric/model/ProcessInfo.java | 4 +++- .../service/impl/DataLoaderService.java | 18 +++++++++++++----- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java index a155b4297f..37a8cf419c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java @@ -170,7 +170,9 @@ public void setBatchCount(long batchCount) { public void incrementCurrentDataCount() { this.currentDataCount++; - this.totalDataCount++; + if (totalDataCount < currentDataCount) { + totalDataCount = currentDataCount; + } } public void incrementBatchCount() { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index 7b986e0a5d..f13c4c6aea 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -526,7 +526,7 @@ public void loadDataFromConfig(Node remote, RemoteNodeStatus status, boolean for */ protected List loadDataFromTransport(final ProcessInfo processInfo, final Node sourceNode, IIncomingTransport transport, OutputStream out) throws IOException { - final ManageIncomingBatchListener listener = new ManageIncomingBatchListener(); + final ManageIncomingBatchListener listener = new ManageIncomingBatchListener(processInfo); final DataContext ctx = new DataContext(); Throwable error = null; try { @@ -1050,6 +1050,12 @@ class ManageIncomingBatchListener implements IDataProcessorListener { protected List batchesProcessed = new ArrayList(); protected IncomingBatch currentBatch; + + protected ProcessInfo processInfo; + + public ManageIncomingBatchListener(ProcessInfo processInfo) { + this.processInfo = processInfo; + } public void beforeBatchEnd(DataContext context) { enableSyncTriggers(context); @@ -1074,6 +1080,12 @@ public boolean beforeBatchStarted(DataContext context) { } IncomingBatch incomingBatch = new IncomingBatch(batch); this.batchesProcessed.add(incomingBatch); + + if (context.getStatistics() != null) { + incomingBatch.mergeInjectedBatchStatistics(context.getStatistics()); + processInfo.setTotalDataCount(incomingBatch.getExtractRowCount()); + } + if (incomingBatchService.acquireIncomingBatch(incomingBatch)) { this.currentBatch = incomingBatch; context.put("currentBatch", this.currentBatch); @@ -1101,10 +1113,6 @@ public void batchSuccessful(DataContext context) { statisticManager.incrementDataBytesLoaded(this.currentBatch.getChannelId(), this.currentBatch.getByteCount()); Status oldStatus = this.currentBatch.getStatus(); - - if (context.getStatistics() != null) { - currentBatch.mergeInjectedBatchStatistics(context.getStatistics()); - } try { this.currentBatch.setStatus(Status.OK);