Skip to content

Commit

Permalink
0004687: Postgres conflict when unique index uses primary key index
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Dec 11, 2020
1 parent 6429d08 commit 3c01948
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 48 deletions.
Expand Up @@ -64,15 +64,7 @@ public void needsResolved(AbstractDatabaseWriter writer, CsvData data, LoadStatu
}

if (isWinner) {
if (writer.getContext().getLastError() != null) {
performChainedFallbackForInsert(writer, data, conflict);
} else {
try {
performFallbackToUpdate(writer, data, conflict, true);
} catch (ConflictException e) {
performChainedFallbackForInsert(writer, data, conflict);
}
}
performChainedFallbackForInsert(writer, data, conflict);
} else if (!conflict.isResolveRowOnly()) {
throw new IgnoreBatchException();
}
Expand Down Expand Up @@ -108,16 +100,7 @@ public void needsResolved(AbstractDatabaseWriter writer, CsvData data, LoadStatu
}

if (isWinner) {
if (writer.getContext().getLastError() != null) {
performChainedFallbackForUpdate(writer, data, conflict, true);
} else {
try {
// original update was 0 rows, so we'll try to update without conflict detection
performFallbackToUpdate(writer, data, conflict, false);
} catch (ConflictException e) {
performChainedFallbackForUpdate(writer, data, conflict, true);
}
}
performChainedFallbackForUpdate(writer, data, conflict, true);
} else if (!conflict.isResolveRowOnly()) {
throw new IgnoreBatchException();
}
Expand Down Expand Up @@ -201,7 +184,17 @@ public void needsResolved(AbstractDatabaseWriter writer, CsvData data, LoadStatu
}

protected void performChainedFallbackForInsert(AbstractDatabaseWriter writer, CsvData data, Conflict conflict) {
if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError())) {
boolean isFallback = false;
if (writer.getContext().getLastError() == null) {
try {
isFallback = true;
performFallbackToUpdate(writer, data, conflict, true);
return;
} catch (ConflictException e) {
}
}

if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError(), isFallback)) {
// unique index violation, we remove blocking rows, and try again
try {
performFallbackToInsert(writer, data, conflict, true);
Expand All @@ -214,7 +207,7 @@ protected void performChainedFallbackForInsert(AbstractDatabaseWriter writer, Cs
// standard fallback to update when insert gets primary key violation
performFallbackToUpdate(writer, data, conflict, true);
} catch (ConflictException e) {
if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError())) {
if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError(), true)) {
// unique index violation, we remove blocking rows, and try again
performFallbackToUpdate(writer, data, conflict, true);
} else {
Expand All @@ -225,8 +218,19 @@ protected void performChainedFallbackForInsert(AbstractDatabaseWriter writer, Cs
}

protected void performChainedFallbackForUpdate(AbstractDatabaseWriter writer, CsvData data, Conflict conflict, boolean overrideToUsePkData) {
boolean isFallback = false;
if (writer.getContext().getLastError() == null) {
try {
// original update was 0 rows, so we'll try to update without conflict detection
isFallback = true;
performFallbackToUpdate(writer, data, conflict, false);
return;
} catch (ConflictException e) {
}
}

if (conflict.getDetectType() == DetectConflict.USE_PK_DATA || overrideToUsePkData) {
if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError())) {
if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError(), isFallback)) {
try {
// unique index violation, we remove blocking rows, and try again
performFallbackToUpdate(writer, data, conflict, true);
Expand All @@ -243,7 +247,7 @@ protected void performChainedFallbackForUpdate(AbstractDatabaseWriter writer, Cs
// standard fallback to insert when update gets zero rows
performFallbackToInsert(writer, withoutOldData, conflict, true);
} catch (ConflictException e) {
if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError())) {
if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError(), true)) {
// unique index violation, we remove blocking rows, and try again
performFallbackToInsert(writer, withoutOldData, conflict, true);
} else {
Expand Down Expand Up @@ -382,7 +386,7 @@ protected void performFallbackToInsert(AbstractDatabaseWriter writer, CsvData cs
}
}

abstract protected boolean checkForUniqueKeyViolation(AbstractDatabaseWriter writer, CsvData csvData, Conflict conflict, Throwable ex);
abstract protected boolean checkForUniqueKeyViolation(AbstractDatabaseWriter writer, CsvData csvData, Conflict conflict, Throwable ex, boolean isFallback);

abstract protected boolean checkForForeignKeyChildExistsViolation(AbstractDatabaseWriter writer, CsvData data, Conflict conflict, Throwable ex);

Expand Down
Expand Up @@ -33,6 +33,7 @@
import java.util.Set;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.IIndex;
import org.jumpmind.db.model.IndexColumn;
Expand Down Expand Up @@ -209,38 +210,66 @@ protected boolean isVersionNewer(Conflict conflict, AbstractDatabaseWriter write
}

@Override
protected boolean checkForUniqueKeyViolation(AbstractDatabaseWriter writer, CsvData data, Conflict conflict, Throwable e) {
protected boolean checkForUniqueKeyViolation(AbstractDatabaseWriter writer, CsvData data, Conflict conflict, Throwable e, boolean isFallback) {
DefaultDatabaseWriter databaseWriter = (DefaultDatabaseWriter)writer;
IDatabasePlatform platform = databaseWriter.getPlatform();
ISqlTemplate sqlTemplate = platform.getSqlTemplate();
int count = 0;

if (e != null && sqlTemplate.isUniqueKeyViolation(e)) {
Table targetTable = writer.getTargetTable();
log.info("Unique key violation on table {} during {} with batch {}. Attempting to correct.",
targetTable.getName(), data.getDataEventType().toString(), writer.getContext().getBatch().getNodeBatchId());
Map<String, String> values = data.toColumnNameValuePairs(targetTable.getColumnNames(), CsvData.ROW_DATA);
List<Column> whereColumns = targetTable.getPrimaryKeyColumnsAsList();
List<String> whereValues = new ArrayList<String>();
for (Column column : whereColumns) {
whereValues.add(values.get(column.getName()));
}
boolean[] nullKeyValues = new boolean[whereColumns.size()];
DmlStatement countStmt = platform.createDmlStatement(DmlType.COUNT, targetTable.getCatalog(), targetTable.getSchema(),
targetTable.getName(), whereColumns.toArray(new Column[0]), targetTable.getPrimaryKeyColumns(), nullKeyValues,
databaseWriter.getWriterSettings().getTextColumnExpression());
Object[] objectValues = platform.getObjectValues(databaseWriter.getBatch().getBinaryEncoding(),
whereValues.toArray(new String[0]), whereColumns.toArray(new Column[0]));
int pkCount = queryForInt(platform, databaseWriter, countStmt.getSql(), objectValues);
boolean isUniqueKeyBlocking = false;
boolean isPrimaryKeyBlocking = false;

for (IIndex index : targetTable.getIndices()) {
if (index.isUnique()) {
log.info("Correcting for possible violation of unique index {} on table {} during {} with batch {}", index.getName(),
targetTable.getName(), data.getDataEventType().toString(), writer.getContext().getBatch().getNodeBatchId());
count += deleteUniqueConstraintRow(platform, sqlTemplate, databaseWriter, targetTable, index, data);
if ((!isFallback && data.getDataEventType().equals(DataEventType.UPDATE)) ||
(isFallback && data.getDataEventType().equals(DataEventType.INSERT))) {
Map<String, String> pkValues = data.toColumnNameValuePairs(targetTable.getPrimaryKeyColumnNames(), CsvData.PK_DATA);
boolean isPkChanged = false;
for (Map.Entry<String, String> entry : pkValues.entrySet()) {
String newValue = values.get(entry.getKey());
if (!StringUtils.equals(newValue, entry.getValue())) {
isPkChanged = true;
break;
}
}
if (isPkChanged && pkCount > 0) {
isPrimaryKeyBlocking = true;
} else {
isUniqueKeyBlocking = true;
}
} else if ((!isFallback && data.getDataEventType().equals(DataEventType.INSERT)) ||
(isFallback && data.getDataEventType().equals(DataEventType.UPDATE))) {
isUniqueKeyBlocking = pkCount == 0;
}

if (data.getDataEventType().equals(DataEventType.UPDATE)) {
// Primary key is preventing our update, so we delete the blocking row
log.info("Correcting for possible violation of primary key on table {} during {} with batch {}", targetTable.getName(),
data.getDataEventType().toString(), writer.getContext().getBatch().getNodeBatchId());
if (isUniqueKeyBlocking) {
log.info("Unique key violation on table {} during {} with batch {}. Attempting to correct.",
targetTable.getName(), data.getDataEventType().toString(), writer.getContext().getBatch().getNodeBatchId());

Map<String, String> values = data.toColumnNameValuePairs(targetTable.getColumnNames(), CsvData.ROW_DATA);
List<Column> whereColumns = targetTable.getPrimaryKeyColumnsAsList();
List<String> whereValues = new ArrayList<String>();

for (Column column : whereColumns) {
whereValues.add(values.get(column.getName()));
}
count += deleteRow(platform, sqlTemplate, databaseWriter, targetTable, whereColumns, whereValues, false);
for (IIndex index : targetTable.getIndices()) {
if (index.isUnique()) {
log.info("Correcting for possible violation of unique index {} on table {} during {} with batch {}", index.getName(),
targetTable.getName(), data.getDataEventType().toString(), writer.getContext().getBatch().getNodeBatchId());
count += deleteUniqueConstraintRow(platform, sqlTemplate, databaseWriter, targetTable, index, data);
}
}
} else if (isPrimaryKeyBlocking) {
log.info("Correcting for update violation of primary key on table {} during {} with batch {}", targetTable.getName(),
data.getDataEventType().toString(), writer.getContext().getBatch().getNodeBatchId());
count += deleteRow(platform, sqlTemplate, databaseWriter, targetTable, whereColumns, whereValues, false);
}
}
return count != 0;
Expand Down Expand Up @@ -353,10 +382,8 @@ public List<TableRow> execute(ISqlTransaction transaction) {
if (visited.add(foreignTableRow)) {
Table foreignTable = foreignTableRow.getTable();

log.info("Remove foreign row "
+ "catalog '{}' schema '{}' foreign table name '{}' fk name '{}' where sql '{}' "
+ "to correct table '{}' for column '{}'",
foreignTable.getCatalog(), foreignTable.getSchema(), foreignTable.getName(), foreignTableRow.getFkName(), foreignTableRow.getWhereSql(),
log.info("Remove foreign row from table '{}' fk name '{}' where sql '{}' to correct table '{}' for column '{}'",
foreignTable.getFullyQualifiedTableName(), foreignTableRow.getFkName(), foreignTableRow.getWhereSql(),
targetTable.getName(), foreignTableRow.getReferenceColumnName());

DatabaseInfo info = platform.getDatabaseInfo();
Expand Down Expand Up @@ -385,6 +412,14 @@ public Row execute(ISqlTransaction transaction) {
});
}

protected int queryForInt(IDatabasePlatform platform, DefaultDatabaseWriter databaseWriter, String sql, Object... values) {
return doInTransaction(platform, databaseWriter, new ITransactionCallback<Integer>() {
public Integer execute(ISqlTransaction transaction) {
return transaction.queryForInt(sql, values);
}
});
}

private ISqlTransaction getTransaction(IDatabasePlatform platform, DefaultDatabaseWriter databaseWriter) {
// There is code in DefaultDatabaseWriter.insert() that sets up the last error of SQLIntegrityConstraintViolationException
// when an insert of a record returns a count of 0 records inserted (with no SQL exception).
Expand Down

0 comments on commit 3c01948

Please sign in to comment.