Skip to content

Commit

Permalink
0003785: Improve visibility of full and partial loads
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Feb 5, 2019
1 parent c42f06c commit f1da057
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 140 deletions.
Expand Up @@ -79,6 +79,8 @@ public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, N

public void updateOutgoingBatch(OutgoingBatch batch);

public void updateOutgoingBatchStatus(ISqlTransaction transaction, Status status, String nodeId, long startBatchId, long endBatchId);

public void updateCommonBatchExtractStatistics(OutgoingBatch batch);

public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgoingBatch);
Expand Down
Expand Up @@ -74,7 +74,7 @@ public BatchAckResult ack(final BatchAck batch) {
.findOutgoingBatch(batch.getBatchId(), batch.getNodeId());
Status status = batch.isResend() ? Status.RS : batch.isOk() ? Status.OK : Status.ER;
Status oldStatus = null;
if (outgoingBatch != null) {
if (outgoingBatch != null && outgoingBatch.getStatus() != Status.RQ) {
// Allow an outside system/user to indicate that a batch
// is OK.
if (outgoingBatch.getStatus() != Status.OK &&
Expand Down Expand Up @@ -182,7 +182,7 @@ public BatchAckResult ack(final BatchAck batch) {
}
engine.getStatisticManager().removeRouterStatsByBatch(batch.getBatchId());
}
} else {
} else if (outgoingBatch == null) {
log.error("Could not find batch {}-{} to acknowledge as {}", new Object[] {batch.getNodeId(), batch.getBatchId(),
status.name()});
result.setOk(false);
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -34,10 +34,14 @@ public DataExtractorServiceSqlMap(IDatabasePlatform platform,
putSql("selectNodeIdsForExtractSql", "select node_id, queue from $(extract_request) where status=? and parent_request_id=0 group by node_id, queue");

putSql("selectExtractRequestForNodeSql", "select * from $(extract_request) where node_id=? and queue=? and status=? and parent_request_id=0 order by request_id");


putSql("selectExtractRequestForBatchSql", "select * from $(extract_request) where start_batch_id <= ? and end_batch_id >= ? and node_id = ? and load_id = ?");

putSql("selectExtractChildRequestForNodeSql", "select c.* from $(extract_request) c " +
"inner join $(extract_request) p on p.request_id = c.parent_request_id where p.node_id=? and p.queue=? and p.status=? and p.parent_request_id=0");

putSql("selectExtractChildRequestsByParentSql", "select * from $(extract_request) where parent_request_id = ?");

putSql("countExtractChildRequestMissed",
"select count(*) from $(extract_request) where status = ? and parent_request_id > 0 "
+ "and parent_request_id in (select request_id from sym_extract_request where parent_request_id = 0 and status = ?)");
Expand All @@ -57,10 +61,11 @@ public DataExtractorServiceSqlMap(IDatabasePlatform platform,

putSql("updateExtractRequestTransferred", "update $(extract_request) set last_transferred_batch_id=?, transferred_rows = transferred_rows + ?, transferred_millis = ?"
+ " where start_batch_id <= ? and end_batch_id >= ? and node_id=? and load_id=? and (last_transferred_batch_id is null or last_transferred_batch_id < ?)");

putSql("resetExtractRequestStatus", "update $(extract_request) set status=?, parent_request_id=0, last_update_time= current_timestamp"
+ " where start_batch_id <= ? and end_batch_id >= ? and node_id=?");


putSql("restartExtractRequest", "update $(extract_request) set last_transferred_batch_id = null, transferred_rows = 0, transferred_millis = 0, "
+ "last_loaded_batch_id = null, loaded_rows = 0, loaded_millis = 0, parent_request_id = 0, status = ? "
+ "where request_id = ? and node_id = ?");

putSql("cancelExtractRequests", "update $(extract_request) set status=?, last_update_time=current_timestamp where load_id = ?");

putSql("selectIncompleteTablesForExtractByLoadId", "select * from $(extract_request) where load_id = ? and loaded_time is null order by request_id desc");
Expand Down
Expand Up @@ -671,6 +671,7 @@ protected IDataWriter chooseDataWriter(Batch batch) {
}

protected void logOrRethrow(Throwable ex) throws IOException {
// Throwing exception will mean acks are not sent, so only certain exceptions should be thrown
if (ex instanceof RegistrationRequiredException) {
throw (RegistrationRequiredException) ex;
} else if (ex instanceof ConnectException) {
Expand All @@ -689,8 +690,6 @@ protected void logOrRethrow(Throwable ex) throws IOException {
throw (SyncDisabledException) ex;
} else if (ex instanceof HttpException) {
throw (HttpException) ex;
} else if (ex instanceof IOException) {
throw (IOException) ex;
} else if (ex instanceof InvalidRetryException) {
throw (InvalidRetryException) ex;
} else if (ex instanceof StagingLowFreeSpace) {
Expand Down
Expand Up @@ -114,9 +114,9 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe

@Override
protected MultiBatchStagingWriter buildMultiBatchStagingWriter(ExtractRequest request, List<ExtractRequest> childRequests, final Node sourceNode, final Node targetNode,
List<OutgoingBatch> batches, ProcessInfo processInfo, Channel channel) {
List<OutgoingBatch> batches, ProcessInfo processInfo, Channel channel, boolean isRestarted) {
MultiBatchStagingWriter multiBatchStagingWriter = new MultiBatchStagingWriter(this, request, childRequests, sourceNode.getNodeId(), stagingManager,
batches, channel.getMaxBatchSize(), processInfo) {
batches, channel.getMaxBatchSize(), processInfo, isRestarted) {
@Override
protected IDataWriter buildWriter() {
IStagedResource stagedResource = stagingManager.create(
Expand Down
Expand Up @@ -87,17 +87,17 @@ public class MultiBatchStagingWriter implements IDataWriter {
protected ProcessInfo processInfo;

protected long startTime, ts, rowCount, byteCount;

protected boolean cancelled = false;

protected List<ExtractRequest> childRequests;

protected Map<Long, OutgoingBatch> childBatches;

protected long memoryThresholdInBytes;

protected boolean isRestarted;

public MultiBatchStagingWriter(DataExtractorService dataExtractorService, ExtractRequest request, List<ExtractRequest> childRequests, String sourceNodeId,
IStagingManager stagingManager, List<OutgoingBatch> batches, long maxBatchSize, ProcessInfo processInfo) {
IStagingManager stagingManager, List<OutgoingBatch> batches, long maxBatchSize, ProcessInfo processInfo, boolean isRestarted) {
this.dataExtractorService = dataExtractorService;
this.request = request;
this.sourceNodeId = sourceNodeId;
Expand All @@ -110,14 +110,15 @@ public MultiBatchStagingWriter(DataExtractorService dataExtractorService, Extrac
this.childRequests = childRequests;
this.memoryThresholdInBytes = this.dataExtractorService.parameterService.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD);
this.childBatches = new HashMap<Long, OutgoingBatch>();
this.isRestarted = isRestarted;
}

@Override
public void open(DataContext context) {
this.context = context;
this.nextBatch();
this.currentDataWriter = buildWriter();
this.currentDataWriter.open(context);
this.currentDataWriter.open(context);
}

protected IDataWriter buildWriter() {
Expand All @@ -127,18 +128,14 @@ protected IDataWriter buildWriter() {

@Override
public void close() {
while (!cancelled && batches.size() > 0 && table != null) {
while (!inError && batches.size() > 0 && table != null) {
startNewBatch();
end(this.table);
end(this.batch, false);
}
if (table == null && batch != null) {
log.debug("Batch {} is empty", new Object[] { batch.getNodeBatchId() });

this.currentDataWriter.end(batch, false);
Statistics stats = this.closeCurrentDataWriter();
Statistics stats = closeCurrentDataWriter();
checkSend(stats);
}
}
closeCurrentDataWriter();
}

Expand All @@ -150,7 +147,14 @@ private Statistics closeCurrentDataWriter() {
this.outgoingBatch.setExtractMillis(System.currentTimeMillis() - batch.getStartTime().getTime());
this.currentDataWriter.close();
this.currentDataWriter = null;
checkSend(stats);
if (inError) {
IStagedResource resource = this.dataExtractorService.getStagedResource(outgoingBatch);
if (resource != null) {
resource.delete();
}
} else {
checkSend(stats);
}
}
return stats;
}
Expand Down Expand Up @@ -196,34 +200,33 @@ public void write(CsvData data) {
startNewBatch();
}
if (System.currentTimeMillis() - ts > 60000) {
long currentRowCount = rowCount + this.currentDataWriter.getStatistics().get(batch).get(DataWriterStatisticConstants.ROWCOUNT);
long currentByteCount = byteCount + this.currentDataWriter.getStatistics().get(batch).get(DataWriterStatisticConstants.BYTECOUNT);
this.dataExtractorService.log.info(
"Extracting table {} request {} for {} seconds, {} batches, {} rows, and {} bytes. Current batch {} of batches {} through {}.",
request.getTableName(), request.getRequestId(), (System.currentTimeMillis() - startTime) / 1000, finishedBatches.size(), rowCount, byteCount,
batch.getBatchId(), request.getStartBatchId(), request.getEndBatchId());
"Extract request {} for table {} extracting for {} seconds, {} batches, {} rows, and {} bytes. Current batch is {} in range {}-{}.",
request.getRequestId(), request.getTableName(), (System.currentTimeMillis() - startTime) / 1000, finishedBatches.size() + 1,
currentRowCount, currentByteCount, batch.getBatchId(), request.getStartBatchId(), request.getEndBatchId());
ts = System.currentTimeMillis();
}
}

public void checkSend(Statistics stats) {
if (this.outgoingBatch.getStatus() != Status.OK) {
IStagedResource resource = this.dataExtractorService.getStagedResource(outgoingBatch);
if (resource != null) {
resource.setState(State.DONE);
}
OutgoingBatch batchFromDatabase = this.dataExtractorService.outgoingBatchService.findOutgoingBatch(outgoingBatch.getBatchId(),
outgoingBatch.getNodeId());
IStagedResource resource = this.dataExtractorService.getStagedResource(outgoingBatch);
if (resource != null) {
resource.setState(State.DONE);
}
OutgoingBatch batchFromDatabase = this.dataExtractorService.outgoingBatchService.findOutgoingBatch(outgoingBatch.getBatchId(),
outgoingBatch.getNodeId());

if (batchFromDatabase.getIgnoreCount() == 0) {
this.outgoingBatch.setStatus(Status.NE);
} else {
cancelled = true;
throw new CancellationException();
}

if (!batchFromDatabase.getStatus().equals(Status.OK) && !batchFromDatabase.getStatus().equals(Status.IG)) {
this.outgoingBatch.setStatus(Status.NE);
checkSendChildRequests(batchFromDatabase, resource, stats);
this.dataExtractorService.outgoingBatchService.updateOutgoingBatch(this.outgoingBatch);
} else {
// The user canceled a batch before it tried to load, so they probably canceled all batches.
log.info("User cancelled batches, so cancelling extract request");
throw new CancellationException();
}

this.dataExtractorService.outgoingBatchService.updateOutgoingBatch(this.outgoingBatch);
}

protected void checkSendChildRequests(OutgoingBatch parentBatch, IStagedResource parentResource, Statistics stats) {
Expand Down Expand Up @@ -261,11 +264,13 @@ protected void checkSendChildRequests(OutgoingBatch parentBatch, IStagedResource
OutgoingBatch childBatch = this.dataExtractorService.outgoingBatchService.findOutgoingBatch(childBatchId, childRequest.getNodeId());
childBatch.setExtractStartTime(startExtractTime);
childBatch.setExtractMillis(System.currentTimeMillis() - startExtractTime.getTime());
childBatch.setDataRowCount(stats.get(DataWriterStatisticConstants.ROWCOUNT));
childBatch.setDataInsertRowCount(stats.get(DataWriterStatisticConstants.INSERTCOUNT));
childBatch.setByteCount(stats.get(DataWriterStatisticConstants.BYTECOUNT));
if (stats != null) {
childBatch.setDataRowCount(stats.get(DataWriterStatisticConstants.ROWCOUNT));
childBatch.setDataInsertRowCount(stats.get(DataWriterStatisticConstants.INSERTCOUNT));
childBatch.setByteCount(stats.get(DataWriterStatisticConstants.BYTECOUNT));
}

if (childBatch.getIgnoreCount() == 0) {
if (!childBatch.getStatus().equals(Status.OK) && !childBatch.getStatus().equals(Status.IG)) {
childBatch.setStatus(Status.NE);
this.dataExtractorService.outgoingBatchService.updateOutgoingBatch(childBatch);
}
Expand Down
Expand Up @@ -251,6 +251,13 @@ public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo
symmetricDialect.getSqlTypeForIds(), Types.VARCHAR });
}

public void updateOutgoingBatchStatus(ISqlTransaction transaction, Status status, String nodeId, long startBatchId, long endBatchId) {
transaction.prepareAndExecute(getSql("updateOutgoingBatchStatusSql"),
new Object[] { status.name(), new Date(), clusterService.getServerId(), nodeId, startBatchId, endBatchId },
new int[] { Types.CHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
symmetricDialect.getSqlTypeForIds(), symmetricDialect.getSqlTypeForIds() });
}

public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) {
ISqlTransaction transaction = null;
try {
Expand Down
Expand Up @@ -68,7 +68,11 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
+ " skip_count=?, extract_row_count=?, extract_insert_row_count=?, extract_update_row_count=?, extract_delete_row_count=?, "
+ " transform_extract_millis=?, transform_load_millis=? "
+ " where batch_id=? and node_id=? ");


putSql("updateOutgoingBatchStatusSql",
"update $(outgoing_batch) set status=?, last_update_time=?, last_update_hostname=? " +
"where node_id = ? and batch_id between ? and ?");

putSql("updateCommonBatchExtractStatsSql",
"update $(outgoing_batch) set byte_count=?, data_row_count=?, "
+ " data_insert_row_count=?, data_update_row_count=?, data_delete_row_count=?, other_row_count=?, "
Expand Down
Expand Up @@ -20,9 +20,10 @@
*/
package org.jumpmind.symmetric.io.data;

import java.util.concurrent.CancellationException;

import org.jumpmind.db.model.Table;
import org.jumpmind.exception.InvalidRetryException;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
import org.jumpmind.symmetric.io.data.writer.IgnoreBatchException;
import org.jumpmind.util.Statistics;
Expand Down Expand Up @@ -232,7 +233,7 @@ protected int forEachDataInTable(DataContext context, boolean processTable, Batc
}

if (Thread.currentThread().isInterrupted()) {
throw new IoException("This thread was interrupted");
throw new CancellationException("This thread was interrupted");
}
} while (currentData != null);

Expand Down

0 comments on commit f1da057

Please sign in to comment.