Skip to content

Commit

Permalink
introduce locking
Browse files Browse the repository at this point in the history
  • Loading branch information
davidpeklak committed Sep 23, 2015
1 parent f4ea2ee commit 05f390f
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 30 deletions.
117 changes: 88 additions & 29 deletions src/main/scala/smt/Handling.scala
Expand Up @@ -8,6 +8,7 @@ import sbt.Logger
import scalaz.{-\/, EitherT}
import smt.migration.{HashedMigrationSeq, MigrationInfo, Migration, Up}
import smt.db.ConnectionAction.HasConnection
import smt.db.LockAction.HasLock
import smt.db.AddAction.{HasUser, HasRemark}
import smt.describe.DescribeAction.HasLogger

Expand All @@ -22,39 +23,62 @@ trait StateHandling[T] extends DbAction[T] {
lazy val hasLogger: HasLogger[ConnectionDep] = _.logger
}

lazy val connectionLockHandling = new ConnectionHandling[(ConnectionDep, String)] {
lazy val hasConnection: HasConnection[(ConnectionDep, String)] = _._1.c
lazy val hasLogger: HasLogger[(ConnectionDep, String)] = _._1.logger
lazy val hasLock: HasLock[(ConnectionDep, String)] = _._2
}

def openConnection: EDKleisli[ConnectionDep] = {
for {
c <- connection()
l <- eAsk.map(hasLogger)
} yield ConnectionDep(c, l)
}

def state(): EDKleisli[Seq[MigrationInfo]] = {
def withConnection[U](action: connectionHandling.EDKleisli[U]): EDKleisli[U] = {
import ekSyntax._
import connectionHandling.eSyntax._

openConnection >=> ((connectionHandling.init() >> connectionHandling.state()) andFinally connectionHandling.close())
openConnection >=> ((connectionHandling.init() >> action) andFinally connectionHandling.close())
}

def latestCommon(mhs: HashedMigrationSeq): EDKleisli[Option[connectionHandling.Common]] = {
import ekSyntax._
import connectionHandling.eSyntax._
def acqLock: connectionHandling.EDKleisli[(ConnectionDep, String)] = {
for {
t <- connectionHandling.eAsk
l <- connectionHandling.acquireLock()
} yield (t, l)
}

openConnection >=> ((connectionHandling.init() >> connectionHandling.latestCommon(mhs)) andFinally connectionHandling.close())
def rlsLock: connectionLockHandling.EDKleisli[Unit] = {
for {
t <- connectionLockHandling.eAsk
_ <- connectionLockHandling.releaseLock(t._2)
} yield ()
}

def common(mhs: HashedMigrationSeq): EDKleisli[connectionHandling.CommonMigrations] = {
import ekSyntax._
import connectionHandling.eSyntax._
def withLock[U](action: connectionLockHandling.EDKleisli[U]): connectionHandling.EDKleisli[U] = {
import connectionHandling._
import connectionHandling.ekSyntax._

openConnection >=> ((connectionHandling.init() >> connectionHandling.common(mhs)) andFinally connectionHandling.close())
acqLock >=> {
import connectionLockHandling._
import connectionLockHandling.eSyntax._
action andFinally rlsLock
}
}

def state(): EDKleisli[Seq[MigrationInfo]] = withConnection(connectionHandling.state())

def latestCommon(mhs: HashedMigrationSeq): EDKleisli[Option[connectionHandling.Common]] = withConnection(connectionHandling.latestCommon(mhs))

def common(mhs: HashedMigrationSeq): EDKleisli[connectionHandling.CommonMigrations] = withConnection(connectionHandling.common(mhs))

def applyScript(scr: migration.Script): EDKleisli[Unit] = {
import ekSyntax._
import connectionHandling.eSyntax._

openConnection >=> (connectionHandling.applyScript(scr, Up) andFinally connectionHandling.close())
openConnection >=> (withLock(connectionLockHandling.applyScript(scr, Up)) andFinally connectionHandling.close())
}
}

Expand All @@ -72,27 +96,62 @@ trait Handling[T] extends DbAction[T] with DescribeAction[T] with ReportersActio
lazy val hasRemark: HasRemark[(T, Connection)] = t => handling.hasRemark(t._1)
}

lazy val connectionLockHandling = new AddHandling[(T, Connection, String)] {
lazy val hasConnection: HasConnection[(T, Connection, String)] = _._2
lazy val hasLogger: HasLogger[(T, Connection, String)] = t => handling.hasLogger(t._1)
lazy val hasUser: HasUser[(T, Connection, String)] = t => handling.hasUser(t._1)
lazy val hasRemark: HasRemark[(T, Connection, String)] = t => handling.hasRemark(t._1)
lazy val hasLock: HasLock[(T, Connection, String)] = _._3
}

def applyMigrationsAndReport(ms: Seq[Migration], imo: Option[(Int, String)], arb: Boolean, runTests: Boolean): EDKleisli[Unit] = {

def openConnection: EDKleisli[(T, Connection)] = {
for {
t <- eAsk
c <- connection()
} yield Tuple2(t, c)
}

def withConnection[U](action: connectionHandling.namedMoveTypes.EWDKleisli[U]): namedMoveTypes.EWDKleisli[U] = {
import namedMoveTypes._
import namedMoveTypes.ewSyntax._

namedMoveTypes.liftE(openConnection) >=> {
import connectionHandling.namedMoveTypes._
import connectionHandling.namedMoveTypes.ewSyntax2._
(liftE(connectionHandling.init()) >> action) andFinally liftE(connectionHandling.close())
}
}

def acqLock: connectionHandling.EDKleisli[(T, Connection, String)] = {
for {
t <- connectionHandling.eAsk
l <- connectionHandling.acquireLock()
} yield (t._1, t._2, l)
}

def rlsLock: connectionLockHandling.EDKleisli[Unit] = {
for {
t <- connectionLockHandling.eAsk
_ <- connectionLockHandling.releaseLock(t._3)
} yield ()
}

def withLock[U](action: connectionLockHandling.namedMoveTypes.EWDKleisli[U]): connectionHandling.namedMoveTypes.EWDKleisli[U] = {
import connectionHandling.namedMoveTypes._
import connectionHandling.namedMoveTypes.ewSyntax._

connectionHandling.namedMoveTypes.liftE(acqLock) >=> {
import connectionLockHandling.namedMoveTypes._
import connectionLockHandling.namedMoveTypes.ewSyntax2._
action andFinally liftE(rlsLock)
}
}

EitherT[DKleisli, String, Unit](
for {
nmse <- {
{
import namedMoveTypes._
import namedMoveTypes.ewSyntax._

{
for {
t <- namedMoveTypes.liftE(eAsk)
c <- namedMoveTypes.liftE(connection())
} yield {
Tuple2(t, c)
}
} >=> {
import connectionHandling.namedMoveTypes._
liftE(connectionHandling.init()) >> connectionHandling.applyMigrations(ms, imo, arb, runTests) >> liftE(connectionHandling.close())
}
}.run.run
}
nmse <- withConnection(withLock(connectionLockHandling.applyMigrations(ms, imo, arb, runTests))).run.run

(nms, e) = nmse

Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/smt/Writing.scala
Expand Up @@ -8,6 +8,8 @@ sealed trait Writing

sealed trait MoveState extends Writing

case class Lock(name: String) extends Writing

case class UpMoveState(
appliedUps: List[Script] = Nil,
appliedUpsWithDowns: List[Script] = Nil,
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/smt/db/Connection.scala
Expand Up @@ -7,6 +7,8 @@ import sbt.Logger
trait Connection {

def init(logger: Logger)(): String \/ Unit

def acquireLock(logger: Logger)(): String \/ String

def state(logger: Logger): String \/ Seq[MigrationInfo]

Expand All @@ -24,5 +26,7 @@ trait Connection {

def testScript(logger: Logger)(script: Script): String \/ Unit

def releaseLock(logger: Logger)(lock: String): String \/ Unit

def close(logger: Logger)(): String \/ Unit
}
4 changes: 4 additions & 0 deletions src/main/scala/smt/db/ConnectionAction.scala
Expand Up @@ -16,6 +16,8 @@ trait ConnectionAction[T] extends ActionTypes[T] {

def init(): EDKleisli[Unit] = EDKleisli(d => hasConnection(d).init(hasLogger(d)))

def acquireLock(): EDKleisli[String] = EDKleisli(d => hasConnection(d).acquireLock(hasLogger(d)))

def state(): EDKleisli[Seq[MigrationInfo]] = EDKleisli(d => hasConnection(d).state(hasLogger(d)))

def downs(hash: Seq[Byte]): EDKleisli[Seq[Script]] = EDKleisli(d => hasConnection(d).downs(hasLogger(d))(hash))
Expand All @@ -32,6 +34,8 @@ trait ConnectionAction[T] extends ActionTypes[T] {

def doTest(test: Test): EDKleisli[Unit] = EDKleisli(d => test.run(hasConnection(d))(hasLogger(d)))

def releaseLock(lock: String): EDKleisli[Unit] = EDKleisli(d => hasConnection(d).releaseLock(hasLogger(d))(lock))

def close(): EDKleisli[Unit] = EDKleisli(d => hasConnection(d).close(hasLogger(d))())
}

Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/smt/db/LockAction.scala
@@ -0,0 +1,5 @@
package smt.db

object LockAction {
type HasLock[α] = α => String
}
12 changes: 11 additions & 1 deletion src/main/scala/smt/db/impl/SqlDatabase.scala
Expand Up @@ -8,7 +8,7 @@ import Util._
import collection.Map.empty
import smt.db.{Connection, Database}
import smt.migration.{Script, MigrationInfo, Direction}
import scalaz.\/
import scalaz.{\/-, \/}
import scala.collection.immutable.Stream.Empty
import sbt.Logger

Expand Down Expand Up @@ -140,6 +140,16 @@ abstract class SqlConnection(protected val cnx: JConnection,
if (!doesDownTableExist()) createDownTable()
}

def acquireLock(logger: Logger)(): \/[String, String] = {
logger.info("Acquiring lock")
\/-("")
}

def releaseLock(logger: Logger)(lock: String): \/[String, Unit] = {
logger.info("Releasing lock")
\/-()
}

def add(logger: Logger)(migrationInfo: MigrationInfo): String \/ Unit = fromTryCatch {
val mm = withStatement(cnx)(st => {
mapResultSet(st.executeQuery(queryMigrationTableString))(rs => {
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/smt/db/impl/TestDatabase.scala
Expand Up @@ -24,6 +24,10 @@ class TestConnection extends Connection {
\/-()
}

def acquireLock(logger: Logger)(): \/[String, String] = \/-("")

def releaseLock(logger: Logger)(lock: String): \/[String, Unit] = \/-()

def add(logger: Logger)(migrationInfo: MigrationInfo): String \/ Unit = {
logger.info("adding " + migrationInfo)
s = s :+ migrationInfo
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/smt/util/ActionTypes.scala
Expand Up @@ -51,6 +51,8 @@ trait ActionTypes[D] {

lazy val ewSyntax = KleisliStack.EitherTWriterTKleisli[String, W].tKleisliSyntax[Future, D]

lazy val ewSyntax2 = EitherTHaerte.eitherTSyntax[WDKleisli, String]

// the implicits are lazy, otherwise I get NullPointerExceptions
implicit lazy val wInstance = WriterT.writerTMonad[DKleisli, W]

Expand Down

0 comments on commit 05f390f

Please sign in to comment.