Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support to Upsert/Insert Ignore on PDB #386

Merged
merged 9 commits into from
Jun 7, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,17 @@ protected AbstractBatch(final DatabaseEngine de, final int batchSize, final long
this(de, null, batchSize, batchTimeout, maxAwaitTimeShutdown);
}

/**
* A functional interface to represent a {@link java.util.function.BiConsumer} that throws an exception.
*
* @param <T> the type of the first argument to the operation.
* @param <R> the type of the second argument to the operation.
*/
@FunctionalInterface
public interface ThrowingBiConsumer<T, R> {
void accept(T t, R r) throws Exception;
}
victorcmg marked this conversation as resolved.
Show resolved Hide resolved

/**
* Starts the timer task.
*/
Expand Down Expand Up @@ -452,6 +463,24 @@ public void add(final String entityName, final EntityEntry ee) throws DatabaseEn
* @implSpec Same as {@link #flush(boolean)} with {@code false}.
*/
public void flush() {
logger.trace("Start batch flushing entries.");
flush(this::processBatch);
}

/**
* Flushes the pending batches ignoring duplicate entries.
*/
public void flushUpsert() {
logger.trace("Start batch flushing upserting duplicated entries.");
flush((this::processBatchUpsert));
}

/**
* Flushes the pending batches given a processing callback function.
*
* @param processBatch A (throwing) BiConsumer to process the batch entries.
*/
private void flush(final ThrowingBiConsumer<DatabaseEngine, List<BatchEntry>> processBatch) {
this.metricsListener.onFlushTriggered();
final long flushTriggeredMs = System.currentTimeMillis();
List<BatchEntry> temp;
Expand Down Expand Up @@ -485,7 +514,7 @@ public void flush() {
this.metricsListener.onFlushStarted(flushTriggeredMs, temp.size());
start = System.currentTimeMillis();

processBatch(de, temp);
processBatch.accept(de, temp);

onFlushFinished(flushTriggeredMs, temp, Collections.emptyList());
logger.trace("[{}] Batch flushed. Took {} ms, {} rows.", name, System.currentTimeMillis() - start, temp.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,29 @@ protected void processBatch(final DatabaseEngine de, final List<BatchEntry> batc
de.flush();
de.commit();
}

/**
* Processes all batch entries ignoring duplicate entries.
*
* @implSpec Same as {@link #processBatch(DatabaseEngine, List)}}.
*
* @param de The {@link DatabaseEngine} on which to perform the flush.
* @param batchEntries The list of batch entries to be flushed.
* @throws DatabaseEngineException If the operation failed.
*/
protected void processBatchUpsert(final DatabaseEngine de, final List<BatchEntry> batchEntries) throws DatabaseEngineException {
/*
Begin transaction before the addBatch calls, in order to force the retry of the connection if it was lost during
or since the last batch. Otherwise, the addBatch call that uses a prepared statement will fail.
*/
de.beginTransaction();

for (final BatchEntry entry : batchEntries) {
de.addBatchUpsert(entry.getTableName(), entry.getEntityEntry());
}

de.flushUpsert();
de.commit();
}

Comment on lines +85 to +99

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DRY - this is duplicated code and should be improved.

(if there is time before the release, I would not make it a priority, just letting a note)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires some further analysis, right off the bat I can't change it as from what I saw it could take some refactoring.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack! - we can address this later.

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,12 @@ public interface PdbBatch extends AutoCloseable {
* @return A void {@link CompletableFuture} that completes when the flush action finishes.
*/
CompletableFuture<Void> flushAsync() throws Exception;

/**
* Flushes the pending batches upserting entries to avoid duplicated key violations.
*
* @throws Exception If an error occurs while flushing.
*/
void flushUpsert() throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ When done, the future removes itself (if done already, all this can be skipped).
}
}

@Override
public void flushUpsert() {
logger.trace("Flush ignoring not available for MultithreadedBatch. Skipping ...");
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cannot be that dead silent :)

If this is not implemented and if it is still being used, then something is not right -- and then, the callee logic should fail fast, so it can be patched as fast as possible. So, I would throw an UnsupportedOperationException.

(at very least, this needs to be logged and error)


/**
* Flushes the given list batch entries to {@link DatabaseEngine} immediately.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ private void closeMappedEntity(final MappedEntity mappedEntity) {
final PreparedStatement insert = mappedEntity.getInsert();
final PreparedStatement insertReturning = mappedEntity.getInsertReturning();
final PreparedStatement insertWithAutoInc = mappedEntity.getInsertWithAutoInc();
final PreparedStatement upsert = mappedEntity.getUpsert();

if (!insert.isClosed()) {
insert.executeBatch();
Expand All @@ -515,6 +516,10 @@ private void closeMappedEntity(final MappedEntity mappedEntity) {
insertWithAutoInc.executeBatch();
}

if (upsert != null && !upsert.isClosed()) {
upsert.executeBatch();
}

} catch (final SQLException e) {
logger.debug(String.format("Failed to flush before closing mapped entity '%s'",
mappedEntity.getEntity().getName()), e);
Expand Down Expand Up @@ -948,6 +953,26 @@ public synchronized void flush() throws DatabaseEngineException {
}
}

/**
* Flushes the batches for all the registered entities, upserting any following .
*
* @throws DatabaseEngineException If something goes wrong while persisting data.
*/
@Override
public synchronized void flushUpsert() throws DatabaseEngineException {
/*
* Reconnect on this method does not make sense since a new connection will have nothing to flush.
*/

try {
for (MappedEntity me : entities.values()) {
me.getUpsert().executeBatch();
}
} catch (final Exception ex) {
throw getQueryExceptionHandler().handleException(ex, "Something went wrong while flushing");
}
}

/**
* Commits the current transaction. You should only call this method if you've previously called {@link AbstractDatabaseEngine#beginTransaction()}.
*
Expand Down Expand Up @@ -1314,6 +1339,27 @@ public synchronized void addBatch(final String name, final EntityEntry entry) th

}

@Override
public synchronized void addBatchUpsert(final String name, final EntityEntry entry) throws DatabaseEngineException {
try {

final MappedEntity me = entities.get(name);

if (me == null) {
throw new DatabaseEngineException(String.format("Unknown entity '%s'", name));
}

PreparedStatement ps = me.getUpsert();

entityToPreparedStatementForBatch(me.getEntity(), ps, entry, true);

ps.addBatch();
} catch (final Exception ex) {
throw new DatabaseEngineException("Error adding to batch", ex);
}

}

/**
* Translates the given entry entity to the prepared statement when used in the context of batch updates.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ public interface DatabaseEngine extends AutoCloseable {
*/
void flush() throws DatabaseEngineException;

/**
* Flushes the batches for all the registered entities upserting duplicated entries.
*
* @throws DatabaseEngineException If something goes wrong while persisting data.
*/
void flushUpsert() throws DatabaseEngineException;

/**
* Commits the current transaction. You should only call this method if you've previously called
* {@link DatabaseEngine#beginTransaction()}.
Expand Down Expand Up @@ -385,6 +392,15 @@ default AbstractBatch createBatch(final int batchSize, final long batchTimeout,
*/
void addBatch(final String name, final EntityEntry entry) throws DatabaseEngineException;

/**
* Adds an entry to the batch upserting duplicate entries.
*
* @param name The entity name.
* @param entry The entry to persist.
* @throws DatabaseEngineException If something goes wrong while persisting data.
*/
void addBatchUpsert(final String name, final EntityEntry entry) throws DatabaseEngineException;

/**
* Executes the given query.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public class MappedEntity implements AutoCloseable {
* The prepared statement to insert new values.
*/
private PreparedStatement insertReturning = null;
/**
* The prepared statement to upsert new values to avoid duplicated keys violation.
*/
private PreparedStatement upsert = null;
/**
* The auto increment column if exists;
*/
Expand Down Expand Up @@ -145,13 +149,35 @@ public PreparedStatement getInsertWithAutoInc() {
* Sets the insert statement auto inc columns.
*
* @param insertWithAutoInc The insert statement with auto inc columns.
* @return This mapped entity;
* @return This mapped entity;
* @see DatabaseEngine#persist(String, EntityEntry, boolean)
*/
public MappedEntity setInsertWithAutoInc(final PreparedStatement insertWithAutoInc) {
closeQuietly(this.insertWithAutoInc);
this.insertWithAutoInc = insertWithAutoInc;


return this;
}

/**
* Gets the prepared statement for upsert operation.
*
* @return The upsert statement.
*/
public PreparedStatement getUpsert() {
return upsert;
}

/**
* Sets the upsert statement.
*
* @param upsert The upsert statement
* @return This mapped entity
*/
public MappedEntity setUpsert(final PreparedStatement upsert) {
closeQuietly(this.upsert);
this.upsert = upsert;

return this;
}

Expand Down Expand Up @@ -215,5 +241,6 @@ public void close() throws Exception {
closeQuietly(this.insert);
closeQuietly(this.insertWithAutoInc);
closeQuietly(this.insertReturning);
closeQuietly(this.upsert);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity)
insertIntoWithAutoInc.add("(" + join(columnsWithAutoInc, ", ") + ")");
insertIntoWithAutoInc.add("VALUES (" + join(valuesWithAutoInc, ", ") + ")");

final String statementWithMerge = buildMergeStatement(entity, columns, values);

final String statement = join(insertInto, " ");
// The H2 DB doesn't implement INSERT RETURNING. Therefore, we just create a dummy statement, which will
Expand All @@ -472,27 +473,54 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity)
logger.trace(statement);


final PreparedStatement ps, psReturn, psWithAutoInc;
final PreparedStatement ps, psReturn, psWithAutoInc, psMerge;
try {

// Generate keys when the table has at least 1 column with auto generate value.
final int generateKeys = columnWithAutoIncName != null? Statement.RETURN_GENERATED_KEYS : Statement.NO_GENERATED_KEYS;
ps = this.conn.prepareStatement(statement, generateKeys);
psReturn = this.conn.prepareStatement(insertReturnStatement, generateKeys);
psWithAutoInc = this.conn.prepareStatement(statementWithAutoInt, generateKeys);

psMerge = this.conn.prepareStatement(statementWithMerge, generateKeys);
return new MappedEntity()
.setInsert(ps)
.setInsertReturning(psReturn)
.setInsertWithAutoInc(psWithAutoInc)
// The auto incremented column must be set, so when persisting a row, it's possible to retrieve its value
// by consulting the column name from this MappedEntity.
.setAutoIncColumn(columnWithAutoIncName);
.setAutoIncColumn(columnWithAutoIncName)
.setUpsert(psMerge);
victorcmg marked this conversation as resolved.
Show resolved Hide resolved
} catch (final SQLException ex) {
throw new DatabaseEngineException("Something went wrong handling statement", ex);
}
}

/**
* Helper method to create a merge statement for this engine.
*
* @param entity The entity.
* @param columns The columns of this entity.
* @param values The values of the entity.
*
* @return A merge statement.
*/
private String buildMergeStatement(final DbEntity entity, final List<String> columns, final List<String> values) {
victorcmg marked this conversation as resolved.
Show resolved Hide resolved

if (entity.getPkFields().isEmpty() || columns.isEmpty() || values.isEmpty()) {
return "";
}

final List<String> mergeInto = new ArrayList<>();
mergeInto.add("MERGE INTO");
mergeInto.add(quotize(entity.getName()));

mergeInto.add("(" + join(columns, ", ") + ")");
mergeInto.add("VALUES (" + join(values, ", ") + ")");

return join(mergeInto, " ");

}

@Override
protected void dropSequences(DbEntity entity) {
/*
Expand Down
Loading