Skip to content

Commit

Permalink
0002577: Separate thread for transfer and loading or extract
Browse files Browse the repository at this point in the history
fix problem where max bytes reached and it skips extracting
  • Loading branch information
erilong committed Jul 26, 2016
1 parent 190d0d5 commit 5dd7e81
Showing 1 changed file with 5 additions and 2 deletions.
Expand Up @@ -595,9 +595,11 @@ public FutureOutgoingBatch call() throws Exception {
status.shouldExtractSkip = true;
}
} catch (Exception e) {
status.shouldExtractSkip = true;
status.shouldExtractSkip = outgoingBatch.isExtractSkipped = true;
throw e;
}
} else {
outgoingBatch.isExtractSkipped = true;
}
return outgoingBatch;
}
Expand All @@ -615,7 +617,7 @@ public FutureOutgoingBatch call() throws Exception {
FutureOutgoingBatch extractBatch = future.get(keepAliveMillis, TimeUnit.MILLISECONDS);
currentBatch = extractBatch.getOutgoingBatch();

if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT) {
if (!extractBatch.isExtractSkipped && (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT)) {
processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
currentBatch = sendOutgoingBatch(processInfo, targetNode, currentBatch, extractBatch.isRetry(),
dataWriter, writer, mode);
Expand Down Expand Up @@ -1935,6 +1937,7 @@ class FutureExtractStatus {
class FutureOutgoingBatch {
OutgoingBatch outgoingBatch;
boolean isRetry;
boolean isExtractSkipped;

public FutureOutgoingBatch(OutgoingBatch outgoingBatch, boolean isRetry) {
this.outgoingBatch = outgoingBatch;
Expand Down

0 comments on commit 5dd7e81

Please sign in to comment.