Skip to content

Commit

Permalink
Save offsets in Batch without bind parameters, #253 (#255)
Browse files Browse the repository at this point in the history
* for better performance since the add-bind approach has round-trips to the db
* basic validation of string parameters
  • Loading branch information
patriknw committed Jul 1, 2022
1 parent 8a720fa commit 2ea4121
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ import reactor.core.publisher.Mono
.asFuture()
}

/**
* Batch update of SQL statements without bind parameters.
*/
def updateBatchInTx(conn: Connection, statements: immutable.IndexedSeq[String])(implicit
ec: ExecutionContext): Future[Int] = {
val batch = conn.createBatch()
statements.foreach(batch.add)
val consumer: BiConsumer[Int, Integer] = (acc, elem) => acc + elem.intValue()
Flux
.from[Result](batch.execute())
.concatMap(_.getRowsUpdated)
.collect(() => 0, consumer)
.asFuture()
}

def updateInTx(statements: immutable.IndexedSeq[Statement])(implicit
ec: ExecutionContext): Future[immutable.IndexedSeq[Int]] =
// connection not intended for concurrent calls, make sure statements are executed one at a time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,22 @@ private[projection] class R2dbcOffsetStore(
(projection_name, projection_key, slice, persistence_id, seq_nr, timestamp_offset, timestamp_consumed)
VALUES (?,?,?,?,?,?, transaction_timestamp())"""

private def insertTimestampOffsetBatchSql(pid: Pid, seqNr: SeqNr, offsetTimestamp: Instant): String = {
def validateStringParam(name: String, value: String): Unit = {
if (value.contains('\''))
throw new IllegalArgumentException(s"Illegal $name parameter [$value]")
}
validateStringParam("projectionId.name", projectionId.name)
validateStringParam("projectionId.key", projectionId.key)
validateStringParam("pid", pid)

val slice = persistenceExt.sliceForPersistenceId(pid)
sql"""
INSERT INTO $timestampOffsetTable
(projection_name, projection_key, slice, persistence_id, seq_nr, timestamp_offset, timestamp_consumed)
VALUES ('${projectionId.name}','${projectionId.key}',$slice,'$pid',$seqNr,'$offsetTimestamp', transaction_timestamp())"""
}

// delete less than a timestamp
private val deleteOldTimestampOffsetSql: String =
sql"DELETE FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ? AND timestamp_offset < ?"
Expand Down Expand Up @@ -499,19 +515,15 @@ private[projection] class R2dbcOffsetStore(
// FIXME change to trace
logger.debug("saving timestamp offset [{}], {}", records.last.timestamp, records)

val statement = conn.createStatement(insertTimestampOffsetSql)

if (records.size == 1) {
val statement = conn.createStatement(insertTimestampOffsetSql)
val boundStatement = bindRecord(statement, records.head)
R2dbcExecutor.updateOneInTx(boundStatement)
} else {
// TODO Try Batch without bind parameters for better performance. Risk of sql injection for these parameters is low.
val boundStatement =
records.foldLeft(statement) { (stmt, rec) =>
stmt.add()
bindRecord(stmt, rec)
}
R2dbcExecutor.updateBatchInTx(boundStatement)
val statements = records.map { rec =>
insertTimestampOffsetBatchSql(rec.pid, rec.seqNr, rec.timestamp)
}
R2dbcExecutor.updateBatchInTx(conn, statements)
}
}

Expand Down

0 comments on commit 2ea4121

Please sign in to comment.