Skip to content

Commit

Permalink
SYMMETRICDS-532 - only fallback to update if the error is truly a pri…
Browse files Browse the repository at this point in the history
…mary key violation.
  • Loading branch information
chenson42 committed Oct 23, 2011
1 parent 86a1325 commit fef8852
Show file tree
Hide file tree
Showing 18 changed files with 151 additions and 157 deletions.
Expand Up @@ -165,6 +165,10 @@ abstract public class AbstractDbDialect implements IDbDialect {

protected int queryTimeoutInSeconds = 300;

protected int[] primaryKeyViolationCodes;

protected String[] primaryKeyViolationSqlStates;

protected List<IDatabaseUpgradeListener> databaseUpgradeListeners = new ArrayList<IDatabaseUpgradeListener>();

protected AbstractDbDialect() {
Expand Down Expand Up @@ -1688,5 +1692,58 @@ public StatementBuilder createStatementBuilder(DmlType type, String tableName, C
return new StatementBuilder(type, tableName, keys,
columns,
preFilteredColumns, isDateOverrideToTimestamp(), getIdentifierQuoteString());
}

public void setPrimaryKeyViolationCodes(int[] primaryKeyViolationCodes) {
this.primaryKeyViolationCodes = primaryKeyViolationCodes;
}

public void setPrimaryKeyViolationSqlStates(String[] primaryKeyViolationSqlStates) {
this.primaryKeyViolationSqlStates = primaryKeyViolationSqlStates;
}

public boolean isPrimaryKeyViolation(Exception ex) {
boolean primaryKeyViolation = false;
if (primaryKeyViolationCodes != null || primaryKeyViolationSqlStates != null) {
SQLException sqlEx = findSQLException(ex);
if (sqlEx != null) {
if (primaryKeyViolationCodes != null) {
int errorCode = sqlEx.getErrorCode();
for (int primaryKeyViolationCode : primaryKeyViolationCodes) {
if (primaryKeyViolationCode == errorCode) {
primaryKeyViolation = true;
break;
}
}
}

if (primaryKeyViolationSqlStates != null) {
String sqlState = sqlEx.getSQLState();
if (sqlState != null) {
for (String primaryKeyViolationSqlState : primaryKeyViolationSqlStates) {
if (primaryKeyViolationSqlState != null
&& primaryKeyViolationSqlState.equals(sqlState)) {
primaryKeyViolation = true;
break;
}
}
}
}
}
}

return primaryKeyViolation;
}

protected SQLException findSQLException(Throwable ex) {
if (ex instanceof SQLException) {
return (SQLException)ex;
} else {
Throwable cause = ex.getCause();
if (cause != null && !cause.equals(ex)) {
return findSQLException(ex);
}
}
return null;
}
}
Expand Down
Expand Up @@ -334,5 +334,12 @@ public long insertWithGeneratedKey(JdbcTemplate jdbcTemplate, final String sql,

public StatementBuilder createStatementBuilder(DmlType type, String tableName, Column[] keys, Column[] columns,
Column[] preFilteredColumns);

/**
* Check to see if the passed in exception (or a nested exception) was caused by a primary key violation.
* @param ex The exception to check
* @return true if the exception was caused by a primary key violation
*/
public boolean isPrimaryKeyViolation(Exception ex);

}
Expand Down
Expand Up @@ -58,7 +58,6 @@
import org.jumpmind.symmetric.statistic.StatisticConstants;
import org.jumpmind.symmetric.util.AppUtils;
import org.jumpmind.symmetric.util.CsvUtils;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.SingleConnectionDataSource;

Expand Down Expand Up @@ -374,33 +373,44 @@ protected int insert(String[] tokens) {
}

if (continueToLoad) {
String keyValues[] = context.getTableTemplate().parseKeys(tokens, 1);
boolean enableFallbackUpdate = parameterService.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_UPDATE);
try {
stats.startTimer();
rows = context.getTableTemplate().insert(context, columnValues, keyValues);
if (rows <= 0) {
throw new DataIntegrityViolationException("Insert was not processed");
}
} catch (DataIntegrityViolationException e) {
// TODO: modify sql-error-codes.xml for unique constraint vs. foreign key
if (enableFallbackUpdate) {
if (log.isDebugEnabled()) {
log.debug("LoaderInsertingFailedUpdating", context.getTableName(), ArrayUtils.toString(tokens));
}
stats.incrementFallbackUpdateCount();
rows = context.getTableTemplate().update(context, columnValues, keyValues);
String keyValues[] = context.getTableTemplate().parseKeys(tokens, 1);
boolean attemptFallbackUpdate = false;
RuntimeException insertException = null;
try {
stats.startTimer();
try {
rows = context.getTableTemplate().insert(context, columnValues, keyValues);
if (rows == 0) {
throw new SymmetricException("LoaderFallbackUpdateFailed", e, context.getTableTemplate().getTable().toVerboseString(), ArrayUtils
.toString(tokens), ArrayUtils.toString(keyValues));
}
} else {
log.error("LoaderInsertingFailed", context.getTableName(), ArrayUtils.toString(tokens));
throw e;
}
} finally {
stats.incrementDatabaseMillis(stats.endTimer());
}
attemptFallbackUpdate = parameterService
.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_UPDATE);
}
} catch (RuntimeException e) {
insertException = e;
attemptFallbackUpdate = dbDialect.isPrimaryKeyViolation(e)
&& parameterService
.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_UPDATE);
if (!attemptFallbackUpdate) {
throw e;
}
}

if (attemptFallbackUpdate) {
if (log.isDebugEnabled()) {
log.debug("LoaderInsertingFailedUpdating", context.getTableName(),
ArrayUtils.toString(tokens));
}
stats.incrementFallbackUpdateCount();
rows = context.getTableTemplate().update(context, columnValues, keyValues);
if (rows == 0) {
throw new SymmetricException("LoaderFallbackUpdateFailed", insertException, context
.getTableTemplate().getTable().toVerboseString(),
ArrayUtils.toString(tokens), ArrayUtils.toString(keyValues));
}
}

} finally {
stats.incrementDatabaseMillis(stats.endTimer());
}
}
return rows;
}
Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.jumpmind.symmetric.load.IMissingTableHandler;
import org.jumpmind.symmetric.load.StatementBuilder.DmlType;
import org.jumpmind.symmetric.load.TableTemplate;
import org.springframework.dao.DataIntegrityViolationException;

public class TransformDataLoader extends AbstractTransformer implements IBuiltInExtensionPoint,
IDataLoaderFilter, IMissingTableHandler {
Expand Down Expand Up @@ -109,30 +108,42 @@ protected void apply(IDataLoaderContext context,
tableTemplate.setKeyNames(data.getKeyNames());
switch (data.getTargetDmlType()) {
case INSERT:
boolean enableFallbackUpdate = parameterService
.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_UPDATE);
Table table = tableTemplate.getTable();
boolean attemptFallbackUpdate = false;
RuntimeException insertException = null;
try {
if (data.isGeneratedIdentityNeeded()) {
if (log.isDebugEnabled()) {
log.debug("TransformEnablingGeneratedIdentity", table.getName());
try {
if (data.isGeneratedIdentityNeeded()) {
if (log.isDebugEnabled()) {
log.debug("TransformEnablingGeneratedIdentity", table.getName());
}
dbDialect.revertAllowIdentityInserts(context.getJdbcTemplate(), table);
} else if (table.hasAutoIncrementColumn()) {
dbDialect.allowIdentityInserts(context.getJdbcTemplate(), table);
}
dbDialect.revertAllowIdentityInserts(context.getJdbcTemplate(), table);
} else if (table.hasAutoIncrementColumn()) {
dbDialect.allowIdentityInserts(context.getJdbcTemplate(), table);
}

if (tableTemplate.insert(context, data.getColumnValues(), data.getKeyValues()) <= 0) {
throw new DataIntegrityViolationException("Insert not executed");
if (tableTemplate.insert(context, data.getColumnValues(),
data.getKeyValues()) == 0) {
attemptFallbackUpdate = parameterService
.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_UPDATE);
}
} catch (RuntimeException ex) {
insertException = ex;
if (dbDialect.isPrimaryKeyViolation(ex)
&& parameterService
.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_UPDATE)) {
attemptFallbackUpdate = true;
} else {
throw ex;
}
}
} catch (DataIntegrityViolationException ex) {
if (enableFallbackUpdate) {
if (attemptFallbackUpdate) {
List<TransformedData> newlyTransformedDatas = transform(DmlType.UPDATE,
context, data.getTransformation(), data.getSourceKeyValues(),
data.getOldSourceValues(), data.getSourceValues());
for (TransformedData newlyTransformedData : newlyTransformedDatas) {
if (newlyTransformedData.hasSameKeyValues(data.getKeyValues()) ||
data.isGeneratedIdentityNeeded()) {
if (newlyTransformedData.hasSameKeyValues(data.getKeyValues())
|| data.isGeneratedIdentityNeeded()) {
if (newlyTransformedData.getKeyNames() != null
&& newlyTransformedData.getKeyNames().length > 0) {
tableTemplate.setColumnNames(newlyTransformedData
Expand All @@ -142,27 +153,28 @@ protected void apply(IDataLoaderContext context,
newlyTransformedData.getColumnValues(),
newlyTransformedData.getKeyValues())) {
throw new SymmetricException("LoaderFallbackUpdateFailed",
ex, tableTemplate.getTable().toVerboseString(),
insertException, tableTemplate.getTable()
.toVerboseString(),
ArrayUtils.toString(data.getColumnValues()),
ArrayUtils.toString(data.getKeyValues()));
}
} else {
// If not keys are specified we are going to
// assume that this is intentional and we
// will simply log a warning and not fail.
log.warn("Message", ex.getMessage());
log.warn("Message", insertException.getMessage());
log.warn("TransformNoPrimaryKeyDefinedNoUpdate",
newlyTransformedData.getTransformation()
.getTransformId());
}
} else {
log.debug("TransformMatchingFallbackNotFound", DmlType.UPDATE.name());
log.debug("TransformMatchingFallbackNotFound",
DmlType.UPDATE.name());
}
}

} else {
throw ex;
}

} finally {
if (table.hasAutoIncrementColumn()) {
dbDialect.revertAllowIdentityInserts(context.getJdbcTemplate(), table);
Expand Down
Expand Up @@ -10,6 +10,7 @@
<bean id="db2zSeriesDialect"
class="org.jumpmind.symmetric.db.db2.Db2zSeriesDbDialect"
scope="prototype">
<property name="primaryKeyViolationCodes" value="-803"/>
<property name="tablePrefix" value="$[sym.sync.table.prefix]" />
<property name="defaultSchema" value="$[sym.db.default.schema]" />
<property name="userName" value="$[sym.db.user]" />
Expand Down
Expand Up @@ -10,6 +10,7 @@
<bean id="db2Dialect"
class="org.jumpmind.symmetric.db.db2.Db2DbDialect"
scope="prototype">
<property name="primaryKeyViolationCodes" value="-803"/>
<property name="tablePrefix" value="$[sym.sync.table.prefix]" />
<property name="defaultSchema" value="$[sym.db.default.schema]" />
<property name="parameterService" ref="parameterService" />
Expand Down
Expand Up @@ -8,6 +8,7 @@

<bean id="derbyDialect" class="org.jumpmind.symmetric.db.derby.DerbyDbDialect"
scope="prototype">
<property name="primaryKeyViolationCodes" value="23505"/>
<property name="tablePrefix" value="$[sym.sync.table.prefix]" />
<property name="parameterService" ref="parameterService" />
<property name="defaultSchema" value="$[sym.db.default.schema]" />
Expand Down
Expand Up @@ -7,7 +7,8 @@
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd" default-lazy-init="true">

<bean id="firebirdDialect" class="org.jumpmind.symmetric.db.firebird.FirebirdDbDialect"
scope="prototype">
scope="prototype">
<property name="primaryKeyViolationCodes" value="335544665"/>
<property name="tablePrefix" value="$[sym.sync.table.prefix]" />
<property name="parameterService" ref="parameterService" />
<property name="defaultSchema" value="$[sym.db.default.schema]" />
Expand Down

0 comments on commit fef8852

Please sign in to comment.