diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java index 14e63405ea..218b88616e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java @@ -70,6 +70,10 @@ public void setValues(Statistics readerStatistics, Statistics writerStatistics, failedRowNumber = getLoadRowCount(); failedLineNumber = writerStatistics.get(DataWriterStatisticConstants.LINENUMBER); } + + setLoadInsertRowCount(writerStatistics.get(DataWriterStatisticConstants.INSERTCOUNT)); + setLoadUpdateRowCount(writerStatistics.get(DataWriterStatisticConstants.UPDATECOUNT)); + setLoadDeleteRowCount(writerStatistics.get(DataWriterStatisticConstants.DELETECOUNT)); } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index 6c825c86ff..bfa713339a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -943,7 +943,9 @@ public LoadIntoDatabaseOnArrivalListener(ProcessInfo processInfo, String sourceN public void start(DataContext ctx, Batch batch) { batchStartsToArriveTimeInMs = System.currentTimeMillis(); processInfo.setStatus(ProcessInfo.ProcessStatus.TRANSFERRING); - processInfo.setDataCount(ctx.getStatistics().get(DataReaderStatistics.DATA_ROW_COUNT)); + if (processInfo != null && ctx != null && ctx.getStatistics() != null) { + processInfo.setDataCount(ctx.getStatistics().get(DataReaderStatistics.DATA_ROW_COUNT)); + } } public void end(final DataContext ctx, final Batch batch, final IStagedResource resource) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java index 0f395d76eb..c80c4c537c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java @@ -256,6 +256,7 @@ public void insertIncomingBatch(ISqlTransaction transaction, IncomingBatch batch batch.getTransformLoadMillis(), batch.getReloadRowCount(), batch.getOtherRowCount(), batch.getDataRowCount(), batch.getDataInsertRowCount(), batch.getDataUpdateRowCount(), batch.getDataDeleteRowCount(), batch.getExtractRowCount(), batch.getExtractInsertRowCount(), batch.getExtractUpdateRowCount(), + batch.getLoadInsertRowCount(), batch.getLoadUpdateRowCount(), batch.getLoadDeleteRowCount(), batch.getExtractDeleteRowCount(), batch.getFailedDataId() }, new int[] { Types.NUMERIC, Types.VARCHAR, Types.VARCHAR, Types.CHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, @@ -263,7 +264,7 @@ public void insertIncomingBatch(ISqlTransaction transaction, IncomingBatch batch Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, - Types.NUMERIC }); + Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC }); } } } @@ -336,15 +337,16 @@ public int updateIncomingBatch(ISqlTransaction transaction, IncomingBatch batch) batch.getTransformExtractMillis(), batch.getTransformLoadMillis(), batch.getReloadRowCount(), batch.getOtherRowCount(), batch.getDataRowCount(), batch.getDataInsertRowCount(), batch.getDataUpdateRowCount(), batch.getDataDeleteRowCount(), batch.getExtractRowCount(), batch.getExtractInsertRowCount(), - batch.getExtractUpdateRowCount(), batch.getExtractDeleteRowCount(), batch.getFailedDataId(), batch.getBatchId(), + batch.getExtractUpdateRowCount(), batch.getExtractDeleteRowCount(), batch.getLoadInsertRowCount(), + batch.getLoadUpdateRowCount(), batch.getLoadDeleteRowCount(), batch.getFailedDataId(), batch.getBatchId(), batch.getNodeId() }, new int[] { Types.CHAR, Types.SMALLINT, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, - Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, - symmetricDialect.getSqlTypeForIds(), Types.VARCHAR }); + Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, + Types.NUMERIC, Types.NUMERIC, symmetricDialect.getSqlTypeForIds(), Types.VARCHAR }); } return count; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchServiceSqlMap.java index aa13160ecb..c7ffd880cd 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchServiceSqlMap.java @@ -71,17 +71,19 @@ public IncomingBatchServiceSqlMap(IDatabasePlatform platform, Map