Skip to content

Commit

Permalink
Merge 2f2a1c2 into 30b4fa3
Browse files Browse the repository at this point in the history
  • Loading branch information
terjokhin committed Jul 26, 2021
2 parents 30b4fa3 + 2f2a1c2 commit 96b0436
Show file tree
Hide file tree
Showing 27 changed files with 1,076 additions and 373 deletions.
8 changes: 6 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ lazy val `persistence-cassandra` = (project in file("persistence-cassandra"))
weaver % IntegrationTest,
),
Defaults.itSettings,
IntegrationTest / testFrameworks += new TestFramework("weaver.framework.CatsEffect")
IntegrationTest / testFrameworks += new TestFramework("weaver.framework.CatsEffect"),
IntegrationTest / fork := true
)

lazy val `persistence-kafka` = (project in file("persistence-kafka"))
Expand All @@ -89,10 +90,13 @@ lazy val `persistence-kafka` = (project in file("persistence-kafka"))
libraryDependencies ++= Seq(
Monocle.core,
Monocle.`macro`,
kafkaLauncher % IntegrationTest,
scribe % IntegrationTest,
weaver % IntegrationTest,
),
Defaults.itSettings,
IntegrationTest / testFrameworks += new TestFramework("weaver.framework.CatsEffect")
IntegrationTest / testFrameworks += new TestFramework("weaver.framework.CatsEffect"),
IntegrationTest / fork := true
)

lazy val docs = (project in file("kafka-flow-docs"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object ConsumerFlow {

val subscribe =
flows.keySet.toList.toNel match {
case Some(topics) => consumer.subscribe(topics.toNes, RebalanceListener[F](consumer, flows))
case Some(topics) => consumer.subscribe(topics.toNes, RebalanceListener[F](flows))
case None => new IllegalArgumentException("Parameter flows cannot be empty").raiseError[F, Unit]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@ package com.evolutiongaming.kafka.flow

import cats.Parallel
import cats.data.NonEmptyList
import cats.effect.Clock
import cats.effect.concurrent.Ref
import cats.effect.{Concurrent, Resource}
import cats.effect.{Clock, Concurrent, Resource}
import cats.syntax.all._
import com.evolutiongaming.catshelper.ClockHelper._
import com.evolutiongaming.catshelper.Log
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.catshelper.{Log, LogOf}
import com.evolutiongaming.kafka.flow.kafka.OffsetToCommit
import com.evolutiongaming.kafka.flow.timer.Timestamp
import com.evolutiongaming.kafka.journal.ConsRecord
import com.evolutiongaming.scache.Cache
import com.evolutiongaming.scache.Releasable
import com.evolutiongaming.scache.{Cache, Releasable}
import com.evolutiongaming.skafka.{Offset, TopicPartition}
import kafka.OffsetToCommit

import java.time.Instant
import timer.Timestamp

trait PartitionFlow[F[_]] {

Expand Down Expand Up @@ -60,14 +58,14 @@ object PartitionFlow {
triggerTimersAt <- Resource.liftF(Ref.of(clock))
commitOffsetsAt <- Resource.liftF(Ref.of(clock))
flow <- of(
topicPartition,
keyStateOf,
committedOffset,
timestamp,
triggerTimersAt,
commitOffsetsAt,
cache,
config
topicPartition = topicPartition,
keyStateOf = keyStateOf,
committedOffset = committedOffset,
timestamp = timestamp,
triggerTimersAt = triggerTimersAt,
commitOffsetsAt = commitOffsetsAt,
cache = cache,
config = config
)
} yield flow

Expand Down Expand Up @@ -137,18 +135,20 @@ object PartitionFlow {
)
stateOf(startedAt, key) flatMap { state =>
state.timers.set(startedAt) *>
state.flow(records) *>
state.timers.set(finishedAt) *>
state.timers.onProcessed
state.flow(records) *>
state.timers.set(finishedAt) *>
state.timers.onProcessed
}
}
lastRecord = records.last
maximumOffset <- OffsetToCommit[F](lastRecord.offset)
_ <- timestamp.set(Timestamp(
clock = clock,
watermark = lastRecord.timestampAndType map (_.timestamp),
offset = maximumOffset
))
_ <- timestamp.set(
Timestamp(
clock = clock,
watermark = lastRecord.timestampAndType map (_.timestamp),
offset = maximumOffset
)
)
} yield ()

def triggerTimers = for {
Expand All @@ -158,7 +158,7 @@ object PartitionFlow {
_ <- states.values.toList.parTraverse_ { state =>
state flatMap { state =>
state.timers.set(timestamp) *>
state.timers.trigger(state.flow)
state.timers.trigger(state.flow)
}
}
_ <- triggerTimersAt update { triggerTimersAt =>
Expand Down Expand Up @@ -219,7 +219,7 @@ object PartitionFlow {
offsetToCommit flatMap { offset =>
offset traverse_ { offset =>
Log[F].info(s"committing on revoke: $offset") *>
PartitionContext[F].scheduleCommit(offset)
PartitionContext[F].scheduleCommit(offset)
}
}
} else {
Expand All @@ -230,5 +230,4 @@ object PartitionFlow {

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,4 @@ object PartitionFlowOf {
implicit val _context = context
PartitionFlow.resource(topicPartition, assignedAt, keyStateOf, config)
}

}
Original file line number Diff line number Diff line change
@@ -1,49 +1,52 @@
package com.evolutiongaming.kafka.flow

import cats.Monad
import cats.data.NonEmptySet
import cats.syntax.all._
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.flow.ConsumerFlow.log
import com.evolutiongaming.kafka.flow.kafka.Consumer
import com.evolutiongaming.skafka.consumer.{RebalanceListener => SRebalanceListener}
import com.evolutiongaming.skafka.consumer.RebalanceCallback.syntax._
import com.evolutiongaming.skafka.consumer.{
RebalanceCallback,
RebalanceListener1WithConsumer,
RebalanceListener1 => SRebalanceListener
}
import com.evolutiongaming.skafka.{Partition, Topic, TopicPartition}

object RebalanceListener {

def apply[F[_]: Monad: LogOf](
consumer: Consumer[F],
def apply[F[_]: LogOf](
flows: Map[Topic, TopicFlow[F]]
): SRebalanceListener[F] = new SRebalanceListener[F] {
): SRebalanceListener[F] = new RebalanceListener1WithConsumer[F] {

def onPartitionsAssigned(topicPartitions: NonEmptySet[TopicPartition]) =
def onPartitionsAssigned(topicPartitions: NonEmptySet[TopicPartition]): RebalanceCallback[F, Unit] = {
groupByTopic(topicPartitions) traverse_ { case (topic, flow, partitions) =>
for {
log <- log[F]
_ <- log.prefixed(topic).info(s"$partitions assigned")
log <- log.lift
_ <- log.prefixed(topic).info(s"$partitions assigned").lift
partitions <- partitions.toNonEmptyList traverse { partition =>
consumer.position(TopicPartition(topic, partition)) map (partition -> _)
}
_ <- log.prefixed(topic).info(s"committed offsets: $partitions")
_ <- flow.add(partitions.toNes)
_ <- log.prefixed(topic).info(s"committed offsets: $partitions").lift
_ <- flow.add(partitions.toNes).lift
} yield ()
}
}

def onPartitionsRevoked(topicPartitions: NonEmptySet[TopicPartition]) =
def onPartitionsRevoked(topicPartitions: NonEmptySet[TopicPartition]): RebalanceCallback[F, Unit] =
groupByTopic(topicPartitions) traverse_ { case (topic, flow, partitions) =>
for {
log <- log[F]
_ <- log.prefixed(topic).info(s"$partitions revoked, removing from topic flow")
_ <- flow.remove(partitions)
log <- log.lift
_ <- log.prefixed(topic).info(s"$partitions revoked, removing from topic flow").lift
_ <- flow.remove(partitions).lift
} yield ()
}

def onPartitionsLost(topicPartitions: NonEmptySet[TopicPartition]) =
def onPartitionsLost(topicPartitions: NonEmptySet[TopicPartition]): RebalanceCallback[F, Unit] =
groupByTopic(topicPartitions) traverse_ { case (topic, flow, partitions) =>
for {
log <- log[F]
_ <- log.prefixed(topic).info(s"$partitions lost, removing from topic flow")
_ <- flow.remove(partitions)
log <- log[F].lift
_ <- log.prefixed(topic).info(s"$partitions lost, removing from topic flow").lift
_ <- flow.remove(partitions).lift
} yield ()
}

Expand Down
96 changes: 72 additions & 24 deletions core/src/main/scala/com/evolutiongaming/kafka/flow/TopicFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package com.evolutiongaming.kafka.flow

import cats.Parallel
import cats.data.NonEmptySet
import cats.effect.concurrent.Ref
import cats.effect.concurrent.{Ref, Semaphore}
import cats.effect.{Concurrent, Resource}
import cats.effect.syntax.all._
import cats.syntax.all._
import com.evolutiongaming.catshelper.DataHelper._
import com.evolutiongaming.catshelper.Log
Expand All @@ -14,6 +15,7 @@ import com.evolutiongaming.scache.Cache
import com.evolutiongaming.scache.Releasable
import com.evolutiongaming.skafka.{Offset, OffsetAndMetadata, Partition, Topic, TopicPartition}
import kafka.Consumer

import scala.collection.immutable.SortedSet

trait TopicFlow[F[_]] {
Expand Down Expand Up @@ -45,8 +47,9 @@ object TopicFlow {
): Resource[F, TopicFlow[F]] = for {
cache <- Cache.loading[F, Partition, PartitionFlow[F]]
pendingCommits <- Resource.liftF(Ref.of(Map.empty[TopicPartition, OffsetAndMetadata]))
semaphore <- Resource.liftF(Semaphore(1))
flow <- LogResource[F](getClass, topic) flatMap { implicit log =>
of(consumer, topic, partitionFlowOf, cache, pendingCommits)
of(consumer, topic, partitionFlowOf, cache, pendingCommits, semaphore)
}
} yield flow

Expand All @@ -55,39 +58,82 @@ object TopicFlow {
topic: Topic,
partitionFlowOf: PartitionFlowOf[F],
cache: Cache[F, Partition, PartitionFlow[F]],
pendingCommits: Ref[F, Map[TopicPartition, OffsetAndMetadata]]
pendingCommits: Ref[F, Map[TopicPartition, OffsetAndMetadata]],
semaphore: Semaphore[F]
): Resource[F, TopicFlow[F]] = {

val commitPending = pendingCommits getAndSet Map.empty flatMap { offsets =>

def partitionOffsets = offsets map { case (topicPartition, offsetAndMetadata) =>
PartitionOffset(topicPartition.partition, offsetAndMetadata.offset)
} mkString (", ")

offsets.toNem.traverse_ { offsets =>
Log[F].info(s"commiting pending offsets: $offsets") *>
consumer
.commit(offsets)
.handleErrorWith { error =>
Log[F].error(s"consumer.commit failed for $partitionOffsets: $error", error)
}
consumer
.commit(offsets)
.handleErrorWith { error =>
Log[F].error(s"consumer.commit failed for $partitionOffsets: $error", error)
}
}
}

val acquire = new TopicFlow[F] {

def apply(records: ConsRecords) = for {
partitons <- cache.values
_ <- partitons.toList parTraverse { case (partition, flow) =>
def apply(records: ConsRecords) = {
// Following TODOs are related to graceful shutdown which is also needed in StatefulProcessingWithKafkaSpec
// but it cannot be implemented properly at the moment as skafka does not allow to commit offsets on partition revocation
// so for now it's just semaphore + uncancelable, after fix in skafka it would be greatly simplified and improved

// TODO `consumer` can be closed at this point coz Resource.release happened already
// and TopicFlow.apply is running concurrently
// can be fixed outside of TopicFlow's scope with code like:
/*
implicit class Ops[A](val self: F[A]) extends AnyVal {
def startAwaitExit: F[Fiber[F, A]] = {
for {
deferred <- Deferred[F, Unit]
fiber <- self.guarantee { deferred.complete(()).handleError { _ => () } }.start
} yield {
new Fiber[F, A] {
def cancel = {
for {
_ <- fiber.cancel
_ <- deferred.get // TODO possible timeout
} yield {}
}
def join = fiber.join
}
}
}
def backgroundAwaitExit: Resource[F, Unit] = {
Resource
.make { self.startAwaitExit } { _.cancel }
.as(())
}
}
*/

// TODO semaphore.withPermit should be part of KafkaFlow code where we glue together building blocks and
// starting a fiber in background which is doing consumer.poll and eventually calling this TopicFlow.apply
// TopicFlow.apply can be executed concurrently with TopicFlow's resource release
// KafkaFlow related code snippet:
// stream = Stream.around(Retry[F].toFunctionK) *> flow.stream
// records <- stream.drain.background
semaphore.withPermit {
for {
flow <- flow
topicPartition = TopicPartition(topic, partition)
partitionRecords = records.values get topicPartition map (_.toList) getOrElse Nil
_ <- flow(partitionRecords)
partitons <- cache.values
_ <- partitons.toList parTraverse { case (partition, flow) =>
for {
flow <- flow
topicPartition = TopicPartition(topic, partition)
partitionRecords = records.values get topicPartition map (_.toList) getOrElse Nil
_ <- flow(partitionRecords)
} yield ()
}
_ <- commitPending
} yield ()
}
_ <- commitPending
} yield ()
}.uncancelable
}

def remove(partitions: NonEmptySet[Partition]) = {
val removePartitions = partitions parTraverse_ (cache.remove(_).flatten)
Expand Down Expand Up @@ -117,12 +163,14 @@ object TopicFlow {
}

Resource.make(acquire.pure[F]) { _ =>
cache.keys flatMap { keys =>
val partitions = NonEmptySet.fromSet(SortedSet.empty[Partition] ++ keys.toList)
val removeAll = partitions parTraverse_ { partitions =>
partitions parTraverse_ (cache.remove(_).flatten)
semaphore.withPermit {
cache.keys flatMap { keys =>
val partitions = NonEmptySet.fromSet(SortedSet.empty[Partition] ++ keys.toList)
val removeAll = partitions parTraverse_ { partitions =>
partitions parTraverse_ (cache.remove(_).flatten)
}
removeAll *> commitPending
}
removeAll *> commitPending
}
}

Expand Down

0 comments on commit 96b0436

Please sign in to comment.