Skip to content

Commit

Permalink
0003406: Improvements for Timeouts related to slow databases
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed Feb 6, 2018
1 parent 1fc4bf1 commit d0cede2
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 18 deletions.
Expand Up @@ -233,7 +233,7 @@ public void process() throws IOException {
if (System.currentTimeMillis() - ts > 60000) {
log.info(
"Batch '{}', for node '{}', for process 'transfer to stage' has been processing for {} seconds. The following stats have been gathered: {}",
new Object[] { (batch != null ? batch.getBatchId() : 0), (batch != null ? batch.getTargetNodeId() : ""),
new Object[] { (batch != null ? batch.getBatchId() : "?"), (batch != null ? batch.getTargetNodeId() : "?"),
(System.currentTimeMillis() - startTime) / 1000,
"LINES=" + lineCount + ", BYTES=" + ((resource == null) ? 0 : resource.getSize()) });
ts = System.currentTimeMillis();
Expand Down
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -528,22 +529,7 @@ public List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode, Str
routerService.routeData(true);
}

OutgoingBatches batches = null;
if (queue != null) {
NodeGroupLinkAction defaultAction = configurationService.getNodeGroupLinkFor(nodeService.findIdentity().getNodeGroupId(),
targetNode.getNodeGroupId(), false).getDataEventAction();
ProcessType processType = extractInfo.getKey().getProcessType();
NodeGroupLinkAction action = null;

if (processType.equals(ProcessType.PUSH_JOB_EXTRACT)) {
action = NodeGroupLinkAction.P;
} else if (processType.equals(ProcessType.PULL_HANDLER_EXTRACT)) {
action = NodeGroupLinkAction.W;
}
batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), queue, action, defaultAction, false);
} else {
batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), false);
}
OutgoingBatches batches = loadPendingBatches(extractInfo, targetNode, queue, transport);

if (batches.containsBatches()) {

Expand All @@ -565,6 +551,60 @@ public List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode, Str
return Collections.emptyList();

}

protected OutgoingBatches loadPendingBatches(ProcessInfo extractInfo, Node targetNode, String queue, IOutgoingTransport transport) {

BufferedWriter writer = transport.getWriter();

Callable<OutgoingBatches> getOutgoingBatches = () -> {
OutgoingBatches batches = null;
if (queue != null) {
NodeGroupLinkAction defaultAction = configurationService.getNodeGroupLinkFor(nodeService.findIdentity().getNodeGroupId(),
targetNode.getNodeGroupId(), false).getDataEventAction();
ProcessType processType = extractInfo.getKey().getProcessType();
NodeGroupLinkAction action = null;

if (processType.equals(ProcessType.PUSH_JOB_EXTRACT)) {
action = NodeGroupLinkAction.P;
} else if (processType.equals(ProcessType.PULL_HANDLER_EXTRACT)) {
action = NodeGroupLinkAction.W;
}
// TODO the pull slow down here.
// here we have a InternalOutgoingTransport, with an open Writer to the pull connection (if this is a Pull).
// When pushing, this is an HttpOutgoingTransport, with an unopened writer.
// at this point the transport can a) give the writer without creating it. b) sendKeepalive
batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), queue, action, defaultAction, false);
} else {
batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), false);
}
return batches;
};

if (writer != null) {
final boolean streamToFileEnabled = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED);
long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE);
Node sourceNode = nodeService.findIdentity();
FutureTask<OutgoingBatches> getOutgoingBatchesTask = new FutureTask<OutgoingBatches>(getOutgoingBatches);
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.execute(getOutgoingBatchesTask);

while (true) {
try {
return getOutgoingBatchesTask.get(keepAliveMillis, TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
writeKeepAliveAck(writer, sourceNode, streamToFileEnabled);
} catch (Exception ex) {
throw new SymmetricException("Failed to execute getOutgoingBatchesTask ", ex);
}
}
} else {
try {
return getOutgoingBatches.call();
} catch (Exception ex) {
throw new SymmetricException("Failed to execute getOutgoingBatchesTask ", ex);
}
}
}

/**
* This method will extract an outgoing batch, but will not update the outgoing batch status
Expand Down
Expand Up @@ -30,7 +30,9 @@

public interface IOutgoingTransport {

public BufferedWriter openWriter();
public BufferedWriter openWriter();

public BufferedWriter getWriter();

public OutputStream openStream();

Expand Down
Expand Up @@ -79,6 +79,11 @@ public BufferedWriter openWriter() {
}
return writer;
}

@Override
public BufferedWriter getWriter() {
return writer;
}

@Override
public OutputStream openStream() {
Expand Down
Expand Up @@ -275,6 +275,11 @@ public BufferedWriter openWriter() {
throw new IoException(ex);
}
}

@Override
public BufferedWriter getWriter() {
return writer;
}

/**
* @throws {@link ConnectionRejectedException}
Expand Down
Expand Up @@ -73,6 +73,11 @@ public OutputStream openStream() {
public BufferedWriter openWriter() {
return writer;
}

@Override
public BufferedWriter getWriter() {
return writer;
}

public ChannelMap getSuspendIgnoreChannelLists(IConfigurationService configurationService, String queue, Node targetNode) {
return map;
Expand Down
Expand Up @@ -73,6 +73,11 @@ public boolean isOpen() {
public BufferedWriter openWriter() {
return writer;
}

@Override
public BufferedWriter getWriter() {
return writer;
}

public ChannelMap getSuspendIgnoreChannelLists(IConfigurationService configurationService, String queue, Node targetNode) {
return configurationService.getSuspendIgnoreChannelLists();
Expand Down
12 changes: 12 additions & 0 deletions symmetric-jdbc/src/main/java/org/jumpmind/driver/Driver.java
Expand Up @@ -33,6 +33,16 @@
import org.jumpmind.properties.TypedProperties;
import org.slf4j.MDC;

/**
* Simple configuration (note the jdbc:symds prefix):
*
* db.url=jdbc:symds:h2:file:demo-corp;LOCK_TIMEOUT=60000;AUTO_SERVER=TRUE
*
* In the your engine.properties file, you can configure interceptors:
* org.jumpmind.driver.PreparedStatementWrapper.interceptor=org.jumpmind.driver.StatementDelayInterceptor
* OR
* org.jumpmind.driver.PreparedStatementWrapper.interceptor=org.jumpmind.driver.RandomErrorInterceptor
*/
public class Driver implements java.sql.Driver {

private static final String DRIVER_PREFIX = "jdbc:symds:";
Expand Down Expand Up @@ -68,6 +78,8 @@ public Connection connect(String url, Properties info) throws SQLException {
TypedProperties engineProperties = null;
if (engineName != null) {
engineProperties = allEngineProperties.get(engineName);
} else {
System.out.println("Unknown engine...");
}

ConnectionWrapper connectionWrapper = new ConnectionWrapper(connection);
Expand Down
Expand Up @@ -57,6 +57,11 @@ public BufferedWriter openWriter() {
bWriter = new BufferedWriter(writer);
return bWriter;
}

@Override
public BufferedWriter getWriter() {
return bWriter;
}

public boolean isOpen() {
return true;
Expand Down

0 comments on commit d0cede2

Please sign in to comment.