Skip to content

Commit

Permalink
Merge pull request #114 from campudus/consistent-db-timeouts
Browse files Browse the repository at this point in the history
Consistent database timeouts
  • Loading branch information
alexvetter committed Feb 1, 2016
2 parents 2598f3e + d40db28 commit 237071f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 40 deletions.
26 changes: 6 additions & 20 deletions src/main/scala/com/campudus/tableaux/database/database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ trait DatabaseHandler[O <: DomainObject, ID] extends DatabaseQuery with Database
}

object DatabaseConnection {
val DEFAULT_TIMEOUT = 5000L

type ScalaTransaction = io.vertx.scala.Transaction

def apply(verticle: ScalaVerticle, connection: SQLConnection): DatabaseConnection = {
Expand Down Expand Up @@ -113,14 +111,8 @@ class DatabaseConnection(val verticle: ScalaVerticle, val connection: SQLConnect
def begin(): Future[Transaction] = connection.transaction().map(Transaction)

def transactional[A](fn: TransFunc[A]): Future[A] = {
import com.campudus.tableaux.helper.TimeoutScheduler._

import scala.concurrent.duration.DurationInt

val random = Random.nextInt()

for {
transaction <- begin().withTimeout(DurationInt(1).seconds, s"Transaction-Begin")
transaction <- begin()

(transaction, result) <- {
fn(transaction) recoverWith {
Expand All @@ -129,10 +121,10 @@ class DatabaseConnection(val verticle: ScalaVerticle, val connection: SQLConnect
transaction.rollback()
Future.failed(e)
}
}.withTimeout(DurationInt(1).seconds, s"Transactional-Fn $random")
}

_ <- {
transaction.commit().withTimeout(DurationInt(2).seconds, s"Transactional-Commit $random")
transaction.commit()
}
} yield {
result
Expand Down Expand Up @@ -195,14 +187,6 @@ class DatabaseConnection(val verticle: ScalaVerticle, val connection: SQLConnect
throw new DatabaseException(s"Command $command in Statement $stmt not supported", "error.database.command_not_supported")
}

import io.vertx.scala.FunctionConverters._
val timerId = vertx.setTimer(10000, { d: java.lang.Long => logger.error(s"doMagicQuery $command $returning $stmt exceeded the delay") })

future.onComplete({
case _ =>
vertx.cancelTimer(timerId)
})

future.map({
case r: UpdateResult => {
mapUpdateResult(command, r.toJson)
Expand Down Expand Up @@ -254,7 +238,9 @@ class DatabaseConnection(val verticle: ScalaVerticle, val connection: SQLConnect
Json.obj(
"status" -> "ok",
"rows" -> results.size(),
"message" -> s"SELECT ${results.size()}",
"message" -> s"SELECT ${
results.size()
}",
"fields" -> columnNames,
"results" -> results
)
Expand Down
43 changes: 23 additions & 20 deletions src/main/scala/io/vertx/scala/SQLConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ sealed trait DatabaseAction extends VertxAccess {
}

object SQLConnection {
val QUERY_TIMEOUT = 5000
val LEASE_TIMEOUT = 10000

type JSQLConnection = io.vertx.ext.sql.SQLConnection
Expand All @@ -57,22 +58,28 @@ object SQLConnection {
}

class SQLConnection(val verticle: ScalaVerticle, private val config: JsonObject) extends DatabaseAction {

import com.campudus.tableaux.helper.TimeoutScheduler._

/* TODO: For now it's createNonShared, otherwise stopping the verticle will last for ever because Test will create many SQLConnection not just the verticle */
val client = PostgreSQLClient.createNonShared(vertx, config)

def transaction(): Future[Transaction] = {
for {
connection <- connection()
_ <- setAutoUpdate(connection, autoCommit = false)
_ <- setAutoUpdate(connection, autoCommit = false).withTimeout(SQLConnection.QUERY_TIMEOUT, "transaction")
} yield new Transaction(verticle, connection)
}

override protected def connection(): Future[JSQLConnection] = client.getConnection(_: Handler[AsyncResult[JSQLConnection]])
private def connect(): Future[JSQLConnection] = client.getConnection(_: Handler[AsyncResult[JSQLConnection]])

override protected def connection(): Future[JSQLConnection] =
connect().withTimeout(SQLConnection.QUERY_TIMEOUT, "connect")

override def execute(sql: String): Future[Unit] = wrap {
conn =>
for {
_ <- execute(conn, sql)
_ <- execute(conn, sql).withTimeout(SQLConnection.QUERY_TIMEOUT, "execute")
} yield ()
}

Expand All @@ -83,7 +90,7 @@ class SQLConnection(val verticle: ScalaVerticle, private val config: JsonObject)
private def query(sql: String, params: Option[JsonArray]): Future[ResultSet] = wrap {
conn =>
for {
resultSet <- query(conn, sql, params)
resultSet <- query(conn, sql, params).withTimeout(SQLConnection.QUERY_TIMEOUT, "query")
} yield {
resultSet
}
Expand All @@ -96,7 +103,7 @@ class SQLConnection(val verticle: ScalaVerticle, private val config: JsonObject)
private def update(sql: String, params: Option[JsonArray]): Future[UpdateResult] = wrap {
conn =>
for {
updateResult <- update(conn, sql, params)
updateResult <- update(conn, sql, params).withTimeout(SQLConnection.QUERY_TIMEOUT, "update")
} yield {
updateResult
}
Expand All @@ -123,8 +130,6 @@ class Transaction(val verticle: ScalaVerticle, private val conn: JSQLConnection)

import com.campudus.tableaux.helper.TimeoutScheduler._

import scala.concurrent.duration.DurationInt

val connectionTimerId = vertx.setTimer(SQLConnection.LEASE_TIMEOUT, {
d: java.lang.Long =>
logger.error(s"Lease timeout exceeded")
Expand All @@ -146,31 +151,31 @@ class Transaction(val verticle: ScalaVerticle, private val conn: JSQLConnection)
override def execute(sql: String): Future[Unit] = {
execute(conn, sql)
.recoverDatabaseException("execute")
.withTimeout(10000, "execute")
.withTimeout(SQLConnection.QUERY_TIMEOUT, "execute")
}

override def query(sql: String): Future[ResultSet] = {
query(conn, sql, None)
.recoverDatabaseException("query")
.withTimeout(10000, "query")
.withTimeout(SQLConnection.QUERY_TIMEOUT, "query")
}

override def query(sql: String, params: JsonArray): Future[ResultSet] = {
query(conn, sql, Some(params))
.recoverDatabaseException("query")
.withTimeout(10000, "query")
.withTimeout(SQLConnection.QUERY_TIMEOUT, "query")
}

override def update(sql: String): Future[UpdateResult] = {
update(conn, sql, None)
.recoverDatabaseException("update")
.withTimeout(10000, "update")
.withTimeout(SQLConnection.QUERY_TIMEOUT, "update")
}

override def update(sql: String, params: JsonArray): Future[UpdateResult] = {
update(conn, sql, Some(params))
.recoverDatabaseException("update")
.withTimeout(10000, "update")
.withTimeout(SQLConnection.QUERY_TIMEOUT, "update")
}

override def close(_conn: JSQLConnection): Future[Unit] = {
Expand All @@ -180,23 +185,21 @@ class Transaction(val verticle: ScalaVerticle, private val conn: JSQLConnection)

def commit(): Future[Unit] = {
(for {
t <- asyncVoidToFuture(conn.commit(_: AsyncVoid)).withTimeout(DurationInt(1).seconds, "commit")
b <- close(conn)
} yield {
()
}) recoverDatabaseException "commit" recoverWith {
_ <- asyncVoidToFuture(conn.commit(_: AsyncVoid)).withTimeout(SQLConnection.QUERY_TIMEOUT, "commit")
_ <- close(conn)
} yield ()) recoverDatabaseException "commit" recoverWith {
case e =>
rollback()
Future.failed(e)
} withTimeout(10000, "commit")
} withTimeout(SQLConnection.QUERY_TIMEOUT, "commit")
}

def rollback(): Future[Unit] = {
(for {
_ <- asyncVoidToFuture(conn.rollback(_: Handler[AsyncResult[Void]])).withTimeout(DurationInt(1).seconds, "rollback")
_ <- asyncVoidToFuture(conn.rollback(_: Handler[AsyncResult[Void]])).withTimeout(SQLConnection.QUERY_TIMEOUT, "rollback")
_ <- close(conn)
} yield {
()
}) recoverDatabaseException "rollback" withTimeout(10000, "commit")
}) recoverDatabaseException "rollback" withTimeout(SQLConnection.QUERY_TIMEOUT, "rollback")
}
}

0 comments on commit 237071f

Please sign in to comment.