Skip to content

Commit

Permalink
Merge ae6873a into efe8cc8
Browse files Browse the repository at this point in the history
  • Loading branch information
dfakhritdinov committed Feb 17, 2021
2 parents efe8cc8 + ae6873a commit 19c3d1c
Show file tree
Hide file tree
Showing 21 changed files with 335 additions and 223 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ lazy val core = (project in file("core"))
KafkaJournal.journal,
KafkaJournal.persistence,
MeowMtl.effects,
Monocle.`macro` % Test,
Monocle.core % Test,
Monocle.`macro`,
Monocle.core,
catsHelper,
kafkaLauncher % IntegrationTest,
munit,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package com.evolutiongaming.kafka.flow

import cats.effect.Blocker
import cats.effect.IO
import cats.effect.Resource
import cats.syntax.option._
import cats.effect.{Blocker, IO, Resource}
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.StartKafka
import com.evolutiongaming.kafka.flow.kafka.KafkaModule
import com.evolutiongaming.skafka.consumer.ConsumerConfig
import com.evolutiongaming.kafka.journal.KafkaConfig
import com.evolutiongaming.smetrics.CollectorRegistry
import monocle.macros.syntax.lens._
import scribe.{Level, Logger}
import weaver._

import scala.concurrent.ExecutionContext
import scala.util.Try
import scribe.Level
import scribe.Logger
import weaver._

object SharedResources extends GlobalResourcesInit {

Expand All @@ -29,7 +29,8 @@ object SharedResources extends GlobalResourcesInit {
implicit val timer = IO.timer(executor)

// we use default config here, because we will launch Kafka locally
val config = ConsumerConfig()
val config = KafkaConfig.default
.lens(_.producer.common.clientId).set("UnsubscribeSpec-producer".some)

val start = IO {
// set root logging to WARN level to avoid spamming the logs
Expand All @@ -45,7 +46,7 @@ object SharedResources extends GlobalResourcesInit {
_ <- Resource.make(start) { shutdown => IO(shutdown()) }
blocker <- Blocker[IO]
kafka <- Resource.liftF(LogOf.slf4j[IO]) flatMap { implicit logOf =>
KafkaModule.of[IO]("SharedResources", config, CollectorRegistry.empty, blocker)
KafkaModule.of[IO]("SharedResources", "SharedResources-groupId", config, CollectorRegistry.empty, blocker)
}
_ <- store.putR(kafka)
} yield ()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,27 @@
package com.evolutiongaming.kafka.flow

import ShutdownSpec._
import cats.data.NonEmptySet
import cats.effect.IO
import cats.effect.Resource
import cats.effect.concurrent.Deferred
import cats.effect.concurrent.Ref
import cats.effect.{IO, Resource}
import cats.effect.concurrent.{Deferred, Ref}
import cats.syntax.all._
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.flow.ShutdownSpec._
import com.evolutiongaming.kafka.journal.ConsRecords
import com.evolutiongaming.retry.Retry
import com.evolutiongaming.skafka.CommonConfig
import com.evolutiongaming.skafka.Offset
import com.evolutiongaming.skafka.Partition
import com.evolutiongaming.skafka.producer.ProducerConfig
import com.evolutiongaming.skafka.{Offset, Partition}
import com.evolutiongaming.skafka.producer.ProducerRecord
import com.evolutiongaming.sstream.Stream
import weaver.GlobalResources

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import weaver.GlobalResources

class ShutdownSpec(val globalResources: GlobalResources) extends KafkaSpec {

test("call and complete onPartitionsRevoked after shutdown started") { kafka =>
implicit val retry = Retry.empty[IO]

val producerConfig = ProducerConfig(
common = CommonConfig(
clientId = Some("UnsubscribeSpec-producer")
)
)

def send = kafka.producerOf(producerConfig) use { producer =>
def send = kafka.producerOf use { producer =>
val record = ProducerRecord[String, String]("UnsubscribeSpec-topic")
producer.send(record).flatten
}
Expand All @@ -45,7 +35,7 @@ class ShutdownSpec(val globalResources: GlobalResources) extends KafkaSpec {
_ <- Stream.lift(send)
// wait for record to be processed
_ <- KafkaFlow.stream(
consumer = kafka.consumerOf("UnsubscribeSpec-groupId"),
consumer = kafka.consumerOf,
flowOf = ConsumerFlowOf[IO](
topic = "UnsubscribeSpec-topic",
flowOf = flowOf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import cats.data.NonEmptySet
import cats.effect.Resource
import cats.syntax.all._
import com.evolutiongaming.catshelper.{Log, LogOf}
import com.evolutiongaming.kafka.flow.kafka.Consumer
import com.evolutiongaming.kafka.journal.ConsRecords
import com.evolutiongaming.skafka.Topic
import com.evolutiongaming.skafka.consumer.ConsumerRecords
import com.evolutiongaming.skafka.consumer.{Consumer, ConsumerRecords}
import com.evolutiongaming.sstream.Stream
import scodec.bits.ByteVector

/** Represents evertything stateful happening on one `Consumer` */
trait ConsumerFlow[F[_]] {
Expand All @@ -33,7 +33,7 @@ object ConsumerFlow {
* journal in the format of `Kafka Journal` library.
*/
def of[F[_]: MonadThrow: LogOf](
consumer: Consumer[F],
consumer: Consumer[F, String, ByteVector],
topic: Topic,
flowOf: TopicFlowOf[F],
config: ConsumerFlowConfig
Expand All @@ -50,7 +50,7 @@ object ConsumerFlow {
* journal in the format of `Kafka Journal` library.
*/
def of[F[_]: MonadThrow: LogOf](
consumer: Consumer[F],
consumer: Consumer[F, String, ByteVector],
topics: NonEmptySet[Topic],
flowOf: TopicFlowOf[F],
config: ConsumerFlowConfig
Expand All @@ -70,14 +70,14 @@ object ConsumerFlow {
* journal in the format of `Kafka Journal` library.
*/
def apply[F[_]: MonadThrow: LogOf](
consumer: Consumer[F],
consumer: Consumer[F, String, ByteVector],
flows: Map[Topic, TopicFlow[F]],
config: ConsumerFlowConfig
): ConsumerFlow[F] = new ConsumerFlow[F] {

val subscribe =
flows.keySet.toList.toNel match {
case Some(topics) => consumer.subscribe(topics.toNes, RebalanceListener[F](consumer, flows))
case Some(topics) => consumer.subscribe(topics.toNes, RebalanceListener[F](consumer, flows).some)
case None => new IllegalArgumentException("Parameter flows cannot be empty").raiseError[F, Unit]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import cats.effect.Resource
import com.evolutiongaming.catshelper.BracketThrowable
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.skafka.Topic
import kafka.Consumer
import com.evolutiongaming.skafka.consumer.Consumer
import scodec.bits.ByteVector

/** Factory which creates `ConsumerFlow` instances */
trait ConsumerFlowOf[F[_]] {

def apply(consumer: Consumer[F]): Resource[F, ConsumerFlow[F]]
def apply(consumer: Consumer[F, String, ByteVector]): Resource[F, ConsumerFlow[F]]

}
object ConsumerFlowOf {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import com.evolutiongaming.random.Random
import com.evolutiongaming.retry.OnError
import com.evolutiongaming.retry.Retry
import com.evolutiongaming.retry.Strategy
import com.evolutiongaming.skafka.consumer.Consumer
import com.evolutiongaming.sstream.Stream
import kafka.Consumer
import scodec.bits.ByteVector

import scala.concurrent.duration._

object KafkaFlow {
Expand All @@ -30,7 +32,7 @@ object KafkaFlow {
* potential errors may be lost.
*/
def retryOnError[F[_]: Concurrent: Timer: LogOf](
consumer: Resource[F, Consumer[F]],
consumer: Resource[F, Consumer[F, String, ByteVector]],
flowOf: ConsumerFlowOf[F],
): Resource[F, F[Unit]] = {

Expand Down Expand Up @@ -63,7 +65,7 @@ object KafkaFlow {
* second time.
*/
def stream[F[_]: BracketThrowable: Retry](
consumer: Resource[F, Consumer[F]],
consumer: Resource[F, Consumer[F, String, ByteVector]],
flowOf: ConsumerFlowOf[F],
): Stream[F, ConsRecords] =
for {
Expand All @@ -81,7 +83,7 @@ object KafkaFlow {
* potential errors may be lost.
*/
def resource[F[_]: Concurrent: Retry](
consumer: Resource[F, Consumer[F]],
consumer: Resource[F, Consumer[F, String, ByteVector]],
flowOf: ConsumerFlowOf[F],
): Resource[F, F[Unit]] =
stream(consumer, flowOf).drain.background
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import cats.data.NonEmptySet
import cats.syntax.all._
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.flow.ConsumerFlow.log
import com.evolutiongaming.kafka.flow.kafka.Consumer
import com.evolutiongaming.skafka.consumer.{RebalanceListener => SRebalanceListener}
import com.evolutiongaming.skafka.consumer.{Consumer, RebalanceListener => SRebalanceListener}
import com.evolutiongaming.skafka.{Partition, Topic, TopicPartition}
import scodec.bits.ByteVector

object RebalanceListener {

def apply[F[_]: Monad: LogOf](
consumer: Consumer[F],
consumer: Consumer[F, String, ByteVector],
flows: Map[Topic, TopicFlow[F]]
): SRebalanceListener[F] = new SRebalanceListener[F] {

Expand Down
19 changes: 9 additions & 10 deletions core/src/main/scala/com/evolutiongaming/kafka/flow/TopicFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ import cats.effect.concurrent.Ref
import cats.effect.{Concurrent, Resource}
import cats.syntax.all._
import com.evolutiongaming.catshelper.DataHelper._
import com.evolutiongaming.catshelper.Log
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.journal.ConsRecords
import com.evolutiongaming.kafka.journal.PartitionOffset
import com.evolutiongaming.scache.Cache
import com.evolutiongaming.scache.Releasable
import com.evolutiongaming.skafka.{Offset, OffsetAndMetadata, Partition, Topic, TopicPartition}
import kafka.Consumer
import com.evolutiongaming.catshelper.{Log, LogOf}
import com.evolutiongaming.kafka.journal.{ConsRecords, PartitionOffset}
import com.evolutiongaming.scache.{Cache, Releasable}
import com.evolutiongaming.skafka.consumer.Consumer
import com.evolutiongaming.skafka._
import scodec.bits.ByteVector

import scala.collection.immutable.SortedSet

trait TopicFlow[F[_]] {
Expand All @@ -39,7 +38,7 @@ trait TopicFlow[F[_]] {
object TopicFlow {

def of[F[_]: Concurrent: Parallel: LogOf](
consumer: Consumer[F],
consumer: Consumer[F, String, ByteVector],
topic: Topic,
partitionFlowOf: PartitionFlowOf[F]
): Resource[F, TopicFlow[F]] = for {
Expand All @@ -51,7 +50,7 @@ object TopicFlow {
} yield flow

private def of[F[_]: Concurrent: Parallel: Log](
consumer: Consumer[F],
consumer: Consumer[F, String, ByteVector],
topic: Topic,
partitionFlowOf: PartitionFlowOf[F],
cache: Cache[F, Partition, PartitionFlow[F]],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.evolutiongaming.kafka.flow

import cats.Parallel
import cats.effect.Concurrent
import cats.effect.Resource
import cats.effect.{Concurrent, Resource}
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.skafka.Topic
import kafka.Consumer
import com.evolutiongaming.skafka.consumer.Consumer
import scodec.bits.ByteVector

trait TopicFlowOf[F[_]] {

def apply(consumer: Consumer[F], topic: Topic): Resource[F, TopicFlow[F]]
def apply(consumer: Consumer[F, String, ByteVector], topic: Topic): Resource[F, TopicFlow[F]]

}
object TopicFlowOf {
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 19c3d1c

Please sign in to comment.