From c8bf63385e38fe224eb1cacea07c42121dd20d75 Mon Sep 17 00:00:00 2001 From: Chris Henson Date: Wed, 23 Nov 2016 13:54:54 -0500 Subject: [PATCH] 0002917: Don't update the status of an outgoing batch to SE until it has been sending for outgoing.batches.update.status.millis --- .../service/impl/DataExtractorService.java | 97 ++++++++++++- .../data/reader/SimpleStagingDataReader.java | 136 ------------------ .../web/NodeConcurrencyInterceptor.java | 1 - 3 files changed, 90 insertions(+), 144 deletions(-) delete mode 100644 symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/SimpleStagingDataReader.java diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 30b4701e69..124d8cb7e3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -22,10 +22,12 @@ import static org.apache.commons.lang.StringUtils.isNotBlank; +import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStream; import java.io.Writer; +import java.math.BigDecimal; import java.sql.SQLException; import java.sql.Types; import java.util.ArrayList; @@ -64,6 +66,7 @@ import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.Row; +import org.jumpmind.exception.IoException; import org.jumpmind.symmetric.AbstractSymmetricEngine; import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.SymmetricException; @@ -85,7 +88,6 @@ import org.jumpmind.symmetric.io.data.reader.ExtractDataReader; import org.jumpmind.symmetric.io.data.reader.IExtractDataReaderSource; import org.jumpmind.symmetric.io.data.reader.ProtocolDataReader; -import org.jumpmind.symmetric.io.data.reader.SimpleStagingDataReader; import org.jumpmind.symmetric.io.data.transform.TransformPoint; import org.jumpmind.symmetric.io.data.transform.TransformTable; import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants; @@ -988,9 +990,6 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo OutgoingBatch currentBatch, boolean isRetry, IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) { if (currentBatch.getStatus() != Status.OK || ExtractMode.EXTRACT_ONLY == mode) { currentBatch.setSentCount(currentBatch.getSentCount() + 1); - if (currentBatch.getStatus() != Status.RS) { - changeBatchStatus(Status.SE, currentBatch, mode); - } long ts = System.currentTimeMillis(); @@ -1024,9 +1023,8 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo Channel channel = configurationService.getChannel(currentBatch.getChannelId()); DataContext ctx = new DataContext(); - SimpleStagingDataReader dataReader = new SimpleStagingDataReader(BatchType.EXTRACT, currentBatch.getBatchId(), - currentBatch.getNodeId(), isRetry, extractedBatch, writer, ctx, channel.getMaxKBytesPerSecond()); - dataReader.process(); + transferFromStaging(mode, BatchType.EXTRACT, currentBatch, isRetry, extractedBatch, writer, ctx, + channel.getMaxKBytesPerSecond()); } else { IDataReader dataReader = new ProtocolDataReader(BatchType.EXTRACT, currentBatch.getNodeId(), extractedBatch); @@ -1060,6 +1058,90 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo return currentBatch; } + + protected void transferFromStaging(ExtractMode mode, BatchType batchType, OutgoingBatch batch, boolean isRetry, IStagedResource stagedResource, + BufferedWriter writer, DataContext context, BigDecimal maxKBytesPerSec) { + final int MAX_WRITE_LENGTH = 32768; + BufferedReader reader = stagedResource.getReader(); + try { + // Retry means we've sent this batch before, so let's ask to + // retry the batch from the target's staging + if (isRetry) { + String line = null; + while ((line = reader.readLine()) != null) { + if (line.startsWith(CsvConstants.BATCH)) { + writer.write(CsvConstants.RETRY + "," + batch.getBatchId()); + writer.newLine(); + writer.write(CsvConstants.COMMIT + "," + batch.getBatchId()); + writer.newLine(); + break; + } else { + writer.write(line); + writer.newLine(); + } + } + } else { + long totalCharsRead = 0, totalBytesRead = 0; + int numCharsRead = 0, numBytesRead = 0; + long startTime = System.currentTimeMillis(), ts = startTime, bts = startTime; + boolean isThrottled = maxKBytesPerSec != null && maxKBytesPerSec.compareTo(BigDecimal.ZERO) > 0; + long totalThrottleTime = 0; + int bufferSize = MAX_WRITE_LENGTH; + + if (isThrottled) { + bufferSize = maxKBytesPerSec.multiply(new BigDecimal(1024)).intValue(); + } + char[] buffer = new char[bufferSize]; + + while ((numCharsRead = reader.read(buffer)) != -1) { + writer.write(buffer, 0, numCharsRead); + totalCharsRead += numCharsRead; + + if (Thread.currentThread().isInterrupted()) { + throw new IoException("This thread was interrupted"); + } + + long batchStatusUpdateMillis = parameterService.getLong(ParameterConstants.OUTGOING_BATCH_UPDATE_STATUS_MILLIS); + if (System.currentTimeMillis() - ts > batchStatusUpdateMillis && batch.getStatus() != Status.SE && batch.getStatus() != Status.RS) { + changeBatchStatus(Status.SE, batch, mode); + } + if (System.currentTimeMillis() - ts > 60000) { + log.info( + "Batch '{}', for node '{}', for process 'send from stage' has been processing for {} seconds. " + + "The following stats have been gathered: {}", + new Object[] { batch.getBatchId(), batch.getNodeId(), (System.currentTimeMillis() - startTime) / 1000, + "CHARS=" + totalCharsRead }); + ts = System.currentTimeMillis(); + } + + if (isThrottled) { + numBytesRead += new String(buffer, 0, numCharsRead).getBytes().length; + totalBytesRead += numBytesRead; + if (numBytesRead >= bufferSize) { + long expectedMillis = (long) (((numBytesRead / 1024f) / maxKBytesPerSec.floatValue()) * 1000); + long actualMillis = System.currentTimeMillis() - bts; + if (actualMillis < expectedMillis) { + totalThrottleTime += expectedMillis - actualMillis; + Thread.sleep(expectedMillis - actualMillis); + } + numBytesRead = 0; + bts = System.currentTimeMillis(); + } + } + } + if (log.isDebugEnabled() && totalThrottleTime > 0) { + log.debug("Batch '{}' for node '{}' took {}ms for {} bytes and was throttled for {}ms because limit is set to {} KB/s", + batch.getBatchId(), batch.getNodeId(), (System.currentTimeMillis() - startTime), totalBytesRead, + totalThrottleTime, maxKBytesPerSec); + } + } + } catch (Throwable t) { + throw new RuntimeException(t); + } finally { + stagedResource.close(); + } + } + public boolean extractBatchRange(Writer writer, String nodeId, long startBatchId, long endBatchId) { @@ -1953,4 +2035,5 @@ public boolean isRetry() { } } + } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/SimpleStagingDataReader.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/SimpleStagingDataReader.java deleted file mode 100644 index 88567d744e..0000000000 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/SimpleStagingDataReader.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to JumpMind Inc under one or more contributor - * license agreements. See the NOTICE file distributed - * with this work for additional information regarding - * copyright ownership. JumpMind Inc licenses this file - * to you under the GNU General Public License, version 3.0 (GPLv3) - * (the "License"); you may not use this file except in compliance - * with the License. - * - * You should have received a copy of the GNU General Public License, - * version 3.0 (GPLv3) along with this library; if not, see - * . - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.jumpmind.symmetric.io.data.reader; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.math.BigDecimal; - -import org.jumpmind.exception.IoException; -import org.jumpmind.symmetric.io.data.Batch.BatchType; -import org.jumpmind.symmetric.io.data.CsvConstants; -import org.jumpmind.symmetric.io.data.DataContext; -import org.jumpmind.symmetric.io.stage.IStagedResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SimpleStagingDataReader { - final static int MAX_WRITE_LENGTH = 32768; - - protected final Logger log = LoggerFactory.getLogger(getClass()); - - protected BatchType batchType; - protected long batchId; - protected String targetNodeId; - protected boolean isRetry; - protected IStagedResource stagedResource; - protected BufferedWriter writer; - protected DataContext context; - protected BigDecimal maxKBytesPerSec; - - public SimpleStagingDataReader(BatchType batchType, long batchId, String targetNodeId, boolean isRetry, - IStagedResource stagedResource, BufferedWriter writer, DataContext context, BigDecimal maxKBytesPerSec) { - this.batchType = batchType; - this.batchId = batchId; - this.targetNodeId = targetNodeId; - this.isRetry = isRetry; - this.stagedResource = stagedResource; - this.writer = writer; - this.context = context; - this.maxKBytesPerSec = maxKBytesPerSec; - } - - public void process() { - BufferedReader reader = stagedResource.getReader(); - try { - // Retry means we've sent this batch before, so let's ask to retry the batch from the target's staging - if (isRetry) { - String line = null; - while ((line = reader.readLine()) != null) { - if (line.startsWith(CsvConstants.BATCH)) { - writer.write(CsvConstants.RETRY + "," + batchId); - writer.newLine(); - writer.write(CsvConstants.COMMIT + "," + batchId); - writer.newLine(); - break; - } else { - writer.write(line); - writer.newLine(); - } - } - } else { - long totalCharsRead = 0, totalBytesRead = 0; - int numCharsRead = 0, numBytesRead = 0; - long startTime = System.currentTimeMillis(), ts = startTime, bts = startTime; - boolean isThrottled = maxKBytesPerSec != null && maxKBytesPerSec.compareTo(BigDecimal.ZERO) > 0; - long totalThrottleTime = 0; - int bufferSize = MAX_WRITE_LENGTH; - - if (isThrottled) { - bufferSize = maxKBytesPerSec.multiply(new BigDecimal(1024)).intValue(); - } - char[] buffer = new char[bufferSize]; - - while ((numCharsRead = reader.read(buffer)) != -1) { - writer.write(buffer, 0, numCharsRead); - totalCharsRead += numCharsRead; - - if (Thread.currentThread().isInterrupted()) { - throw new IoException("This thread was interrupted"); - } - - if (System.currentTimeMillis() - ts > 60000) { - log.info("Batch '{}', for node '{}', for process 'send from stage' has been processing for {} seconds. " + - "The following stats have been gathered: {}", - new Object[] { batchId, targetNodeId, (System.currentTimeMillis() - startTime) / 1000, - "CHARS=" + totalCharsRead }); - ts = System.currentTimeMillis(); - } - - if (isThrottled) { - numBytesRead += new String(buffer, 0, numCharsRead).getBytes().length; - totalBytesRead += numBytesRead; - if (numBytesRead >= bufferSize) { - long expectedMillis = (long) (((numBytesRead / 1024f) / maxKBytesPerSec.floatValue()) * 1000); - long actualMillis = System.currentTimeMillis() - bts; - if (actualMillis < expectedMillis) { - totalThrottleTime += expectedMillis - actualMillis; - Thread.sleep(expectedMillis - actualMillis); - } - numBytesRead = 0; - bts = System.currentTimeMillis(); - } - } - } - if (log.isDebugEnabled() && totalThrottleTime > 0) { - log.debug("Batch '{}' for node '{}' took {}ms for {} bytes and was throttled for {}ms because limit is set to {} KB/s", - batchId, targetNodeId, (System.currentTimeMillis() - startTime), totalBytesRead, totalThrottleTime, - maxKBytesPerSec); - } - } - } catch (Throwable t) { - throw new RuntimeException(t); - } finally { - stagedResource.close(); - } - } - -} diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/NodeConcurrencyInterceptor.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/NodeConcurrencyInterceptor.java index aa2249265f..01b01a8004 100644 --- a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/NodeConcurrencyInterceptor.java +++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/NodeConcurrencyInterceptor.java @@ -31,7 +31,6 @@ import org.jumpmind.symmetric.model.ChannelMap; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.statistic.IStatisticManager; -import org.jumpmind.symmetric.transport.ConcurrentConnectionManager; import org.jumpmind.symmetric.transport.IConcurrentConnectionManager; import org.jumpmind.symmetric.transport.IConcurrentConnectionManager.ReservationType; import org.slf4j.Logger;