Skip to content

Commit

Permalink
0004187: Initial load queue and router service blocking each other
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Dec 6, 2019
1 parent f0252ab commit f1de1c3
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
Expand Up @@ -187,16 +187,17 @@ protected void processTableRequestLoads(Node source, ProcessInfo processInfo) {
List<TableReloadRequest> loadsToProcess = engine.getDataService().getTableReloadRequestToProcess(source.getNodeId());
if (loadsToProcess.size() > 0) {
processInfo.setStatus(ProcessInfo.ProcessStatus.CREATING);
log.info("Found " + loadsToProcess.size() + " table reload requests to process.");

int maxLoadCount = parameterService.getInt(ParameterConstants.INITIAL_LOAD_EXTRACT_THREAD_COUNT_PER_SERVER, 20);
int activeLoadCount = engine.getDataService().getActiveTableReloadStatus().size();
String maxLoadsReachedMessage = "Max initial/partial loads of {} are already active";
if (activeLoadCount >= maxLoadCount) {
log.info(maxLoadsReachedMessage, activeLoadCount);
log.debug(maxLoadsReachedMessage, activeLoadCount);
return;
}

log.info("Found " + loadsToProcess.size() + " table reload requests to process.");

boolean useExtractJob = parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB, true);
boolean streamToFile = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED, false);
Map<String, List<TableReloadRequest>> requestsSplitByLoad = new HashMap<String, List<TableReloadRequest>>();
Expand Down Expand Up @@ -225,7 +226,7 @@ protected void processTableRequestLoads(Node source, ProcessInfo processInfo) {
triggerRouters, extractRequests);

if (++activeLoadCount >= maxLoadCount) {
log.info(maxLoadsReachedMessage, activeLoadCount);
log.debug(maxLoadsReachedMessage, activeLoadCount);
return;
}
} else {
Expand Down Expand Up @@ -275,7 +276,7 @@ protected void processTableRequestLoads(Node source, ProcessInfo processInfo) {
triggerRouters, extractRequests);

if (++activeLoadCount >= maxLoadCount) {
log.info(maxLoadsReachedMessage, activeLoadCount);
log.debug(maxLoadsReachedMessage, activeLoadCount);
return;
}
}
Expand Down
Expand Up @@ -258,7 +258,7 @@ public void insertOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo

long batchId = outgoingBatch.getBatchId();
if (batchId <= 0) {
batchId = sequenceService.nextVal(transaction, Constants.SEQUENCE_OUTGOING_BATCH);
batchId = sequenceService.nextVal(Constants.SEQUENCE_OUTGOING_BATCH);
}
transaction.prepareAndExecute(getSql("insertOutgoingBatchSql"), batchId, outgoingBatch.getNodeId(), outgoingBatch.getChannelId(),
outgoingBatch.getStatus().name(), outgoingBatch.getLoadId(), outgoingBatch.isExtractJobFlag() ? 1 : 0,
Expand Down

0 comments on commit f1de1c3

Please sign in to comment.