diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index d4f14252dc..f88837ecf4 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -128,6 +128,7 @@ private ParameterConstants() { public final static String INITIAL_LOAD_EXTRACT_JOB_START = "start.initial.load.extract.job"; public final static String INITIAL_LOAD_SCHEMA_DUMP_COMMAND = "initial.load.schema.dump.command"; public final static String INITIAL_LOAD_SCHEMA_LOAD_COMMAND = "initial.load.schema.load.command"; + public final static String INITIAL_LOAD_EXTRACT_AND_SEND_WHEN_STAGED = "initial.load.extract.and.send.when.staged"; public final static String CREATE_TABLE_WITHOUT_DEFAULTS = "create.table.without.defaults"; public final static String CREATE_TABLE_WITHOUT_FOREIGN_KEYS = "create.table.without.foreign.keys"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index d07b036798..9880186e4b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -1146,8 +1146,15 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status if (!areBatchesOk) { for (OutgoingBatch outgoingBatch : batches) { - outgoingBatch.setStatus(Status.NE); - outgoingBatchService.updateOutgoingBatch(transaction, outgoingBatch); + if (parameterService.is(ParameterConstants.INITIAL_LOAD_EXTRACT_AND_SEND_WHEN_STAGED, false)) { + if (outgoingBatch.getStatus() == Status.RQ) { + outgoingBatch.setStatus(Status.NE); + outgoingBatchService.updateOutgoingBatch(transaction, outgoingBatch); + } + } else { + outgoingBatch.setStatus(Status.NE); + outgoingBatchService.updateOutgoingBatch(transaction, outgoingBatch); + } } } else { log.info("Batches already had an OK status for request {}, batches {} to {}. Not updating the status to NE", new Object[] { request.getRequestId(), request.getStartBatchId(), @@ -1272,12 +1279,39 @@ public void write(CsvData data) { Statistics stats = this.currentDataWriter.getStatistics().get(batch); this.outgoingBatch.setByteCount(stats.get(DataWriterStatisticConstants.BYTECOUNT)); this.outgoingBatch.setExtractMillis(System.currentTimeMillis() - batch.getStartTime().getTime()); - this.currentDataWriter.close(); + this.currentDataWriter.close(); + checkSend(); startNewBatch(); } } + public void checkSend() { + if (parameterService.is(ParameterConstants.INITIAL_LOAD_EXTRACT_AND_SEND_WHEN_STAGED, false)) { + this.outgoingBatch.setStatus(Status.NE); + ISqlTransaction transaction = null; + try { + transaction = sqlTemplate.startSqlTransaction(); + outgoingBatchService.updateOutgoingBatch(transaction, this.outgoingBatch); + transaction.commit(); + } catch (Error ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } catch (RuntimeException ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } finally { + if (transaction != null) { + transaction.close(); + } + } + } + } + public void end(Table table) { if (this.currentDataWriter != null) { this.currentDataWriter.end(table); diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index 084de2d04b..c97d3232de 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -509,6 +509,14 @@ initial.load.reverse.first=true # Type: boolean initial.load.use.extract.job.enabled=false +# Indicate that the initial load batches will be sent as soon +# as they are staged. Used in combination with initial.load.use.extract.job.enabled=true +# +# DatabaseOverridable: true +# Tags: load +# Type: boolean +initial.load.extract.and.send.when.staged=false + # The number of threads available for concurrent extracts of initial load batches. # # DatabaseOverridable: true