Skip to content

Commit

Permalink
Merge f0f161e into 9bafeb0
Browse files Browse the repository at this point in the history
  • Loading branch information
Z1kkurat committed Jan 26, 2022
2 parents 9bafeb0 + f0f161e commit 0b23fb1
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import cats.effect.implicits._
import cats.effect.std.Semaphore
import cats.implicits._
import cats.{Applicative, Monad, MonadError, ~>}
import com.evolutiongaming.catshelper.Blocking.implicits._
import com.evolutiongaming.catshelper.CatsHelper._
import com.evolutiongaming.catshelper._
import com.evolutiongaming.skafka.Converters._
Expand Down Expand Up @@ -238,20 +237,32 @@ object Consumer {

private sealed abstract class Main

@deprecated("Use of(ConsumerConfig)", since = "12.0.1")
def of[F[_]: ToTry: ToFuture: Async, K, V](
config: ConsumerConfig,
executorBlocking: ExecutionContext
)(implicit fromBytesK: FromBytes[F, K], fromBytesV: FromBytes[F, V]): Resource[F, Consumer[F, K, V]] = {
implicit val blocking = Blocking.fromExecutionContext(executorBlocking)
val consumer = CreateConsumerJ(config, fromBytesK, fromBytesV)
fromConsumerJ(consumer)
of(config)
}

def of[F[_]: ToTry: ToFuture: Async, K, V](
config: ConsumerConfig
)(implicit fromBytesK: FromBytes[F, K], fromBytesV: FromBytes[F, V]): Resource[F, Consumer[F, K, V]] = {
fromConsumerJ1(CreateConsumerJ(config, fromBytesK, fromBytesV))
}

@deprecated("Use fromConsumerJ1", since = "12.0.1")
def fromConsumerJ[F[_]: ToFuture: ToTry: Blocking: Async, K, V](
consumer: F[ConsumerJ[K, V]]
): Resource[F, Consumer[F, K, V]] = {
fromConsumerJ1(consumer)
}

def fromConsumerJ1[F[_]: ToFuture: ToTry: Async, K, V](
consumer: F[ConsumerJ[K, V]]
): Resource[F, Consumer[F, K, V]] = {

def blocking[A](f: => A) = Sync[F].delay { f }.blocking
def blocking[A](f: => A) = Sync[F].blocking { f }

trait Around {
def apply[A](f: => A): F[A]
Expand All @@ -260,7 +271,7 @@ object Consumer {
for {
serialListeners <- SerialListeners.of[F].toResource
semaphore <- Semaphore(1).toResource
consumer <- consumer.blocking.toResource
consumer <- consumer.toResource
around = new Around {
def apply[A](f: => A) = {
semaphore
Expand All @@ -282,7 +293,7 @@ object Consumer {

def commitLater1(f: OffsetCommitCallback => Unit) = {
Async[F]
.async_[MapJ[TopicPartitionJ, OffsetAndMetadataJ]] { callback =>
.async[MapJ[TopicPartitionJ, OffsetAndMetadataJ]] { callback =>
val offsetCommitCallback = new OffsetCommitCallback {

def onComplete(offsets: MapJ[TopicPartitionJ, OffsetAndMetadataJ], exception: Exception) = {
Expand All @@ -296,9 +307,8 @@ object Consumer {
}
}
}
f(offsetCommitCallback)
Sync[F].blocking(f(offsetCommitCallback)).as(None)
}
.blocking
}

def listenerOf(listener: Option[RebalanceListener[F]]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ trait ConsumerOf[F[_]] {

object ConsumerOf {

@deprecated("Use apply1", since = "12.0.1")
def apply[F[_]: Async: ToTry: ToFuture: MeasureDuration](
executorBlocking: ExecutionContext,
metrics: Option[ConsumerMetrics[F]] = None
): ConsumerOf[F] = apply1(metrics)

def apply1[F[_]: Async: ToTry: ToFuture: MeasureDuration](
metrics: Option[ConsumerMetrics[F]] = None
): ConsumerOf[F] = new ConsumerOf[F] {

def apply[K, V](config: ConsumerConfig)(implicit fromBytesK: FromBytes[F, K], fromBytesV: FromBytes[F, V]) = {
for {
consumer <- Consumer.of[F, K, V](config, executorBlocking)
consumer <- Consumer.of[F, K, V](config)
} yield {
metrics.fold(consumer)(consumer.withMetrics[Throwable])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ object CreateConsumerJ {
): F[ConsumerJ[K, V]] = {
val deserializerK = fromBytesV.asJava
val deserializerV = fromBytesK.asJava
Sync[F].delay { new KafkaConsumer(config.properties, deserializerV, deserializerK) }
Sync[F].blocking { new KafkaConsumer(config.properties, deserializerV, deserializerK) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ object CreateProducerJ {
def apply[F[_]: Sync](config: ProducerConfig): F[ProducerJ[Bytes, Bytes]] = {
val properties = config.properties
val serializer = new ByteArraySerializer()
Sync[F].delay { new KafkaProducer[Bytes, Bytes](properties, serializer, serializer) }
Sync[F].blocking { new KafkaProducer[Bytes, Bytes](properties, serializer, serializer) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import cats.effect.{Resource, Sync, Async, Deferred}
import cats.effect.implicits._
import cats.implicits._
import cats.{Applicative, Functor, MonadError, ~>}
import com.evolutiongaming.catshelper.Blocking.implicits._
import com.evolutiongaming.catshelper.CatsHelper._
import com.evolutiongaming.catshelper.{Blocking, Log, MonadThrowable, ToTry}
import com.evolutiongaming.skafka.Converters._
Expand Down Expand Up @@ -91,20 +90,31 @@ object Producer {
}
}

@deprecated("Use of(ProducerConfig)", since = "12.0.1")
def of[F[_]: ToTry: Async](
config: ProducerConfig,
executorBlocking: ExecutionContext
): Resource[F, Producer[F]] = {
implicit val blocking = Blocking.fromExecutionContext(executorBlocking)
val producer = CreateProducerJ(config)
fromProducerJ1(producer)
of(config)
}

def of[F[_]: ToTry: Async](
config: ProducerConfig
): Resource[F, Producer[F]] = {
val producer = CreateProducerJ(config)
fromProducerJ2(producer)
}

private sealed abstract class Main

@deprecated("Use fromProducerJ2", since = "12.0.1")
def fromProducerJ1[F[_]: Blocking: ToTry: Async](producer: F[ProducerJ[Bytes, Bytes]]): Resource[F, Producer[F]] = {
fromProducerJ2(producer)
}

def fromProducerJ2[F[_]: ToTry: Async](producer: F[ProducerJ[Bytes, Bytes]]): Resource[F, Producer[F]] = {

def blocking[A](f: => A) = Sync[F].delay(f).blocking
def blocking[A](f: => A) = Sync[F].blocking(f)

def apply(producer: ProducerJ[Bytes, Bytes]) = {
new Main with Producer[F] {
Expand Down Expand Up @@ -197,7 +207,7 @@ object Producer {
}

for {
producerJ <- producer.blocking.toResource
producerJ <- producer.toResource
producer = apply(producerJ)
close = blocking { producerJ.close() }
flush = producer.flush.attempt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@ trait ProducerOf[F[_]] {

object ProducerOf {

@deprecated("Use apply1", since = "12.0.1")
def apply[F[_]: MeasureDuration: ToTry: Async](
executorBlocking: ExecutionContext,
metrics: Option[ProducerMetrics[F]] = None
): ProducerOf[F] = apply1(metrics = metrics)

def apply1[F[_]: MeasureDuration: ToTry: Async](
metrics: Option[ProducerMetrics[F]] = None
): ProducerOf[F] = new ProducerOf[F] {

def apply(config: ProducerConfig) = {
for {
producer <- Producer.of[F](config, executorBlocking)
producer <- Producer.of[F](config = config)
} yield {
metrics.fold(producer)(producer.withMetrics[Throwable])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ class ConsumerSpec extends AnyWordSpec with Matchers {

val consumer: Consumer[IO, Bytes, Bytes] = {
Consumer
.fromConsumerJ(consumerJ.pure[IO])
.fromConsumerJ1(consumerJ.pure[IO])
.allocated
.toTry
.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.{Map => MapJ}
import cats.effect.{Async, Concurrent, Deferred, IO, Sync}
import cats.implicits._
import cats.effect.implicits._
import com.evolutiongaming.catshelper.{Blocking, FromTry, ToFuture, ToTry}
import com.evolutiongaming.catshelper.{FromTry, ToFuture, ToTry}
import com.evolutiongaming.skafka.producer.ProducerConverters._
import com.evolutiongaming.skafka.{Bytes, Partition, TopicPartition}
import org.apache.kafka.clients.consumer.{ConsumerGroupMetadata, OffsetAndMetadata => OffsetAndMetadataJ}
Expand All @@ -30,7 +30,7 @@ class ProducerSendSpec extends AsyncFunSuite with Matchers {
}

private def blockAndSend[
F[_]: ToTry: FromTry: ToFuture: Blocking: Async
F[_]: ToTry: FromTry: ToFuture: Async
] = {

val topic = "topic"
Expand Down Expand Up @@ -92,7 +92,7 @@ class ProducerSendSpec extends AsyncFunSuite with Matchers {
def abortTransaction() = {}
}

Producer.fromProducerJ1(producer.pure[F])
Producer.fromProducerJ2(producer.pure[F])
}

def start[A](fa: F[A]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class ProducerSpec extends AnyWordSpec with Matchers {
val producer: Producer[IO] = {
implicit val measureDuration = MeasureDuration.empty[IO]
Producer
.fromProducerJ1[IO](jProducer.pure[IO])
.fromProducerJ2[IO](jProducer.pure[IO])
.allocated
.toTry
.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match
val config = ProducerConfig.Default.copy(acks = acks)
for {
metrics <- ProducerMetrics.of(CollectorRegistry.empty[IO])
producerOf = ProducerOf(executor, metrics("clientId").some).mapK(FunctionK.id, FunctionK.id)
producerOf = ProducerOf.apply1(metrics("clientId").some).mapK(FunctionK.id, FunctionK.id)
producer <- producerOf(config)
} yield {
producer.withLogging(Log.empty)
Expand Down

0 comments on commit 0b23fb1

Please sign in to comment.