diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriterConflictResolver.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriterConflictResolver.java index bb1db98e40..bcae9ff571 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriterConflictResolver.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriterConflictResolver.java @@ -64,15 +64,7 @@ public void needsResolved(AbstractDatabaseWriter writer, CsvData data, LoadStatu } if (isWinner) { - if (writer.getContext().getLastError() != null) { - performChainedFallbackForInsert(writer, data, conflict); - } else { - try { - performFallbackToUpdate(writer, data, conflict, true); - } catch (ConflictException e) { - performChainedFallbackForInsert(writer, data, conflict); - } - } + performChainedFallbackForInsert(writer, data, conflict); } else if (!conflict.isResolveRowOnly()) { throw new IgnoreBatchException(); } @@ -108,16 +100,7 @@ public void needsResolved(AbstractDatabaseWriter writer, CsvData data, LoadStatu } if (isWinner) { - if (writer.getContext().getLastError() != null) { - performChainedFallbackForUpdate(writer, data, conflict, true); - } else { - try { - // original update was 0 rows, so we'll try to update without conflict detection - performFallbackToUpdate(writer, data, conflict, false); - } catch (ConflictException e) { - performChainedFallbackForUpdate(writer, data, conflict, true); - } - } + performChainedFallbackForUpdate(writer, data, conflict, true); } else if (!conflict.isResolveRowOnly()) { throw new IgnoreBatchException(); } @@ -201,7 +184,17 @@ public void needsResolved(AbstractDatabaseWriter writer, CsvData data, LoadStatu } protected void performChainedFallbackForInsert(AbstractDatabaseWriter writer, CsvData data, Conflict conflict) { - if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError())) { + boolean isFallback = false; + if (writer.getContext().getLastError() == null) { + try { + isFallback = true; + performFallbackToUpdate(writer, data, conflict, true); + return; + } catch (ConflictException e) { + } + } + + if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError(), isFallback)) { // unique index violation, we remove blocking rows, and try again try { performFallbackToInsert(writer, data, conflict, true); @@ -214,7 +207,7 @@ protected void performChainedFallbackForInsert(AbstractDatabaseWriter writer, Cs // standard fallback to update when insert gets primary key violation performFallbackToUpdate(writer, data, conflict, true); } catch (ConflictException e) { - if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError())) { + if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError(), true)) { // unique index violation, we remove blocking rows, and try again performFallbackToUpdate(writer, data, conflict, true); } else { @@ -225,8 +218,19 @@ protected void performChainedFallbackForInsert(AbstractDatabaseWriter writer, Cs } protected void performChainedFallbackForUpdate(AbstractDatabaseWriter writer, CsvData data, Conflict conflict, boolean overrideToUsePkData) { + boolean isFallback = false; + if (writer.getContext().getLastError() == null) { + try { + // original update was 0 rows, so we'll try to update without conflict detection + isFallback = true; + performFallbackToUpdate(writer, data, conflict, false); + return; + } catch (ConflictException e) { + } + } + if (conflict.getDetectType() == DetectConflict.USE_PK_DATA || overrideToUsePkData) { - if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError())) { + if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError(), isFallback)) { try { // unique index violation, we remove blocking rows, and try again performFallbackToUpdate(writer, data, conflict, true); @@ -243,7 +247,7 @@ protected void performChainedFallbackForUpdate(AbstractDatabaseWriter writer, Cs // standard fallback to insert when update gets zero rows performFallbackToInsert(writer, withoutOldData, conflict, true); } catch (ConflictException e) { - if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError())) { + if (checkForUniqueKeyViolation(writer, data, conflict, writer.getContext().getLastError(), true)) { // unique index violation, we remove blocking rows, and try again performFallbackToInsert(writer, withoutOldData, conflict, true); } else { @@ -382,7 +386,7 @@ protected void performFallbackToInsert(AbstractDatabaseWriter writer, CsvData cs } } - abstract protected boolean checkForUniqueKeyViolation(AbstractDatabaseWriter writer, CsvData csvData, Conflict conflict, Throwable ex); + abstract protected boolean checkForUniqueKeyViolation(AbstractDatabaseWriter writer, CsvData csvData, Conflict conflict, Throwable ex, boolean isFallback); abstract protected boolean checkForForeignKeyChildExistsViolation(AbstractDatabaseWriter writer, CsvData data, Conflict conflict, Throwable ex); diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriterConflictResolver.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriterConflictResolver.java index 7a54aa9943..b28d7a9f8c 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriterConflictResolver.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriterConflictResolver.java @@ -33,6 +33,7 @@ import java.util.Set; import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; import org.jumpmind.db.model.Column; import org.jumpmind.db.model.IIndex; import org.jumpmind.db.model.IndexColumn; @@ -209,7 +210,7 @@ protected boolean isVersionNewer(Conflict conflict, AbstractDatabaseWriter write } @Override - protected boolean checkForUniqueKeyViolation(AbstractDatabaseWriter writer, CsvData data, Conflict conflict, Throwable e) { + protected boolean checkForUniqueKeyViolation(AbstractDatabaseWriter writer, CsvData data, Conflict conflict, Throwable e, boolean isFallback) { DefaultDatabaseWriter databaseWriter = (DefaultDatabaseWriter)writer; IDatabasePlatform platform = databaseWriter.getPlatform(); ISqlTemplate sqlTemplate = platform.getSqlTemplate(); @@ -217,30 +218,58 @@ protected boolean checkForUniqueKeyViolation(AbstractDatabaseWriter writer, CsvD if (e != null && sqlTemplate.isUniqueKeyViolation(e)) { Table targetTable = writer.getTargetTable(); - log.info("Unique key violation on table {} during {} with batch {}. Attempting to correct.", - targetTable.getName(), data.getDataEventType().toString(), writer.getContext().getBatch().getNodeBatchId()); + Map values = data.toColumnNameValuePairs(targetTable.getColumnNames(), CsvData.ROW_DATA); + List whereColumns = targetTable.getPrimaryKeyColumnsAsList(); + List whereValues = new ArrayList(); + for (Column column : whereColumns) { + whereValues.add(values.get(column.getName())); + } + boolean[] nullKeyValues = new boolean[whereColumns.size()]; + DmlStatement countStmt = platform.createDmlStatement(DmlType.COUNT, targetTable.getCatalog(), targetTable.getSchema(), + targetTable.getName(), whereColumns.toArray(new Column[0]), targetTable.getPrimaryKeyColumns(), nullKeyValues, + databaseWriter.getWriterSettings().getTextColumnExpression()); + Object[] objectValues = platform.getObjectValues(databaseWriter.getBatch().getBinaryEncoding(), + whereValues.toArray(new String[0]), whereColumns.toArray(new Column[0])); + int pkCount = queryForInt(platform, databaseWriter, countStmt.getSql(), objectValues); + boolean isUniqueKeyBlocking = false; + boolean isPrimaryKeyBlocking = false; - for (IIndex index : targetTable.getIndices()) { - if (index.isUnique()) { - log.info("Correcting for possible violation of unique index {} on table {} during {} with batch {}", index.getName(), - targetTable.getName(), data.getDataEventType().toString(), writer.getContext().getBatch().getNodeBatchId()); - count += deleteUniqueConstraintRow(platform, sqlTemplate, databaseWriter, targetTable, index, data); + if ((!isFallback && data.getDataEventType().equals(DataEventType.UPDATE)) || + (isFallback && data.getDataEventType().equals(DataEventType.INSERT))) { + Map pkValues = data.toColumnNameValuePairs(targetTable.getPrimaryKeyColumnNames(), CsvData.PK_DATA); + boolean isPkChanged = false; + for (Map.Entry entry : pkValues.entrySet()) { + String newValue = values.get(entry.getKey()); + if (!StringUtils.equals(newValue, entry.getValue())) { + isPkChanged = true; + break; + } } + if (isPkChanged && pkCount > 0) { + isPrimaryKeyBlocking = true; + } else { + isUniqueKeyBlocking = true; + } + } else if ((!isFallback && data.getDataEventType().equals(DataEventType.INSERT)) || + (isFallback && data.getDataEventType().equals(DataEventType.UPDATE))) { + isUniqueKeyBlocking = pkCount == 0; } - if (data.getDataEventType().equals(DataEventType.UPDATE)) { - // Primary key is preventing our update, so we delete the blocking row - log.info("Correcting for possible violation of primary key on table {} during {} with batch {}", targetTable.getName(), - data.getDataEventType().toString(), writer.getContext().getBatch().getNodeBatchId()); + if (isUniqueKeyBlocking) { + log.info("Unique key violation on table {} during {} with batch {}. Attempting to correct.", + targetTable.getName(), data.getDataEventType().toString(), writer.getContext().getBatch().getNodeBatchId()); - Map values = data.toColumnNameValuePairs(targetTable.getColumnNames(), CsvData.ROW_DATA); - List whereColumns = targetTable.getPrimaryKeyColumnsAsList(); - List whereValues = new ArrayList(); - - for (Column column : whereColumns) { - whereValues.add(values.get(column.getName())); - } - count += deleteRow(platform, sqlTemplate, databaseWriter, targetTable, whereColumns, whereValues, false); + for (IIndex index : targetTable.getIndices()) { + if (index.isUnique()) { + log.info("Correcting for possible violation of unique index {} on table {} during {} with batch {}", index.getName(), + targetTable.getName(), data.getDataEventType().toString(), writer.getContext().getBatch().getNodeBatchId()); + count += deleteUniqueConstraintRow(platform, sqlTemplate, databaseWriter, targetTable, index, data); + } + } + } else if (isPrimaryKeyBlocking) { + log.info("Correcting for update violation of primary key on table {} during {} with batch {}", targetTable.getName(), + data.getDataEventType().toString(), writer.getContext().getBatch().getNodeBatchId()); + count += deleteRow(platform, sqlTemplate, databaseWriter, targetTable, whereColumns, whereValues, false); } } return count != 0; @@ -353,10 +382,8 @@ public List execute(ISqlTransaction transaction) { if (visited.add(foreignTableRow)) { Table foreignTable = foreignTableRow.getTable(); - log.info("Remove foreign row " - + "catalog '{}' schema '{}' foreign table name '{}' fk name '{}' where sql '{}' " - + "to correct table '{}' for column '{}'", - foreignTable.getCatalog(), foreignTable.getSchema(), foreignTable.getName(), foreignTableRow.getFkName(), foreignTableRow.getWhereSql(), + log.info("Remove foreign row from table '{}' fk name '{}' where sql '{}' to correct table '{}' for column '{}'", + foreignTable.getFullyQualifiedTableName(), foreignTableRow.getFkName(), foreignTableRow.getWhereSql(), targetTable.getName(), foreignTableRow.getReferenceColumnName()); DatabaseInfo info = platform.getDatabaseInfo(); @@ -385,6 +412,14 @@ public Row execute(ISqlTransaction transaction) { }); } + protected int queryForInt(IDatabasePlatform platform, DefaultDatabaseWriter databaseWriter, String sql, Object... values) { + return doInTransaction(platform, databaseWriter, new ITransactionCallback() { + public Integer execute(ISqlTransaction transaction) { + return transaction.queryForInt(sql, values); + } + }); + } + private ISqlTransaction getTransaction(IDatabasePlatform platform, DefaultDatabaseWriter databaseWriter) { // There is code in DefaultDatabaseWriter.insert() that sets up the last error of SQLIntegrityConstraintViolationException // when an insert of a record returns a count of 0 records inserted (with no SQL exception).