Skip to content

Commit

Permalink
Merge branch '3.9' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.9
  • Loading branch information
mmichalek committed Dec 8, 2017
2 parents c42bd3a + 9b1ef94 commit 92eb2f1
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 14 deletions.
Expand Up @@ -158,6 +158,7 @@
import org.jumpmind.util.AppUtils;
import org.jumpmind.util.CustomizableThreadFactory;
import org.jumpmind.util.FormatUtils;
import org.jumpmind.util.FutureImpl;
import org.jumpmind.util.Statistics;

/**
Expand Down Expand Up @@ -598,7 +599,7 @@ protected List<OutgoingBatch> extract(final ProcessInfo extractInfo, final Node
this.threadPoolFactory = new CustomizableThreadFactory(String.format("%s-dataextractor", parameterService.getEngineName().toLowerCase()));
}

executor = Executors.newFixedThreadPool(1, this.threadPoolFactory);
executor = streamToFileEnabled ? Executors.newFixedThreadPool(1, this.threadPoolFactory) : null;

List<Future<FutureOutgoingBatch>> futures = new ArrayList<Future<FutureOutgoingBatch>>();

Expand All @@ -617,10 +618,22 @@ protected List<OutgoingBatch> extract(final ProcessInfo extractInfo, final Node
if (status.shouldExtractSkip) {
break;
}
futures.add(executor.submit(callable));

if (executor != null) {
futures.add(executor.submit(callable));
} else {
try {
FutureOutgoingBatch batch = callable.call();
futures.add(new FutureImpl<>(batch));
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

if (parameterService.is(ParameterConstants.SYNCHRONIZE_ALL_JOBS)) {
if (parameterService.is(ParameterConstants.SYNCHRONIZE_ALL_JOBS) && executor != null) {
executor.shutdown();
boolean isProcessed = false;
while (!isProcessed) {
Expand Down Expand Up @@ -656,10 +669,11 @@ protected List<OutgoingBatch> extract(final ProcessInfo extractInfo, final Node
}

if (extractBatch.isExtractSkipped) {
transferInfo.setStatus(ProcessStatus.OK);
break;
}

if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT) {
if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT || (currentBatch.isExtractJobFlag() && parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB))) {
transferInfo.setStatus(ProcessInfo.ProcessStatus.TRANSFERRING);
transferInfo.setCurrentLoadId(currentBatch.getLoadId());
boolean isRetry = extractBatch.isRetry() && extractBatch.getOutgoingBatch().getStatus() != OutgoingBatch.Status.IG;
Expand Down
Expand Up @@ -591,16 +591,23 @@ protected List<IncomingBatch> loadDataFromTransport(final ProcessInfo transferIn
transferInfo.setStatus(ProcessStatus.OK);
ProcessInfo loadInfo = statisticManager.newProcessInfo(new ProcessInfoKey(sourceNode.getNodeId()
, transferInfo.getQueue(), nodeService.findIdentityNodeId(), PULL_JOB_LOAD));
DataProcessor processor = new DataProcessor(new ProtocolDataReader(BatchType.LOAD,
targetNodeId, transport.openReader()), null, listener, "data load") {
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
return buildDataWriter(loadInfo, sourceNode.getNodeId(),
batch.getChannelId(), batch.getBatchId(),
((ManageIncomingBatchListener) listener).getCurrentBatch().isRetry());
}
};
processor.process(ctx);
try {
DataProcessor processor = new DataProcessor(new ProtocolDataReader(BatchType.LOAD,
targetNodeId, transport.openReader()), null, listener, "data load") {
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
return buildDataWriter(loadInfo, sourceNode.getNodeId(),
batch.getChannelId(), batch.getBatchId(),
((ManageIncomingBatchListener) listener).getCurrentBatch().isRetry());
}
};
processor.process(ctx);
loadInfo.setStatus(ProcessStatus.OK);
} catch (Throwable e) {
loadInfo.setStatus(ProcessStatus.ERROR);
throw e;
}

}
} catch (Throwable ex) {
error = ex;
Expand Down
44 changes: 44 additions & 0 deletions symmetric-util/src/main/java/org/jumpmind/util/FutureImpl.java
@@ -0,0 +1,44 @@
package org.jumpmind.util;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureImpl<V> implements Future<V> {

private V v;

public FutureImpl() {
}

public FutureImpl(V v) {
this.v = v;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return true;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return true;
}

@Override
public V get() throws InterruptedException, ExecutionException {
return v;
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return v;
}

}

0 comments on commit 92eb2f1

Please sign in to comment.