Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0003165: Common batch issue when nodes are pushed to or pulled from
concurrently
  • Loading branch information
chenson42 committed Jun 22, 2017
1 parent e542859 commit a9650b8
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 18 deletions.
Expand Up @@ -574,7 +574,7 @@ protected List<OutgoingBatch> extract(final ProcessInfo processInfo, final Node
long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE);
Node sourceNode = nodeService.findIdentity();
final FutureExtractStatus status = new FutureExtractStatus();
executor = Executors.newFixedThreadPool(1, new CustomizableThreadFactory(String.format("dataextractor-%s-%s", targetNode.getNodeGroupId(), targetNode.getNodeGroupId())));
executor = Executors.newFixedThreadPool(1, new CustomizableThreadFactory(String.format("dataextractor-%s-%s", targetNode.getNodeGroupId(), targetNode.getNodeId())));
List<Future<FutureOutgoingBatch>> futures = new ArrayList<Future<FutureOutgoingBatch>>();

processInfo.setBatchCount(activeBatches.size());
Expand Down Expand Up @@ -1016,19 +1016,21 @@ protected IStagedResource getStagedResource(OutgoingBatch currentBatch) {

protected boolean isPreviouslyExtracted(OutgoingBatch currentBatch, boolean acquireReference) {
IStagedResource previouslyExtracted = getStagedResource(currentBatch);
if (previouslyExtracted != null && previouslyExtracted.exists()
&& previouslyExtracted.getState() != State.CREATE) {
if (log.isDebugEnabled()) {
log.debug("We have already extracted batch {}. Using the existing extraction: {}",
currentBatch.getBatchId(), previouslyExtracted);
}
if (acquireReference) {
previouslyExtracted.reference();
if (previouslyExtracted != null && previouslyExtracted.exists() && previouslyExtracted.getState() != State.CREATE) {
synchronized (DataExtractorService.this) {
if (previouslyExtracted.exists()) {
if (log.isDebugEnabled()) {
log.debug("We have already extracted batch {}. Using the existing extraction: {}", currentBatch.getBatchId(),
previouslyExtracted);
}
if (acquireReference) {
previouslyExtracted.reference();
}
return true;
}
}
return true;
} else {
return false;
}
return false;
}

protected boolean isRetry(OutgoingBatch currentBatch, Node remoteNode) {
Expand Down Expand Up @@ -1199,7 +1201,11 @@ protected void transferFromStaging(ExtractMode mode, BatchType batchType, Outgoi
stagedResource.close();
stagedResource.dereference();
if (!stagedResource.isFileResource() && !stagedResource.isInUse()) {
stagedResource.delete();
synchronized(DataExtractorService.this) {
if (!stagedResource.isFileResource() && !stagedResource.isInUse()) {
stagedResource.delete();
}
}
}
}
}
Expand Down
Expand Up @@ -100,11 +100,13 @@ protected static String toPath(File directory, File file) {
@Override
public void reference() {
references++;
log.debug("Increased reference to {} for {} by {}", references, path, Thread.currentThread().getName());
}

@Override
public void dereference() {
references--;
log.debug("Decreased reference to {} for {} by {}", references, path, Thread.currentThread().getName());
}

public boolean isInUse() {
Expand Down Expand Up @@ -329,10 +331,6 @@ public void refreshLastUpdateTime() {

public boolean delete() {
close();
return deleteInternal();
}

private boolean deleteInternal() {
boolean deleted = false;
if (file != null && file.exists()) {
FileUtils.deleteQuietly(file);
Expand All @@ -347,7 +345,10 @@ private boolean deleteInternal() {
if (deleted) {
stagingManager.resourcePaths.remove(path);
stagingManager.inUse.remove(path);
}
if (log.isDebugEnabled() && path.contains("outgoing")) {
log.debug("Deleted staging resource {}", path);
}
}
return deleted;
}

Expand Down

0 comments on commit a9650b8

Please sign in to comment.