From 3cd2cc62ef2e6d19b2b971220404a38808d28bc7 Mon Sep 17 00:00:00 2001 From: Victor Camargo Date: Tue, 4 Jul 2023 17:06:53 +0100 Subject: [PATCH] Revert "Add Support to Upsert/Insert Ignore on PDB" This reverts commit d626297e911d4770e18d5cd19c8260edbe70d8a5. --- .../commons/sql/abstraction/batch/AbstractBatch.java | 8 ++++---- .../abstraction/batch/impl/MultithreadedBatch.java | 3 +-- .../sql/abstraction/engine/impl/H2Engine.java | 4 ++-- .../abstraction/engine/impl/PostgreSqlEngine.java | 12 ++++++------ 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/feedzai/commons/sql/abstraction/batch/AbstractBatch.java b/src/main/java/com/feedzai/commons/sql/abstraction/batch/AbstractBatch.java index c044509e..f092e42e 100644 --- a/src/main/java/com/feedzai/commons/sql/abstraction/batch/AbstractBatch.java +++ b/src/main/java/com/feedzai/commons/sql/abstraction/batch/AbstractBatch.java @@ -372,8 +372,8 @@ protected AbstractBatch(final DatabaseEngine de, final int batchSize, final long * @param the type of the second argument to the operation. */ @FunctionalInterface - private interface FlushConsumer { - void accept(T t, R r) throws DatabaseEngineException; + public interface ThrowingBiConsumer { + void accept(T t, R r) throws Exception; } /** @@ -480,7 +480,7 @@ public void flushUpsert() { * * @param processBatch A (throwing) BiConsumer to process the batch entries. */ - private void flush(final FlushConsumer> processBatch) { + private void flush(final ThrowingBiConsumer> processBatch) { this.metricsListener.onFlushTriggered(); final long flushTriggeredMs = System.currentTimeMillis(); List temp; @@ -539,7 +539,7 @@ private void flush(final FlushConsumer> process de.rollback(); } - processBatch.accept(de, temp); + processBatch(de, temp); success = true; } catch (final InterruptedException ex) { diff --git a/src/main/java/com/feedzai/commons/sql/abstraction/batch/impl/MultithreadedBatch.java b/src/main/java/com/feedzai/commons/sql/abstraction/batch/impl/MultithreadedBatch.java index a71a4e56..deeba3a1 100644 --- a/src/main/java/com/feedzai/commons/sql/abstraction/batch/impl/MultithreadedBatch.java +++ b/src/main/java/com/feedzai/commons/sql/abstraction/batch/impl/MultithreadedBatch.java @@ -342,8 +342,7 @@ When done, the future removes itself (if done already, all this can be skipped). @Override public void flushUpsert() { - logger.error("Flush ignoring not available for MultithreadedBatch."); - throw new UnsupportedOperationException("Flushing pending batches upserting duplicated entries is not implemented using multiple threads/connections."); + logger.trace("Flush ignoring not available for MultithreadedBatch. Skipping ..."); } /** diff --git a/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/H2Engine.java b/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/H2Engine.java index 9c149a04..5062f74f 100644 --- a/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/H2Engine.java +++ b/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/H2Engine.java @@ -463,7 +463,7 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity) insertIntoWithAutoInc.add("(" + join(columnsWithAutoInc, ", ") + ")"); insertIntoWithAutoInc.add("VALUES (" + join(valuesWithAutoInc, ", ") + ")"); - final String statementWithMerge = buildUpsertStatement(entity, columns, values); + final String statementWithMerge = buildMergeStatement(entity, columns, values); final String statement = join(insertInto, " "); // The H2 DB doesn't implement INSERT RETURNING. Therefore, we just create a dummy statement, which will @@ -504,7 +504,7 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity) * * @return A merge statement. */ - private String buildUpsertStatement(final DbEntity entity, final List columns, final List values) { + private String buildMergeStatement(final DbEntity entity, final List columns, final List values) { if (entity.getPkFields().isEmpty() || columns.isEmpty() || values.isEmpty()) { return ""; diff --git a/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/PostgreSqlEngine.java b/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/PostgreSqlEngine.java index 5170ba62..677cb728 100644 --- a/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/PostgreSqlEngine.java +++ b/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/PostgreSqlEngine.java @@ -406,21 +406,21 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity) final String insertStatement = join(insertInto, " "); final String insertReturnStatement = join(insertIntoReturn, " "); final String statementWithAutoInt = join(insertIntoWithAutoInc, " "); - final String upsert = buildUpsertStatement(entity, columns, values); + final String insertIgnoring = buildInsertOnConflictStatement(entity, columns, values); logger.trace(insertStatement); logger.trace(insertReturnStatement); - logger.trace(upsert); + logger.trace(insertIgnoring); - PreparedStatement ps, psReturn, psWithAutoInc, psUpsert; + PreparedStatement ps, psReturn, psWithAutoInc, psWithInsertIgnoring; try { ps = conn.prepareStatement(insertStatement); psReturn = conn.prepareStatement(insertReturnStatement); psWithAutoInc = conn.prepareStatement(statementWithAutoInt); - psUpsert = conn.prepareStatement(upsert); + psWithInsertIgnoring = conn.prepareStatement(insertIgnoring); - return new MappedEntity().setInsert(ps).setInsertReturning(psReturn).setInsertWithAutoInc(psWithAutoInc).setUpsert(psUpsert).setAutoIncColumn(returning); + return new MappedEntity().setInsert(ps).setInsertReturning(psReturn).setInsertWithAutoInc(psWithAutoInc).setUpsert(psWithInsertIgnoring).setAutoIncColumn(returning); } catch (final SQLException ex) { throw new DatabaseEngineException("Something went wrong handling statement", ex); } @@ -435,7 +435,7 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity) * * @return A insert on conflict statement. */ - private String buildUpsertStatement(final DbEntity entity, final List columns, final List values) { + private String buildInsertOnConflictStatement(final DbEntity entity, final List columns, final List values) { if (entity.getPkFields().isEmpty() || columns.isEmpty() || values.isEmpty()) { return "";