Skip to content
Permalink
Browse files

0003886: Initial load foreign key error auto resolve

  • Loading branch information...
elong
elong committed Mar 7, 2019
1 parent 8c5984a commit 323e00b5719c97956e069ef971b2d84cd43f2780
@@ -124,6 +124,7 @@ private ParameterConstants() {
public final static String AUTO_RESOLVE_FOREIGN_KEY_VIOLATION = "auto.resolve.foreign.key.violation";
public final static String AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE = "auto.resolve.foreign.key.violation.reverse";
public final static String AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE_PEERS = "auto.resolve.foreign.key.violation.reverse.peers";
public final static String AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE_RELOAD = "auto.resolve.foreign.key.violation.reverse.reload";
public final static String AUTO_INSERT_REG_SVR_IF_NOT_FOUND = "auto.insert.registration.svr.if.not.found";
public final static String AUTO_SYNC_CONFIGURATION = "auto.sync.configuration";
public final static String AUTO_SYNC_CONFIGURATION_ON_INCOMING = "auto.sync.configuration.on.incoming";
@@ -75,9 +75,12 @@

public String reloadTable(String nodeId, String catalogName, String schemaName, String tableName, String overrideInitialLoadSelect);

public String reloadTableImmediate(String nodeId, String catalogName, String schemaName, String tableName,
String overrideInitialLoadSelect, String overrideChannelId);

public void reloadMissingForeignKeyRows(String nodeId, long dataId);

public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, Table table, CsvData data, boolean sendCorrectionToPeers);
public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, Table table, CsvData data, String channelId, boolean sendCorrectionToPeers);

/**
* Sends a SQL command to the remote node for execution by creating a SQL event that is synced like other data
@@ -114,20 +114,24 @@ public BatchAckResult ack(final BatchAck batch) {

boolean isNewError = false;
if (!batch.isOk() && batch.getErrorLine() != 0) {
String sql = getSql("selectDataIdSql");
if (parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false)) {
sql = getSql("selectDataIdByCreateTimeSql");
} else if (parameterService.is(ParameterConstants.ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED, true)) {
sql += getSql("orderByDataId");
}

List<Number> ids = sqlTemplateDirty.query(sql, new NumberMapper(), outgoingBatch.getBatchId());
if (ids.size() >= batch.getErrorLine()) {
long failedDataId = ids.get((int) batch.getErrorLine() - 1).longValue();
if (outgoingBatch.getFailedDataId() == 0 || outgoingBatch.getFailedDataId() != failedDataId) {
isNewError = true;
if (outgoingBatch.isLoadFlag()) {
isNewError = outgoingBatch.getSentCount() == 1;
} else {
String sql = getSql("selectDataIdSql");
if (parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false)) {
sql = getSql("selectDataIdByCreateTimeSql");
} else if (parameterService.is(ParameterConstants.ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED, true)) {
sql += getSql("orderByDataId");
}

List<Number> ids = sqlTemplateDirty.query(sql, new NumberMapper(), outgoingBatch.getBatchId());
if (ids.size() >= batch.getErrorLine()) {
long failedDataId = ids.get((int) batch.getErrorLine() - 1).longValue();
if (outgoingBatch.getFailedDataId() == 0 || outgoingBatch.getFailedDataId() != failedDataId) {
isNewError = true;
}
outgoingBatch.setFailedDataId(failedDataId);
}
outgoingBatch.setFailedDataId(failedDataId);
}
}

@@ -136,13 +140,14 @@ public BatchAckResult ack(final BatchAck batch) {
if (isNewError) {
engine.getStatisticManager().incrementDataLoadedOutgoingErrors(outgoingBatch.getChannelId(), 1);
}
if (isNewError && outgoingBatch.getSqlCode() == ErrorConstants.FK_VIOLATION_CODE
&& parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION)) {
Channel channel = engine.getConfigurationService().getChannel(outgoingBatch.getChannelId());
if (channel != null && !channel.isReloadFlag()) {
if (isNewError && outgoingBatch.getSqlCode() == ErrorConstants.FK_VIOLATION_CODE) {
if (!outgoingBatch.isLoadFlag() && parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION)) {
engine.getDataService().reloadMissingForeignKeyRows(outgoingBatch.getNodeId(), outgoingBatch.getFailedDataId());
suppressLogError = true;
}
if (outgoingBatch.isLoadFlag() && parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE_RELOAD)) {
suppressLogError = true;
}
}
if (outgoingBatch.getSqlCode() == ErrorConstants.PROTOCOL_VIOLATION_CODE
&& ErrorConstants.PROTOCOL_VIOLATION_STATE.equals(outgoingBatch.getSqlState())) {
@@ -653,11 +653,19 @@ protected IDataWriter chooseDataWriter(Batch batch) {
}
} catch (Throwable ex) {
error = ex;
if (parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE_RELOAD)
&& listener.getCurrentBatch() != null && listener.isNewErrorForCurrentBatch()
&& listener.getCurrentBatch().isLoadFlag()
&& listener.getCurrentBatch().getSqlCode() == ErrorConstants.FK_VIOLATION_CODE) {
engine.getDataService().reloadMissingForeignKeyRowsReverse(sourceNode.getNodeId(), ctx.getTable(),
ctx.getData(), Constants.CHANNEL_CONFIG, false);
}
if (parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE)
&& listener.getCurrentBatch() != null && listener.isNewErrorForCurrentBatch()
&& !listener.getCurrentBatch().isLoadFlag()
&& listener.getCurrentBatch().getSqlCode() == ErrorConstants.FK_VIOLATION_CODE) {
engine.getDataService().reloadMissingForeignKeyRowsReverse(sourceNode.getNodeId(), ctx.getTable(),
ctx.getData(), parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE_PEERS));
ctx.getData(), null, parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE_PEERS));
}
logOrRethrow(ex);
} finally {
@@ -672,11 +672,20 @@ public long insertReloadEventImmediate(ISqlTransaction transaction, Node targetN
TriggerRouter triggerRouter, TriggerHistory triggerHistory,
String overrideInitialLoadSelect, boolean isLoad, long loadId, String createBy,
Status status, String channelId, long estimatedBatchRowCount) {
return insertReloadEvent(transaction, targetNode, triggerRouter, triggerHistory,
overrideInitialLoadSelect, isLoad, loadId, createBy,
status, channelId, estimatedBatchRowCount, true);
if (triggerHistory == null) {
triggerHistory = lookupTriggerHistory(triggerRouter.getTrigger());
}

Data data = new Data(triggerHistory.getSourceTableName(), DataEventType.RELOAD,
overrideInitialLoadSelect != null ? overrideInitialLoadSelect : triggerRouter
.getInitialLoadSelect(), null, triggerHistory, channelId,
null, null);
data.setNodeList(targetNode.getNodeId());
return insertDataAndDataEventAndOutgoingBatch(transaction, data,
targetNode.getNodeId(), triggerRouter.getRouter().getRouterId(), isLoad,
loadId, createBy, status, channelId, estimatedBatchRowCount);
}

protected long insertReloadEvent(ISqlTransaction transaction, Node targetNode,
TriggerRouter triggerRouter, TriggerHistory triggerHistory,
String overrideInitialLoadSelect, boolean isLoad, long loadId, String createBy,
@@ -2233,15 +2242,17 @@ public String reloadTable(String nodeId, String catalogName, String schemaName,
@Override
public String reloadTable(String nodeId, String catalogName, String schemaName,
String tableName, String overrideInitialLoadSelect) {
return reloadTable(nodeId, catalogName, schemaName, tableName, null, false);
return reloadTable(nodeId, catalogName, schemaName, tableName, null, null, false);
}

protected String reloadTableImmediate(String nodeId, String catalogName, String schemaName, String tableName, String overrideInitialLoadSelect) {
return reloadTable(nodeId, catalogName, schemaName, tableName, overrideInitialLoadSelect, true);
@Override
public String reloadTableImmediate(String nodeId, String catalogName, String schemaName, String tableName,
String overrideInitialLoadSelect, String overrideChannelId) {
return reloadTable(nodeId, catalogName, schemaName, tableName, overrideInitialLoadSelect, overrideChannelId, true);
}

protected String reloadTable(String nodeId, String catalogName, String schemaName,
String tableName, String overrideInitialLoadSelect, boolean isImmediate) {
String tableName, String overrideInitialLoadSelect, String overrideChannelId, boolean isImmediate) {
Node sourceNode = engine.getNodeService().findIdentity();
Node targetNode = engine.getNodeService().findNode(nodeId);
if (targetNode == null) {
@@ -2265,8 +2276,11 @@ protected String reloadTable(String nodeId, String catalogName, String schemaNam
if (triggerRouters != null && triggerRouters.size() > 0) {
for (TriggerRouter triggerRouter : triggerRouters) {
eventCount++;
String channelId = getReloadChannelIdForTrigger(triggerRouter.getTrigger(), engine
String channelId = overrideChannelId;
if (channelId == null) {
channelId = getReloadChannelIdForTrigger(triggerRouter.getTrigger(), engine
.getConfigurationService().getChannels(false));
}

if (isImmediate) {
insertReloadEventImmediate(transaction, targetNode, triggerRouter, triggerHistory,
@@ -2306,7 +2320,7 @@ protected String reloadTable(String nodeId, String catalogName, String schemaNam

}

public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, Table table, CsvData data, boolean sendCorrectionToPeers) {
public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, Table table, CsvData data, String channelId, boolean sendCorrectionToPeers) {
try {
Map<String, String> dataMap = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA);
List<TableRow> tableRows = new ArrayList<TableRow>();
@@ -2356,13 +2370,14 @@ public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, Table table,
+ "to correct table '{}' for column '{}'",
sourceNodeId, catalog, schema, foreignTable.getName(), foreignTableRow.getFkName(),
foreignTableRow.getWhereSql(), localTable.getName(), foreignTableRow.getReferenceColumnName());

for (Node targetNode : targetNodes) {
script.append("engine.getDataService().reloadTable(\"" + targetNode.getNodeId() + "\", " +
((schema == null) ? schema : "\"" + schema + "\"") + ", " +
((catalog == null) ? catalog : "\"" + catalog + "\"") + ", \"" +
script.append("engine.getDataService().reloadTableImmediate(\"" + targetNode.getNodeId() + "\", " +
(catalog == null ? catalog : "\"" + catalog + "\"") + ", " +
(schema == null ? schema : "\"" + schema + "\"") + ", \"" +
foreignTable.getName().replace("\"", "\\\"") + "\", \"" +
foreignTableRow.getWhereSql().replace("\"", "\\\"") + "\");\n");
foreignTableRow.getWhereSql().replace("\"", "\\\"") + "\", " +
(channelId == null ? channelId : "\"" + channelId + "\"") + ");\n");
}
}
}
@@ -2415,7 +2430,7 @@ public void reloadMissingForeignKeyRows(String nodeId, long dataId) {
+ "to correct dataId '{}' table '{}' for column '{}'",
nodeId, catalog, schema, foreignTable.getName(), foreignTableRow.getFkName(), foreignTableRow.getWhereSql(),
dataId, data.getTableName(), foreignTableRow.getReferenceColumnName());
reloadTableImmediate(nodeId, catalog, schema, foreignTable.getName(), foreignTableRow.getWhereSql());
reloadTableImmediate(nodeId, catalog, schema, foreignTable.getName(), foreignTableRow.getWhereSql(), null);
}
}
}
@@ -784,6 +784,16 @@ auto.resolve.foreign.key.violation.reverse=false
# Type: boolean
auto.resolve.foreign.key.violation.reverse.peers=false

# If this is true, when a reload batch receives a foreign key violation,
# the missing data will be automatically sent to resolve it.
# The resolution is done at the target node by sending a script
# that requests reload batches.
#
# DatabaseOverridable: true
# Tags: load
# Type: boolean
auto.resolve.foreign.key.violation.reverse.reload=true

# Indicate whether the process of inserting data, data_events and outgoing_batches for
# a reload is transactional. The only reason this might be marked as false is to reduce
# possible contention while multiple nodes connect for reloads at the same time.

0 comments on commit 323e00b

Please sign in to comment.
You can’t perform that action at this time.