Skip to content

Commit

Permalink
002910: Node Communication Type FILE_EXTRACT is too long
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed Nov 21, 2016
1 parent 474d0b5 commit f858f0f
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 10 deletions.
Expand Up @@ -28,7 +28,7 @@ public class NodeCommunication implements Serializable {
private static final long serialVersionUID = 1L;

public enum CommunicationType {
PULL, PUSH, FILE_PUSH, FILE_PULL, OFFLN_PULL, OFFLN_PUSH, EXTRACT, FILE_EXTRACT;
PULL, PUSH, FILE_PUSH, FILE_PULL, OFFLN_PULL, OFFLN_PUSH, EXTRACT, FILE_XTRCT;

public static boolean isPullType(CommunicationType communicationType) {
return communicationType == PULL || communicationType == CommunicationType.FILE_PULL || communicationType == OFFLN_PULL;
Expand Down
Expand Up @@ -89,7 +89,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe
@Override
protected MultiBatchStagingWriter buildMultiBatchStagingWriter(ExtractRequest request, final Node sourceNode, final Node targetNode,
List<OutgoingBatch> batches, ProcessInfo processInfo, Channel channel) {
MultiBatchStagingWriter multiBatchStatingWriter = new MultiBatchStagingWriter(this, request, sourceNode.getNodeId(), stagingManager,
MultiBatchStagingWriter multiBatchStagingWriter = new MultiBatchStagingWriter(this, request, sourceNode.getNodeId(), stagingManager,
batches, channel.getMaxBatchSize(), processInfo) {
@Override
protected IDataWriter buildWriter(long memoryThresholdInBytes) {
Expand All @@ -111,16 +111,18 @@ public void close() {
return fileSyncWriter;
}
};
return multiBatchStatingWriter;
return multiBatchStagingWriter;
}

@Override
protected void queue(String nodeId, String queue, RemoteNodeStatuses statuses) {
final NodeCommunication.CommunicationType TYPE = NodeCommunication.CommunicationType.FILE_EXTRACT;
int availableThreads = nodeCommunicationService.getAvailableThreads(TYPE);
NodeCommunication lock = nodeCommunicationService.find(nodeId, queue, TYPE);
if (availableThreads > 0) {
nodeCommunicationService.execute(lock, statuses, this);
if (parameterService.is(ParameterConstants.FILE_SYNC_ENABLE)) {
final NodeCommunication.CommunicationType TYPE = NodeCommunication.CommunicationType.FILE_XTRCT;
int availableThreads = nodeCommunicationService.getAvailableThreads(TYPE);
NodeCommunication lock = nodeCommunicationService.find(nodeId, queue, TYPE);
if (availableThreads > 0) {
nodeCommunicationService.execute(lock, statuses, this);
}
}
}

Expand Down
Expand Up @@ -401,7 +401,7 @@ protected ThreadPoolExecutor getExecutor(final CommunicationType communicationTy
case FILE_PUSH:
threadCountParameter = ParameterConstants.FILE_PUSH_THREAD_COUNT_PER_SERVER;
break;
case FILE_EXTRACT:
case FILE_XTRCT:
case EXTRACT:
threadCountParameter = ParameterConstants.INITIAL_LOAD_EXTRACT_THREAD_COUNT_PER_SERVER;
break;
Expand Down Expand Up @@ -463,7 +463,7 @@ protected Date getLockTimeoutDate(CommunicationType communicationType) {
case FILE_PUSH:
parameter = ParameterConstants.FILE_PUSH_LOCK_TIMEOUT_MS;
break;
case FILE_EXTRACT:
case FILE_XTRCT:
case EXTRACT:
parameter = ParameterConstants.INITIAL_LOAD_EXTRACT_TIMEOUT_MS;
break;
Expand Down

0 comments on commit f858f0f

Please sign in to comment.