Skip to content

Commit

Permalink
Fixes #8 Explicit closing sql statements to prevent leakage
Browse files Browse the repository at this point in the history
  • Loading branch information
mbknor committed Oct 23, 2015
1 parent fcca224 commit 95baa15
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
Expand Up @@ -225,6 +225,11 @@ class JdbcAsyncWriteJournal extends AsyncWriteJournal with ActorLogging {


override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = {

if (log.isDebugEnabled) {
log.debug("JdbcAsyncWriteJournal doWriteMessages messages: " + messages.size)
}

val promise = Promise[Seq[Try[Unit]]]()
promise.success(messages.map {
atomicWrite =>
Expand All @@ -239,7 +244,13 @@ class JdbcAsyncWriteJournal extends AsyncWriteJournal with ActorLogging {
JournalEntryDto(processorIdSplitter().split(p.persistenceId), p.sequenceNr, serialization.serialize(p).get, payloadJson.getOrElse(null))
}

repo().insertPersistentReprList(dtoList)
try {
repo().insertPersistentReprList(dtoList)
} catch {
case e:Exception =>
log.error(e, s"Error while persisting ${dtoList.size} PersistentReprs")
throw e
}
}
})
promise.future
Expand Down
Expand Up @@ -94,20 +94,22 @@ class StorageRepoImpl(sql2o: Sql2o, schemaName: String, errorHandler:JdbcJournal
dtoList.foreach {
dto =>
val insert = c.createQuery(sql)
.addParameter("typePath", dto.processorId.typePath)
.addParameter("id", dto.processorId.id)
.addParameter("sequenceNr", dto.sequenceNr)
.addParameter("persistentRepr", dto.persistentRepr)
.addParameter("payload_write_only", dto.payloadWriteOnly)

try {
insert.executeUpdate

insert.addParameter("typePath", dto.processorId.typePath)
.addParameter("id", dto.processorId.id)
.addParameter("sequenceNr", dto.sequenceNr)
.addParameter("persistentRepr", dto.persistentRepr)
.addParameter("payload_write_only", dto.payloadWriteOnly)
.executeUpdate
} catch {
case e: Sql2oException => {
val exception = new Exception("Error updating journal for processorId=" + dto.processorId + " and sequenceNr=" + dto.sequenceNr + ": " + e.getMessage, e)
errorHandler.onError(e)
throw exception
}
} finally {
insert.close()
}
}

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Expand Up @@ -62,7 +62,7 @@ lazy val akkaToolsJsonSerializingDependencies = Seq(

lazy val akkaToolsJdbcJournalDependencies = Seq(
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"org.sql2o" % "sql2o" % "1.5.2"
"org.sql2o" % "sql2o" % "1.5.4"
)

lazy val akkaToolsClusterDependencies = Seq(
Expand Down

0 comments on commit 95baa15

Please sign in to comment.