Skip to content

Commit

Permalink
0002833: Batch extraction stuck in loop
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Sep 29, 2016
1 parent 1d367f0 commit 29214a4
Showing 1 changed file with 50 additions and 9 deletions.
Expand Up @@ -579,7 +579,7 @@ public FutureOutgoingBatch call() throws Exception {
*/
log.info("Batch {} is marked as ready but it has been deleted. Rescheduling it for extraction",
extractBatch.getNodeBatchId());
if (changeBatchStatus(Status.RQ, extractBatch, mode)) {
if (mode != ExtractMode.EXTRACT_ONLY) {
resetExtractRequest(extractBatch);
}
status.shouldExtractSkip = outgoingBatch.isExtractSkipped = true;
Expand Down Expand Up @@ -1213,8 +1213,28 @@ public List<ExtractRequest> getExtractRequestsForNode(NodeCommunication nodeComm
}

protected void resetExtractRequest(OutgoingBatch batch) {
sqlTemplate.update(getSql("resetExtractRequestStatus"), ExtractStatus.NE.name(),
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
batch.setStatus(Status.RQ);
outgoingBatchService.updateOutgoingBatch(transaction, batch);

transaction.prepareAndExecute(getSql("resetExtractRequestStatus"), ExtractStatus.NE.name(),
batch.getBatchId(), batch.getBatchId(), batch.getNodeId());
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
close(transaction);
}
}

public void requestExtractRequest(ISqlTransaction transaction, String nodeId, String queue,
Expand Down Expand Up @@ -1289,7 +1309,7 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
resource.delete();
}
extractOutgoingBatch(processInfo, targetNode,
new MultiBatchStagingWriter(identity.getNodeId(), stagingManager,
new MultiBatchStagingWriter(request, identity.getNodeId(), stagingManager,
batches, channel.getMaxBatchSize(), processInfo), firstBatch, false,
false, ExtractMode.FOR_SYM_CLIENT);

Expand Down Expand Up @@ -1341,7 +1361,7 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
request.getEndBatchId() });
}
transaction.commit();

log.info("Done extracting {} batches for request {}", (request.getEndBatchId() - request.getStartBatchId()) + 1, request.getRequestId());
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
Expand Down Expand Up @@ -1387,6 +1407,8 @@ public ExtractRequest mapRow(Row row) {

public class MultiBatchStagingWriter implements IDataWriter {

ExtractRequest request;

long maxBatchSize;

StagingDataWriter currentDataWriter;
Expand All @@ -1410,15 +1432,19 @@ public class MultiBatchStagingWriter implements IDataWriter {
boolean inError = false;

ProcessInfo processInfo;

long startTime, ts, rowCount, byteCount;

public MultiBatchStagingWriter(String sourceNodeId, IStagingManager stagingManager,
public MultiBatchStagingWriter(ExtractRequest request, String sourceNodeId, IStagingManager stagingManager,
List<OutgoingBatch> batches, long maxBatchSize, ProcessInfo processInfo) {
this.request = request;
this.sourceNodeId = sourceNodeId;
this.maxBatchSize = maxBatchSize;
this.stagingManager = stagingManager;
this.batches = new ArrayList<OutgoingBatch>(batches);
this.finishedBatches = new ArrayList<OutgoingBatch>(batches.size());
this.processInfo = processInfo;
this.startTime = this.ts = System.currentTimeMillis();
}

public void open(DataContext context) {
Expand All @@ -1433,9 +1459,14 @@ public void open(DataContext context) {
}

public void close() {
if (this.currentDataWriter != null) {
this.currentDataWriter.close();
}
while (batches.size() > 0) {
startNewBatch();
end(this.table);
end(this.batch, false);
}
if (this.currentDataWriter != null) {
this.currentDataWriter.close();
}
}

public Map<Batch, Statistics> getStatistics() {
Expand Down Expand Up @@ -1476,7 +1507,12 @@ public void write(CsvData data) {
checkSend();
startNewBatch();
}

if (System.currentTimeMillis() - ts > 60000) {
log.info("Request {} has been processing for {} seconds. BATCHES={}, ROWS={}, BYTES={}, RANGE={}-{}, CURRENT={}",
request.getRequestId(), (System.currentTimeMillis() - startTime) / 1000, finishedBatches.size(), rowCount, byteCount,
request.getStartBatchId(), request.getEndBatchId(), batch.getBatchId());
ts = System.currentTimeMillis();
}
}

public void checkSend() {
Expand Down Expand Up @@ -1525,10 +1561,15 @@ public void end(Batch batch, boolean inError) {
protected void nextBatch() {
if (this.outgoingBatch != null) {
this.finishedBatches.add(outgoingBatch);
rowCount += this.outgoingBatch.getDataEventCount();
byteCount += this.outgoingBatch.getByteCount();
}
this.outgoingBatch = this.batches.remove(0);
this.outgoingBatch.setDataEventCount(0);
this.outgoingBatch.setInsertEventCount(0);
if (this.finishedBatches.size() > 0) {
this.outgoingBatch.setExtractCount(this.outgoingBatch.getExtractCount() + 1);
}

/*
* Update the last update time so the batch
Expand Down

0 comments on commit 29214a4

Please sign in to comment.