diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeCommunication.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeCommunication.java index e8098e4759..c4d64b811f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeCommunication.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeCommunication.java @@ -28,7 +28,7 @@ public class NodeCommunication implements Serializable { private static final long serialVersionUID = 1L; public enum CommunicationType { - PULL, PUSH, FILE_PUSH, FILE_PULL, OFFLN_PULL, OFFLN_PUSH, EXTRACT, FILE_EXTRACT; + PULL, PUSH, FILE_PUSH, FILE_PULL, OFFLN_PULL, OFFLN_PUSH, EXTRACT, FILE_XTRCT; public static boolean isPullType(CommunicationType communicationType) { return communicationType == PULL || communicationType == CommunicationType.FILE_PULL || communicationType == OFFLN_PULL; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java index 7284b35533..dcdf03ef3a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java @@ -89,7 +89,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe @Override protected MultiBatchStagingWriter buildMultiBatchStagingWriter(ExtractRequest request, final Node sourceNode, final Node targetNode, List batches, ProcessInfo processInfo, Channel channel) { - MultiBatchStagingWriter multiBatchStatingWriter = new MultiBatchStagingWriter(this, request, sourceNode.getNodeId(), stagingManager, + MultiBatchStagingWriter multiBatchStagingWriter = new MultiBatchStagingWriter(this, request, sourceNode.getNodeId(), stagingManager, batches, channel.getMaxBatchSize(), processInfo) { @Override protected IDataWriter buildWriter(long memoryThresholdInBytes) { @@ -111,16 +111,18 @@ public void close() { return fileSyncWriter; } }; - return multiBatchStatingWriter; + return multiBatchStagingWriter; } @Override protected void queue(String nodeId, String queue, RemoteNodeStatuses statuses) { - final NodeCommunication.CommunicationType TYPE = NodeCommunication.CommunicationType.FILE_EXTRACT; - int availableThreads = nodeCommunicationService.getAvailableThreads(TYPE); - NodeCommunication lock = nodeCommunicationService.find(nodeId, queue, TYPE); - if (availableThreads > 0) { - nodeCommunicationService.execute(lock, statuses, this); + if (parameterService.is(ParameterConstants.FILE_SYNC_ENABLE)) { + final NodeCommunication.CommunicationType TYPE = NodeCommunication.CommunicationType.FILE_XTRCT; + int availableThreads = nodeCommunicationService.getAvailableThreads(TYPE); + NodeCommunication lock = nodeCommunicationService.find(nodeId, queue, TYPE); + if (availableThreads > 0) { + nodeCommunicationService.execute(lock, statuses, this); + } } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java index 38d74a1d8e..db4327ad1f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java @@ -401,7 +401,7 @@ protected ThreadPoolExecutor getExecutor(final CommunicationType communicationTy case FILE_PUSH: threadCountParameter = ParameterConstants.FILE_PUSH_THREAD_COUNT_PER_SERVER; break; - case FILE_EXTRACT: + case FILE_XTRCT: case EXTRACT: threadCountParameter = ParameterConstants.INITIAL_LOAD_EXTRACT_THREAD_COUNT_PER_SERVER; break; @@ -463,7 +463,7 @@ protected Date getLockTimeoutDate(CommunicationType communicationType) { case FILE_PUSH: parameter = ParameterConstants.FILE_PUSH_LOCK_TIMEOUT_MS; break; - case FILE_EXTRACT: + case FILE_XTRCT: case EXTRACT: parameter = ParameterConstants.INITIAL_LOAD_EXTRACT_TIMEOUT_MS; break;