Skip to content

Commit

Permalink
Merge eedb4bc into 76294d7
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Mar 6, 2024
2 parents 76294d7 + eedb4bc commit 816b7a9
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object EventualCassandra {
): Resource[F, EventualJournal[F]] = {

def journal(implicit cassandraCluster: CassandraCluster[F], cassandraSession: CassandraSession[F]) = {
of(config.schema, origin, metrics, config.consistencyConfig.toCassandraConsistencyConfig)
of(config.schema, origin, metrics, config.consistencyConfig)
}

for {
Expand All @@ -75,7 +75,7 @@ object EventualCassandra {
schemaConfig: SchemaConfig,
origin: Option[Origin],
metrics: Option[EventualJournal.Metrics[F]],
consistencyConfig: CassandraConsistencyConfig
consistencyConfig: EventualCassandraConfig.ConsistencyConfig
): F[EventualJournal[F]] = {

for {
Expand Down Expand Up @@ -193,7 +193,7 @@ object EventualCassandra {
schema: Schema,
segmentNrsOf: SegmentNrsOf[F],
segments: Segments,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[Statements[F]] = {
for {
selectRecords <- JournalStatements.SelectRecords.of[F](schema.journal, consistencyConfig)
Expand Down Expand Up @@ -222,7 +222,7 @@ object EventualCassandra {
schema: Schema,
segmentNrsOf: SegmentNrsOf[F],
segments: Segments,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[MetaJournalStatements[F]] = {
of(schema.metaJournal, segmentNrsOf, segments, consistencyConfig)
}
Expand All @@ -231,7 +231,7 @@ object EventualCassandra {
metaJournal: TableName,
segmentNrsOf: SegmentNrsOf[F],
segments: Segments,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[MetaJournalStatements[F]] = {
for {
selectJournalHead <- cassandra.MetaJournalStatements.SelectJournalHead.of[F](metaJournal, consistencyConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object JournalStatements {

def of[F[_] : Monad : CassandraSession : ToTry : JsonCodec.Encode](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[InsertRecords[F]] = {

implicit val encodeTry: JsonCodec.Encode[Try] = JsonCodec.Encode.summon[F].mapK(ToTry.functionK)
Expand Down Expand Up @@ -148,7 +148,7 @@ object JournalStatements {

def of[F[_] : Monad : CassandraSession : ToTry : JsonCodec.Decode](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read): F[SelectRecords[F]] = {
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read): F[SelectRecords[F]] = {

implicit val encodeTry: JsonCodec.Decode[Try] = JsonCodec.Decode.summon[F].mapK(ToTry.functionK)
implicit val decodeByNameByteVector: DecodeByName[ByteVector] = DecodeByName[Array[Byte]]
Expand Down Expand Up @@ -242,7 +242,7 @@ object JournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[DeleteTo[F]] = {

val query =
Expand Down Expand Up @@ -280,7 +280,7 @@ object JournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[Delete[F]] = {

val query =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[Insert[F]] = {

val query =
Expand Down Expand Up @@ -122,7 +122,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[SelectJournalHead[F]] = {

val query =
Expand Down Expand Up @@ -164,7 +164,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[SelectJournalPointer[F]] = {

val query =
Expand Down Expand Up @@ -207,7 +207,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[IdByTopicAndExpireOn[F]] = {

val query =
Expand Down Expand Up @@ -244,7 +244,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[IdByTopicAndCreated[F]] = {

val query =
Expand Down Expand Up @@ -281,7 +281,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[IdByTopicAndSegment[F]] = {

val query =
Expand Down Expand Up @@ -321,7 +321,7 @@ object MetaJournalStatements {

object Update {

def of[F[_] : Monad : CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Write
def of[F[_] : Monad : CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[Update[F]] = {

val query =
Expand Down Expand Up @@ -367,7 +367,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[UpdateSeqNr[F]] = {

val query =
Expand Down Expand Up @@ -414,7 +414,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[UpdateExpiry[F]] = {

val query =
Expand Down Expand Up @@ -455,7 +455,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[UpdateDeleteTo[F]] = {

val query =
Expand Down Expand Up @@ -494,7 +494,7 @@ object MetaJournalStatements {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[UpdatePartitionOffset[F]] = {
s"""
|UPDATE ${ name.toCql }
Expand Down Expand Up @@ -528,7 +528,7 @@ object MetaJournalStatements {

object Delete {

def of[F[_] : Monad : CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Write): F[Delete[F]] = {
def of[F[_] : Monad : CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write): F[Delete[F]] = {

val query =
s"""
Expand Down Expand Up @@ -562,7 +562,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[DeleteExpiry[F]] = {

val query =
Expand Down Expand Up @@ -598,7 +598,7 @@ object MetaJournalStatements {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[SelectIds[F]] = {
for {
prepared <- s"SELECT id FROM ${ name.toCql } WHERE topic = ? AND segment = ?".prepare
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object Pointer2Statements {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[SelectTopics[F]] = {

val query = s"""SELECT DISTINCT topic, partition FROM ${ name.toCql }""".stripMargin
Expand Down Expand Up @@ -76,7 +76,7 @@ object Pointer2Statements {
}
}

def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Read): F[Select[F]] = {
def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read): F[Select[F]] = {
s"""
|SELECT created FROM ${ name.toCql }
|WHERE topic = ?
Expand Down Expand Up @@ -105,7 +105,7 @@ object Pointer2Statements {

object SelectOffset {

def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Read): F[SelectOffset[F]] = {
def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read): F[SelectOffset[F]] = {

val query =
s"""
Expand Down Expand Up @@ -138,7 +138,7 @@ object Pointer2Statements {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[Insert[F]] = {

val query =
Expand Down Expand Up @@ -173,7 +173,7 @@ object Pointer2Statements {

object Update {

def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Write): F[Update[F]] = {
def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write): F[Update[F]] = {

val query =
s"""
Expand Down Expand Up @@ -207,7 +207,7 @@ object Pointer2Statements {

object UpdateCreated {

def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Write): F[UpdateCreated[F]] = {
def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write): F[UpdateCreated[F]] = {
s"""
|UPDATE ${ name.toCql }
|SET offset = ?, created = ?, updated = ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object PointerStatements {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[Insert[F]] = {

val query =
Expand Down Expand Up @@ -73,7 +73,7 @@ object PointerStatements {

object Update {

def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Write): F[Update[F]] = {
def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write): F[Update[F]] = {

val query =
s"""
Expand Down Expand Up @@ -118,7 +118,7 @@ object PointerStatements {
}
}

def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Read): F[Select[F]] = {
def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read): F[Select[F]] = {
s"""
|SELECT created FROM ${ name.toCql }
|WHERE topic = ?
Expand Down Expand Up @@ -147,7 +147,7 @@ object PointerStatements {

object SelectOffset {

def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Read): F[SelectOffset[F]] = {
def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read): F[SelectOffset[F]] = {

val query =
s"""
Expand Down Expand Up @@ -180,7 +180,7 @@ object PointerStatements {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[SelectTopics[F]] = {

val query = s"""SELECT DISTINCT topic FROM ${ name.toCql }""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ object ReplicatedCassandra {
): F[ReplicatedJournal[F]] = {

for {
schema <- SetupSchema[F](config.schema, origin, config.consistencyConfig.toCassandraConsistencyConfig)
statements <- Statements.of[F](schema, config.consistencyConfig.toCassandraConsistencyConfig)
schema <- SetupSchema[F](config.schema, origin, config.consistencyConfig)
statements <- Statements.of[F](schema, config.consistencyConfig)
log <- LogOf[F].apply(ReplicatedCassandra.getClass)
expiryService <- ExpiryService.of[F]
} yield {
Expand Down Expand Up @@ -540,15 +540,15 @@ object ReplicatedCassandra {

def of[F[_]: Monad: CassandraSession](
schema: Schema,
consistencyConfig: CassandraConsistencyConfig
consistencyConfig: EventualCassandraConfig.ConsistencyConfig
): F[MetaJournalStatements[F]] = {
of[F](schema.metaJournal, consistencyConfig)
}


def of[F[_]: Monad: CassandraSession](
metaJournal: TableName,
consistencyConfig: CassandraConsistencyConfig
consistencyConfig: EventualCassandraConfig.ConsistencyConfig
): F[MetaJournalStatements[F]] = {

for {
Expand Down Expand Up @@ -694,7 +694,7 @@ object ReplicatedCassandra {

def of[F[_]: Monad: CassandraSession: ToTry: JsonCodec.Encode](
schema: Schema,
consistencyConfig: CassandraConsistencyConfig
consistencyConfig: EventualCassandraConfig.ConsistencyConfig
): F[Statements[F]] = {
for {
insertRecords <- JournalStatements.InsertRecords.of[F](schema.journal, consistencyConfig.write)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object SetupSchema {
def apply[F[_]: Temporal: Parallel: CassandraCluster: CassandraSession: LogOf](
config: SchemaConfig,
origin: Option[Origin],
consistencyConfig: CassandraConsistencyConfig
consistencyConfig: EventualCassandraConfig.ConsistencyConfig
): F[Schema] = {

def createSchema(implicit cassandraSync: CassandraSync[F]) = CreateSchema(config)
Expand All @@ -53,7 +53,7 @@ object SetupSchema {
cassandraSync <- CassandraSync.of[F](config.keyspace, config.locksTable, origin)
ab <- createSchema(cassandraSync)
(schema, fresh) = ab
settings <- SettingsCassandra.of[F](schema.setting, origin, consistencyConfig)
settings <- SettingsCassandra.of[F](schema.setting, origin, consistencyConfig.toCassandraConsistencyConfig)
_ <- migrate(schema, fresh, settings, cassandraSync)
} yield schema
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import cats.{Applicative, Monad, ~>}
import com.evolutiongaming.catshelper.DataHelper._
import com.evolutiongaming.catshelper.{ApplicativeThrowable, FromTry, Log, MeasureDuration, MonadThrowable}
import com.evolutiongaming.kafka.journal._
import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraConsistencyConfig
import com.evolutiongaming.kafka.journal.eventual.cassandra.{CassandraSession, ExpireOn, MetaJournalStatements, SegmentNr}
import com.evolutiongaming.kafka.journal.util.Fail
import com.evolutiongaming.kafka.journal.util.StreamHelper._
Expand All @@ -19,6 +18,7 @@ import com.evolutiongaming.smetrics.MetricsHelper._
import com.evolutiongaming.smetrics._

import scala.concurrent.duration.FiniteDuration
import com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandraConfig

trait PurgeExpired[F[_]] {

Expand All @@ -33,7 +33,7 @@ object PurgeExpired {
producerConfig: ProducerConfig,
tableName: TableName,
metrics: Option[Metrics[F]],
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): Resource[F, PurgeExpired[F]] = {

implicit val fromAttempt = FromAttempt.lift[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class SettingsIntSpec extends AsyncWordSpec with BeforeAndAfterAll with Matchers
cassandraSession: CassandraSession[F]) = {

for {
schema <- SetupSchema[F](config, origin, CassandraConsistencyConfig.default)
schema <- SetupSchema[F](config, origin, EventualCassandraConfig.ConsistencyConfig.default)
settings <- SettingsCassandra.of[F](schema.setting, origin, CassandraConsistencyConfig.default)
} yield settings
}
Expand Down

0 comments on commit 816b7a9

Please sign in to comment.