From 1b07622bdbe54d064ee6ba4b0e89623473996e00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Mon, 8 Jan 2018 09:47:33 +0100 Subject: [PATCH] 0003356: transport.max.bytes.to.sync not respected --- .../service/impl/DataExtractorService.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 7fd7fdfa54..4264a0c6de 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -658,6 +658,9 @@ protected List extract(final ProcessInfo extractInfo, final Node } } + final long maxBytesToSync = parameterService.getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC); + long totalBytesSend = 0; + boolean logMaxBytesReached = false; Iterator activeBatchIter = activeBatches.iterator(); for (int i = 0; i < futures.size(); i++) { Future future = futures.get(i); @@ -684,12 +687,27 @@ protected List extract(final ProcessInfo extractInfo, final Node } if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT || (currentBatch.isExtractJobFlag() && parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB))) { + + if(totalBytesSend > maxBytesToSync) { + if(!logMaxBytesReached) { + logMaxBytesReached = true; + log.info( + "Reached the total byte threshold after {} of {} batches were send for node '{}' (send {} bytes, the max is {}). " + + "The remaining batches will be send on a subsequent sync.", + new Object[] { i, futures.size(), targetNode.getNodeId(), totalBytesSend, maxBytesToSync }); + } + transferInfo.setStatus(ProcessStatus.OK); + break; + } + transferInfo.setStatus(ProcessInfo.ProcessStatus.TRANSFERRING); transferInfo.setCurrentLoadId(currentBatch.getLoadId()); boolean isRetry = extractBatch.isRetry() && extractBatch.getOutgoingBatch().getStatus() != OutgoingBatch.Status.IG; currentBatch = sendOutgoingBatch(transferInfo, targetNode, currentBatch, isRetry, dataWriter, writer, mode); + + totalBytesSend += currentBatch.getByteCount(); } processedBatches.add(currentBatch);