Skip to content

Commit

Permalink
0003924: Initial load support for bulk loaders when a restart of
Browse files Browse the repository at this point in the history
SymmetricDS occurs when partial data has been loaded
  • Loading branch information
jumpmind-josh committed Apr 24, 2019
1 parent 9af303a commit f2c79f5
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 2 deletions.
Expand Up @@ -63,6 +63,9 @@ public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, N

public OutgoingBatches getOutgoingBatchByLoad(long loadI);

public OutgoingBatches getOutgoingBatchByLoadRangeAndTable(long loadId, long startBatchId,
long endBatchId, String tableName);

public int cancelLoadBatches(long loadId);

public OutgoingBatches getOutgoingBatchRange(String nodeId, Date startDate, Date endDate, String... channels);
Expand Down
Expand Up @@ -2205,12 +2205,35 @@ protected void restartExtractRequest(List<OutgoingBatch> batches, ExtractRequest
}

// clear the incoming batch table for the batches at the target node, so the batches won't be skipped
String symIncomingBatch = TableConstants.getTableName(parameterService.getTablePrefix(), TableConstants.SYM_INCOMING_BATCH);
String nodeIdentityId = nodeService.findIdentityNodeId();

for (ExtractRequest extractRequest : allRequests) {
String symIncomingBatch = TableConstants.getTableName(parameterService.getTablePrefix(), TableConstants.SYM_INCOMING_BATCH);
String sql = "delete from " + symIncomingBatch + " where node_id = '" + nodeService.findIdentityNodeId() +
String sql = "delete from " + symIncomingBatch + " where node_id = '" + nodeIdentityId +
"' and batch_id between " + extractRequest.getStartBatchId() + " and " + extractRequest.getEndBatchId();
dataService.sendSQL(extractRequest.getNodeId(), sql);
}


for (ExtractRequest extractRequest : allRequests) {

TableReloadStatus reloadStatus = dataService.getTableReloadStatusByLoadId(extractRequest.getLoadId());
OutgoingBatches setupBatches = outgoingBatchService.getOutgoingBatchByLoadRangeAndTable(extractRequest.getLoadId(), 1,
reloadStatus.getStartDataBatchId() - 1, extractRequest.getTableName().toLowerCase());

// clear incoming batch table for all batches at the target node that were used to setup this load for a specific table (delete, truncate, etc)
for (OutgoingBatch batch : setupBatches.getBatches()) {
String sql = "delete from " + symIncomingBatch + " where node_id = '" + nodeIdentityId +
"' and batch_id = " + batch.getBatchId();
dataService.sendSQL(batch.getNodeId(), sql);

// set status of these batches back to new so they are resent
batch.setStatus(Status.NE);
outgoingBatchService.updateOutgoingBatch(batch);
}
}


}

public void releaseMissedExtractRequests() {
Expand Down
Expand Up @@ -555,6 +555,14 @@ public OutgoingBatches getOutgoingBatchByLoad(long loadId) {
new OutgoingBatchMapper(true), loadId));
return batches;
}

public OutgoingBatches getOutgoingBatchByLoadRangeAndTable(long loadId, long startBatchId,
long endBatchId, String tableName) {
OutgoingBatches batches = new OutgoingBatches();
batches.setBatches(sqlTemplate.query(getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchLoadByBatchRangeByTableNameSql"),
new OutgoingBatchMapper(true), loadId, startBatchId, endBatchId, tableName));
return batches;
}

public OutgoingBatches getOutgoingBatchErrors(int maxRows) {
OutgoingBatches batches = new OutgoingBatches();
Expand Down
Expand Up @@ -99,6 +99,9 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
putSql("selectOutgoingBatchLoadSql",
"where load_id = ? order by batch_id ");

putSql("selectOutgoingBatchLoadByBatchRangeByTableNameSql",
"where load_id = ? and batch_id between ? and ? and summary = ? order by batch_id ");

putSql("selectOutgoingBatchTimeRangeSql",
"where node_id=? and channel_id=? and create_time >= ? and create_time <= ? ");

Expand Down

0 comments on commit f2c79f5

Please sign in to comment.