Skip to content

Commit

Permalink
Clarify documentation more
Browse files Browse the repository at this point in the history
  • Loading branch information
Z1kkurat committed Jul 26, 2021
1 parent 97a64dc commit 2f2a1c2
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,85 +106,6 @@ class StatefulProcessingWithKafkaSpec(val globalRead: GlobalRead) extends KafkaS
}
}

// private def kafkaPersistence: Resource[IO, Persistence] = for {
// cache <- Cache.loading[IO, TopicPartition, Persistence]
// } yield new Persistence {
//
// def reset: IO[Unit] = cache.clear.flatten
//
// // TODO: use 2 partitions to verify correct state recovery, that for key1 input-topic:0 state-topic:0 is used
// // for key2 input-topic:1 state-topic:1, test case to check that key2 is removed when reached final state (when FoldOption returns None)
// // state topic must have the same number of partitions as input topic, and same keys,
// // so the data would be co-partitioned
// val stateTopic = "state-topic-StatefulProcessingWithKafkaSpec"
//
// // looks like keyStateOf.all is our entry point to recover state for the partition
// // PartitionFlow init:
// // keys = keyStateOf.all(topicPartition)
// // _ <- Log[F].info("partition recovery started")
// // ...
// // keys.foldM(0) { (count, key) =>
// // stateOf(timestamp, key) as (count + 1)
// // }
//
// case object ImpossibleError extends NoStackTrace
//
// val keysOf: KeysOf[IO, KafkaKey] = new KeysOf[IO, KafkaKey] {
//
// // KeyStateOf.eagerRecovery is not using this apply, it only needs to know how to load all keys
// def apply(key: KafkaKey) = ??? // fail fast, safe as it is not used in KeyStateOf.eagerRecovery
//
// def all(applicationId: String, groupId: String, topicPartition: TopicPartition) = {
// import Boilerplate._
// // FIXME keys are never deleted from the cache (should be removed on consumer rebalance listener partitions revoked)
// com.evolutiongaming.sstream.Stream.lift {
// cache
// .getOrUpdateReleasable(topicPartition) {
// // recover state for topicPartition
// Releasable.of(
// for {
// _ <- Resource.liftF(IO.unit)
// blocker <- Blocker[IO]
// producer <- ProducerOf[IO](blocker.blockingContext).apply(
// ProducerConfig.Default.copy(common =
// ProducerConfig.Default.common.copy(
// clientId = s"$topicPartition-producer".some
// )
// )
// )
// // use recovery consumer
// // read state from recovery topic
// stateRestoreConsumerConfig = ConsumerConfig(
// autoCommit = false,
// autoOffsetReset = AutoOffsetReset.Earliest
// )
// kafkaPersistence = KafkaPersistence.of[IO, State](
// ConsumerOf[IO](blocker.blockingContext),
// stateRestoreConsumerConfig,
// stateTopic,
// producer
// )
// p <- Resource.liftF(kafkaPersistence.ofPartition(topicPartition.partition))
//
// } yield new Persistence {
// def keysOf = p.keysOf
// def snapshotPersistenceOf = p.snapshots
// def reset: IO[Unit] = IO.unit
// }
// )
// }
// .map(p => p.keysOf.all(applicationId, groupId, topicPartition))
// }.flatten
// }
// }
//
// def snapshotPersistenceOf: SnapshotPersistenceOf[IO, KafkaKey, State, ConsRecord] =
// (key: KafkaKey, timestamps: Timestamps[IO]) => {
// cache
// .getOrElse(key.topicPartition, IO.raiseError(ImpossibleError))
// .flatMap(_.snapshotPersistenceOf.apply(key, timestamps))
// }
// }

test("stateful processing using in-memory persistence") { kafka =>
// using unique input topic name per test as weaver is running tests in parallel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object KafkaPartitionPersistence {
acc.asRight[BytesByKey].pure[F]
case _ =>
consumer
.poll(10.millis)
.poll(10.millis) // TODO: make poll timeout configurable
.map(
_.values.values
.flatMap(_.toIterable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object KafkaPersistenceModule {
/** Creates an instance of [[KafkaPersistenceModule]] for state recovery from a specific partition of a snapshot
* Kafka 'compacted' ([[https://kafka.apache.org/documentation/#compaction official documentation]]) topic.
* The exposed `keysOf` and `persistenceOf` implementations will perform cached reading of all the snapshot data
* in that partition.
* in that partition to the end without committing offsets.
* This implementation is to be used only with `eagerRecovery` strategy since it relies on the order of state
* recovery actions during partition assignment. See the implementation details below.
*
Expand All @@ -45,19 +45,15 @@ object KafkaPersistenceModule {
* Removing a value for a specific key from a cache is safe at that point since state recovery is performed only once -
* either during initialization when a partition is assigned (and there is a snapshot for a key) or when the journal record is first seen (no snapshot for a key previously).
*
* @param consumerOf
* @param producerOf
* @param consumerConfig
* @param producerConfig
* @param snapshotTopicPartition
* @tparam F
* @tparam S
* @return
* @param consumerOf Kafka consumer factory to create snapshot reading consumers
* @param producerOf Kafka producer factory to create producers for saving snapshots
* @param consumerConfig Kafka consumer config for snapshot reading consumers
* @param producerConfig Kafka producer config for snapshot writing producers
* @param snapshotTopicPartition snapshot topic-partition to read/write snapshots
*
* @see [[com.evolutiongaming.kafka.flow.PartitionFlow.of]] for implementations details of keys fetching and state recovery for a partition
* @see [[com.evolutiongaming.kafka.flow.KeyStateOf.eagerRecovery]] for implementation details of constructing [[com.evolutiongaming.kafka.flow.KeyState]] for a specific key
* @see [[com.evolutiongaming.kafka.flow.KeyFlow.of]] for implementation details of state recovery for a specific key
*
*/
def caching[F[_]: LogOf: Concurrent: FromBytes[*[_], String]: ToBytes[*[_], S], S: FromBytes[F, *]](
consumerOf: ConsumerOf[F],
Expand Down Expand Up @@ -100,7 +96,7 @@ object KafkaPersistenceModule {
): F[SnapshotPersistenceOf[F, KafkaKey, S, ConsRecord]] = {
LogOf[F].apply(classOf[SnapshotPersistenceOf[F, KafkaKey, S, ConsRecord]]).map { implicit log =>
implicit val producer_ = producer
val read = KafkaSnapshotReadDatabase[F, S](snapshotTopicPartition.topic, key => cache.remove(key).flatten)
val read = KafkaSnapshotReadDatabase.of[F, S](snapshotTopicPartition.topic, key => cache.remove(key).flatten)

val snapshotsOf = new SnapshotsOf[F, KafkaKey, S] {
override def apply(key: KafkaKey): F[Snapshots[F, S]] =
Expand All @@ -110,7 +106,7 @@ object KafkaPersistenceModule {
key = key,
database = SnapshotDatabase(
read = read,
write = KafkaSnapshotWriteDatabase[F, S](snapshotTopicPartition)
write = KafkaSnapshotWriteDatabase.of[F, S](snapshotTopicPartition)
),
buffer = buffer
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.evolutiongaming.skafka.{FromBytes, Topic}
import scodec.bits.ByteVector

object KafkaSnapshotReadDatabase {
def apply[F[_]: Monad, S: FromBytes[F, *]](
def of[F[_]: Monad, S: FromBytes[F, *]](
snapshotTopic: Topic,
getState: String => F[Option[ByteVector]]
): SnapshotReadDatabase[F, KafkaKey, S] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import com.evolutiongaming.skafka.producer.{Producer, ProducerRecord}
import com.evolutiongaming.skafka.{ToBytes, TopicPartition}

object KafkaSnapshotWriteDatabase {
def apply[F[_]: FromTry: Monad: Producer, S: ToBytes[F, *]](
def of[F[_]: FromTry: Monad: Producer, S: ToBytes[F, *]](
snapshotTopicPartition: TopicPartition
): SnapshotWriteDatabase[F, KafkaKey, S] = new SnapshotWriteDatabase[F, KafkaKey, S] {
override def persist(key: KafkaKey, snapshot: S) = produce(key, snapshot.some)
override def persist(key: KafkaKey, snapshot: S): F[Unit] = produce(key, snapshot.some)

override def delete(key: KafkaKey) = produce(key, none)
override def delete(key: KafkaKey): F[Unit] = produce(key, none)

private def produce(key: KafkaKey, snapshot: Option[S]) = {
private def produce(key: KafkaKey, snapshot: Option[S]): F[Unit] = {
val record = new ProducerRecord(
topic = snapshotTopicPartition.topic,
partition = snapshotTopicPartition.partition.some,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,49 @@ package object kafkapersistence {
def empty: BytesByKey = Map.empty
}

/** Create a [[PartitionFlowOf]] with a snapshot-based persistence and recovery from a Kafka
* [[https://kafka.apache.org/documentation/#compaction compacted topic]].
* State is restored eagerly on partition assignment by reading the content of a snapshot topic to the end
* without committing offsets.
*
* Note that the snapshot topic should have the same number of partitions as the input topic since state recovery
* will be performed based on a number of the assigned partition of the input topic (state for partition N of input
* topic will be restored from the Nth partition of a snapshot topic).
*
* Example usage:
* {{{
* val timerFlowOf: TimerFlowOf = ...
* val timersOf: TimersOf = ...
* val persistenceModule = KafkaPersistenceModuleOf.caching(consumerOf, producerOf, consumerConfig, producerConfig, snapshotTopic)
* val businessLogicFold: FoldOption[F, State, ConsRecord] = ... // your business logic here in this fold
* val tick: TickOption[F, State] = ... // optional additional Tick to change state, use TickOption.id if not used
* val partitionFlowConfig: PartitionFlowConfig = ... // additional configuration for partition flow
*
* val partitionFlowOf = kafkaEagerRecovery[F, State](
* kafkaPersistenceModuleOf = persistenceModuleOf,
* applicationId = "appId",
* groupId = "groupId",
* timersOf = timersOf,
* timerFlowOf = timerFlowOf,
* fold = businessLogicFold,
* partitionFlowConfig = partitionFlowConfig,
* tick = tick
* )
*
* val topicFlowOf = TopicFlowOf(partitionFlowOf)
*
* val kafkaFlow: Resource[F, F[Unit]] = KafkaFlow.resource(
* consumer = ...,
* flowOf = ConsumerFlowOf[F](
* topic = inputTopic,
* flowOf = flowOf
* )
* )
* kafkaFlow.use(_ => ...)
* }}}
*
* For a complete example of usage you can refer to the integration test `StatefulProcessingWithKafkaSpec`.
*/
def kafkaEagerRecovery[F[_]: Concurrent: Timer: Parallel: LogOf, S](
kafkaPersistenceModuleOf: KafkaPersistenceModuleOf[F, S],
applicationId: String,
Expand All @@ -29,7 +72,7 @@ package object kafkapersistence {
fold: FoldOption[F, S, ConsRecord],
tick: TickOption[F, S],
partitionFlowConfig: PartitionFlowConfig
): PartitionFlowOf[F] = {
): PartitionFlowOf[F] =
new PartitionFlowOf[F] {
override def apply(
topicPartition: TopicPartition,
Expand Down Expand Up @@ -59,7 +102,6 @@ package object kafkapersistence {
} yield partitionFlow
}
}
}

private[kafkapersistence] implicit class ConsumerConfigCompanionOps(
val self: ConsumerConfig.type
Expand Down

0 comments on commit 2f2a1c2

Please sign in to comment.