Skip to content

Commit

Permalink
wip: add RebalanceListener1WithConsumer, add RebalanceCallbackSyntaxO…
Browse files Browse the repository at this point in the history
…ps to allow fa.lift syntax where fa is F[A]
  • Loading branch information
nikitapecasa committed Apr 19, 2021
1 parent a06529e commit cab6806
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ object RebalanceCallback extends RebalanceCallbackInstances with RebalanceCallba
def apply[F[_]]: RebalanceCallback[F, A] = effectAs
}

object implicits {
implicit class RebalanceCallbackSyntaxOps[F[_], A](val self: F[A]) extends AnyVal {
def lift: RebalanceCallback[F, A] = RebalanceCallback.lift(self)
}
}

}

sealed trait RebalanceCallbackApi[F[_]] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,35 @@ trait RebalanceListener1[F[_]] {

}

/**
* Same as [[RebalanceListener1]] but with a `consumer` to allow a better type inference.
*
* {{{
* import RebalanceCallback.implicits._ // to allow writing `someF.lift` instead of `lift(someF)`
*
* def onPartitionsRevoked(partitions: Nes[TopicPartition]) = {
* groupByTopic(partitions) traverse_ {
* case (_, partitions) =>
* for {
* _ <- someF.lift
* partitionsOffsets <- partitions.toNonEmptyList traverse { partition =>
* // fails to compile with `RebalanceCallback.position` variant at
* // _ <- someF2(partitionsOffsets).lift
* // expected type RebalanceCallback[Nothing,?] but found RebalanceCallback[F,Unit]
* consumer.position(partition) map (partition -> _)
* }
* _ <- someF2(partitionsOffsets).lift
* } yield ()
* }
* }
* def someF: F[Unit] = ???
* def someF2(a: Any): F[Unit] = ???
* }}}
*/
trait RebalanceListener1WithConsumer[F[_]] extends RebalanceListener1[F] {
final def consumer: RebalanceCallbackApi[F] = RebalanceCallback.api[F]
}

object RebalanceListener1 {

def empty[F[_]]: RebalanceListener1[F] = const(RebalanceCallback.pure(()))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,76 @@
package com.evolutiongaming.skafka.consumer

import cats.data.{NonEmptySet => Nes}
import java.lang.{Long => LongJ}

import cats.Applicative
import cats.data.{NonEmptyList, NonEmptySet => Nes}
import cats.effect.IO
import com.evolutiongaming.skafka.TopicPartition
import cats.syntax.all._
import com.evolutiongaming.skafka.consumer.DataPoints._
import com.evolutiongaming.skafka.consumer.RebalanceCallback.implicits._
import com.evolutiongaming.skafka.consumer.RebalanceListener1SyntaxSpec._
import org.apache.kafka.clients.consumer.{MockConsumer, OffsetResetStrategy}
import com.evolutiongaming.skafka.{Topic, TopicPartition}
import org.apache.kafka.common
import org.scalatest.freespec.AnyFreeSpec
import org.scalatest.matchers.must.Matchers

import scala.util.Try
class RebalanceListener1SyntaxSpec extends AnyFreeSpec with Matchers {

"type inference to the max" in {
val consumer = RebalanceConsumerJ(new MockConsumer(OffsetResetStrategy.NONE))
val consumer = new ExplodingConsumer {
override def position(partition: common.TopicPartition): LongJ = 0L
}
val tfListener = new TfRebalanceListener1[IO]

RebalanceCallback
.run(tfListener.onPartitionsAssigned(partitions.s).effectAs[IO], consumer) mustBe Try(())
.run(tfListener.onPartitionsAssigned(partitions.s), consumer) mustBe Try(())

RebalanceCallback
.run(tfListener.onPartitionsRevoked(partitions.s), consumer) mustBe Try(())

RebalanceCallback
.run(tfListener.onPartitionsLost(partitions.s), consumer) mustBe Try(())
}

}

object RebalanceListener1SyntaxSpec {
// TODO: add complex example show casing better type inference with RebalanceCallback.api[F]
class TfRebalanceListener1[F[_]] extends RebalanceListener1[F] {
def onPartitionsAssigned(partitions: Nes[TopicPartition]) = RebalanceCallback.empty
def onPartitionsRevoked(partitions: Nes[TopicPartition]) = RebalanceCallback.empty
def onPartitionsLost(partitions: Nes[TopicPartition]) = RebalanceCallback.empty
class TfRebalanceListener1[F[_]: Applicative] extends RebalanceListener1WithConsumer[F] {

def someF: F[Unit] = ().pure[F]
def someF2(a: Any): F[Unit] = a.pure[F] *> ().pure[F]
def someFO: F[Option[Unit]] = ().some.pure[F]

def onPartitionsAssigned(partitions: Nes[TopicPartition]) =
for {
_ <- someF.lift
_ <- someFO.lift
} yield ()

def onPartitionsRevoked(partitions: Nes[TopicPartition]) = {
groupByTopic(partitions) traverse_ {
case (_, partitions) =>
for {
_ <- someF.lift
partitionsOffsets <- partitions.toNonEmptyList traverse { partition =>
// fails to compile with `RebalanceCallback.position` variant at
// _ <- someF2(partitionsOffsets).lift
// expected type RebalanceCallback[Nothing,?] but found RebalanceCallback[F,Unit]
consumer.position(partition) map (partition -> _)
}
_ <- someF2(partitionsOffsets).lift
} yield ()
}
}

def onPartitionsLost(partitions: Nes[TopicPartition]) = consumer.empty

def groupByTopic(
topicPartitions: Nes[TopicPartition]
): NonEmptyList[(Topic, Nes[TopicPartition])] =
topicPartitions.groupBy(_.topic).toNel

}
}

0 comments on commit cab6806

Please sign in to comment.