From b00627a71a9cfc4b775046966ad05209845703cc Mon Sep 17 00:00:00 2001 From: Chris Henson Date: Fri, 22 Sep 2017 15:35:30 -0400 Subject: [PATCH] 0003256: Extract stats aren't updated on common outgoing batch rows for nodes that did not do the actual extract --- .../symmetric/service/IOutgoingBatchService.java | 2 ++ .../service/impl/DataExtractorService.java | 14 ++++++++++---- .../service/impl/OutgoingBatchService.java | 11 ++++++++++- .../service/impl/OutgoingBatchServiceSqlMap.java | 6 ++++++ 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index 76406c5316..7a61866c3b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -80,6 +80,8 @@ public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, N public void updateOutgoingBatch(OutgoingBatch batch); + public void updateCommonBatchExtractStatistics(OutgoingBatch batch); + public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgoingBatch); public void updateOutgoingBatches(List batches); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index fd998e4e57..e7aa9e5fff 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -637,15 +637,16 @@ protected List extract(final ProcessInfo extractInfo, final Node Future future = futures.get(i); currentBatch = activeBatchIter.next(); transferInfo.setTotalDataCount(currentBatch.getExtractRowCount()); - if (i == futures.size() - 1) { - extractInfo.setStatus(ProcessStatus.OK); - } boolean isProcessed = false; while (!isProcessed) { try { FutureOutgoingBatch extractBatch = future.get(keepAliveMillis, TimeUnit.MILLISECONDS); currentBatch = extractBatch.getOutgoingBatch(); + if (i == futures.size() - 1) { + extractInfo.setStatus(ProcessStatus.OK); + } + if (extractBatch.isExtractSkipped) { break; } @@ -663,7 +664,7 @@ protected List extract(final ProcessInfo extractInfo, final Node if (currentBatch.getStatus() != Status.OK) { currentBatch.setLoadCount(currentBatch.getLoadCount() + 1); - changeBatchStatus(Status.LD, currentBatch, mode); + changeBatchStatus(Status.LD, currentBatch, mode); } transferInfo.setCurrentTableName(currentBatch.getSummary()); @@ -937,6 +938,11 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targe statisticManager.incrementDataBytesExtracted(currentBatch.getChannelId(), byteCount); statisticManager.incrementDataExtracted(currentBatch.getChannelId(), stats.get(DataWriterStatisticConstants.ROWCOUNT)); + currentBatch.setByteCount(byteCount); + + if (currentBatch.isCommonFlag()) { + outgoingBatchService.updateCommonBatchExtractStatistics(currentBatch); + } } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index f7ebf96002..3549e73c00 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -203,6 +203,16 @@ public void updateOutgoingBatch(OutgoingBatch outgoingBatch) { close(transaction); } } + + public void updateCommonBatchExtractStatistics(OutgoingBatch outgoingBatch) { + sqlTemplate.update(getSql("updateCommonBatchExtractStatsSql"), + new Object[] { outgoingBatch.getByteCount(), outgoingBatch.getDataRowCount(), outgoingBatch.getDataInsertRowCount(), outgoingBatch.getDataUpdateRowCount(), + outgoingBatch.getDataDeleteRowCount(), outgoingBatch.getOtherRowCount(), outgoingBatch.getExtractRowCount(), outgoingBatch.getExtractInsertRowCount(), + outgoingBatch.getExtractUpdateRowCount(), outgoingBatch.getExtractDeleteRowCount(), outgoingBatch.getBatchId(), + outgoingBatch.getNodeId() }, + new int[] { Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, + Types.NUMERIC, symmetricDialect.getSqlTypeForIds(), Types.VARCHAR }); + } public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgoingBatch) { outgoingBatch.setLastUpdatedTime(new Date()); @@ -231,7 +241,6 @@ public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo Types.VARCHAR, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, - symmetricDialect.getSqlTypeForIds(), Types.VARCHAR }); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index 872aa4eab2..b96d0ab525 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -66,6 +66,12 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, + " fallback_insert_count=?, fallback_update_count=?, ignore_row_count=?, missing_delete_count=?, " + " skip_count=?, extract_row_count=?, extract_insert_row_count=?, extract_update_row_count=?, extract_delete_row_count=? " + " where batch_id=? and node_id=? "); + + putSql("updateCommonBatchExtractStatsSql", + "update $(outgoing_batch) set byte_count=?, data_row_count=?, " + + " data_insert_row_count=?, data_update_row_count=?, data_delete_row_count=?, other_row_count=?, " + + " extract_row_count=?, extract_insert_row_count=?, extract_update_row_count=?, extract_delete_row_count=? " + + " where batch_id=? and node_id != ? "); putSql("findOutgoingBatchSql", "where batch_id=? and node_id=? ");