Skip to content

Commit

Permalink
removed consumer feed
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronp committed Sep 16, 2019
1 parent cf2fc96 commit 32cb1e6
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 141 deletions.
90 changes: 0 additions & 90 deletions src/main/scala/kafka4m/consumer/KafkaConsumerFeed.scala

This file was deleted.

2 changes: 1 addition & 1 deletion src/test/scala/kafka4m/BaseKafka4mDockerSpec.scala
Expand Up @@ -9,6 +9,6 @@ import scala.concurrent.duration._

abstract class BaseKafka4mDockerSpec extends BaseKafkaSpec with ScalaFutures with BeforeAndAfterAll with GivenWhenThen with StrictLogging {

override def testTimeout: FiniteDuration = 5.seconds
override def testTimeout: FiniteDuration = 15.seconds

}
44 changes: 0 additions & 44 deletions src/test/scala/kafka4m/consumer/KafkaConsumerFeedTest.scala

This file was deleted.

22 changes: 18 additions & 4 deletions src/test/scala/kafka4m/consumer/RichKafkaConsumerTest.scala
Expand Up @@ -2,6 +2,7 @@ package kafka4m.consumer

import java.util.concurrent.atomic.AtomicLong

import com.typesafe.config.{Config, ConfigFactory}
import kafka4m.BaseKafka4mDockerSpec
import kafka4m.admin.RichKafkaAdmin
import kafka4m.producer.RichKafkaProducer
Expand All @@ -10,6 +11,7 @@ import kafka4m.util.Schedulers
import scala.concurrent.Future

class RichKafkaConsumerTest extends BaseKafka4mDockerSpec {
import RichKafkaConsumerTest.testConfig

private val id = new AtomicLong(System.currentTimeMillis)

Expand All @@ -21,7 +23,7 @@ class RichKafkaConsumerTest extends BaseKafka4mDockerSpec {
"report current assignments and partitions" in {
Schedulers.using { implicit sched =>
val topic = nextTopic()
val config = KafkaConsumerFeedTest.testConfig(topic)
val config = testConfig(topic)
val consumer: RichKafkaConsumer[String, Array[Byte]] = RichKafkaConsumer.byteArrayValues(config)

RichKafkaAdmin(config).createTopicSync(topic, testTimeout)
Expand All @@ -36,7 +38,7 @@ class RichKafkaConsumerTest extends BaseKafka4mDockerSpec {
"return the assignmentPartitions" in {
Schedulers.using { implicit sched =>
val topic = nextTopic()
val config = KafkaConsumerFeedTest.testConfig(topic)
val config = testConfig(topic)
val consumer: RichKafkaConsumer[String, Array[Byte]] = RichKafkaConsumer.byteArrayValues(config)

RichKafkaAdmin(config).createTopicSync(topic, testTimeout)
Expand All @@ -50,7 +52,7 @@ class RichKafkaConsumerTest extends BaseKafka4mDockerSpec {
Schedulers.using { implicit sched =>
Given("Some messages in a topic")
val topic = nextTopic()
val config = KafkaConsumerFeedTest.testConfig(topic)
val config = testConfig(topic)
val consumer: RichKafkaConsumer[String, Array[Byte]] = RichKafkaConsumer.byteArrayValues(config)

val producer = RichKafkaProducer.byteArrayValues(config)
Expand Down Expand Up @@ -84,7 +86,7 @@ class RichKafkaConsumerTest extends BaseKafka4mDockerSpec {
Schedulers.using { implicit sched =>
Given("Some messages in a topic")
val topic = nextTopic()
val config = KafkaConsumerFeedTest.testConfig(topic)
val config = testConfig(topic)
val consumer: RichKafkaConsumer[String, Array[Byte]] = RichKafkaConsumer.byteArrayValues(config)

val producer = RichKafkaProducer.byteArrayValues(config)
Expand Down Expand Up @@ -121,3 +123,15 @@ class RichKafkaConsumerTest extends BaseKafka4mDockerSpec {
}
}
}

object RichKafkaConsumerTest {
def testConfig(topic: String): Config = ConfigFactory.parseString(s"""kafka4m.admin.topic : $topic
|
|kafka4m.consumer.topic : $topic
|kafka4m.consumer.group.id : "test"$topic
|kafka4m.consumer.application.id : "test"$topic
|kafka4m.consumer.auto.offset.reset : earliest
|
|kafka4m.producer.topic : $topic
|""".stripMargin).withFallback(ConfigFactory.load())
}
7 changes: 5 additions & 2 deletions src/test/scala/kafka4m/io/FileSinkTest.scala
Expand Up @@ -2,6 +2,7 @@ package kafka4m.io

import java.util.Base64

import com.typesafe.scalalogging.StrictLogging
import eie.io._
import kafka4m.BaseKafka4mSpec
import kafka4m.util.Schedulers
Expand All @@ -12,7 +13,7 @@ import monix.reactive.Observable

import scala.collection.mutable.ListBuffer

class FileSinkTest extends BaseKafka4mSpec {
class FileSinkTest extends BaseKafka4mSpec with StrictLogging {

"FileSink.base64" should {
"write to a zip file quickly" in {
Expand All @@ -35,16 +36,18 @@ class FileSinkTest extends BaseKafka4mSpec {
import concurrent.duration._
var lastCount = 0
val throughputObserved = ListBuffer[Int]()
val expectedRecordsPerSecond = 800000
val expectedRecordsPerSecond = 200000
s.scheduleAtFixedRate(1.second, 1.second) {
val throughput = count - lastCount
logger.info(s"$throughput / second")
throughputObserved += throughput
lastCount = count
}
val stop = BooleanCancelable()
var isDone = false
val noteWhenDone = Task.delay[Unit] { isDone = true }
val task: Cancelable = input.doOnNext(_ => inc).takeWhileNotCanceled(stop).guarantee(noteWhenDone).subscribe(sink)(s)

eventually {
throughputObserved.exists(_ > expectedRecordsPerSecond) shouldBe true
}
Expand Down

0 comments on commit 32cb1e6

Please sign in to comment.