Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0003239: Observed that two load threads are loading the same batch at
the same time during a pull
  • Loading branch information
chenson42 committed Sep 19, 2017
1 parent a81d340 commit 043eb8f
Showing 1 changed file with 6 additions and 1 deletion.
Expand Up @@ -187,6 +187,8 @@ protected enum ExtractMode { FOR_SYM_CLIENT, FOR_PAYLOAD_CLIENT, EXTRACT_ONLY };
private IClusterService clusterService;

private Map<String, Semaphore> locks = new HashMap<String, Semaphore>();

private CustomizableThreadFactory threadPoolFactory;

public DataExtractorService(ISymmetricEngine engine) {
super(engine.getParameterService(), engine.getSymmetricDialect());
Expand Down Expand Up @@ -583,7 +585,10 @@ protected List<OutgoingBatch> extract(final ProcessInfo processInfo, final Node
long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE);
Node sourceNode = nodeService.findIdentity();
final FutureExtractStatus status = new FutureExtractStatus();
executor = Executors.newFixedThreadPool(1, new CustomizableThreadFactory(String.format("dataextractor-%s-%s", targetNode.getNodeGroupId(), targetNode.getNodeId())));
if (this.threadPoolFactory == null) {
this.threadPoolFactory = new CustomizableThreadFactory(String.format("%s-dataextractor", parameterService.getEngineName().toLowerCase()));
}
executor = Executors.newFixedThreadPool(1, threadPoolFactory);
List<Future<FutureOutgoingBatch>> futures = new ArrayList<Future<FutureOutgoingBatch>>();

processInfo.setBatchCount(activeBatches.size());
Expand Down

0 comments on commit 043eb8f

Please sign in to comment.