Skip to content

Commit

Permalink
0002603: Retry batches from staging instead of sending them again
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed May 31, 2016
1 parent 5dca565 commit b96aa9d
Showing 1 changed file with 31 additions and 12 deletions.
Expand Up @@ -530,7 +530,7 @@ protected List<OutgoingBatch> extract(final ProcessInfo processInfo, final Node
Node sourceNode = nodeService.findIdentity();
final FutureExtractStatus status = new FutureExtractStatus();
ExecutorService executor = Executors.newFixedThreadPool(1, new DataExtractorThreadFactory());
List<Future<OutgoingBatch>> futures = new ArrayList<Future<OutgoingBatch>>();
List<Future<FutureOutgoingBatch>> futures = new ArrayList<Future<FutureOutgoingBatch>>();

for (int i = 0; i < activeBatches.size(); i++) {
currentBatch = activeBatches.get(i);
Expand Down Expand Up @@ -570,13 +570,13 @@ protected List<OutgoingBatch> extract(final ProcessInfo processInfo, final Node
} else {
processInfo.setStatus(ProcessInfo.Status.EXTRACTING);
final OutgoingBatch extractBatch = currentBatch;
Callable<OutgoingBatch> callable = new Callable<OutgoingBatch>() {
public OutgoingBatch call() throws Exception {
OutgoingBatch outgoingBatch = extractBatch;
Callable<FutureOutgoingBatch> callable = new Callable<FutureOutgoingBatch>() {
public FutureOutgoingBatch call() throws Exception {
FutureOutgoingBatch outgoingBatch = new FutureOutgoingBatch(extractBatch, false);
if (!status.shouldExtractSkip) {
try {
outgoingBatch = extractOutgoingBatch(processInfo, targetNode, dataWriter, outgoingBatch,
streamToFileEnabled, true, mode);
outgoingBatch = new FutureOutgoingBatch(extractOutgoingBatch(processInfo, targetNode,
dataWriter, extractBatch, streamToFileEnabled, true, mode), isRetry(extractBatch, targetNode));
status.batchExtractCount++;
status.byteExtractCount += extractBatch.getByteCount();

Expand All @@ -599,16 +599,18 @@ public OutgoingBatch call() throws Exception {
}

Iterator<OutgoingBatch> activeBatchIter = activeBatches.iterator();
for (Future<OutgoingBatch> future : futures) {
for (Future<FutureOutgoingBatch> future : futures) {
currentBatch = activeBatchIter.next();
boolean isSent = false;
while (!isSent) {
try {
currentBatch = future.get(keepAliveMillis, TimeUnit.MILLISECONDS);
FutureOutgoingBatch extractBatch = future.get(keepAliveMillis, TimeUnit.MILLISECONDS);
currentBatch = extractBatch.getOutgoingBatch();

if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT) {
processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
currentBatch = sendOutgoingBatch(processInfo, targetNode, currentBatch, dataWriter, writer, mode);
currentBatch = sendOutgoingBatch(processInfo, targetNode, currentBatch, extractBatch.isRetry(),
dataWriter, writer, mode);
}

processedBatches.add(currentBatch);
Expand Down Expand Up @@ -894,7 +896,7 @@ protected boolean isRetry(OutgoingBatch currentBatch, Node remoteNode) {
}

protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNode,
OutgoingBatch currentBatch, IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) {
OutgoingBatch currentBatch, boolean isRetry, IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) {
if (currentBatch.getStatus() != Status.OK || ExtractMode.EXTRACT_ONLY == mode) {
currentBatch.setSentCount(currentBatch.getSentCount() + 1);
if (currentBatch.getStatus() != Status.RS) {
Expand All @@ -909,8 +911,7 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo
Channel channel = configurationService.getChannel(currentBatch.getChannelId());
DataContext ctx = new DataContext();
SimpleStagingDataReader dataReader = new SimpleStagingDataReader(BatchType.EXTRACT, currentBatch.getBatchId(),
currentBatch.getNodeId(), isRetry(currentBatch, targetNode), extractedBatch, writer, ctx,
channel.getMaxKBytesPerSecond());
currentBatch.getNodeId(), isRetry, extractedBatch, writer, ctx, channel.getMaxKBytesPerSecond());
dataReader.process();
} else {
IDataReader dataReader = new ProtocolDataReader(BatchType.EXTRACT,
Expand Down Expand Up @@ -1901,4 +1902,22 @@ class FutureExtractStatus {
int byteExtractCount;
}

class FutureOutgoingBatch {
OutgoingBatch outgoingBatch;
boolean isRetry;

public FutureOutgoingBatch(OutgoingBatch outgoingBatch, boolean isRetry) {
this.outgoingBatch = outgoingBatch;
this.isRetry = isRetry;
}

public OutgoingBatch getOutgoingBatch() {
return outgoingBatch;
}

public boolean isRetry() {
return isRetry;
}
}

}

0 comments on commit b96aa9d

Please sign in to comment.