Skip to content
Permalink
Browse files

0003847: Enhance fallback to handle duplicate key and foreign key errors

  • Loading branch information...
erilong committed Jan 4, 2019
1 parent 4a350c3 commit cba2df0b36c37c5569a9aeb56427fda19df32539
Showing with 1,317 additions and 256 deletions.
  1. +3 −184 symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java
  2. +16 −0 symmetric-db/src/main/java/org/jumpmind/db/platform/DatabaseMetaDataWrapper.java
  3. +11 −0 symmetric-db/src/main/java/org/jumpmind/db/platform/IDdlReader.java
  4. +20 −8 symmetric-db/src/main/java/org/jumpmind/db/platform/cassandra/CassandraDdlReader.java
  5. +20 −0 symmetric-db/src/main/java/org/jumpmind/db/platform/kafka/KafkaDdlReader.java
  6. +20 −0 symmetric-db/src/main/java/org/jumpmind/db/platform/sqlite/SqliteDdlReader.java
  7. +4 −0 symmetric-db/src/main/java/org/jumpmind/db/sql/AbstractJavaDriverSqlTemplate.java
  8. +9 −0 symmetric-db/src/main/java/org/jumpmind/db/sql/AbstractSqlTemplate.java
  9. +4 −0 symmetric-db/src/main/java/org/jumpmind/db/sql/ISqlTemplate.java
  10. +9 −0 symmetric-db/src/main/java/org/jumpmind/db/sql/Row.java
  11. +120 −0 symmetric-db/src/main/java/org/jumpmind/db/util/TableRow.java
  12. +52 −9 ...o/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriterConflictResolver.java
  13. +45 −34 symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriter.java
  14. +220 −1 ...io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriterConflictResolver.java
  15. +14 −6 symmetric-io/src/test/java/org/jumpmind/symmetric/io/AbstractWriterTest.java
  16. +323 −0 symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriterConflictTest.java
  17. +28 −0 symmetric-io/src/test/resources/testDatabaseWriter.xml
  18. +274 −0 symmetric-jdbc/src/main/java/org/jumpmind/db/platform/AbstractJdbcDdlReader.java
  19. +15 −0 symmetric-jdbc/src/main/java/org/jumpmind/db/platform/db2/Db2JdbcSqlTemplate.java
  20. +4 −1 symmetric-jdbc/src/main/java/org/jumpmind/db/platform/derby/DerbyJdbcSqlTemplate.java
  21. +2 −0 symmetric-jdbc/src/main/java/org/jumpmind/db/platform/firebird/FirebirdJdbcSqlTemplate.java
  22. +3 −1 symmetric-jdbc/src/main/java/org/jumpmind/db/platform/h2/H2JdbcSqlTemplate.java
  23. +3 −1 symmetric-jdbc/src/main/java/org/jumpmind/db/platform/hsqldb2/HsqlDb2JdbcSqlTemplate.java
  24. +2 −0 symmetric-jdbc/src/main/java/org/jumpmind/db/platform/informix/InformixJdbcSqlTemplate.java
  25. +6 −0 symmetric-jdbc/src/main/java/org/jumpmind/db/platform/mssql/MsSqlJdbcSqlTemplate.java
  26. +2 −0 symmetric-jdbc/src/main/java/org/jumpmind/db/platform/mysql/MySqlJdbcSqlTemplate.java
  27. +1 −0 symmetric-jdbc/src/main/java/org/jumpmind/db/platform/nuodb/NuoDbJdbcSqlTemplate.java
  28. +2 −0 symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OracleJdbcSqlTemplate.java
  29. +3 −1 symmetric-jdbc/src/main/java/org/jumpmind/db/platform/postgresql/PostgreSqlJdbcSqlTemplate.java
  30. +2 −0 symmetric-jdbc/src/main/java/org/jumpmind/db/platform/sybase/SybaseJdbcSqlTemplate.java
  31. +3 −9 symmetric-jdbc/src/main/java/org/jumpmind/db/platform/tibero/TiberoJdbcSqlTemplate.java
  32. +77 −1 symmetric-jdbc/src/main/java/org/jumpmind/db/sql/JdbcSqlTemplate.java
@@ -41,20 +41,16 @@
import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Database;
import org.jumpmind.db.model.ForeignKey;
import org.jumpmind.db.model.Reference;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.DatabaseInfo;
import org.jumpmind.db.sql.DmlStatement;
import org.jumpmind.db.sql.DmlStatement.DmlType;
import org.jumpmind.db.sql.ISqlReadCursor;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.UniqueKeyException;
import org.jumpmind.db.sql.mapper.NumberMapper;
import org.jumpmind.db.util.TableRow;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
@@ -1990,12 +1986,7 @@ public void reloadMissingForeignKeyRowsReverse(String sourceNodeId, Table table,
log.info("Could not find table " + table.getFullyQualifiedTableName());
}
tableRows.add(new TableRow(localTable, row, null, null, null));
List<TableRow> foreignTableRows;
try {
foreignTableRows = getForeignTableRows(tableRows, new HashSet<TableRow>());
} catch (CloneNotSupportedException e) {
throw new RuntimeException(e);
}
List<TableRow> foreignTableRows = platform.getDdlReader().getImportedForeignTableRows(tableRows, new HashSet<TableRow>());

if (foreignTableRows.isEmpty()) {
log.info("Could not determine foreign table rows to fix foreign key violation for "
@@ -2065,12 +2056,7 @@ public void reloadMissingForeignKeyRows(String nodeId, long dataId) {
Row row = new Row(dataMap.size());
row.putAll(dataMap);
tableRows.add(new TableRow(table, row, null, null, null));
List<TableRow> foreignTableRows;
try {
foreignTableRows = getForeignTableRows(tableRows, new HashSet<TableRow>());
} catch (CloneNotSupportedException e) {
throw new RuntimeException(e);
}
List<TableRow> foreignTableRows = platform.getDdlReader().getImportedForeignTableRows(tableRows, new HashSet<TableRow>());

if (foreignTableRows.isEmpty()) {
log.info("Could not determine foreign table rows to fix foreign key violation for "
@@ -2106,90 +2092,6 @@ public void reloadMissingForeignKeyRows(String nodeId, long dataId) {
}
}

protected List<TableRow> getForeignTableRows(List<TableRow> tableRows, Set<TableRow> visited) throws CloneNotSupportedException {
List<TableRow> fkDepList = new ArrayList<TableRow>();
for (TableRow tableRow : tableRows) {
if (!visited.contains(tableRow)) {
visited.add(tableRow);
for (ForeignKey fk : tableRow.getTable().getForeignKeys()) {
Table table = platform.getTableFromCache(fk.getForeignTableName(), false);
if (table == null) {
table = fk.getForeignTable();
if (table == null) {
table = platform.getTableFromCache(tableRow.getTable().getCatalog(), tableRow.getTable().getSchema(),
fk.getForeignTableName(), false);
}
}
if (table != null) {
Table foreignTable = (Table) table.clone();
for (Column column : foreignTable.getColumns()) {
column.setPrimaryKey(false);
}
Row whereRow = new Row(fk.getReferenceCount());
String referenceColumnName = null;
boolean[] nullValues = new boolean[fk.getReferenceCount()];
int index = 0;
for (Reference ref : fk.getReferences()) {
Column foreignColumn = foreignTable.findColumn(ref.getForeignColumnName());
Object value = tableRow.getRow().get(ref.getLocalColumnName());
nullValues[index++] = value == null;
referenceColumnName = ref.getLocalColumnName();
whereRow.put(foreignColumn.getName(), value);
foreignColumn.setPrimaryKey(true);
}

boolean allNullValues = true;
for (boolean b : nullValues) {
if (!b) {
allNullValues = false;
break;
}
}

if (!allNullValues) {
DmlStatement whereSt = platform.createDmlStatement(DmlType.WHERE, foreignTable.getCatalog(),
foreignTable.getSchema(), foreignTable.getName(), foreignTable.getPrimaryKeyColumns(),
foreignTable.getColumns(), nullValues, null);
String whereSql = whereSt.buildDynamicSql(symmetricDialect.getBinaryEncoding(), whereRow, false, true,
foreignTable.getPrimaryKeyColumns()).substring(6);
String delimiter = platform.getDatabaseInfo().getSqlCommandDelimiter();
if (delimiter != null && delimiter.length() > 0) {
whereSql = whereSql.substring(0, whereSql.length() - delimiter.length());
}

Row foreignRow = new Row(foreignTable.getColumnCount());
if (foreignTable.getForeignKeyCount() > 0) {
DmlStatement selectSt = platform.createDmlStatement(DmlType.SELECT, foreignTable, null);
Object[] keys = whereRow.toArray(foreignTable.getPrimaryKeyColumnNames());
Map<String, Object> values = sqlTemplate.queryForMap(selectSt.getSql(), keys);
if (values == null) {
log.warn(
"Unable to reload rows for missing foreign key data for table '{}', parent data not found. Using sql='{}' with keys '{}'",
table.getName(), selectSt.getSql(), keys);
} else {
foreignRow.putAll(values);
}
}

TableRow foreignTableRow = new TableRow(foreignTable, foreignRow, whereSql, referenceColumnName, fk.getName());
fkDepList.add(foreignTableRow);
log.debug("Add foreign table reference '{}' whereSql='{}'", foreignTable.getName(), whereSql);
} else {
log.debug("The foreign table reference was null for {}", foreignTable.getName());
}
} else {
log.debug("Foreign table '{}' not found for foreign key '{}'", fk.getForeignTableName(), fk.getName());
}
if (fkDepList.size() > 0) {
fkDepList.addAll(getForeignTableRows(fkDepList, visited));
}
}
}
}

return fkDepList;
}

/**
* Because we can't add a trigger on the _node table, we are artificially
* generating heartbeat events.
@@ -2658,89 +2560,6 @@ public boolean fixLastDataGap() {
}
return fixed;
}

class TableRow {
Table table;
Row row;
String whereSql;
String referenceColumnName;
String fkName;
String fkColumnValues = null;

public TableRow(Table table, Row row, String whereSql, String referenceColumnName, String fkName) {
this.table = table;
this.row = row;
this.whereSql = whereSql;
this.referenceColumnName = referenceColumnName;
this.fkName = fkName;
}

protected String getFkColumnValues() {
if (fkColumnValues == null) {
StringBuilder builder = new StringBuilder();
ForeignKey[] keys = table.getForeignKeys();
for (ForeignKey foreignKey : keys) {
if (foreignKey.getName().equals(fkName)) {
Reference[] refs = foreignKey.getReferences();
for (Reference ref : refs) {
Object value = row.get(ref.getLocalColumnName());
if (value != null) {
builder.append("\"").append(value).append("\",");
} else {
builder.append("null,");
}
}
}
}
fkColumnValues = builder.toString();
}
return fkColumnValues;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((table == null) ? 0 : table.hashCode());
result = prime * result + ((whereSql == null) ? 0 : whereSql.hashCode());
result = prime * result + ((getFkColumnValues() == null) ? 0 : getFkColumnValues().hashCode());
return result;
}

@Override
public boolean equals(Object o) {
if (o instanceof TableRow) {
TableRow tr = (TableRow) o;
return tr.table.equals(table) && tr.whereSql.equals(whereSql)
&& tr.getFkColumnValues().equals(getFkColumnValues().toString());
}
return false;
}

@Override
public String toString() {
return table.getFullyQualifiedTableName() + ":" + whereSql + ":" + getFkColumnValues();
}

public Table getTable() {
return table;
}

public Row getRow() {
return row;
}

public String getWhereSql() {
return whereSql;
}
public String getReferenceColumnName() {
return referenceColumnName;
}
public String getFkName() {
return fkName;
}

}

public class DataMapper implements ISqlRowMapper<Data> {
public Data mapRow(Row row) {
@@ -183,6 +183,22 @@ public ResultSet getForeignKeys(String tableNamePattern) throws SQLException {
return getMetaData().getImportedKeys(getCatalog(), getSchemaPattern(), tableNamePattern);
}

/**
* Convenience method to return the foreign keys that reference this table using the
* configured catalog and schema pattern.
*
* @param tableNamePattern
* The pattern identifying for which tables to return info
* @return The foreign key meta data
* @throws SQLException
* If an error occurred retrieving the meta data
* @see DatabaseMetaData#getImportedKeys(java.lang.String, java.lang.String,
* java.lang.String)
*/
public ResultSet getExportedKeys(String tableNamePattern) throws SQLException {
return getMetaData().getExportedKeys(getCatalog(), getSchemaPattern(), tableNamePattern);
}

/**
* Convenience method to return the index meta data using the configured
* catalog and schema pattern.
@@ -21,11 +21,16 @@
package org.jumpmind.db.platform;


import java.util.Collection;
import java.util.List;
import java.util.Set;

import org.jumpmind.db.model.Database;
import org.jumpmind.db.model.ForeignKey;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.model.Trigger;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.util.TableRow;

public interface IDdlReader {

@@ -47,4 +52,10 @@

public Trigger getTriggerFor(Table table, String name);

public Collection<ForeignKey> getExportedKeys(Table table);

public List<TableRow> getExportedForeignTableRows(ISqlTransaction transaction, List<TableRow> tableRows, Set<TableRow> visited);

public List<TableRow> getImportedForeignTableRows(List<TableRow> tableRows, Set<TableRow> visited);

}
@@ -1,13 +1,18 @@
package org.jumpmind.db.platform.cassandra;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.jumpmind.db.model.Database;
import org.jumpmind.db.model.ForeignKey;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.model.Trigger;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.IDdlReader;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.util.TableRow;

public class CassandraDdlReader implements IDdlReader {
protected CassandraPlatform platform;
@@ -18,7 +23,6 @@ public CassandraDdlReader(IDatabasePlatform platform) {

@Override
public Database readTables(String catalog, String schema, String[] tableTypes) {
// TODO Auto-generated method stub
return null;
}

@@ -31,44 +35,52 @@ public Table readTable(String catalog, String schema, String tableName) {

@Override
public List<String> getTableTypes() {
// TODO Auto-generated method stub
return null;
}

@Override
public List<String> getCatalogNames() {
// TODO Auto-generated method stub
return null;
}

@Override
public List<String> getSchemaNames(String catalog) {
// TODO Auto-generated method stub
return null;
}

@Override
public List<String> getTableNames(String catalog, String schema, String[] tableTypes) {
// TODO Auto-generated method stub
return null;
}

@Override
public List<String> getColumnNames(String catalog, String schema, String tableName) {
// TODO Auto-generated method stub
return null;
}

@Override
public List<Trigger> getTriggers(String catalog, String schema, String tableName) {
// TODO Auto-generated method stub
return null;
}

@Override
public Trigger getTriggerFor(Table table, String name) {
// TODO Auto-generated method stub
return null;
}

@Override
public Collection<ForeignKey> getExportedKeys(Table table) {
return null;
}

@Override
public List<TableRow> getExportedForeignTableRows(ISqlTransaction transaction, List<TableRow> tableRows, Set<TableRow> visited) {
return null;
}

@Override
public List<TableRow> getImportedForeignTableRows(List<TableRow> tableRows, Set<TableRow> visited) {
return null;
}

}
Oops, something went wrong.

0 comments on commit cba2df0

Please sign in to comment.
You can’t perform that action at this time.