Skip to content

Commit

Permalink
Implement ZStream support (#53)
Browse files Browse the repository at this point in the history
Co-authored-by: Alex Ritter <alex@undo.app>
  • Loading branch information
gaelrenoux and xQwexx committed Mar 24, 2024
1 parent cfc719f commit 82bb584
Show file tree
Hide file tree
Showing 14 changed files with 543 additions and 35 deletions.
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,26 @@ val es: ErrorStrategies =



### Streaming

When the wrapped framework handle streaming, you can convert the framework's stream to a `ZStream` using `tzioStream`.
To provide a `Connection` to the ZIO stream, you can either consume the stream into a ZIO first (then use the same functions as above), or use the `Database`'s streaming methods.

The methods `transactionOrDieStream` and `autoCommitStream` work in the same way as the similar, non-stream method.
Note that for transactions, only the `OrDie` variant exists: this is because ZIO's acquire-release mechanism for stream does not allow to pass errors that occur during the acquire-release phase in the error channel.
Defects in the stream due to connection errors can only be caught after the `ZStream` has been consumed into a `ZIO`.

```scala
import io.github.gaelrenoux.tranzactio.doobie._
import zio._
val queryStream: ZStream[Connection, Error, Person] = tzioStream { sql"""SELECT given_name, family_name FROM person""".query[Person].stream }.mapError(transform)
val zStream: ZStream[Database, DbException, Person] = Database.transactionOrDieStream(queryStream)
```

You can see a full example in the `examples` submodule (in `LayeredAppStreaming`).



### Multiple Databases

Some applications use multiple databases.
Expand All @@ -339,7 +359,6 @@ You only need to provide a different marker type for each database you use.

```scala
import io.github.gaelrenoux.tranzactio.doobie._
import javax.sql.DataSource
import zio._

trait Db1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.github.gaelrenoux.tranzactio

import zio.ZIO.attemptBlocking
import zio._
import zio.stream.ZStream

import java.sql.Connection
import javax.sql.DataSource
Expand All @@ -14,8 +15,14 @@ object ConnectionSource {
def runTransaction[R, E, A](task: Connection => ZIO[R, E, A], commitOnFailure: => Boolean = false)
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A]

def runTransactionOrDieStream[R, E, A](task: Connection => ZStream[R, E, A], commitOnFailure: => Boolean = false)
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, E, A]

def runAutoCommit[R, E, A](task: Connection => ZIO[R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A]

def runAutoCommitStream[R, E, A](task: Connection => ZStream[R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A]
}

/** ConnectionSource with standard behavior. Children class need to implement `getConnection`. */
Expand All @@ -39,16 +46,37 @@ object ConnectionSource {
}
}

def runTransactionOrDieStream[R, E, A](task: Connection => ZStream[R, E, A], commitOnFailure: => Boolean = false)
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, E, A] = {
ZStream
.acquireReleaseExitWith(openConnection.tap(c => setAutoCommit(c, autoCommit = false)).orDie) {
case (c, Exit.Success(_)) => commitConnection(c).tapEither(_ => closeConnection(c)).orDie
case (c, Exit.Failure(cause)) if cause.isDie => closeConnection(c).orDie // No commit, no rollback in case of a defect, just close the connection
case (c, Exit.Failure(_)) => (if (commitOnFailure) commitConnection(c) else rollbackConnection(c)).tapEither(_ => closeConnection(c)).orDie
}
.flatMap { (c: Connection) => task(c) }
}

def runAutoCommit[R, E, A](task: Connection => ZIO[R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A] =
ZIO.acquireReleaseWith(openConnection.mapError(Left(_)))(closeConnection(_).orDie) { (c: Connection) =>
setAutoCommit(c, autoCommit = true)
.mapError(Left(_))
setAutoCommit(c, autoCommit = true).mapError(Left(_))
.zipRight {
task(c).mapError(Right(_))
}
}

def runAutoCommitStream[R, E, A](task: Connection => ZStream[R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A] =
ZStream
.acquireReleaseWith(openConnection.mapError(Left(_)))(closeConnection(_).orDie)
.flatMap { (c: Connection) =>
ZStream.fromZIO(setAutoCommit(c, autoCommit = true).mapError(Left(_)))
.crossRight {
task(c).mapError(Right(_))
}
}

// TODO handle error reporting when retrying

private def bottomErrorStrategy(implicit errorStrategies: ErrorStrategiesRef) =
Expand Down Expand Up @@ -116,6 +144,10 @@ object ConnectionSource {
super.runTransaction(task, commitOnFailure)
}

override def runTransactionOrDieStream[R, E, A](task: Connection => ZStream[R, E, A], commitOnFailure: => Boolean = false)
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, E, A] =
super.runTransactionOrDieStream(task, commitOnFailure) // TODO Could not find a way to use the semaphore here

override def runAutoCommit[R, E, A](task: Connection => ZIO[R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A] =
semaphore.withPermit {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.gaelrenoux.tranzactio


import zio.stream.ZStream
import zio.{Tag, Trace, ZIO, ZLayer}

import javax.sql.DataSource
Expand All @@ -20,6 +21,15 @@ abstract class DatabaseModuleBase[Connection, Database <: DatabaseOps.ServiceOps
}
}

override def transactionOrDieStream[R, E, A](
stream: => ZStream[Connection with R, E, A],
commitOnFailure: => Boolean = false
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[Database with R, E, A] = {
ZStream.serviceWithStream[Database] { db =>
db.transactionOrDieStream[R, E, A](stream, commitOnFailure)
}
}

override def autoCommit[R, E, A](
zio: => ZIO[Connection with R, E, A]
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZIO[Database with R, Either[DbException, E], A] = {
Expand All @@ -28,6 +38,14 @@ abstract class DatabaseModuleBase[Connection, Database <: DatabaseOps.ServiceOps
}
}

override def autoCommitStream[R, E, A](
stream: => ZStream[Connection with R, E, A]
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[Database with R, Either[DbException, E], A] = {
ZStream.serviceWithStream[Database] { db =>
db.autoCommitStream[R, E, A](stream)
}
}

/** Creates a Database Layer which requires an existing ConnectionSource. */
def fromConnectionSource(implicit dbContext: DbContext, trace: Trace): ZLayer[ConnectionSource, Nothing, Database]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.gaelrenoux.tranzactio

import zio.stream.ZStream
import zio.{Cause, Trace, ZIO}

/** Operations for a Database, based on a few atomic operations. Can be used both by the actual DB service, or by the DB
Expand Down Expand Up @@ -58,6 +59,12 @@ trait DatabaseOps[Connection, R0] {
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZIO[R with R0, E, A] =
transactionOrDie[R, E, A](zio, commitOnFailure)

/** As `transactionOrDie`, for ZStream instances instead of ZIO instances. */
def transactionOrDieStream[R <: Any, E, A](
stream: => ZStream[Connection with R, E, A],
commitOnFailure: => Boolean = false
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[R with R0, E, A]

/** Provides that ZIO with a Connection. All DB action in the ZIO will be auto-committed. Failures in the initial
* ZIO will be wrapped in a Right in the error case of the resulting ZIO, with connection errors resulting in a
* failure with the exception wrapped in a Left.
Expand All @@ -74,6 +81,14 @@ trait DatabaseOps[Connection, R0] {
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZIO[R with R0, Either[DbException, E], A] =
autoCommit[R, E, A](zio)

/** As `autoCommit`, for ZStream instances instead of ZIO instances.
*
* This method should be implemented by subclasses, to provide the connection.
*/
def autoCommitStream[R, E, A](
stream: => ZStream[Connection with R, E, A]
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[R with R0, Either[DbException, E], A]

/** As `autoCommit`, but exceptions are simply widened to a common failure type. The resulting failure type is a
* superclass of both DbException and the error type of the inital ZIO. */
final def autoCommitOrWiden[R, E >: DbException, A](
Expand All @@ -87,6 +102,12 @@ trait DatabaseOps[Connection, R0] {
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZIO[R with R0, E, A] =
autoCommitOrWiden[R, E, A](zio)

/** As `autoCommitOrWiden`, for ZStream instances instead of ZIO instances. */
final def autoCommitOrWidenStream[R, E >: DbException, A](
stream: => ZStream[Connection with R, E, A]
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[R with R0, E, A] =
autoCommitStream[R, E, A](stream).mapError(_.merge)

/** As `autoCommit`, but errors when handling the connections are treated as defects instead of failures. */
final def autoCommitOrDie[R, E, A](
zio: => ZIO[Connection with R, E, A]
Expand All @@ -99,6 +120,17 @@ trait DatabaseOps[Connection, R0] {
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZIO[R with R0, E, A] =
autoCommitOrDie[R, E, A](zio)

/** As `autoCommitOrDie`, for ZStream instances instead of ZIO instances. */
final def autoCommitOrDieStream[R, E, A](
stream: => ZStream[Connection with R, E, A]
)(implicit errorStrategies: ErrorStrategiesRef = ErrorStrategies.Parent, trace: Trace): ZStream[R with R0, E, A] =
autoCommitStream[R, E, A](stream).mapErrorCause { cause =>
cause.flatMap {
case Left(dbError) => Cause.die(dbError, cause.trace)
case Right(error) => Cause.fail(error, cause.trace)
}
}

}

object DatabaseOps {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.gaelrenoux.tranzactio

import zio.{Tag, ZEnvironment, ZIO, Trace}
import zio.stream.ZStream
import zio.{Tag, Trace, ZEnvironment, ZIO}

import java.sql.{Connection => JdbcConnection}

Expand All @@ -23,6 +24,16 @@ abstract class DatabaseServiceBase[Connection: Tag](connectionSource: Connection
}, commitOnFailure)
}

override def transactionOrDieStream[R, E, A](stream: => ZStream[Connection with R, E, A], commitOnFailure: => Boolean = false)
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, E, A] =
ZStream.environmentWithStream[R] { r =>
runTransactionOrDieStream({ (c: JdbcConnection) =>
ZStream.fromZIO(connectionFromJdbc(c))
.map(r ++ ZEnvironment(_))
.flatMap(stream.provideEnvironment(_))
}, commitOnFailure)
}

override def autoCommit[R, E, A](zio: => ZIO[Connection with R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A] =
ZIO.environmentWithZIO[R] { r =>
Expand All @@ -33,5 +44,15 @@ abstract class DatabaseServiceBase[Connection: Tag](connectionSource: Connection
}
}

override def autoCommitStream[R, E, A](stream: => ZStream[Connection with R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A] =
ZStream.environmentWithStream[R] { r =>
runAutoCommitStream { (c: JdbcConnection) =>
ZStream.fromZIO(connectionFromJdbc(c))
.map(r ++ ZEnvironment(_))
.flatMap(stream.provideEnvironment(_))
}
}

}

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.gaelrenoux.tranzactio

import zio.{Trace, ZEnvironment, ZIO, ZLayer, Tag}
import zio.stream.ZStream
import zio.{Tag, Trace, ZEnvironment, ZIO, ZLayer}

/**
* This is a typed database, to use when you have multiple databases in your application. Simply provide a marker type,
Expand All @@ -12,11 +13,17 @@ class DatabaseTBase[M: Tag, Connection](underlying: DatabaseOps.ServiceOps[Conne
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R with Any, Either[DbException, E], A] =
underlying.transaction[R, E, A](task, commitOnFailure = commitOnFailure)

override def transactionOrDieStream[R, E, A](stream: => ZStream[Connection with R, E, A], commitOnFailure: => Boolean)
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R with Any, E, A] =
underlying.transactionOrDieStream[R, E, A](stream, commitOnFailure = commitOnFailure)

override def autoCommit[R, E, A](task: => ZIO[Connection with R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R with Any, Either[DbException, E], A] =
underlying.autoCommit[R, E, A](task)

override def autoCommitStream[R, E, A](stream: => ZStream[Connection with R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R with Any, Either[DbException, E], A] =
underlying.autoCommitStream[R, E, A](stream)
}

object DatabaseTBase {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.github.gaelrenoux.tranzactio

import zio.{Tag, Trace}
import zio.Trace


/** A specific wrapper package for one specific library (e.g. Doobie). */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.github.gaelrenoux.tranzactio.test

import zio.stream.ZStream

import io.github.gaelrenoux.tranzactio._
import zio.{Tag, ZEnvironment, ZIO, ZLayer, Trace}
import zio.{Tag, Trace, ZEnvironment, ZIO, ZLayer}

/** Testing utilities on the Database module. */
trait DatabaseModuleTestOps[Connection, DbContext] extends DatabaseModuleBase[Connection, DatabaseOps.ServiceOps[Connection], DbContext] {
Expand Down Expand Up @@ -32,13 +34,29 @@ trait DatabaseModuleTestOps[Connection, DbContext] extends DatabaseModuleBase[Co
.mapError(Right(_))
}

override def transactionOrDieStream[R, E, A](stream: => ZStream[Connection with R, E, A], commitOnFailure: => Boolean = false)
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, E, A] = {
ZStream.fromZIO(noConnection).flatMap { c =>
ZStream.environmentWith[R](_ ++ ZEnvironment(c))
.flatMap(stream.provideEnvironment(_))
}
}

override def autoCommit[R, E, A](zio: => ZIO[Connection with R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZIO[R, Either[DbException, E], A] =
noConnection.flatMap { c =>
ZIO.environmentWith[R](_ ++ ZEnvironment(c))
.flatMap(zio.provideEnvironment(_))
.mapError(Right(_))
}

override def autoCommitStream[R, E, A](stream: => ZStream[Connection with R, E, A])
(implicit errorStrategies: ErrorStrategiesRef, trace: Trace): ZStream[R, Either[DbException, E], A] =
ZStream.fromZIO(noConnection).flatMap { c =>
ZStream.environmentWith[R](_ ++ ZEnvironment(c))
.flatMap(stream.provideEnvironment(_))
.mapError(Right(_))
}
}
}
}
Loading

0 comments on commit 82bb584

Please sign in to comment.