Skip to content

Commit

Permalink
Avoid the need to define the codec twice, i.e. like this:
Browse files Browse the repository at this point in the history
    implicit val journalCodec = JsonCodec.default[Try]
    implicit val journalParser = JournalParser.of[F]

    implicit val stateCodec = JsonCodec.default[F]
  • Loading branch information
rtar committed Jan 7, 2021
1 parent 869882b commit 20ad0d1
Showing 1 changed file with 21 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.evolutiongaming.kafka.flow.cassandra

import cats.Monad
import cats.arrow.FunctionK
import cats.effect.Clock
import cats.syntax.all._
import com.evolutiongaming.cassandra.sync.CassandraSync
Expand All @@ -12,12 +13,13 @@ import com.evolutiongaming.kafka.flow.snapshot.CassandraSnapshots
import com.evolutiongaming.kafka.journal.FromBytes
import com.evolutiongaming.kafka.journal.ToBytes
import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraSession
import scala.util.Try

trait CassandraPersistence[F[_], S] extends PersistenceModule[F, S]
object CassandraPersistence {

/** Creates schema in Cassandra if not there yet */
def withSchema[F[_]: MonadThrowable: Clock, S](
def withSchemaF[F[_]: MonadThrowable: Clock, S](
session: CassandraSession[F],
sync: CassandraSync[F]
)(implicit
Expand All @@ -33,6 +35,24 @@ object CassandraPersistence {
def snapshots = _snapshots
}

/** Creates schema in Cassandra if not there yet
*
* This method uses the same `JsonCodec[Try]` as `JournalParser` does to
* simplify defining the basic application.
*/
def withSchema[F[_]: MonadThrowable: Clock, S](
session: CassandraSession[F],
sync: CassandraSync[F]
)(implicit
fromBytes: FromBytes[Try, S],
toBytes: ToBytes[Try, S]
): F[PersistenceModule[F, S]] = {
val fromTry = FunctionK.liftFunction[Try, F](MonadThrowable[F].fromTry)
implicit val _fromBytes = fromBytes mapK fromTry
implicit val _toBytes = toBytes mapK fromTry
withSchemaF(session, sync)
}

/** Deletes all data in Cassandra */
def truncate[F[_]: Monad](
session: CassandraSession[F],
Expand Down

0 comments on commit 20ad0d1

Please sign in to comment.