Skip to content

Commit

Permalink
0003356: transport.max.bytes.to.sync not respected
Browse files Browse the repository at this point in the history
  • Loading branch information
woehrl01 committed Jan 8, 2018
1 parent 8c526fc commit 1b07622
Showing 1 changed file with 18 additions and 0 deletions.
Expand Up @@ -658,6 +658,9 @@ protected List<OutgoingBatch> extract(final ProcessInfo extractInfo, final Node
}
}

final long maxBytesToSync = parameterService.getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC);
long totalBytesSend = 0;
boolean logMaxBytesReached = false;
Iterator<OutgoingBatch> activeBatchIter = activeBatches.iterator();
for (int i = 0; i < futures.size(); i++) {
Future<FutureOutgoingBatch> future = futures.get(i);
Expand All @@ -684,12 +687,27 @@ protected List<OutgoingBatch> extract(final ProcessInfo extractInfo, final Node
}

if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT || (currentBatch.isExtractJobFlag() && parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB))) {

if(totalBytesSend > maxBytesToSync) {
if(!logMaxBytesReached) {
logMaxBytesReached = true;
log.info(
"Reached the total byte threshold after {} of {} batches were send for node '{}' (send {} bytes, the max is {}). "
+ "The remaining batches will be send on a subsequent sync.",
new Object[] { i, futures.size(), targetNode.getNodeId(), totalBytesSend, maxBytesToSync });
}
transferInfo.setStatus(ProcessStatus.OK);
break;
}

transferInfo.setStatus(ProcessInfo.ProcessStatus.TRANSFERRING);
transferInfo.setCurrentLoadId(currentBatch.getLoadId());
boolean isRetry = extractBatch.isRetry() && extractBatch.getOutgoingBatch().getStatus() != OutgoingBatch.Status.IG;

currentBatch = sendOutgoingBatch(transferInfo, targetNode, currentBatch, isRetry,
dataWriter, writer, mode);

totalBytesSend += currentBatch.getByteCount();
}

processedBatches.add(currentBatch);
Expand Down

0 comments on commit 1b07622

Please sign in to comment.