Skip to content

Commit

Permalink
0002859: Auto resolve foreign key violation by sending missing rows
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 13, 2016
1 parent 25ee05c commit ed571bd
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 6 deletions.
Expand Up @@ -27,5 +27,8 @@ private ErrorConstants() {

public static final String CONFLICT_STATE = "CONFLICT";
public static final int CONFLICT_CODE = -999;

public static final String FK_VIOLATION_STATE = "FK";
public static final int FK_VIOLATION_CODE = -900;

}
Expand Up @@ -105,6 +105,7 @@ private ParameterConstants() {
public final static String AUTO_REGISTER_ENABLED = "auto.registration";
public final static String AUTO_RELOAD_ENABLED = "auto.reload";
public final static String AUTO_RELOAD_REVERSE_ENABLED = "auto.reload.reverse";
public final static String AUTO_RESOLVE_FOREIGN_KEY_VIOLATION = "auto.resolve.foreign.key.violation";
public final static String AUTO_INSERT_REG_SVR_IF_NOT_FOUND = "auto.insert.registration.svr.if.not.found";
public final static String AUTO_SYNC_CONFIGURATION = "auto.sync.configuration";
public final static String AUTO_SYNC_CONFIGURATION_ON_INCOMING = "auto.sync.configuration.on.incoming";
Expand Down
Expand Up @@ -56,6 +56,8 @@ public interface IDataService {

public String reloadTable(String nodeId, String catalogName, String schemaName, String tableName, String overrideInitialLoadSelect);

public void sendMissingForeignKeyRows(String nodeId, long dataId);

/**
* Sends a SQL command to the remote node for execution by creating a SQL event that is synced like other data
*
Expand Down
Expand Up @@ -26,6 +26,8 @@
import org.jumpmind.db.sql.mapper.NumberMapper;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ErrorConstants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagedResource.State;
import org.jumpmind.symmetric.io.stage.IStagingManager;
Expand Down Expand Up @@ -116,6 +118,10 @@ public BatchAckResult ack(final BatchAck batch) {
if (routerStats != null) {
log.info("Router stats for batch " + outgoingBatch.getBatchId() + ": " + routerStats.toString());
}
if (outgoingBatch.getSqlCode() == ErrorConstants.FK_VIOLATION_CODE
&& parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION)) {
engine.getDataService().sendMissingForeignKeyRows(outgoingBatch.getNodeId(), outgoingBatch.getFailedDataId());
}
} else if (status == Status.RS) {
log.info("The outgoing batch {} received resend request", outgoingBatch.getNodeBatchId());
} else if (!outgoingBatch.isCommonFlag()) {
Expand Down
Expand Up @@ -1110,6 +1110,10 @@ public void batchInError(DataContext context, Throwable ex) {
this.currentBatch.setSqlState(se.getSQLState());
this.currentBatch.setSqlCode(se.getErrorCode());
this.currentBatch.setSqlMessage(se.getMessage());
if (sqlTemplate.isForeignKeyViolation(se)) {
this.currentBatch.setSqlState(ErrorConstants.FK_VIOLATION_STATE);
this.currentBatch.setSqlCode(ErrorConstants.FK_VIOLATION_CODE);
}
} else {
this.currentBatch.setSqlMessage(ex.getMessage());
}
Expand Down
Expand Up @@ -36,8 +36,13 @@
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.ForeignKey;
import org.jumpmind.db.model.Reference;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.DatabaseInfo;
import org.jumpmind.db.sql.DmlStatement;
import org.jumpmind.db.sql.DmlStatement.DmlType;
import org.jumpmind.db.sql.ISqlReadCursor;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTransaction;
Expand Down Expand Up @@ -1311,11 +1316,6 @@ public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction,
return insertDataAndDataEventAndOutgoingBatch(transaction, data, nodeId, routerId, isLoad, loadId, createBy, status, null);
}

/**
* @param status
* TODO
* @return The inserted batch id
*/
protected long insertDataEventAndOutgoingBatch(ISqlTransaction transaction, long dataId,
String channelId, String nodeId, DataEventType eventType, String routerId,
boolean isLoad, long loadId, String createBy, Status status, String tableName) {
Expand Down Expand Up @@ -1515,6 +1515,39 @@ public String reloadTable(String nodeId, String catalogName, String schemaName,

}

public void sendMissingForeignKeyRows(String nodeId, long dataId) {
Data data = findData(dataId);
Table table = platform.getTableFromCache(data.getTableName(), false);
Map<String, String> dataMap = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA);

for (ForeignKey fk : table.getForeignKeys()) {
Table foreignTable = platform.getTableFromCache(fk.getForeignTableName(), false);
ArrayList<Column> foreignColumnList = new ArrayList<Column>();
Row foreignRow = new Row(3);

for (Reference ref : fk.getReferences()) {
String foreignColumnName = ref.getForeignColumnName();
foreignColumnList.add(foreignTable.findColumn(foreignColumnName));
foreignRow.put(foreignColumnName, dataMap.get(ref.getLocalColumnName()));
}

DmlStatement st = platform.createDmlStatement(DmlType.WHERE, foreignTable, null);
Column[] foreignColumns = foreignColumnList.toArray(new Column[foreignColumnList.size()]);
String sql = st.buildDynamicSql(symmetricDialect.getBinaryEncoding(), foreignRow, false, true, foreignColumns).substring(6);

String catalog = foreignTable.getCatalog();
String schema = foreignTable.getSchema();
if (StringUtils.equals(platform.getDefaultCatalog(), catalog)) {
catalog = null;
}
if (StringUtils.equals(platform.getDefaultSchema(), schema)) {
schema = null;
}

reloadTable(nodeId, catalog, schema, foreignTable.getName(), sql);
}
}

/**
* Because we can't add a trigger on the _node table, we are artificially
* generating heartbeat events.
Expand Down Expand Up @@ -1873,6 +1906,10 @@ public List<Data> listData(long batchId, String nodeId, long startDataId, String
new int[] { symmetricDialect.getSqlTypeForIds(), Types.VARCHAR, symmetricDialect.getSqlTypeForIds()});
}

public Data findData(long dataId) {
return sqlTemplateDirty.queryForObject(getSql("selectData"), dataMapper, dataId);
}

public Data mapData(Row row) {
return dataMapper.mapRow(row);
}
Expand Down
Expand Up @@ -67,6 +67,11 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
+ " $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id "
+ " where o.batch_id = ? and o.node_id = ? ");

putSql("selectData",
"select data_id, table_name, event_type, row_data, pk_data, old_data, " +
"create_time, trigger_hist_id, channel_id, transaction_id, source_node_id, external_data, node_list, null as router_id " +
"from $(data) where data_id = ?");

putSql("selectMaxDataEventDataIdSql", ""
+ "select max(data_id) from $(data_event) ");

Expand Down
Expand Up @@ -640,6 +640,14 @@ auto.reload=false
# Type: boolean
auto.reload.reverse=false

# If this is true, when a batch receives a foreign key violation,
# the missing data will be automatically sent to resolve it.
#
# DatabaseOverridable: true
# Tags: load
# Type: boolean
auto.resolve.foreign.key.violation=true

# If this is true, then nodes will communicate with other nodes using a thread per node per channel versus a thread per node.
#
# DatabaseOverridable: true
Expand Down
11 changes: 10 additions & 1 deletion symmetric-db/src/main/java/org/jumpmind/db/sql/DmlStatement.java
Expand Up @@ -50,7 +50,7 @@ public class DmlStatement {
protected static final Logger log = LoggerFactory.getLogger(DmlStatement.class);

public enum DmlType {
INSERT, UPDATE, DELETE, UPSERT, COUNT, FROM, SELECT, SELECT_ALL, UNKNOWN
INSERT, UPDATE, DELETE, UPSERT, COUNT, FROM, WHERE, SELECT, SELECT_ALL, UNKNOWN
};

protected DmlType dmlType;
Expand Down Expand Up @@ -136,6 +136,9 @@ protected void init(DmlType type, String catalogName, String schemaName, String
} else if (type == DmlType.FROM) {
this.sql = buildFromSql(Table.getFullyQualifiedTableName(catalogName, schemaName,
tableName, quote, databaseInfo.getCatalogSeparator(), databaseInfo.getSchemaSeparator()), keysColumns);
} else if (type == DmlType.WHERE) {
this.sql = buildWhereSql(Table.getFullyQualifiedTableName(catalogName, schemaName,
tableName, quote, databaseInfo.getCatalogSeparator(), databaseInfo.getSchemaSeparator()), keysColumns);
} else if (type == DmlType.SELECT) {
this.sql = buildSelectSql(Table.getFullyQualifiedTableName(catalogName, schemaName,
tableName, quote, databaseInfo.getCatalogSeparator(), databaseInfo.getSchemaSeparator()), keysColumns, columns);
Expand Down Expand Up @@ -233,6 +236,12 @@ protected String buildFromSql(String tableName, Column[] keyColumns) {
return sql.toString();
}

protected String buildWhereSql(String tableName, Column[] keyColumns) {
StringBuilder sql = new StringBuilder("where ");
appendColumnsEquals(sql, keyColumns, nullKeyValues, " and ");
return sql.toString();
}

protected String buildCountSql(String tableName, Column[] keyColumns) {
StringBuilder sql = new StringBuilder("select count(*) from ").append(tableName);
if (keyColumns != null && keyColumns.length > 0) {
Expand Down
Expand Up @@ -33,6 +33,7 @@ public H2JdbcSqlTemplate(DataSource dataSource, SqlTemplateSettings settings,
SymmetricLobHandler lobHandler, DatabaseInfo databaseInfo) {
super(dataSource, settings, lobHandler, databaseInfo);
primaryKeyViolationSqlStates = new String[] {"23001", "23505"};
foreignKeyViolationCodes = new int[] { 23506 };
}

@Override
Expand Down

0 comments on commit ed571bd

Please sign in to comment.