diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 6c84fbc..cebc6e8 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -65,6 +65,9 @@ kafka4m { # The number of records appended to a file before we request the writer be flushed. This is purely a performance # configuration, as the writer is flushed when the bucket is closed. numberOfAppendsBeforeWriterFlush: 1000 + + # if set to a non-zero value then this many records will be consumed from kafka + limit: 0 } } diff --git a/src/main/scala/kafka4m/Kafka4mApp.scala b/src/main/scala/kafka4m/Kafka4mApp.scala index 249cfb5..faa772d 100644 --- a/src/main/scala/kafka4m/Kafka4mApp.scala +++ b/src/main/scala/kafka4m/Kafka4mApp.scala @@ -73,6 +73,11 @@ object Kafka4mApp extends ConfigApp with StrictLogging { } } + /** Read data from kafka to a local disk + * @param config the kafka4m root configuration + * @param scheduler + * @return an observable of the buckets and paths written + */ def readFromKafka(config: Config)(implicit scheduler: Scheduler): Observable[(TimeBucket, Path)] = { Base64Writer(config).partition(kafka4m.read(config)) } @@ -81,6 +86,11 @@ object Kafka4mApp extends ConfigApp with StrictLogging { logger.info(s"Wrote $perSecond / second, $total total") } + /** write data into kafka using the 'kafka4m.etl.intoKafka' config entry + * @param config the root configuration + * @param scheduler + * @return + */ def writeToKafka(config: Config)(implicit scheduler: Scheduler): (Cancelable, CancelableFuture[Long]) = { val data: Observable[(String, Array[Byte])] = FileSource(config) diff --git a/src/main/scala/kafka4m/io/Base64Writer.scala b/src/main/scala/kafka4m/io/Base64Writer.scala index be5252d..b54212c 100644 --- a/src/main/scala/kafka4m/io/Base64Writer.scala +++ b/src/main/scala/kafka4m/io/Base64Writer.scala @@ -21,14 +21,19 @@ import scala.concurrent.duration._ * @param timeBucketMinutes see comments in reference.conf * @tparam A the record type */ -case class Base64Writer[A: HasTimestamp: Show](dir: Path, recordsReceivedBeforeClosingBucket: Int, numberOfAppendsBeforeWriterFlush: Int, timeBucketMinutes: Int) { +case class Base64Writer[A: HasTimestamp: Show](dir: Path, + recordsReceivedBeforeClosingBucket: Int, + numberOfAppendsBeforeWriterFlush: Int, + timeBucketMinutes: Int, + limit: Option[Long]) { def asEvents(input: Observable[A]): Observable[AppendEvent[A]] = { TimePartitionState.appendEvents(input, recordsReceivedBeforeClosingBucket, timeBucketMinutes.minutes) } def partition(input: Observable[A]): Observable[(TimeBucket, Path)] = { - write(asEvents(input)) + val limitted = limit.fold(input)(input.take) + write(asEvents(limitted)) } def write(events: Observable[AppendEvent[A]]): Observable[(TimeBucket, Path)] = { @@ -60,7 +65,8 @@ object Base64Writer extends StrictLogging { dir = dataDir, recordsReceivedBeforeClosingBucket = fromKafkaConfig.getInt("recordsReceivedBeforeClosingBucket"), numberOfAppendsBeforeWriterFlush = fromKafkaConfig.getInt("numberOfAppendsBeforeWriterFlush"), - timeBucketMinutes = fromKafkaConfig.getInt("timeBucketMinutes") + timeBucketMinutes = fromKafkaConfig.getInt("timeBucketMinutes"), + limit = Option(fromKafkaConfig.getLong("limit")).filter(_ > 0) ) } } diff --git a/src/main/scala/kafka4m/io/FileSource.scala b/src/main/scala/kafka4m/io/FileSource.scala index 24dae00..3cb9ba7 100644 --- a/src/main/scala/kafka4m/io/FileSource.scala +++ b/src/main/scala/kafka4m/io/FileSource.scala @@ -52,7 +52,7 @@ object FileSource { private def unlimited(conf: EtlConfig): Observable[(String, Array[Byte])] = { val dir = Paths.get(conf.dataDir) - def all: Observable[(String, Array[Byte])] = + def all: Observable[(String, Array[Byte])] = { if (conf.cache) { val data: List[(String, Array[Byte])] = cacheDirContents(dir) if (conf.repeat) { @@ -65,6 +65,7 @@ object FileSource { file.getFileName.toString -> Files.readAllBytes(file) } } + } if (conf.repeat) { val LastDot = "(.*?)\\.(.*)".r diff --git a/src/main/scala/kafka4m/io/TextAppenderObserver.scala b/src/main/scala/kafka4m/io/TextAppenderObserver.scala index 420d60c..6775e80 100644 --- a/src/main/scala/kafka4m/io/TextAppenderObserver.scala +++ b/src/main/scala/kafka4m/io/TextAppenderObserver.scala @@ -43,10 +43,6 @@ object TextAppenderObserver { } } - def asFileName(first: ZonedDateTime, bucket: TimeBucket) = { - s"${first.getYear}-${first.getMonthValue}-${first.getDayOfMonth}__${bucket.hour}hr_${bucket.fromMinute}-${bucket.toMinute}.txt" - } - def fromEvents[A: HasTimestamp: Show](dir: Path, flushEvery: Int, appendEvents: Observable[AppendEvent[A]]): Observable[(TimeBucket, Path)] = { appendEvents .scan(GroupState[A](dir, flushEvery, Map.empty) -> Seq.empty[Notification[(TimeBucket, Path)]]) { @@ -69,7 +65,7 @@ object TextAppenderObserver { appender.appendLine(text) NoOp case None => - val file = dir.resolve(asFileName(HasTimestamp[A].timestamp(data), bucket)) + val file = dir.resolve(bucket.asFileName(HasTimestamp[A].timestamp(data))) val appender = new TextAppenderObserver(file, flushEvery) appender.appendLine(text) copy(byBucket = byBucket.updated(bucket, appender)) -> Nil diff --git a/src/main/scala/kafka4m/package.scala b/src/main/scala/kafka4m/package.scala index 1e8f5cf..cf9add2 100644 --- a/src/main/scala/kafka4m/package.scala +++ b/src/main/scala/kafka4m/package.scala @@ -3,7 +3,7 @@ import kafka4m.admin.RichKafkaAdmin import kafka4m.consumer.RichKafkaConsumer import kafka4m.producer.AsProducerRecord._ import kafka4m.producer.{AsProducerRecord, RichKafkaProducer} -import kafka4m.util.{Env, Props} +import kafka4m.util.{Props, Schedulers} import monix.eval.Task import monix.reactive.{Consumer, Observable} import org.apache.kafka.clients.consumer.ConsumerRecord @@ -47,17 +47,14 @@ package object kafka4m { * @return an Observable of data coming from kafka. The offsets, etc will be controlled by the kafka4m.consumer configuration, which includes default offset strategy, etc. */ def read(config: Config): Observable[ConsumerRecord[Key, Bytes]] = { - val env = Env(config) + val scheduler = Schedulers.io() - val consumer: RichKafkaConsumer[String, Array[Byte]] = RichKafkaConsumer.byteArrayValues(config)(env.io) + val consumer: RichKafkaConsumer[String, Array[Byte]] = RichKafkaConsumer.byteArrayValues(config)(scheduler) val topic = Props.topic(config, "consumer") consumer.subscribe(topic) - val closeMe = Task.delay { - consumer.close() - env.close() - } + val closeMe = Task.delay(scheduler.shutdown()) consumer.asObservable.guarantee(closeMe) } diff --git a/src/main/scala/kafka4m/partitions/TimeBucket.scala b/src/main/scala/kafka4m/partitions/TimeBucket.scala index 56eb841..c7223ef 100644 --- a/src/main/scala/kafka4m/partitions/TimeBucket.scala +++ b/src/main/scala/kafka4m/partitions/TimeBucket.scala @@ -8,7 +8,11 @@ import java.time.ZonedDateTime * @param fromMinute * @param toMinute */ -final case class TimeBucket(hour: Int, fromMinute: Int, toMinute: Int) +final case class TimeBucket(hour: Int, fromMinute: Int, toMinute: Int) { + def asFileName(first: ZonedDateTime) = { + s"${first.getYear}-${first.getMonthValue}-${first.getDayOfMonth}__${hour}hr_${fromMinute}-${toMinute}.txt" + } +} object TimeBucket { def apply(minutes: Int, epochMilli: Long): TimeBucket = { apply(minutes, utcForEpochMillis(epochMilli)) diff --git a/src/main/scala/kafka4m/util/Env.scala b/src/main/scala/kafka4m/util/Env.scala deleted file mode 100644 index df99653..0000000 --- a/src/main/scala/kafka4m/util/Env.scala +++ /dev/null @@ -1,27 +0,0 @@ -package kafka4m.util - -import com.typesafe.config.Config -import com.typesafe.scalalogging.LazyLogging -import monix.execution.Scheduler -import monix.execution.schedulers.SchedulerService - -final case class Env(config: Config, compute: Scheduler, io: Scheduler) extends AutoCloseable { - override def close(): Unit = { - Env.close(compute) - Env.close(io) - } -} - -object Env extends LazyLogging { - def apply(config: Config): Env = { - new Env(config, Schedulers.compute(), Schedulers.io()) - } - - def close(s: Scheduler) = s match { - case ss: SchedulerService => ss.shutdown() - case ac: AutoCloseable => ac.close() - case other => - logger.warn(s"NOT closing scheduler $other") - } - -} diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf index 4ab4b73..12db095 100644 --- a/src/test/resources/application.conf +++ b/src/test/resources/application.conf @@ -12,7 +12,7 @@ kafka4m { intoKafka { dataDir: "." cache: false - rateLimitPerSecond: 3 + rateLimitPerSecond: 0 limit: 100 repeat: true } diff --git a/src/test/scala/kafka4m/Kafka4mAppTest.scala b/src/test/scala/kafka4m/Kafka4mAppTest.scala index cb1270b..329cd77 100644 --- a/src/test/scala/kafka4m/Kafka4mAppTest.scala +++ b/src/test/scala/kafka4m/Kafka4mAppTest.scala @@ -15,9 +15,9 @@ class Kafka4mAppTest extends BaseKafka4mDockerSpec { "Kafka4mApp.main" should { "read and write from kafka" in { import eie.io._ - val dir = "target/Kafka4mAppTest/input".asPath.mkDirs() - val f1 = dir.resolve("input.txt").text = "This is the first input file" - val f2 = dir.resolve("input.txt").text = "This is another" + val dir = s"target/Kafka4mAppTest-${System.currentTimeMillis}/input".asPath.mkDirs() + + dir.resolve("input.txt").text = "This is the first input file" val outputDir = dir.getParent.resolve("output") @@ -30,14 +30,16 @@ class Kafka4mAppTest extends BaseKafka4mDockerSpec { | intoKafka { | dataDir: "${dir.toAbsolutePath}" | cache: true - | rateLimitPerSecond: 10 - | limit: 1000 + | rateLimitPerSecond: 0 + | limit: 200 | repeat: true | } | fromKafka { | dataDir: "${outputDir.toAbsolutePath}" | timeBucketMinutes: 1 + | limit: 200 | recordsReceivedBeforeClosingBucket: 10 + | numberOfAppendsBeforeWriterFlush : 1 | } |}""".stripMargin @@ -46,11 +48,15 @@ class Kafka4mAppTest extends BaseKafka4mDockerSpec { try { eventually { - outputDir.children.size should be > 1 + outputDir.children.size should be > 0 + } + eventually { + outputDir.children.head.lines.size shouldBe 200 } } finally { writerJob.cancel() readerJob.cancel() + dir.delete() } } } @@ -66,13 +72,13 @@ class Kafka4mAppTest extends BaseKafka4mDockerSpec { val (report, task) = Kafka4mApp.writeToKafka(conf1)(s) val numWritten: Long = task.futureValue report.cancel() - numWritten shouldBe 4L + numWritten shouldBe 100L Then("We should be able to read out the data from that topic") val bucketWrites: Observable[(TimeBucket, Path)] = { val writer = Base64Writer(conf1) - val kafkaData = kafka4m.read(conf1).dump("from kafka").take(numWritten) - val readEvents: Observable[AppendEvent[ConsumerRecord[Key, Bytes]]] = writer.asEvents(kafkaData).dump("\treadEvents") + val kafkaData = kafka4m.read(conf1).take(numWritten) + val readEvents: Observable[AppendEvent[ConsumerRecord[Key, Bytes]]] = writer.asEvents(kafkaData) writer.write(readEvents :+ ForceFlushBuckets[ConsumerRecord[Key, Bytes]]()) } val bucketsAndPaths: List[(TimeBucket, Path)] = bucketWrites.toListL.runToFuture(s).futureValue