From d0cede258f01f66b579c498a13cd48c16a891c14 Mon Sep 17 00:00:00 2001 From: mmichalek Date: Tue, 6 Feb 2018 16:20:56 -0500 Subject: [PATCH] 0003406: Improvements for Timeouts related to slow databases --- .../io/stage/SimpleStagingDataWriter.java | 2 +- .../service/impl/DataExtractorService.java | 72 ++++++++++++++----- .../transport/IOutgoingTransport.java | 4 +- .../transport/file/FileOutgoingTransport.java | 5 ++ .../transport/http/HttpOutgoingTransport.java | 5 ++ .../internal/InternalOutgoingTransport.java | 5 ++ ...InternalOutgoingWithResponseTransport.java | 5 ++ .../main/java/org/jumpmind/driver/Driver.java | 12 ++++ .../transport/mock/MockOutgoingTransport.java | 5 ++ 9 files changed, 97 insertions(+), 18 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/SimpleStagingDataWriter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/SimpleStagingDataWriter.java index 9669f4ce31..bda279cdb8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/SimpleStagingDataWriter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/SimpleStagingDataWriter.java @@ -233,7 +233,7 @@ public void process() throws IOException { if (System.currentTimeMillis() - ts > 60000) { log.info( "Batch '{}', for node '{}', for process 'transfer to stage' has been processing for {} seconds. The following stats have been gathered: {}", - new Object[] { (batch != null ? batch.getBatchId() : 0), (batch != null ? batch.getTargetNodeId() : ""), + new Object[] { (batch != null ? batch.getBatchId() : "?"), (batch != null ? batch.getTargetNodeId() : "?"), (System.currentTimeMillis() - startTime) / 1000, "LINES=" + lineCount + ", BYTES=" + ((resource == null) ? 0 : resource.getSize()) }); ts = System.currentTimeMillis(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index e4bf6c69d8..af2a0028df 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -49,6 +49,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -528,22 +529,7 @@ public List extract(ProcessInfo extractInfo, Node targetNode, Str routerService.routeData(true); } - OutgoingBatches batches = null; - if (queue != null) { - NodeGroupLinkAction defaultAction = configurationService.getNodeGroupLinkFor(nodeService.findIdentity().getNodeGroupId(), - targetNode.getNodeGroupId(), false).getDataEventAction(); - ProcessType processType = extractInfo.getKey().getProcessType(); - NodeGroupLinkAction action = null; - - if (processType.equals(ProcessType.PUSH_JOB_EXTRACT)) { - action = NodeGroupLinkAction.P; - } else if (processType.equals(ProcessType.PULL_HANDLER_EXTRACT)) { - action = NodeGroupLinkAction.W; - } - batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), queue, action, defaultAction, false); - } else { - batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), false); - } + OutgoingBatches batches = loadPendingBatches(extractInfo, targetNode, queue, transport); if (batches.containsBatches()) { @@ -565,6 +551,60 @@ public List extract(ProcessInfo extractInfo, Node targetNode, Str return Collections.emptyList(); } + + protected OutgoingBatches loadPendingBatches(ProcessInfo extractInfo, Node targetNode, String queue, IOutgoingTransport transport) { + + BufferedWriter writer = transport.getWriter(); + + Callable getOutgoingBatches = () -> { + OutgoingBatches batches = null; + if (queue != null) { + NodeGroupLinkAction defaultAction = configurationService.getNodeGroupLinkFor(nodeService.findIdentity().getNodeGroupId(), + targetNode.getNodeGroupId(), false).getDataEventAction(); + ProcessType processType = extractInfo.getKey().getProcessType(); + NodeGroupLinkAction action = null; + + if (processType.equals(ProcessType.PUSH_JOB_EXTRACT)) { + action = NodeGroupLinkAction.P; + } else if (processType.equals(ProcessType.PULL_HANDLER_EXTRACT)) { + action = NodeGroupLinkAction.W; + } + // TODO the pull slow down here. + // here we have a InternalOutgoingTransport, with an open Writer to the pull connection (if this is a Pull). + // When pushing, this is an HttpOutgoingTransport, with an unopened writer. + // at this point the transport can a) give the writer without creating it. b) sendKeepalive + batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), queue, action, defaultAction, false); + } else { + batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), false); + } + return batches; + }; + + if (writer != null) { + final boolean streamToFileEnabled = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED); + long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE); + Node sourceNode = nodeService.findIdentity(); + FutureTask getOutgoingBatchesTask = new FutureTask(getOutgoingBatches); + ExecutorService executor = Executors.newFixedThreadPool(1); + executor.execute(getOutgoingBatchesTask); + + while (true) { + try { + return getOutgoingBatchesTask.get(keepAliveMillis, TimeUnit.MILLISECONDS); + } catch (TimeoutException ex) { + writeKeepAliveAck(writer, sourceNode, streamToFileEnabled); + } catch (Exception ex) { + throw new SymmetricException("Failed to execute getOutgoingBatchesTask ", ex); + } + } + } else { + try { + return getOutgoingBatches.call(); + } catch (Exception ex) { + throw new SymmetricException("Failed to execute getOutgoingBatchesTask ", ex); + } + } + } /** * This method will extract an outgoing batch, but will not update the outgoing batch status diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/IOutgoingTransport.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/IOutgoingTransport.java index 3243126d6e..405837952d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/IOutgoingTransport.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/IOutgoingTransport.java @@ -30,7 +30,9 @@ public interface IOutgoingTransport { - public BufferedWriter openWriter(); + public BufferedWriter openWriter(); + + public BufferedWriter getWriter(); public OutputStream openStream(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/file/FileOutgoingTransport.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/file/FileOutgoingTransport.java index 067c002ebf..eacc40eff1 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/file/FileOutgoingTransport.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/file/FileOutgoingTransport.java @@ -79,6 +79,11 @@ public BufferedWriter openWriter() { } return writer; } + + @Override + public BufferedWriter getWriter() { + return writer; + } @Override public OutputStream openStream() { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java index aa0c4f9d41..e1b117c165 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java @@ -275,6 +275,11 @@ public BufferedWriter openWriter() { throw new IoException(ex); } } + + @Override + public BufferedWriter getWriter() { + return writer; + } /** * @throws {@link ConnectionRejectedException} diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalOutgoingTransport.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalOutgoingTransport.java index 148ff3b4b0..964c59a046 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalOutgoingTransport.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalOutgoingTransport.java @@ -73,6 +73,11 @@ public OutputStream openStream() { public BufferedWriter openWriter() { return writer; } + + @Override + public BufferedWriter getWriter() { + return writer; + } public ChannelMap getSuspendIgnoreChannelLists(IConfigurationService configurationService, String queue, Node targetNode) { return map; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalOutgoingWithResponseTransport.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalOutgoingWithResponseTransport.java index ed1d27ee2e..9def00e6fd 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalOutgoingWithResponseTransport.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalOutgoingWithResponseTransport.java @@ -73,6 +73,11 @@ public boolean isOpen() { public BufferedWriter openWriter() { return writer; } + + @Override + public BufferedWriter getWriter() { + return writer; + } public ChannelMap getSuspendIgnoreChannelLists(IConfigurationService configurationService, String queue, Node targetNode) { return configurationService.getSuspendIgnoreChannelLists(); diff --git a/symmetric-jdbc/src/main/java/org/jumpmind/driver/Driver.java b/symmetric-jdbc/src/main/java/org/jumpmind/driver/Driver.java index 93c32deb72..51c6683649 100644 --- a/symmetric-jdbc/src/main/java/org/jumpmind/driver/Driver.java +++ b/symmetric-jdbc/src/main/java/org/jumpmind/driver/Driver.java @@ -33,6 +33,16 @@ import org.jumpmind.properties.TypedProperties; import org.slf4j.MDC; +/** + * Simple configuration (note the jdbc:symds prefix): + * + * db.url=jdbc:symds:h2:file:demo-corp;LOCK_TIMEOUT=60000;AUTO_SERVER=TRUE + * + * In the your engine.properties file, you can configure interceptors: + * org.jumpmind.driver.PreparedStatementWrapper.interceptor=org.jumpmind.driver.StatementDelayInterceptor + * OR + * org.jumpmind.driver.PreparedStatementWrapper.interceptor=org.jumpmind.driver.RandomErrorInterceptor + */ public class Driver implements java.sql.Driver { private static final String DRIVER_PREFIX = "jdbc:symds:"; @@ -68,6 +78,8 @@ public Connection connect(String url, Properties info) throws SQLException { TypedProperties engineProperties = null; if (engineName != null) { engineProperties = allEngineProperties.get(engineName); + } else { + System.out.println("Unknown engine..."); } ConnectionWrapper connectionWrapper = new ConnectionWrapper(connection); diff --git a/symmetric-server/src/test/java/org/jumpmind/symmetric/transport/mock/MockOutgoingTransport.java b/symmetric-server/src/test/java/org/jumpmind/symmetric/transport/mock/MockOutgoingTransport.java index f64602a61e..8a55748176 100644 --- a/symmetric-server/src/test/java/org/jumpmind/symmetric/transport/mock/MockOutgoingTransport.java +++ b/symmetric-server/src/test/java/org/jumpmind/symmetric/transport/mock/MockOutgoingTransport.java @@ -57,6 +57,11 @@ public BufferedWriter openWriter() { bWriter = new BufferedWriter(writer); return bWriter; } + + @Override + public BufferedWriter getWriter() { + return bWriter; + } public boolean isOpen() { return true;