Skip to content

Commit

Permalink
0002876: FileSync initial load batches should respect max batch size of
Browse files Browse the repository at this point in the history
the filesync_reload channel - make sure to re-request extraction if
needded.
  • Loading branch information
mmichalek committed Nov 10, 2016
1 parent a39449f commit e20b916
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
Expand Up @@ -63,6 +63,7 @@ public boolean extractBatchRange(Writer writer, String nodeId, Date startBatchTi
public RemoteNodeStatuses queueWork(boolean force);

public void requestExtractRequest(ISqlTransaction transaction, String nodeId, String channelId, TriggerRouter triggerRouter, long startBatchId, long endBatchId);


public void resetExtractRequest(OutgoingBatch batch);

}
Expand Up @@ -1235,7 +1235,8 @@ public List<ExtractRequest> getExtractRequestsForNode(NodeCommunication nodeComm
, ExtractRequest.ExtractStatus.NE.name());
}

protected void resetExtractRequest(OutgoingBatch batch) {
@Override
public void resetExtractRequest(OutgoingBatch batch) {
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
Expand Down
Expand Up @@ -613,8 +613,17 @@ private boolean isWaitForExtractionRequired(OutgoingBatch currentBatch, IStagedR
if (previouslyStagedResource == null
&& channel.isReloadFlag()
&& parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB)) {
// if this is a reload that isn't extracted yet, we need to defer to the exact job.
log.info("Batch needs to be extracted by the extact job {}", currentBatch.getNodeBatchId());

if (currentBatch.getStatus() == OutgoingBatch.Status.RQ) {
// if this is a reload that isn't extracted yet, we need to defer to the extract job.
log.info("Batch needs to be extracted by the extact job {}", currentBatch.getNodeBatchId());
} else {
// it's possible there was an error and staging was cleared, so we need to re-request extraction here.
log.info("Batch has status of '{}' but is not extracted. Requesting re-extract for batch: {}",
currentBatch.getStatus(), currentBatch.getNodeBatchId());
engine.getDataExtractorService().resetExtractRequest(currentBatch);
}

return true;
} else {
return false;
Expand Down

0 comments on commit e20b916

Please sign in to comment.