diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java index 6058608e19..b43f3e76c6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java @@ -41,6 +41,8 @@ public interface IIncomingBatchService { public int countIncomingBatchesInError(String channelId); public IncomingBatch findIncomingBatch(long batchId, String nodeId); + + public void refreshIncomingBatch(IncomingBatch batch); public List findIncomingBatchErrors(int maxRows); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index a571df666b..13f80eff42 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -919,7 +919,7 @@ public void batchInError(DataContext context, Throwable ex) { /* * Reread batch to make sure it wasn't set to IG or OK */ - currentBatch = engine.getIncomingBatchService().findIncomingBatch(currentBatch.getBatchId(), currentBatch.getNodeId()); + engine.getIncomingBatchService().refreshIncomingBatch(currentBatch); Batch batch = context.getBatch(); if (context.getWriter() != null diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java index 92572d1bf9..7fdd992c54 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java @@ -63,6 +63,12 @@ public IncomingBatchService(IParameterService parameterService, setSqlMap(new IncomingBatchServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens())); } + + public void refreshIncomingBatch(IncomingBatch batch) { + sqlTemplate.queryForObject( + getSql("selectIncomingBatchPrefixSql", "findIncomingBatchSql"), + new IncomingBatchMapper(batch), batch.getBatchId(), batch.getNodeId()); + } public IncomingBatch findIncomingBatch(long batchId, String nodeId) { if (nodeId != null) { @@ -363,8 +369,19 @@ public BatchId mapRow(Row rs) { } class IncomingBatchMapper implements ISqlRowMapper { + + IncomingBatch batchToRefresh = null; + + public IncomingBatchMapper(IncomingBatch batchToRefresh) { + this.batchToRefresh = batchToRefresh; + } + + public IncomingBatchMapper() { + } + + public IncomingBatch mapRow(Row rs) { - IncomingBatch batch = new IncomingBatch(); + IncomingBatch batch = batchToRefresh != null ? batchToRefresh : new IncomingBatch(); batch.setBatchId(rs.getLong("batch_id")); batch.setNodeId(rs.getString("node_id")); batch.setChannelId(rs.getString("channel_id"));