Skip to content

Commit

Permalink
0004742: Postgres conflict but can't ignore row
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jan 5, 2021
1 parent c8931bc commit d421f75
Showing 1 changed file with 42 additions and 14 deletions.
Expand Up @@ -83,6 +83,8 @@ public class DefaultDatabaseWriter extends AbstractDatabaseWriter {
protected Object[] currentDmlValues;

protected LogSqlBuilder logSqlBuilder = new LogSqlBuilder();

protected boolean isRequiresSavePointsInTransaction;

public DefaultDatabaseWriter(IDatabasePlatform platform) {
this(platform, null, null);
Expand All @@ -96,6 +98,7 @@ public DefaultDatabaseWriter(IDatabasePlatform platform,
IDatabaseWriterConflictResolver conflictResolver, DatabaseWriterSettings settings) {
super(conflictResolver, settings);
this.platform = platform;
isRequiresSavePointsInTransaction = platform.getDatabaseInfo().isRequiresSavePointsInTransaction();
}

public IDatabasePlatform getPlatform() {
Expand Down Expand Up @@ -220,6 +223,11 @@ protected void rollback() {
@Override
protected LoadStatus insert(CsvData data) {
try {
if (isRequiresSavePointsInTransaction && conflictResolver != null && conflictResolver.isIgnoreRow(this, data)) {
statistics.get(batch).increment(DataWriterStatisticConstants.IGNOREROWCOUNT);
return LoadStatus.SUCCESS;
}

statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS);
if (requireNewStatement(DmlType.INSERT, data, false, true, null)) {
this.lastUseConflictDetection = true;
Expand All @@ -240,14 +248,16 @@ protected LoadStatus insert(CsvData data) {
if (count > 0) {
return LoadStatus.SUCCESS;
} else {
context.put(CUR_DATA, getCurData(getTransaction()));
context.setLastError(getInsertException(data, values));
findAndThrowInsertException(data, values);
return LoadStatus.CONFLICT;
}
} catch (SqlException ex) {
if (isRequiresSavePointsInTransaction && !context.getContext().containsKey(TRANSACTION_ABORTED)) {
context.put(TRANSACTION_ABORTED, true);
}
if (getPlatform().getSqlTemplate().isUniqueKeyViolation(ex)) {
context.put(CONFLICT_ERROR, ex);
context.put(CUR_DATA,getCurData(getTransaction()));
context.put(CUR_DATA, getCurData(getTransaction()));
context.setLastError(ex);
return LoadStatus.CONFLICT;
} else {
Expand All @@ -262,26 +272,31 @@ protected LoadStatus insert(CsvData data) {
}
}

private SqlException getInsertException(CsvData data, String[] values) {
SqlException ret = null;
if (getPlatform().getDatabaseInfo().isRequiresSavePointsInTransaction()) {
private void findAndThrowInsertException(CsvData data, String[] values) throws SqlException {
if (isRequiresSavePointsInTransaction) {
try {
getTransaction().execute("savepoint sym");
getTransaction().prepare(currentDmlStatement.getSql(false));
getTransaction().addRow(data, currentDmlValues, currentDmlStatement.getTypes());
} catch (SqlException ex) {
ret = ex;
} catch (SqlException e) {
getTransaction().execute("rollback to savepoint sym");
throw e;
} finally {
getTransaction().execute("release savepoint sym");
getTransaction().prepare(currentDmlStatement.getSql());
context.put(TRANSACTION_ABORTED, false);
}
}
return ret;
}

@Override
protected LoadStatus delete(CsvData data, boolean useConflictDetection) {
try {
if (isRequiresSavePointsInTransaction && conflictResolver != null && conflictResolver.isIgnoreRow(this, data)) {
statistics.get(batch).increment(DataWriterStatisticConstants.IGNOREROWCOUNT);
return LoadStatus.SUCCESS;
}

statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS);
Conflict conflict = writerSettings.pickConflict(this.targetTable, batch);
Map<String, String> lookupDataMap = null;
Expand Down Expand Up @@ -380,12 +395,16 @@ protected LoadStatus delete(CsvData data, boolean useConflictDetection) {
if (count > 0) {
return LoadStatus.SUCCESS;
} else {
context.put(CUR_DATA,null); // since a delete conflicted, there's no row to delete, so no cur data.
context.put(CUR_DATA, null); // since a delete conflicted, there's no row to delete, so no cur data.
return LoadStatus.CONFLICT;
}
} catch (RuntimeException ex) {
if (isRequiresSavePointsInTransaction && ex instanceof SqlException) {
context.put(TRANSACTION_ABORTED, true);
}
if (getPlatform().getSqlTemplate().isForeignKeyChildExistsViolation(ex)) {
context.put(CUR_DATA,null); // since a delete conflicted, there's no row to delete, so no cur data.
context.put(CONFLICT_ERROR, ex);
context.put(CUR_DATA, null); // since a delete conflicted, there's no row to delete, so no cur data.
context.setLastError(ex);
return LoadStatus.CONFLICT;
} else {
Expand All @@ -404,6 +423,11 @@ protected LoadStatus delete(CsvData data, boolean useConflictDetection) {
@Override
protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useConflictDetection) {
try {
if (isRequiresSavePointsInTransaction && conflictResolver != null && conflictResolver.isIgnoreRow(this, data)) {
statistics.get(batch).increment(DataWriterStatisticConstants.IGNOREROWCOUNT);
return LoadStatus.SUCCESS;
}

statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS);
String[] rowData = getRowData(data, CsvData.ROW_DATA);
String[] oldData = getRowData(data, CsvData.OLD_DATA);
Expand Down Expand Up @@ -551,12 +575,16 @@ protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useC
if (count > 0) {
return LoadStatus.SUCCESS;
} else {
context.put(CUR_DATA,getCurData(getTransaction()));
context.put(CUR_DATA, getCurData(getTransaction()));
return LoadStatus.CONFLICT;
}
} catch (SqlException ex) {
if (isRequiresSavePointsInTransaction) {
context.put(TRANSACTION_ABORTED, true);
}
if ((getPlatform().getSqlTemplate().isUniqueKeyViolation(ex) || getPlatform().getSqlTemplate().isForeignKeyChildExistsViolation(ex))) {
context.put(CUR_DATA,getCurData(getTransaction()));
context.put(CONFLICT_ERROR, ex);
context.put(CUR_DATA, getCurData(getTransaction()));
context.setLastError(ex);
return LoadStatus.CONFLICT;
} else {
Expand Down Expand Up @@ -811,7 +839,7 @@ protected void logFailureDetails(Throwable e, CsvData data, boolean logLastDmlDe
}

if (e instanceof DataTruncationException) {
logDataTruncation(data, failureMessage);
logDataTruncation(data, failureMessage);
}
data.writeCsvDataDetails(failureMessage);

Expand Down

0 comments on commit d421f75

Please sign in to comment.