Skip to content

Commit

Permalink
0005906: Non transactional initial load setup can cause load events to
Browse files Browse the repository at this point in the history
be missed ini table_reload_status
  • Loading branch information
Philip Marzullo committed Jun 29, 2023
1 parent fc1b003 commit 23744fb
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 50 deletions.
Expand Up @@ -35,7 +35,7 @@ public class AbstractBatch implements Serializable {

public enum Status {
OK("Ok"), ER("Error"), RQ("Request"), NE("New"), QY("Querying"), SE("Sending"), LD("Loading"), RT("Routing"), IG("Ignored"), RS(
"Resend"), XX("Unknown");
"Resend"), XX("Unknown"), LS("LoadSetup");

private String description;

Expand Down
Expand Up @@ -27,7 +27,7 @@ public class ExtractRequest implements Serializable {
private static final long serialVersionUID = 1L;

public enum ExtractStatus {
NE, OK
NE, OK, LS
};

private long requestId;
Expand Down
Expand Up @@ -86,4 +86,7 @@ public ExtractRequest requestExtractRequest(ISqlTransaction transaction, String
public int cancelExtractRequests(long loadId);

public void releaseMissedExtractRequests();

public void updateExtractRequestStatuses(ISqlTransaction transaction, long loadId, String sourceNodeId,
String fromStatus, String toStatus);
}
Expand Up @@ -75,6 +75,15 @@ public OutgoingBatches getOutgoingBatchByLoadRangeAndTable(long loadId, long sta

public void updateOutgoingBatchStatus(ISqlTransaction transaction, Status status, String nodeId, long startBatchId, long endBatchId);

public void updateOutgoingSetupBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId,
long maxBatchId, String fromStatus, String toStatus);

public void updateOutgoingLoadBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId,
long startDataBatchId, long endDataBatchId, String fromStatus, String toStatus);

public void updateOutgoingFinalizeBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId,
long minBatchId, String fromStatus, String toStatus);

public void updateCommonBatchExtractStatistics(OutgoingBatch batch);

public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgoingBatch);
Expand Down
Expand Up @@ -1725,7 +1725,7 @@ public ExtractRequest requestExtractRequest(ISqlTransaction transaction, String
requestId = sequenceService.nextVal(transaction, Constants.SEQUENCE_EXTRACT_REQ);
}
transaction.prepareAndExecute(getSql("insertExtractRequestSql"),
new Object[] { requestId, engine.getNodeId(), nodeId, queue, ExtractStatus.NE.name(), startBatchId, endBatchId,
new Object[] { requestId, engine.getNodeId(), nodeId, queue, ExtractStatus.LS.name(), startBatchId, endBatchId,
triggerRouter.getTrigger().getTriggerId(), triggerRouter.getRouter().getRouterId(), loadId,
table, rows, parentRequestId, new Date(), new Date() },
new int[] { Types.BIGINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.BIGINT,
Expand All @@ -1735,7 +1735,7 @@ table, rows, parentRequestId, new Date(), new Date() },
request.setRequestId(requestId);
request.setNodeId(nodeId);
request.setQueue(queue);
request.setStatus(ExtractStatus.NE);
request.setStatus(ExtractStatus.LS);
request.setStartBatchId(startBatchId);
request.setEndBatchId(endBatchId);
request.setRouterId(triggerRouter.getRouterId());
Expand Down Expand Up @@ -2063,6 +2063,13 @@ public void removeBatchFromStaging(OutgoingBatch batch) {
}
}

@Override
public void updateExtractRequestStatuses(ISqlTransaction transaction, long loadId, String sourceNodeId,
String fromStatus, String toStatus) {
transaction.prepareAndExecute(getSql("updateExtractRequestStatuses"),
toStatus, new Date(), loadId, sourceNodeId, fromStatus);
}

static class FutureExtractStatus {
boolean shouldExtractSkip;
int batchExtractCount;
Expand Down
Expand Up @@ -80,6 +80,9 @@ public DataExtractorServiceSqlMap(IDatabasePlatform platform,
putSql("selectIncompleteTablesForExtractByLoadIdAndNodeId", "select * from $(extract_request) where load_id = ? and loaded_time is null and node_id = ? order by request_id");

putSql("selectCompletedTablesForExtractByLoadIdAndNodeId", "select * from $(extract_request) where load_id = ? and loaded_time is not null and node_id = ? order by request_id");

putSql("updateExtractRequestStatuses", "update $(extract_request) set status=?, last_update_time=? "
+ "where load_id=? and source_node_id=? and status=?");
}

}

Large diffs are not rendered by default.

Expand Up @@ -120,32 +120,33 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
putSql("updateTableReloadStatusTableCount", "update $(table_reload_status) set table_count = ?, last_update_time = ? where load_id = ?");
putSql("updateTableReloadStatusDataCounts", "update $(table_reload_status) set "
+ " start_data_batch_id = ?, end_data_batch_id = ?, "
+ " data_batch_count = data_batch_count + ?, "
+ " data_batch_count = case when data_batch_count = -1 then 0 else data_batch_count end + ?, "
+ " rows_count = rows_count + ?, "
+ " last_update_time = ? "
+ " where load_id = ?");
putSql("updateTableReloadStatusDataCountsNoParamsInSelect", "update $(table_reload_status) set "
+ " start_data_batch_id = ?, end_data_batch_id = ?, "
+ " data_batch_count = data_batch_count + $(batchCount), "
+ " data_batch_count = case when data_batch_count = -1 then 0 else data_batch_count end + $(batchCount), "
+ " rows_count = rows_count + $(rowCount), "
+ " last_update_time = ? "
+ " where load_id = ?");
putSql("insertTableReloadStatus",
"insert into $(table_reload_status) (load_id, target_node_id, source_node_id, full_load, start_time, last_update_time) values (?, ?, ?, ?, ?, ?)");
"insert into $(table_reload_status) (load_id, target_node_id, source_node_id, full_load, start_time, last_update_time, data_batch_count, setup_batch_count, finalize_batch_count) "
+ "values (?, ?, ?, ?, ?, ?, ?, ?, ?)");
putSql("deleteTableReloadStatus", "delete from $(table_reload_status) where load_id = ?");
putSql("updateTableReloadStatusSetupCount", "update $(table_reload_status) set "
+ " setup_batch_count = ?, last_update_time = ? "
+ " where load_id = ?");
putSql("updateTableReloadStatusDataLoaded", "update $(table_reload_status) "
+ " set completed = case when ("
+ " data_batch_count <= (case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end) and "
+ " setup_batch_count <= (case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end) and "
+ " finalize_batch_count <= (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end)) "
+ " (data_batch_count > -1 and data_batch_count <= (case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end)) and "
+ " (setup_batch_count > -1 and setup_batch_count <= (case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end)) and "
+ " (finalize_batch_count > -1 and finalize_batch_count <= (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end))) "
+ " then 1 else 0 end, "
+ " end_time = case when ("
+ " data_batch_count <= (case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end) and "
+ " setup_batch_count <= (case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end) and "
+ " finalize_batch_loaded <= (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end)) "
+ " (data_batch_count > -1 and data_batch_count <= (case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end)) and "
+ " (setup_batch_count > -1 and setup_batch_count <= (case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end)) and "
+ " (finalize_batch_count > -1 and finalize_batch_count <= (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end))) "
+ " then ? else end_time end, "
+ " data_batch_loaded = case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end, "
+ " setup_batch_loaded = case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end, "
Expand All @@ -158,14 +159,14 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
+ " where load_id = ? and completed = 0");
putSql("updateTableReloadStatusDataLoadedNoParams", "update $(table_reload_status) "
+ " set completed = case when ("
+ " data_batch_count <= (case when $(batchId) between start_data_batch_id and end_data_batch_id then data_batch_loaded + $(batchCount) else data_batch_loaded end) and "
+ " setup_batch_count <= (case when $(batchId) < start_data_batch_id then setup_batch_loaded + $(batchCount) else setup_batch_loaded end) and "
+ " finalize_batch_count <= (case when $(batchId) > end_data_batch_id then finalize_batch_loaded + $(batchCount) else finalize_batch_loaded end)) "
+ " (data_batch_count > -1 and data_batch_count <= (case when $(batchId) between start_data_batch_id and end_data_batch_id then data_batch_loaded + $(batchCount) else data_batch_loaded end)) and "
+ " (setup_batch_count > -1 and setup_batch_count <= (case when $(batchId) < start_data_batch_id then setup_batch_loaded + $(batchCount) else setup_batch_loaded end)) and "
+ " (finalize_batch_count > -1 and finalize_batch_count <= (case when $(batchId) > end_data_batch_id then finalize_batch_loaded + $(batchCount) else finalize_batch_loaded end))) "
+ " then 1 else 0 end, "
+ " end_time = case when ("
+ " data_batch_count <= (case when $(batchId) between start_data_batch_id and end_data_batch_id then data_batch_loaded + $(batchCount) else data_batch_loaded end) and "
+ " setup_batch_count <= (case when $(batchId) < start_data_batch_id then setup_batch_loaded + $(batchCount) else setup_batch_loaded end) and "
+ " finalize_batch_loaded <= (case when $(batchId) > end_data_batch_id then finalize_batch_loaded + $(batchCount) else finalize_batch_loaded end)) "
+ " (data_batch_count > -1 and data_batch_count <= (case when $(batchId) between start_data_batch_id and end_data_batch_id then data_batch_loaded + $(batchCount) else data_batch_loaded end)) and "
+ " (setup_batch_count > -1 and setup_batch_count <= (case when $(batchId) < start_data_batch_id then setup_batch_loaded + $(batchCount) else setup_batch_loaded end)) and "
+ " (finalize_batch_count > -1 and finalize_batch_loaded <= (case when $(batchId) > end_data_batch_id then finalize_batch_loaded + $(batchCount) else finalize_batch_loaded end))) "
+ " then current_timestamp else end_time end, "
+ " data_batch_loaded = case when $(batchId) between start_data_batch_id and end_data_batch_id then data_batch_loaded + $(batchCount) else data_batch_loaded end, "
+ " setup_batch_loaded = case when $(batchId) < start_data_batch_id then setup_batch_loaded + $(batchCount) else setup_batch_loaded end, "
Expand Down
Expand Up @@ -36,7 +36,6 @@
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.mapper.LongMapper;
import org.jumpmind.db.sql.mapper.RowMapper;
import org.jumpmind.db.sql.mapper.StringMapper;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
Expand Down Expand Up @@ -277,6 +276,42 @@ public void updateOutgoingBatchStatus(ISqlTransaction transaction, Status status
symmetricDialect.getSqlTypeForIds(), symmetricDialect.getSqlTypeForIds() });
}

public void updateOutgoingSetupBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId,
long maxBatchId, String fromStatus, String toStatus) {
// update $(outgoing_batch)
// set status=?, last_update_time=?, last_update_hostname=?
// where node_id=? and load_id=? and status=? and batch_id < ?
transaction.prepareAndExecute(getSql("updateOutgoingSetupBatchStatusByStatus"),
new Object[] { toStatus, new Date(), clusterService.getServerId(),
targetNodeId, loadId, fromStatus, maxBatchId },
new int[] { Types.CHAR, Types.TIMESTAMP, Types.VARCHAR,
Types.VARCHAR, Types.NUMERIC, Types.CHAR, Types.NUMERIC });
}

public void updateOutgoingLoadBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId,
long startDataBatchId, long endDataBatchId, String fromStatus, String toStatus) {
// update $(outgoing_batch)
// set status=?, last_update_time=?, last_update_hostname=?
// where node_id=? and load_id=? and status=? and batch_id between ? and ?
transaction.prepareAndExecute(getSql("updateOutgoingLoadBatchStatusByStatus"),
new Object[] { toStatus, new Date(), clusterService.getServerId(),
targetNodeId, loadId, fromStatus, startDataBatchId, endDataBatchId },
new int[] { Types.CHAR, Types.TIMESTAMP, Types.VARCHAR,
Types.VARCHAR, Types.NUMERIC, Types.CHAR, Types.NUMERIC, Types.NUMERIC });
}

public void updateOutgoingFinalizeBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId,
long minBatchId, String fromStatus, String toStatus) {
// update $(outgoing_batch)
// set status=?, last_update_time=?, last_update_hostname=?
// where node_id=? and load_id=? and status=? and batch_id > ?
transaction.prepareAndExecute(getSql("updateOutgoingFinalizeBatchStatusByStatus"),
new Object[] { toStatus, new Date(), clusterService.getServerId(),
targetNodeId, loadId, fromStatus, minBatchId },
new int[] { Types.CHAR, Types.TIMESTAMP, Types.VARCHAR,
Types.VARCHAR, Types.NUMERIC, Types.CHAR, Types.NUMERIC });
}

public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) {
ISqlTransaction transaction = null;
try {
Expand Down
Expand Up @@ -169,5 +169,11 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
+ " last_update_hostname, ?, create_time, 'copy' from $(outgoing_batch) where node_id=? and channel_id=? and batch_id > ?) ");
putSql("getAllBatchesSql", "select batch_id from $(outgoing_batch)");
putSql("whereInProgressStatusSql", "where status in (?, ?, ?, ?, ?) ");
putSql("updateOutgoingSetupBatchStatusByStatus",
"update $(outgoing_batch) set status=?, last_update_time=?, last_update_hostname=? where node_id=? and load_id=? and status=? and batch_id < ?");
putSql("updateOutgoingLoadBatchStatusByStatus",
"update $(outgoing_batch) set status=?, last_update_time=?, last_update_hostname=? where node_id=? and load_id=? and status=? and batch_id between ? and ?");
putSql("updateOutgoingFinalizeBatchStatusByStatus",
"update $(outgoing_batch) set status=?, last_update_time=?, last_update_hostname=? where node_id=? and load_id=? and status=? and batch_id > ?");
}
}

0 comments on commit 23744fb

Please sign in to comment.