Skip to content

Commit

Permalink
io free monad
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil committed Nov 13, 2016
1 parent 88b8ffa commit cee378c
Show file tree
Hide file tree
Showing 23 changed files with 960 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,27 @@ import scala.util.Try
import io.getquill.context.sql.SqlContext
import io.getquill.context.sql.idiom.SqlIdiom
import io.getquill.NamingStrategy
import io.getquill.monad.ScalaFutureIOMonad

abstract class AsyncContext[D <: SqlIdiom, N <: NamingStrategy, C <: Connection](pool: PartitionedConnectionPool[C])
extends SqlContext[D, N]
with Decoders
with Encoders {
with Encoders
with ScalaFutureIOMonad {

private val logger: Logger =
Logger(LoggerFactory.getLogger(classOf[AsyncContext[_, _, _]]))

override type PrepareRow = List[Any]
override type ResultRow = RowData

override type RunQueryResult[T] = Future[List[T]]
override type RunQuerySingleResult[T] = Future[T]
override type RunActionResult = Future[Long]
override type RunActionReturningResult[T] = Future[T]
override type RunBatchActionResult = Future[List[Long]]
override type RunBatchActionReturningResult[T] = Future[List[T]]
override type Result[T] = Future[T]
override type RunQueryResult[T] = List[T]
override type RunQuerySingleResult[T] = T
override type RunActionResult = Long
override type RunActionReturningResult[T] = T
override type RunBatchActionResult = List[Long]
override type RunBatchActionReturningResult[T] = List[T]

override def close = {
Await.result(pool.close, Duration.Inf)
Expand All @@ -61,6 +64,12 @@ abstract class AsyncContext[D <: SqlIdiom, N <: NamingStrategy, C <: Connection]
f(TransactionalExecutionContext(ec, c))
}

override def unsafePerformIO[T](io: IO[T, _], transactional: Boolean = false)(implicit ec: ExecutionContext): Result[T] =
transactional match {
case false => super.unsafePerformIO(io)
case true => transaction(super.unsafePerformIO(io)(_))
}

def executeQuery[T](sql: String, prepare: List[Any] => List[Any] = identity, extractor: RowData => T = identity[RowData] _)(implicit ec: ExecutionContext): Future[List[T]] = {
logger.info(sql)
withConnection(_.sendPreparedStatement(sql, prepare(List()))).map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,30 @@ import com.typesafe.config.Config
import scala.collection.JavaConverters._
import io.getquill.context.cassandra.CassandraSessionContext
import com.datastax.driver.core.Cluster
import io.getquill.monad.ScalaFutureIOMonad

class CassandraAsyncContext[N <: NamingStrategy](
cluster: Cluster,
keyspace: String,
preparedStatementCacheSize: Long
)
extends CassandraSessionContext[N](cluster, keyspace, preparedStatementCacheSize) {
extends CassandraSessionContext[N](cluster, keyspace, preparedStatementCacheSize)
with ScalaFutureIOMonad {

def this(config: CassandraContextConfig) = this(config.cluster, config.keyspace, config.preparedStatementCacheSize)
def this(config: Config) = this(CassandraContextConfig(config))
def this(configPrefix: String) = this(LoadConfig(configPrefix))

override type RunQueryResult[T] = Future[List[T]]
override type RunQuerySingleResult[T] = Future[T]
override type RunActionResult = Future[Unit]
override type RunBatchActionResult = Future[Unit]
override type Result[T] = Future[T]
override type RunQueryResult[T] = List[T]
override type RunQuerySingleResult[T] = T
override type RunActionResult = Unit
override type RunBatchActionResult = Unit

override def unsafePerformIO[T](io: IO[T, _], transactional: Boolean = false)(implicit ec: ExecutionContext): Result[T] = {
if (transactional) logger.warn("Cassandra doesn't support transactions, ignoring `io.transactional`")
unsafePerformIO(io)
}

def executeQuery[T](cql: String, prepare: BoundStatement => BoundStatement = identity, extractor: Row => T = identity[Row] _)(implicit ec: ExecutionContext): Future[List[T]] =
session.executeAsync(prepare(super.prepare(cql)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ class CassandraStreamContext[N <: NamingStrategy](
def this(config: Config) = this(CassandraContextConfig(config))
def this(configPrefix: String) = this(LoadConfig(configPrefix))

override type RunQueryResult[T] = Observable[T]
override type RunQuerySingleResult[T] = Observable[T]
override type RunActionResult = Observable[Unit]
override type RunBatchActionResult = Observable[Unit]
override type Result[T] = Observable[T]
override type RunQueryResult[T] = T
override type RunQuerySingleResult[T] = T
override type RunActionResult = Unit
override type RunBatchActionResult = Unit

protected def page(rs: ResultSet): Task[Iterable[Row]] = Task.defer {
val available = rs.getAvailableWithoutFetching
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,31 @@ import io.getquill.util.LoadConfig
import io.getquill.context.cassandra.CassandraSessionContext
import scala.collection.JavaConverters._
import com.datastax.driver.core.Cluster
import io.getquill.monad.SyncIOMonad

class CassandraSyncContext[N <: NamingStrategy](
cluster: Cluster,
keyspace: String,
preparedStatementCacheSize: Long
)
extends CassandraSessionContext[N](cluster, keyspace, preparedStatementCacheSize) {
extends CassandraSessionContext[N](cluster, keyspace, preparedStatementCacheSize)
with SyncIOMonad {

def this(config: CassandraContextConfig) = this(config.cluster, config.keyspace, config.preparedStatementCacheSize)
def this(config: Config) = this(CassandraContextConfig(config))
def this(configPrefix: String) = this(LoadConfig(configPrefix))

override type Result[T] = T
override type RunQueryResult[T] = List[T]
override type RunQuerySingleResult[T] = T
override type RunActionResult = Unit
override type RunBatchActionResult = Unit

override def unsafePerformIO[T](io: IO[T, _], transactional: Boolean = false): Result[T] = {
if (transactional) logger.warn("Cassandra doesn't support transactions, ignoring `io.transactional`")
unsafePerformIO(io)
}

def executeQuery[T](cql: String, prepare: BoundStatement => BoundStatement = identity, extractor: Row => T = identity[Row] _): List[T] =
session.execute(prepare(super.prepare(cql)))
.all.asScala.toList.map(extractor)
Expand Down
7 changes: 5 additions & 2 deletions quill-core/src/main/scala/io/getquill/MirrorContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@ package io.getquill
import io.getquill.context.Context
import io.getquill.context.mirror.{ MirrorDecoders, MirrorEncoders, Row }
import io.getquill.idiom.{ Idiom => BaseIdiom }

import scala.util.{ Failure, Success, Try }
import io.getquill.monad.SyncIOMonad

class MirrorContextWithQueryProbing[Idiom <: BaseIdiom, Naming <: NamingStrategy]
extends MirrorContext[Idiom, Naming] with QueryProbing

class MirrorContext[Idiom <: BaseIdiom, Naming <: NamingStrategy]
extends Context[Idiom, Naming]
with MirrorEncoders
with MirrorDecoders {
with MirrorDecoders
with SyncIOMonad {

override type PrepareRow = Row
override type ResultRow = Row

override type Result[T] = T
override type RunQueryResult[T] = QueryMirror[T]
override type RunQuerySingleResult[T] = QueryMirror[T]
override type RunActionResult = ActionMirror
Expand Down
13 changes: 7 additions & 6 deletions quill-core/src/main/scala/io/getquill/context/Context.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ trait Context[Idiom <: io.getquill.idiom.Idiom, Naming <: NamingStrategy]
extends Closeable
with CoreDsl {

type Result[T]
type RunQuerySingleResult[T]
type RunQueryResult[T]
type RunActionResult
Expand All @@ -23,12 +24,12 @@ trait Context[Idiom <: io.getquill.idiom.Idiom, Naming <: NamingStrategy]

def probe(statement: String): Try[_]

def run[T](quoted: Quoted[T]): RunQuerySingleResult[T] = macro QueryMacro.runQuerySingle[T]
def run[T](quoted: Quoted[Query[T]]): RunQueryResult[T] = macro QueryMacro.runQuery[T]
def run(quoted: Quoted[Action[_]]): RunActionResult = macro ActionMacro.runAction
def run[T](quoted: Quoted[ActionReturning[_, T]]): RunActionReturningResult[T] = macro ActionMacro.runActionReturning[T]
def run(quoted: Quoted[BatchAction[Action[_]]]): RunBatchActionResult = macro ActionMacro.runBatchAction
def run[T](quoted: Quoted[BatchAction[ActionReturning[_, T]]]): RunBatchActionReturningResult[T] = macro ActionMacro.runBatchActionReturning[T]
def run[T](quoted: Quoted[T]): Result[RunQuerySingleResult[T]] = macro QueryMacro.runQuerySingle[T]
def run[T](quoted: Quoted[Query[T]]): Result[RunQueryResult[T]] = macro QueryMacro.runQuery[T]
def run(quoted: Quoted[Action[_]]): Result[RunActionResult] = macro ActionMacro.runAction
def run[T](quoted: Quoted[ActionReturning[_, T]]): Result[RunActionReturningResult[T]] = macro ActionMacro.runActionReturning[T]
def run(quoted: Quoted[BatchAction[Action[_]]]): Result[RunBatchActionResult] = macro ActionMacro.runBatchAction
def run[T](quoted: Quoted[BatchAction[ActionReturning[_, T]]]): Result[RunBatchActionReturningResult[T]] = macro ActionMacro.runBatchActionReturning[T]

protected def handleSingleResult[T](list: List[T]) =
list match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package io.getquill.context.mirror
import java.time.LocalDate
import java.util.Date

import io.getquill.MirrorContext

import scala.reflect.ClassTag
import io.getquill.context.Context

trait MirrorDecoders {
this: MirrorContext[_, _] =>
this: Context[_, _] =>

override type PrepareRow = Row
override type ResultRow = Row
override type Decoder[T] = MirrorDecoder[T]

case class MirrorDecoder[T](decoder: BaseDecoder[T]) extends BaseDecoder[T] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package io.getquill.context.mirror

import java.time.LocalDate
import java.util.Date

import io.getquill.MirrorContext
import io.getquill.context.Context

trait MirrorEncoders {
this: MirrorContext[_, _] =>
this: Context[_, _] =>

override type PrepareRow = Row
override type ResultRow = Row
override type Encoder[T] = MirrorEncoder[T]

case class MirrorEncoder[T](encoder: BaseEncoder[T]) extends BaseEncoder[T] {
Expand Down
126 changes: 126 additions & 0 deletions quill-core/src/main/scala/io/getquill/monad/IOMonad.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package io.getquill.monad

import scala.language.experimental.macros
import scala.collection.generic.CanBuildFrom
import scala.language.higherKinds
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import io.getquill.context.Context

sealed trait Effect

object Effect {
trait Read extends Effect
trait Write extends Effect
trait Transaction extends Effect
}

trait IOMonad {
this: Context[_, _] =>

type Effect = io.getquill.monad.Effect
val Effect = io.getquill.monad.Effect

def runIO[T](quoted: Quoted[T]): IO[RunQuerySingleResult[T], Effect.Read] = macro IOMonadMacro.runIO
def runIO[T](quoted: Quoted[Query[T]]): IO[RunQueryResult[T], Effect.Read] = macro IOMonadMacro.runIO
def runIO(quoted: Quoted[Action[_]]): IO[RunActionResult, Effect.Write] = macro IOMonadMacro.runIO
def runIO[T](quoted: Quoted[ActionReturning[_, T]]): IO[RunActionReturningResult[T], Effect.Write] = macro IOMonadMacro.runIO
def runIO(quoted: Quoted[BatchAction[Action[_]]]): IO[RunBatchActionResult, Effect.Write] = macro IOMonadMacro.runIO
def runIO[T](quoted: Quoted[BatchAction[ActionReturning[_, T]]]): IO[RunBatchActionReturningResult[T], Effect.Write] = macro IOMonadMacro.runIO

case class Run[T, E <: Effect](f: () => Result[T]) extends IO[T, E]
protected case class FromTry[T](t: Try[T]) extends IO[T, Effect]
protected case class Sequence[A, M[X] <: TraversableOnce[X], E <: Effect](in: M[IO[A, E]], cbfIOToResult: CanBuildFrom[M[IO[A, E]], Result[A], M[Result[A]]], cbfResultToValue: CanBuildFrom[M[Result[A]], A, M[A]]) extends IO[M[A], E]
protected case class TransformWith[T, S, E1 <: Effect, E2 <: Effect](io: IO[T, E1], f: Try[T] => IO[S, E2]) extends IO[S, E1 with E2]
protected case class Transactional[T, E <: Effect](io: IO[T, E]) extends IO[T, E with Effect.Transaction]

object IO {

def fromTry[T](result: Try[T]): IO[T, Effect] = FromTry(result)

def sequence[A, M[X] <: TraversableOnce[X], E <: Effect](in: M[IO[A, E]])(implicit cbfIOToResult: CanBuildFrom[M[IO[A, E]], Result[A], M[Result[A]]], cbfResultToValue: CanBuildFrom[M[Result[A]], A, M[A]]): IO[M[A], E] =
Sequence(in, cbfIOToResult, cbfResultToValue)

val unit: IO[Unit, Effect] = fromTry(Success(()))

def zip[T, E1 <: Effect, S, E2 <: Effect](a: IO[T, E1], b: IO[S, E2]): IO[(T, S), E1 with E2] =
sequence(List(a, b)).map {
case a :: b :: Nil => (a.asInstanceOf[T], b.asInstanceOf[S])
case other => throw new IllegalStateException("Sequence returned less than two elements")
}

def failed[T](exception: Throwable): IO[T, Effect] = fromTry(Failure(exception))

def successful[T](result: T): IO[T, Effect] = fromTry(Success(result))

def apply[T](body: => T): IO[T, Effect] = fromTry(Try(body))

def foldLeft[T, R, E <: Effect](ios: collection.immutable.Iterable[IO[T, E]])(zero: R)(op: (R, T) => R): IO[R, E] =
sequence(ios).map(_.foldLeft(zero)(op))

def reduceLeft[T, R >: T, E <: Effect](ios: collection.immutable.Iterable[IO[T, E]])(op: (R, T) => R): IO[R, E] =
sequence(ios).map(_.reduceLeft(op))

def traverse[A, B, M[X] <: TraversableOnce[X], E <: Effect](in: M[A])(fn: A => IO[B, E])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): IO[M[B], E] =
sequence(in.map(fn)).map(r => cbf().++=(r).result)
}

sealed trait IO[+T, -E <: Effect] {

def transactional: IO[T, E with Effect.Transaction] = Transactional(this)

def transformWith[S, E2 <: Effect](f: Try[T] => IO[S, E2]): IO[S, E with E2] =
TransformWith(this, f)

def transform[S](f: Try[T] => Try[S]): IO[S, E] =
transformWith { r =>
IO.fromTry(f(r))
}

def lowerFromTry[U](implicit ev: T => Try[U]) =
map(ev).flatMap {
case Success(v) => IO.successful(v)
case Failure(e) => IO.failed(e)
}

def liftToTry: IO[Try[T], E] =
transformWith(IO.successful)

def failed: IO[Throwable, E] =
transform {
case Failure(t) => Success(t)
case Success(v) => Failure(new NoSuchElementException("IO.failed not completed with a throwable."))
}

def map[S](f: T => S): IO[S, E] = transform(_.map(f))

def flatMap[S, E2 <: Effect](f: T => IO[S, E2]): IO[S, E with E2] =
transformWith {
case Success(s) => f(s)
case Failure(_) => this.asInstanceOf[IO[S, E with E2]]
}

def filter(p: T => Boolean): IO[T, E] =
map { r => if (p(r)) r else throw new NoSuchElementException("IO.filter predicate is not satisfied") }

final def withFilter(p: T => Boolean): IO[T, E] = filter(p)

def collect[S](pf: PartialFunction[T, S]): IO[S, E] =
map {
r => pf.applyOrElse(r, (t: T) => throw new NoSuchElementException("IO.collect partial function is not defined at: " + t))
}

def recover[U >: T](pf: PartialFunction[Throwable, U]): IO[U, E] =
transform { _ recover pf }

def recoverWith[U >: T, E2 <: Effect](pf: PartialFunction[Throwable, IO[U, E2]]): IO[U, E with E2] =
transformWith {
case Failure(t) => pf.applyOrElse(t, (_: Throwable) => this)
case Success(_) => this
}

def zip[S, E2 <: Effect](io: IO[S, E2]): IO[(T, S), E with E2] =
IO.zip(this, io)
}
}
11 changes: 11 additions & 0 deletions quill-core/src/main/scala/io/getquill/monad/IOMonadMacro.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.getquill.monad

import io.getquill.util.Messages._
import scala.reflect.macros.blackbox.{ Context => MacroContext }

class IOMonadMacro(val c: MacroContext) {
import c.universe._

def runIO(quoted: Tree): Tree =
c.debug(q"${c.prefix}.Run(() => ${c.prefix}.run($quoted))")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.getquill.monad

import scala.util.Failure
import scala.util.Success
import io.getquill.context.Context
import scala.concurrent.Future
import scala.concurrent.ExecutionContext

trait ScalaFutureIOMonad extends IOMonad {
this: Context[_, _] =>

type Result[T] = Future[T]

def unsafePerformIO[T](io: IO[T, _], transactional: Boolean = false)(implicit ec: ExecutionContext): Result[T] =
io match {
case FromTry(v) => Future.fromTry(v)
case Run(f) => f()
case Sequence(in, cbfIOToResult, cbfResultToValue) =>
val builder = cbfIOToResult()
in.foreach(builder += unsafePerformIO(_))
Future.sequence(builder.result)(cbfResultToValue, ec)
case TransformWith(a, fA) =>
unsafePerformIO(a)
.map(Success(_))
.recover { case ex => Failure(ex) }
.flatMap(v => unsafePerformIO(fA(v)))
case Transactional(io) =>
unsafePerformIO(io, transactional = true)
}
}
Loading

0 comments on commit cee378c

Please sign in to comment.