From f0e9cb8bcc671d518f49286f91eaf543c99ec947 Mon Sep 17 00:00:00 2001 From: Chris Henson Date: Tue, 19 Sep 2017 14:51:50 -0400 Subject: [PATCH] 0003239: Observed that two load threads are loading the same batch at the same time during a pull --- .../symmetric/service/impl/DataExtractorService.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index ab80b71f8d..e5ad42f0c9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -188,6 +188,8 @@ protected enum ExtractMode { FOR_SYM_CLIENT, FOR_PAYLOAD_CLIENT, EXTRACT_ONLY }; private INodeCommunicationService nodeCommunicationService; private IClusterService clusterService; + + private CustomizableThreadFactory threadPoolFactory; private Map locks = new HashMap(); @@ -506,7 +508,6 @@ public List extract(ProcessInfo processInfo, Node targetNode, Str if (!parameterService.is(ParameterConstants.START_ROUTE_JOB) && parameterService.is(ParameterConstants.ROUTE_ON_EXTRACT)) { routerService.routeData(true); } - OutgoingBatches batches = null; if (queue != null) { @@ -586,7 +587,10 @@ protected List extract(final ProcessInfo processInfo, final Node long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE); Node sourceNode = nodeService.findIdentity(); final FutureExtractStatus status = new FutureExtractStatus(); - executor = Executors.newFixedThreadPool(1, new CustomizableThreadFactory(String.format("dataextractor-%s-%s", targetNode.getNodeGroupId(), targetNode.getNodeId()))); + if (this.threadPoolFactory == null) { + this.threadPoolFactory = new CustomizableThreadFactory(String.format("%s-dataextractor", parameterService.getEngineName().toLowerCase())); + } + executor = Executors.newFixedThreadPool(1, this.threadPoolFactory); List> futures = new ArrayList>(); processInfo.setBatchCount(activeBatches.size());