diff --git a/src/main/scala/kafka4m/consumer/KafkaConsumerFeed.scala b/src/main/scala/kafka4m/consumer/KafkaConsumerFeed.scala deleted file mode 100644 index dade274..0000000 --- a/src/main/scala/kafka4m/consumer/KafkaConsumerFeed.scala +++ /dev/null @@ -1,90 +0,0 @@ -package kafka4m.consumer - -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue} - -import args4c.implicits._ -import com.typesafe.config.Config -import com.typesafe.scalalogging.StrictLogging -import monix.execution.Scheduler -import monix.reactive.{Observable, Pipe} -import org.apache.kafka.clients.consumer.ConsumerRecord - -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} -import scala.util.control.NonFatal - -object KafkaConsumerFeed { - def apply(config: Config)(implicit sched: Scheduler): KafkaConsumerFeed[String, Array[Byte]] = { - val consumerAccess: KafkaAccess[RichKafkaConsumer[String, Array[Byte]]] = KafkaAccess(RichKafkaConsumer.byteArrayValues(config)) - val topic = config.getString("kafka4m.consumer.topic") - val feedTimeout = config.asFiniteDuration("kafka4m.consumer.feedTimeout") - val commandQueueSize = config.getInt("kafka4m.consumer.commandQueueSize") - val queue = new ArrayBlockingQueue[ConsumerRecord[String, Array[Byte]]](commandQueueSize) - apply(topic, queue, feedTimeout, consumerAccess) - } - - def apply[K, V](topic: String, consumerAccess: KafkaAccess[RichKafkaConsumer[K, V]])(implicit sched: Scheduler): KafkaConsumerFeed[K, V] = { - apply(topic, new ArrayBlockingQueue[ConsumerRecord[K, V]](1000), 1.second, consumerAccess) - } - - def apply[K, V](topic: String, data: BlockingQueue[ConsumerRecord[K, V]], feedTimeout: FiniteDuration, consumerAccess: KafkaAccess[RichKafkaConsumer[K, V]])( - implicit sched: Scheduler) = { - - val feed = new KafkaConsumerFeed[K, V](topic, data, feedTimeout, consumerAccess) - sched.execute(feed) - feed - } -} - -case class KafkaConsumerFeed[K, V] private (topic: String, - data: BlockingQueue[ConsumerRecord[K, V]], - feedTimeout: FiniteDuration, - consumerAccess: KafkaAccess[RichKafkaConsumer[K, V]])(implicit sched: Scheduler) - extends Runnable - with AutoCloseable - with StrictLogging { - private val cancelled = new AtomicBoolean(false) - - def unicast: Observable[ConsumerRecord[K, V]] = { - val (input, output) = Pipe.publish[ConsumerRecord[K, V]].unicast - sched.execute(() => { - while (!cancelled.get()) { - val next = data.take() - Await.result(input.onNext(next), feedTimeout) - } - }) - output - } - - def run(): Unit = { - try { - Await.result(consumerAccess(_.subscribe(topic, RebalanceListener)), feedTimeout * 10) - } catch { - case NonFatal(e) => - logger.error(s"ERROR DURING SUBSCRIPTION: $e") - throw e - } - loop() - logger.error("KafkaConsumerFeed complete!") - } - - private def loop(): Unit = { - while (!cancelled.get()) { - try { - logger.trace("polling kafka") - val future: Future[Iterable[ConsumerRecord[K, V]]] = consumerAccess(_.poll()) - val d8a = Await.result(future, feedTimeout) - // blocking put ... wait if the queue is full - d8a.foreach(data.put) - } catch { - case NonFatal(e) => - logger.error("kafka feed loop error", e) - } - } - } - - override def close(): Unit = { - consumerAccess.close() - } -} diff --git a/src/test/scala/kafka4m/BaseKafka4mDockerSpec.scala b/src/test/scala/kafka4m/BaseKafka4mDockerSpec.scala index 5b6aa1e..e2d3e21 100644 --- a/src/test/scala/kafka4m/BaseKafka4mDockerSpec.scala +++ b/src/test/scala/kafka4m/BaseKafka4mDockerSpec.scala @@ -9,6 +9,6 @@ import scala.concurrent.duration._ abstract class BaseKafka4mDockerSpec extends BaseKafkaSpec with ScalaFutures with BeforeAndAfterAll with GivenWhenThen with StrictLogging { - override def testTimeout: FiniteDuration = 5.seconds + override def testTimeout: FiniteDuration = 15.seconds } diff --git a/src/test/scala/kafka4m/consumer/KafkaConsumerFeedTest.scala b/src/test/scala/kafka4m/consumer/KafkaConsumerFeedTest.scala deleted file mode 100644 index b7aaeaa..0000000 --- a/src/test/scala/kafka4m/consumer/KafkaConsumerFeedTest.scala +++ /dev/null @@ -1,44 +0,0 @@ -package kafka4m.consumer - -import com.typesafe.config.{Config, ConfigFactory} -import kafka4m.BaseKafka4mDockerSpec -import kafka4m.producer.RichKafkaProducer -import kafka4m.util.{Schedulers, Using} -import org.apache.kafka.clients.producer.RecordMetadata - -class KafkaConsumerFeedTest extends BaseKafka4mDockerSpec { - - "KafkaConsumerFeed.unicast" should { - "produce a stream of events" in { - Schedulers.using { implicit sched => - val topic = s"${getClass.getSimpleName}${System.currentTimeMillis}".filter(_.isLetterOrDigit) - val config = KafkaConsumerFeedTest.testConfig(topic) - Using(RichKafkaProducer.byteArrayValues(config)) { producer => - val first: RecordMetadata = producer.sendAsync(topic, "foo", "first record".getBytes).futureValue - first.offset() shouldBe 0L - - val second = producer.sendAsync(topic, "bar", "second record".getBytes).futureValue - second.offset() shouldBe 1L - - Using(KafkaConsumerFeed(config)) { consumerFeed: KafkaConsumerFeed[String, Array[Byte]] => - val received = consumerFeed.unicast.take(2).toListL.runToFuture.futureValue - received.head.offset() shouldBe 0 - received.tail.head.offset() shouldBe 1 - } - } - } - } - } -} - -object KafkaConsumerFeedTest { - def testConfig(topic: String): Config = ConfigFactory.parseString(s"""kafka4m.admin.topic : $topic - | - |kafka4m.consumer.topic : $topic - |kafka4m.consumer.group.id : "test"$topic - |kafka4m.consumer.application.id : "test"$topic - |kafka4m.consumer.auto.offset.reset : earliest - | - |kafka4m.producer.topic : $topic - |""".stripMargin).withFallback(ConfigFactory.load()) -} diff --git a/src/test/scala/kafka4m/consumer/RichKafkaConsumerTest.scala b/src/test/scala/kafka4m/consumer/RichKafkaConsumerTest.scala index 1ef1725..65426ae 100644 --- a/src/test/scala/kafka4m/consumer/RichKafkaConsumerTest.scala +++ b/src/test/scala/kafka4m/consumer/RichKafkaConsumerTest.scala @@ -2,6 +2,7 @@ package kafka4m.consumer import java.util.concurrent.atomic.AtomicLong +import com.typesafe.config.{Config, ConfigFactory} import kafka4m.BaseKafka4mDockerSpec import kafka4m.admin.RichKafkaAdmin import kafka4m.producer.RichKafkaProducer @@ -10,6 +11,7 @@ import kafka4m.util.Schedulers import scala.concurrent.Future class RichKafkaConsumerTest extends BaseKafka4mDockerSpec { + import RichKafkaConsumerTest.testConfig private val id = new AtomicLong(System.currentTimeMillis) @@ -21,7 +23,7 @@ class RichKafkaConsumerTest extends BaseKafka4mDockerSpec { "report current assignments and partitions" in { Schedulers.using { implicit sched => val topic = nextTopic() - val config = KafkaConsumerFeedTest.testConfig(topic) + val config = testConfig(topic) val consumer: RichKafkaConsumer[String, Array[Byte]] = RichKafkaConsumer.byteArrayValues(config) RichKafkaAdmin(config).createTopicSync(topic, testTimeout) @@ -36,7 +38,7 @@ class RichKafkaConsumerTest extends BaseKafka4mDockerSpec { "return the assignmentPartitions" in { Schedulers.using { implicit sched => val topic = nextTopic() - val config = KafkaConsumerFeedTest.testConfig(topic) + val config = testConfig(topic) val consumer: RichKafkaConsumer[String, Array[Byte]] = RichKafkaConsumer.byteArrayValues(config) RichKafkaAdmin(config).createTopicSync(topic, testTimeout) @@ -50,7 +52,7 @@ class RichKafkaConsumerTest extends BaseKafka4mDockerSpec { Schedulers.using { implicit sched => Given("Some messages in a topic") val topic = nextTopic() - val config = KafkaConsumerFeedTest.testConfig(topic) + val config = testConfig(topic) val consumer: RichKafkaConsumer[String, Array[Byte]] = RichKafkaConsumer.byteArrayValues(config) val producer = RichKafkaProducer.byteArrayValues(config) @@ -84,7 +86,7 @@ class RichKafkaConsumerTest extends BaseKafka4mDockerSpec { Schedulers.using { implicit sched => Given("Some messages in a topic") val topic = nextTopic() - val config = KafkaConsumerFeedTest.testConfig(topic) + val config = testConfig(topic) val consumer: RichKafkaConsumer[String, Array[Byte]] = RichKafkaConsumer.byteArrayValues(config) val producer = RichKafkaProducer.byteArrayValues(config) @@ -121,3 +123,15 @@ class RichKafkaConsumerTest extends BaseKafka4mDockerSpec { } } } + +object RichKafkaConsumerTest { + def testConfig(topic: String): Config = ConfigFactory.parseString(s"""kafka4m.admin.topic : $topic + | + |kafka4m.consumer.topic : $topic + |kafka4m.consumer.group.id : "test"$topic + |kafka4m.consumer.application.id : "test"$topic + |kafka4m.consumer.auto.offset.reset : earliest + | + |kafka4m.producer.topic : $topic + |""".stripMargin).withFallback(ConfigFactory.load()) +} diff --git a/src/test/scala/kafka4m/io/FileSinkTest.scala b/src/test/scala/kafka4m/io/FileSinkTest.scala index f6f5279..f7c4365 100644 --- a/src/test/scala/kafka4m/io/FileSinkTest.scala +++ b/src/test/scala/kafka4m/io/FileSinkTest.scala @@ -2,6 +2,7 @@ package kafka4m.io import java.util.Base64 +import com.typesafe.scalalogging.StrictLogging import eie.io._ import kafka4m.BaseKafka4mSpec import kafka4m.util.Schedulers @@ -12,7 +13,7 @@ import monix.reactive.Observable import scala.collection.mutable.ListBuffer -class FileSinkTest extends BaseKafka4mSpec { +class FileSinkTest extends BaseKafka4mSpec with StrictLogging { "FileSink.base64" should { "write to a zip file quickly" in { @@ -35,9 +36,10 @@ class FileSinkTest extends BaseKafka4mSpec { import concurrent.duration._ var lastCount = 0 val throughputObserved = ListBuffer[Int]() - val expectedRecordsPerSecond = 800000 + val expectedRecordsPerSecond = 200000 s.scheduleAtFixedRate(1.second, 1.second) { val throughput = count - lastCount + logger.info(s"$throughput / second") throughputObserved += throughput lastCount = count } @@ -45,6 +47,7 @@ class FileSinkTest extends BaseKafka4mSpec { var isDone = false val noteWhenDone = Task.delay[Unit] { isDone = true } val task: Cancelable = input.doOnNext(_ => inc).takeWhileNotCanceled(stop).guarantee(noteWhenDone).subscribe(sink)(s) + eventually { throughputObserved.exists(_ > expectedRecordsPerSecond) shouldBe true }