Skip to content

Commit

Permalink
0003239: Observed that two load threads are loading the same batch at
Browse files Browse the repository at this point in the history
the same time during a pull
  • Loading branch information
chenson42 committed Sep 19, 2017
1 parent 3fdf40d commit f0e9cb8
Showing 1 changed file with 6 additions and 2 deletions.
Expand Up @@ -188,6 +188,8 @@ protected enum ExtractMode { FOR_SYM_CLIENT, FOR_PAYLOAD_CLIENT, EXTRACT_ONLY };
private INodeCommunicationService nodeCommunicationService;

private IClusterService clusterService;

private CustomizableThreadFactory threadPoolFactory;

private Map<String, Semaphore> locks = new HashMap<String, Semaphore>();

Expand Down Expand Up @@ -506,7 +508,6 @@ public List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode, Str
if (!parameterService.is(ParameterConstants.START_ROUTE_JOB) && parameterService.is(ParameterConstants.ROUTE_ON_EXTRACT)) {
routerService.routeData(true);
}


OutgoingBatches batches = null;
if (queue != null) {
Expand Down Expand Up @@ -586,7 +587,10 @@ protected List<OutgoingBatch> extract(final ProcessInfo processInfo, final Node
long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE);
Node sourceNode = nodeService.findIdentity();
final FutureExtractStatus status = new FutureExtractStatus();
executor = Executors.newFixedThreadPool(1, new CustomizableThreadFactory(String.format("dataextractor-%s-%s", targetNode.getNodeGroupId(), targetNode.getNodeId())));
if (this.threadPoolFactory == null) {
this.threadPoolFactory = new CustomizableThreadFactory(String.format("%s-dataextractor", parameterService.getEngineName().toLowerCase()));
}
executor = Executors.newFixedThreadPool(1, this.threadPoolFactory);
List<Future<FutureOutgoingBatch>> futures = new ArrayList<Future<FutureOutgoingBatch>>();

processInfo.setBatchCount(activeBatches.size());
Expand Down

0 comments on commit f0e9cb8

Please sign in to comment.