From e20b916aa5fa57eec898db28795160985c3dc25d Mon Sep 17 00:00:00 2001 From: mmichalek Date: Thu, 10 Nov 2016 15:47:22 -0500 Subject: [PATCH] 0002876: FileSync initial load batches should respect max batch size of the filesync_reload channel - make sure to re-request extraction if needded. --- .../symmetric/service/IDataExtractorService.java | 3 ++- .../service/impl/DataExtractorService.java | 3 ++- .../symmetric/service/impl/FileSyncService.java | 13 +++++++++++-- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java index 203c90366b..2c3f49311c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java @@ -63,6 +63,7 @@ public boolean extractBatchRange(Writer writer, String nodeId, Date startBatchTi public RemoteNodeStatuses queueWork(boolean force); public void requestExtractRequest(ISqlTransaction transaction, String nodeId, String channelId, TriggerRouter triggerRouter, long startBatchId, long endBatchId); - + + public void resetExtractRequest(OutgoingBatch batch); } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 4008cedade..0e7a83cf68 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -1235,7 +1235,8 @@ public List getExtractRequestsForNode(NodeCommunication nodeComm , ExtractRequest.ExtractStatus.NE.name()); } - protected void resetExtractRequest(OutgoingBatch batch) { + @Override + public void resetExtractRequest(OutgoingBatch batch) { ISqlTransaction transaction = null; try { transaction = sqlTemplate.startSqlTransaction(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java index 03a64e71f6..d88a680b02 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java @@ -613,8 +613,17 @@ private boolean isWaitForExtractionRequired(OutgoingBatch currentBatch, IStagedR if (previouslyStagedResource == null && channel.isReloadFlag() && parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB)) { - // if this is a reload that isn't extracted yet, we need to defer to the exact job. - log.info("Batch needs to be extracted by the extact job {}", currentBatch.getNodeBatchId()); + + if (currentBatch.getStatus() == OutgoingBatch.Status.RQ) { + // if this is a reload that isn't extracted yet, we need to defer to the extract job. + log.info("Batch needs to be extracted by the extact job {}", currentBatch.getNodeBatchId()); + } else { + // it's possible there was an error and staging was cleared, so we need to re-request extraction here. + log.info("Batch has status of '{}' but is not extracted. Requesting re-extract for batch: {}", + currentBatch.getStatus(), currentBatch.getNodeBatchId()); + engine.getDataExtractorService().resetExtractRequest(currentBatch); + } + return true; } else { return false;