Skip to content

Commit

Permalink
0003010: Batches get stranded in ready state in staging on a cluster
Browse files Browse the repository at this point in the history
because the ack that moves them to done occurs on a different server
  • Loading branch information
chenson42 committed Mar 10, 2017
1 parent 79b8111 commit 28dbfdc
Show file tree
Hide file tree
Showing 9 changed files with 11 additions and 25 deletions.
Expand Up @@ -259,9 +259,9 @@ public void finish() {
} catch (IOException e) {
throw new IoException(e);
} finally {
if (stagedResource != null) {
if (stagedResource != null) {
stagedResource.setState(IStagedResource.State.DONE);
stagedResource.close();
stagedResource.setState(IStagedResource.State.READY);
}
}
}
Expand Down
Expand Up @@ -131,15 +131,8 @@ public BatchAckResult ack(final BatchAck batch) {
}
} else if (status == Status.RS) {
log.info("The outgoing batch {} received resend request", outgoingBatch.getNodeBatchId());
} else if (!outgoingBatch.isCommonFlag()) {
IStagedResource stagingResource = stagingManager.find(
Constants.STAGING_CATEGORY_OUTGOING, outgoingBatch.getNodeId(),
outgoingBatch.getBatchId());
if (stagingResource != null) {
stagingResource.setState(State.DONE);
}
}

outgoingBatchService.updateOutgoingBatch(outgoingBatch);
if (status == Status.OK) {
Channel channel = engine.getConfigurationService().getChannel(outgoingBatch.getChannelId());
Expand Down
Expand Up @@ -1017,7 +1017,7 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo
extractedBatch.close();
targetResource.close();
}
targetResource.setState(State.READY);
targetResource.setState(State.DONE);
isRetry = true;
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Expand Up @@ -472,8 +472,6 @@ public List<OutgoingBatch> sendFiles(ProcessInfo processInfo, Node targetNode,
}

IStagingManager stagingManager = engine.getStagingManager();
long memoryThresholdInBytes = parameterService
.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD);
long maxBytesToSync = parameterService
.getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC);

Expand Down Expand Up @@ -752,7 +750,7 @@ public void loadFilesFromPush(String nodeId, InputStream in, OutputStream out) {

protected IStagedResource getStagedResource(OutgoingBatch currentBatch) {
IStagedResource stagedResource = engine.getStagingManager().find(getStagingPathComponents(currentBatch));
if (stagedResource != null && stagedResource.getState() == State.READY) {
if (stagedResource != null && stagedResource.getState() == State.DONE) {
return stagedResource;
} else {
return null;
Expand Down
Expand Up @@ -141,7 +141,7 @@ public void process() throws IOException {
if (writer != null) {
writeLine(line);
resource.close();
resource.setState(State.READY);
resource.setState(State.DONE);
writer = null;
}
batchTableLines.clear();
Expand Down
Expand Up @@ -94,7 +94,7 @@ protected IStagedResource getStagedResource(Batch batch) {
protected void endBatch(Batch batch) {
IStagedResource resource = getStagedResource(batch);
resource.close();
resource.setState(State.READY);
resource.setState(State.DONE);
flushNodeId = true;
processedTables.clear();
table = null;
Expand Down
Expand Up @@ -29,7 +29,7 @@
public interface IStagedResource {

public enum State {
CREATE, READY, DONE;
CREATE, DONE;

public String getExtensionName() {
return name().toLowerCase();
Expand Down
Expand Up @@ -75,9 +75,7 @@ public StagedResource(File directory, String path, StagingManager stagingManager
this.stagingManager = stagingManager;
lastUpdateTime = System.currentTimeMillis();

if (buildFile(State.READY).exists()) {
this.state = State.READY;
} else if (buildFile(State.DONE).exists()){
if (buildFile(State.DONE).exists()){
this.state = State.DONE;
} else {
this.state = State.CREATE;
Expand Down
Expand Up @@ -64,7 +64,7 @@ public Set<String> getResourceReferences() {
protected void refreshResourceList() {
synchronized (StagingManager.class) {
Collection<File> files = FileUtils.listFiles(this.directory,
new String[] { State.CREATE.getExtensionName(), State.READY.getExtensionName(),
new String[] { State.CREATE.getExtensionName(), State.DONE.getExtensionName(),
State.DONE.getExtensionName() }, true);
for (File file : files) {
try {
Expand Down Expand Up @@ -100,10 +100,7 @@ public long clean(long ttlInMs) {
if (resource != null) {
boolean resourceIsOld = (System.currentTimeMillis() - resource
.getLastUpdateTime()) > ttlInMs;
if ((resource.getState() == State.DONE ||
(resource.getState() == State.READY && resource.getPath().contains("/common/")) ||
(resource.getState() == State.READY && ttlInMs == 0))
&& resourceIsOld) {
if (resource.getState() == State.DONE && resourceIsOld) {
if (!resource.isInUse()) {
boolean file = resource.isFileResource();
long size = resource.getSize();
Expand Down

0 comments on commit 28dbfdc

Please sign in to comment.