Skip to content

Commit

Permalink
0003253: ProcessInfo which is used to gather information about processes
Browse files Browse the repository at this point in the history
can be corrupted on push and pull because of threading in 3.8
  • Loading branch information
chenson42 committed Dec 6, 2017
1 parent 9da5a6f commit 9fa4b8a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
Expand Up @@ -656,6 +656,7 @@ protected List<OutgoingBatch> extract(final ProcessInfo extractInfo, final Node
}

if (extractBatch.isExtractSkipped) {
transferInfo.setStatus(ProcessStatus.OK);
break;
}

Expand Down
Expand Up @@ -591,16 +591,23 @@ protected List<IncomingBatch> loadDataFromTransport(final ProcessInfo transferIn
transferInfo.setStatus(ProcessStatus.OK);
ProcessInfo loadInfo = statisticManager.newProcessInfo(new ProcessInfoKey(sourceNode.getNodeId()
, transferInfo.getQueue(), nodeService.findIdentityNodeId(), PULL_JOB_LOAD));
DataProcessor processor = new DataProcessor(new ProtocolDataReader(BatchType.LOAD,
targetNodeId, transport.openReader()), null, listener, "data load") {
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
return buildDataWriter(loadInfo, sourceNode.getNodeId(),
batch.getChannelId(), batch.getBatchId(),
((ManageIncomingBatchListener) listener).getCurrentBatch().isRetry());
}
};
processor.process(ctx);
try {
DataProcessor processor = new DataProcessor(new ProtocolDataReader(BatchType.LOAD,
targetNodeId, transport.openReader()), null, listener, "data load") {
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
return buildDataWriter(loadInfo, sourceNode.getNodeId(),
batch.getChannelId(), batch.getBatchId(),
((ManageIncomingBatchListener) listener).getCurrentBatch().isRetry());
}
};
processor.process(ctx);
loadInfo.setStatus(ProcessStatus.OK);
} catch (Throwable e) {
loadInfo.setStatus(ProcessStatus.ERROR);
throw e;
}

}
} catch (Throwable ex) {
error = ex;
Expand Down

0 comments on commit 9fa4b8a

Please sign in to comment.