Skip to content

Commit

Permalink
0004661: Initial load foreign key error auto resolve when foreign key
Browse files Browse the repository at this point in the history
missing at source
  • Loading branch information
erilong committed Nov 24, 2020
1 parent f3ea81e commit 9dc830b
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@ public String reloadTableImmediate(String nodeId, String catalogName, String sch

public void reloadMissingForeignKeyRows(String nodeId, long dataId);

public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, Table table, CsvData data, String channelId, boolean sendCorrectionToPeers);
public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, long batchId, long rowNumber, Table table, CsvData data,
String channelId, boolean sendCorrectionToPeers);

public void sendMissingRowsForInitialLoad(String nodeId, long batchId, long rowNumber, String catalogName, String schemaName, String tableName,
String whereSql, String channelId);

public void sendNewerDataToNode(ISqlTransaction transaction, String targetNodeId, String tableName, String pkCsvData,
Date minCreateTime, String winningNodeId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,15 +649,16 @@ protected IDataWriter chooseDataWriter(Batch batch) {
&& listener.getCurrentBatch() != null && listener.isNewErrorForCurrentBatch()
&& listener.getCurrentBatch().isLoadFlag()
&& listener.getCurrentBatch().getSqlCode() == ErrorConstants.FK_VIOLATION_CODE) {
engine.getDataService().reloadMissingForeignKeyRowsReverse(sourceNode.getNodeId(), ctx.getTable(),
ctx.getData(), Constants.CHANNEL_CONFIG, false);
engine.getDataService().reloadMissingForeignKeyRowsReverse(sourceNode.getNodeId(), ctx.getBatch().getBatchId(),
listener.getCurrentBatch().getFailedLineNumber(), ctx.getTable(), ctx.getData(), Constants.CHANNEL_CONFIG, false);
}
if (parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE)
&& listener.getCurrentBatch() != null && listener.isNewErrorForCurrentBatch()
&& !listener.getCurrentBatch().isLoadFlag()
&& listener.getCurrentBatch().getSqlCode() == ErrorConstants.FK_VIOLATION_CODE) {
engine.getDataService().reloadMissingForeignKeyRowsReverse(sourceNode.getNodeId(), ctx.getTable(),
ctx.getData(), null, parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE_PEERS));
engine.getDataService().reloadMissingForeignKeyRowsReverse(sourceNode.getNodeId(), ctx.getBatch().getBatchId(),
listener.getCurrentBatch().getFailedLineNumber(), ctx.getTable(), ctx.getData(), null,
parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE_PEERS));
}
logOrRethrow(ex);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2444,7 +2444,8 @@ protected String reloadTable(String nodeId, String catalogName, String schemaNam

}

public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, Table table, CsvData data, String channelId, boolean sendCorrectionToPeers) {
public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, long batchId, long rowNumber, Table table, CsvData data, String channelId,
boolean sendCorrectionToPeers) {
try {
IDatabasePlatform platform = engine.getTargetDialect().getPlatform();
Map<String, String> dataMap = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA);
Expand Down Expand Up @@ -2497,12 +2498,19 @@ public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, Table table,
foreignTableRow.getWhereSql(), localTable.getName(), foreignTableRow.getReferenceColumnName());

for (Node targetNode : targetNodes) {
script.append("engine.getDataService().reloadTableImmediate(\"" + targetNode.getNodeId() + "\", " +
(catalog == null ? catalog : "\"" + catalog + "\"") + ", " +
script.append("try { engine.getDataService().sendMissingRowsForInitialLoad(\"" + targetNode.getNodeId() + "\", " +
batchId + ", " + rowNumber + ", " + (catalog == null ? catalog : "\"" + catalog + "\"") + ", " +
(schema == null ? schema : "\"" + schema + "\"") + ", \"" +
foreignTable.getName().replace("\"", "\\\"") + "\", \"" +
foreignTableRow.getWhereSql().replace("\"", "\\\"") + "\", " +
(channelId == null ? channelId : "\"" + channelId + "\"") + ");\n");

script.append("} catch (Exception e) { engine.getDataService().reloadTableImmediate(\"" + targetNode.getNodeId() + "\", " +
(catalog == null ? catalog : "\"" + catalog + "\"") + ", " +
(schema == null ? schema : "\"" + schema + "\"") + ", \"" +
foreignTable.getName().replace("\"", "\\\"") + "\", \"" +
foreignTableRow.getWhereSql().replace("\"", "\\\"") + "\", " +
(channelId == null ? channelId : "\"" + channelId + "\"") + "); }\n");
}
}
}
Expand All @@ -2514,6 +2522,32 @@ public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, Table table,
log.error("Unknown exception while processing foreign key for node id: " + sourceNodeId, e);
}
}

/**
* Check if source has the foreign key parent rows that the target is reporting as missing.
* If they exist, send a reload batch for each row. Otherwise, send a SQL batch to ignore the row since we can't resolve it.
*
* TODO: should reloadMissingForeignKeyRows() also call this to handle similar case with backlog of changes and a busy system?
*/
public void sendMissingRowsForInitialLoad(String nodeId, long batchId, long rowNumber, String catalogName, String schemaName, String tableName,
String whereSql, String channelId) {
IDatabasePlatform platform = engine.getTargetDialect().getPlatform();
DatabaseInfo info = platform.getDatabaseInfo();
String quote = info.getDelimiterToken() == null || !platform.getDdlBuilder().isDelimitedIdentifierModeOn() ? "" : info.getDelimiterToken();
String fullTableName = Table.getFullyQualifiedTableName(catalogName, schemaName, tableName, quote,
info.getCatalogSeparator(), info.getSchemaSeparator());
// TODO: still a race condition here before push/pull, maybe use createData() instead
long count = platform.getSqlTemplateDirty().queryForLong("select count(*) from " + fullTableName + " where " + whereSql);
if (count > 0) {
reloadTableImmediate(nodeId, catalogName, schemaName, tableName, whereSql, channelId);
} else {
log.info("Resolving initial load batch {}-{} to ignore the row because no rows found for {} where {}", nodeId, batchId,
fullTableName, whereSql);
sendSQL(nodeId, "update " + engine.getParameterService().getTablePrefix() +
"_incoming_error set resolve_ignore = 1 where batch_id = " + batchId + " and node_id = '" + engine.getNodeId() +
"' and failed_row_number = " + rowNumber);
}
}

public void reloadMissingForeignKeyRows(String nodeId, long dataId) {
try {
Expand Down

0 comments on commit 9dc830b

Please sign in to comment.