Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support to Upsert/Insert Ignore on PDB #389

Merged
merged 7 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.feedzai.commons.sql.abstraction.util.Constants;
import com.feedzai.commons.sql.abstraction.util.PreparedStatementCapsule;
import com.ibm.db2.jcc.am.SqlSyntaxErrorException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;

import java.sql.Connection;
Expand Down Expand Up @@ -425,17 +426,92 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity)
logger.trace(insertStatement);
logger.trace(insertReturnStatement);

PreparedStatement ps, psReturn, psWithAutoInc;
PreparedStatement ps = null, psReturn = null, psWithAutoInc = null, psUpsert;
diogo-anjos-fdz marked this conversation as resolved.
Show resolved Hide resolved
try {

ps = conn.prepareStatement(insertStatement);
psReturn = conn.prepareStatement(insertReturnStatement);
psWithAutoInc = conn.prepareStatement(insertWithAutoInc);

return new MappedEntity().setInsert(ps).setInsertReturning(psReturn).setInsertWithAutoInc(psWithAutoInc).setAutoIncColumn(returning);
final String upsert = buildUpsertStatement(entity, columns, values);
psUpsert = conn.prepareStatement(upsert);

return new MappedEntity()
.setInsert(ps)
.setInsertReturning(psReturn)
.setInsertWithAutoInc(psWithAutoInc)
.setAutoIncColumn(returning)
.setUpsert(psUpsert);

} catch (final IllegalArgumentException e) {
logger.error("Returning entity without an UPSERT/MERGE prepared statement.", e);
return new MappedEntity()
.setInsert(ps)
.setInsertReturning(psReturn)
.setInsertWithAutoInc(psWithAutoInc)
.setAutoIncColumn(returning);

} catch (final SQLException ex) {
throw new DatabaseEngineException("Something went wrong handling statement", ex);
}

}

/**
* Helper method to create an upsert statement for this engine.
*
* @param entity The entity.
* @param columns The columns of this entity.
* @param values The values of the entity.
*
* @return An upsert statement.
*/
private String buildUpsertStatement(final DbEntity entity, final List<String> columns, final List<String> values) {

if (entity.getPkFields().isEmpty() || columns.isEmpty() || values.isEmpty()) {
throw new IllegalArgumentException("The MERGE command was not created because the entity has no primary keys. Skipping statement creation.");
}

final List<String> merge = new ArrayList<>();
merge.add("MERGE INTO " + quotize(entity.getName()) + " AS dest");

merge.add("USING (VALUES(" + join(values, ", ") + ")) AS src (" + join(columns, ", ") + ")");

final List<String> primaryKeys = entity.getPkFields()
.stream()
.map(com.feedzai.commons.sql.abstraction.util.StringUtils::quotize)
.collect(Collectors.toList());

final List<String> primaryKeysOnClause = primaryKeys
.stream()
.map(pk -> String.format("dest.%s = src.%s", pk, pk))
.collect(Collectors.toList());

merge.add("ON ( " + join(primaryKeysOnClause, " AND ") + " )");

merge.add("WHEN MATCHED THEN");
merge.add("UPDATE");

final List<String> columnsWithoutPKs = new ArrayList<>(columns);
columnsWithoutPKs.removeAll(primaryKeys);

final String columnsToUpdate = columnsWithoutPKs
.stream()
.map(column -> String.format("%s = src.%s", column, column))
.collect(Collectors.joining(", "));

merge.add("SET " + columnsToUpdate);

merge.add("WHEN NOT MATCHED THEN");

final String insertColumns = columns.stream().map(column -> String.format("%s", column)).collect(Collectors.joining(", "));
merge.add("INSERT (" + insertColumns + ")");

final String insertColumnValues = columns.stream().map(value -> String.format("src.%s", value)).collect(Collectors.joining(", "));
merge.add("VALUES (" + insertColumnValues + ")");

return join(merge, " ");

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,6 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity)
insertIntoWithAutoInc.add("(" + join(columnsWithAutoInc, ", ") + ")");
insertIntoWithAutoInc.add("VALUES (" + join(valuesWithAutoInc, ", ") + ")");

final String statementWithMerge = buildUpsertStatement(entity, columns, values);

final String statement = join(insertInto, " ");
// The H2 DB doesn't implement INSERT RETURNING. Therefore, we just create a dummy statement, which will
// never be invoked by this implementation.
Expand All @@ -473,23 +471,34 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity)
logger.trace(statement);


final PreparedStatement ps, psReturn, psWithAutoInc, psMerge;
PreparedStatement ps = null, psReturn = null, psWithAutoInc = null, psMerge;
diogo-anjos-fdz marked this conversation as resolved.
Show resolved Hide resolved
try {

// Generate keys when the table has at least 1 column with auto generate value.
final int generateKeys = columnWithAutoIncName != null? Statement.RETURN_GENERATED_KEYS : Statement.NO_GENERATED_KEYS;
final int generateKeys = columnWithAutoIncName != null ? Statement.RETURN_GENERATED_KEYS : Statement.NO_GENERATED_KEYS;
ps = this.conn.prepareStatement(statement, generateKeys);
psReturn = this.conn.prepareStatement(insertReturnStatement, generateKeys);
psWithAutoInc = this.conn.prepareStatement(statementWithAutoInt, generateKeys);

final String statementWithMerge = buildUpsertStatement(entity, columns, values);
psMerge = this.conn.prepareStatement(statementWithMerge, generateKeys);

return new MappedEntity()
.setInsert(ps)
.setInsertReturning(psReturn)
.setInsertWithAutoInc(psWithAutoInc)
// The auto incremented column must be set, so when persisting a row, it's possible to retrieve its value
// by consulting the column name from this MappedEntity.
.setAutoIncColumn(columnWithAutoIncName)
.setUpsert(psMerge);
.setInsert(ps)
.setInsertReturning(psReturn)
.setInsertWithAutoInc(psWithAutoInc)
// The auto incremented column must be set, so when persisting a row, it's possible to retrieve its value
// by consulting the column name from this MappedEntity.
.setAutoIncColumn(columnWithAutoIncName)
.setUpsert(psMerge);

} catch (final IllegalArgumentException e) {
logger.error("Returning entity without an UPSERT/MERGE prepared statement.", e);
return new MappedEntity()
.setInsert(ps)
.setInsertReturning(psReturn)
.setInsertWithAutoInc(psWithAutoInc);

} catch (final SQLException ex) {
throw new DatabaseEngineException("Something went wrong handling statement", ex);
}
Expand All @@ -507,7 +516,7 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity)
private String buildUpsertStatement(final DbEntity entity, final List<String> columns, final List<String> values) {

if (entity.getPkFields().isEmpty() || columns.isEmpty() || values.isEmpty()) {
return "";
throw new IllegalArgumentException("The MERGE command was not created because the entity has no primary keys. Skipping statement creation.");
}

final List<String> mergeInto = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static com.feedzai.commons.sql.abstraction.util.StringUtils.md5;
import static com.feedzai.commons.sql.abstraction.util.StringUtils.quotize;
Expand Down Expand Up @@ -391,18 +392,72 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity)
logger.trace(statement);
logger.trace(statementWithAutoInt);

final PreparedStatement ps, psReturn, psWithAutoInc;
PreparedStatement ps = null, psReturn = null, psWithAutoInc = null, psUpsert;
diogo-anjos-fdz marked this conversation as resolved.
Show resolved Hide resolved
try {
ps = conn.prepareStatement(statement, Statement.RETURN_GENERATED_KEYS);
psReturn = conn.prepareStatement(insertReturnStatement);
psWithAutoInc = conn.prepareStatement(statementWithAutoInt);

return new MappedEntity().setInsert(ps).setInsertReturning(psReturn).setInsertWithAutoInc(psWithAutoInc);
final String upsertStatement = buildUpsertStatement(entity, columns, values);
psUpsert = conn.prepareStatement(upsertStatement);

return new MappedEntity()
.setInsert(ps)
.setInsertReturning(psReturn)
.setInsertWithAutoInc(psWithAutoInc)
.setUpsert(psUpsert);

} catch (final IllegalArgumentException e) {
logger.error("Returning entity without an UPSERT/MERGE prepared statement.", e);
return new MappedEntity()
.setInsert(ps)
.setInsertReturning(psReturn)
.setInsertWithAutoInc(psWithAutoInc);

} catch (final SQLException ex) {
throw new DatabaseEngineException("Something went wrong handling statement", ex);
}
}

/**
* Helper method to create an insert statement on duplicate key conflict for this engine.
*
* @param entity The entity.
* @param columns The columns of this entity.
* @param values The values of the entity.
*
* @return An insert statement on duplicate key conflict.
*/
private String buildUpsertStatement(final DbEntity entity, final List<String> columns, final List<String> values) {

if (entity.getPkFields().isEmpty() || columns.isEmpty() || values.isEmpty()) {
throw new IllegalArgumentException("The MERGE command was not created because the entity has no primary keys. Skipping statement creation.");
}

List<String> insertIntoIgnoring = new ArrayList<>();
insertIntoIgnoring.add("INSERT INTO");
insertIntoIgnoring.add(quotize(entity.getName(), escapeCharacter()));

insertIntoIgnoring.add("(" + join(columns, ", ") + ")");
insertIntoIgnoring.add("VALUES (" + join(values, ", ") + ")");

final List<String> primaryKeys = entity.getPkFields().stream().map(pk -> quotize(pk, escapeCharacter())).collect(Collectors.toList());
insertIntoIgnoring.add("ON DUPLICATE KEY UPDATE");

final List<String> columnsWithoutPKs = new ArrayList<>(columns);
columnsWithoutPKs.removeAll(primaryKeys);

final String columnsToUpdate = columnsWithoutPKs
.stream()
.map(column -> String.format("%s = VALUES(%s)", column, column))
.collect(Collectors.joining(", "));

insertIntoIgnoring.add(columnsToUpdate);

return join(insertIntoIgnoring, " ");

}

@Override
protected void dropSequences(DbEntity entity) throws DatabaseEngineException {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.feedzai.commons.sql.abstraction.engine.handler.OperationFault;
import com.feedzai.commons.sql.abstraction.engine.handler.QueryExceptionHandler;
import com.feedzai.commons.sql.abstraction.engine.impl.postgresql.PostgresSqlQueryExceptionHandler;
import com.feedzai.commons.sql.abstraction.util.StringUtils;

import java.util.stream.Collectors;
import org.postgresql.Driver;
Expand Down Expand Up @@ -406,39 +407,53 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity)
final String insertStatement = join(insertInto, " ");
final String insertReturnStatement = join(insertIntoReturn, " ");
final String statementWithAutoInt = join(insertIntoWithAutoInc, " ");
final String upsert = buildUpsertStatement(entity, columns, values);

logger.trace(insertStatement);
logger.trace(insertReturnStatement);
logger.trace(upsert);

PreparedStatement ps, psReturn, psWithAutoInc, psUpsert;
PreparedStatement ps = null, psReturn = null, psWithAutoInc = null, psUpsert;
diogo-anjos-fdz marked this conversation as resolved.
Show resolved Hide resolved
try {

ps = conn.prepareStatement(insertStatement);
psReturn = conn.prepareStatement(insertReturnStatement);
psWithAutoInc = conn.prepareStatement(statementWithAutoInt);

final String upsert = buildUpsertStatement(entity, columns, values);
psUpsert = conn.prepareStatement(upsert);

return new MappedEntity().setInsert(ps).setInsertReturning(psReturn).setInsertWithAutoInc(psWithAutoInc).setUpsert(psUpsert).setAutoIncColumn(returning);
return new MappedEntity()
.setInsert(ps)
.setInsertReturning(psReturn)
.setInsertWithAutoInc(psWithAutoInc)
.setUpsert(psUpsert)
.setAutoIncColumn(returning);

} catch (final IllegalArgumentException e) {
logger.error("Returning entity without an UPSERT/MERGE prepared statement.", e);
return new MappedEntity()
.setInsert(ps)
.setInsertReturning(psReturn)
.setInsertWithAutoInc(psWithAutoInc)
.setAutoIncColumn(returning);

} catch (final SQLException ex) {
throw new DatabaseEngineException("Something went wrong handling statement", ex);
}
}

/**
* Helper method to create a insert on conflict statement that updates for this engine.
* Helper method to create an insert statement on duplicate key conflict for this engine.
*
* @param entity The entity.
* @param columns The columns of this entity.
* @param values The values of the entity.
*
* @return A insert on conflict statement.
* @return An insert statement on duplicate key conflict.
*/
private String buildUpsertStatement(final DbEntity entity, final List<String> columns, final List<String> values) {

if (entity.getPkFields().isEmpty() || columns.isEmpty() || values.isEmpty()) {
return "";
throw new IllegalArgumentException("The MERGE command was not created because the entity has no primary keys. Skipping statement creation.");
}

List<String> insertIntoIgnoring = new ArrayList<>();
Expand All @@ -448,7 +463,7 @@ private String buildUpsertStatement(final DbEntity entity, final List<String> co
insertIntoIgnoring.add("(" + join(columns, ", ") + ")");
insertIntoIgnoring.add("VALUES (" + join(values, ", ") + ")");

final List<String> primaryKeys = entity.getPkFields().stream().map(pk -> quotize(pk)).collect(Collectors.toList());
final List<String> primaryKeys = entity.getPkFields().stream().map(StringUtils::quotize).collect(Collectors.toList());
insertIntoIgnoring.add("ON CONFLICT (" + join(primaryKeys, ", ") + ")");

insertIntoIgnoring.add("DO UPDATE");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ public void batchInsertDuplicateFlushWithDBErrorTest() throws Exception {
*/
@Test
public void batchInsertOnIgnoreDuplicateFlushTest() throws Exception {
assumeTrue(ImmutableList.of("h2", "postresql").contains(dbConfig.vendor));
assumeTrue(ImmutableList.of("postresql", "mysql", "cockroach", "db2").contains(dbConfig.vendor) && dbConfig.vendor.startsWith("h2"));
final TestBatchListener batchListener = new TestBatchListener();
final int numTestEntries = 2;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void setUp() throws DatabaseFactoryException {
@Test
public void closingAnEngineShouldFlushAndCloseInsertPSsAndCloseCachedPSs(@Capturing final Statement preparedStatementMock)
throws DatabaseEngineException, SQLException, NameAlreadyExistsException {
assumeTrue(!ImmutableList.of("postgresql", "cockroach").contains(config.vendor) && !config.vendor.startsWith("h2"));
assumeTrue(!ImmutableList.of("postgresql", "cockroach", "mysql", "db2").contains(config.vendor) && !config.vendor.startsWith("h2"));

engine.addEntity(buildEntity("ENTITY-1"));
engine.addEntity(buildEntity("ENTITY-2"));
Expand Down
Loading