Skip to content

Commit

Permalink
Added fromKafka limit
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronp committed Sep 17, 2019
1 parent 01aa21a commit a08bb6f
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 54 deletions.
3 changes: 3 additions & 0 deletions src/main/resources/reference.conf
Expand Up @@ -65,6 +65,9 @@ kafka4m {
# The number of records appended to a file before we request the writer be flushed. This is purely a performance
# configuration, as the writer is flushed when the bucket is closed.
numberOfAppendsBeforeWriterFlush: 1000

# if set to a non-zero value then this many records will be consumed from kafka
limit: 0
}

}
Expand Down
10 changes: 10 additions & 0 deletions src/main/scala/kafka4m/Kafka4mApp.scala
Expand Up @@ -73,6 +73,11 @@ object Kafka4mApp extends ConfigApp with StrictLogging {
}
}

/** Read data from kafka to a local disk
* @param config the kafka4m root configuration
* @param scheduler
* @return an observable of the buckets and paths written
*/
def readFromKafka(config: Config)(implicit scheduler: Scheduler): Observable[(TimeBucket, Path)] = {
Base64Writer(config).partition(kafka4m.read(config))
}
Expand All @@ -81,6 +86,11 @@ object Kafka4mApp extends ConfigApp with StrictLogging {
logger.info(s"Wrote $perSecond / second, $total total")
}

/** write data into kafka using the 'kafka4m.etl.intoKafka' config entry
* @param config the root configuration
* @param scheduler
* @return
*/
def writeToKafka(config: Config)(implicit scheduler: Scheduler): (Cancelable, CancelableFuture[Long]) = {
val data: Observable[(String, Array[Byte])] = FileSource(config)

Expand Down
12 changes: 9 additions & 3 deletions src/main/scala/kafka4m/io/Base64Writer.scala
Expand Up @@ -21,14 +21,19 @@ import scala.concurrent.duration._
* @param timeBucketMinutes see comments in reference.conf
* @tparam A the record type
*/
case class Base64Writer[A: HasTimestamp: Show](dir: Path, recordsReceivedBeforeClosingBucket: Int, numberOfAppendsBeforeWriterFlush: Int, timeBucketMinutes: Int) {
case class Base64Writer[A: HasTimestamp: Show](dir: Path,
recordsReceivedBeforeClosingBucket: Int,
numberOfAppendsBeforeWriterFlush: Int,
timeBucketMinutes: Int,
limit: Option[Long]) {

def asEvents(input: Observable[A]): Observable[AppendEvent[A]] = {
TimePartitionState.appendEvents(input, recordsReceivedBeforeClosingBucket, timeBucketMinutes.minutes)
}

def partition(input: Observable[A]): Observable[(TimeBucket, Path)] = {
write(asEvents(input))
val limitted = limit.fold(input)(input.take)
write(asEvents(limitted))
}

def write(events: Observable[AppendEvent[A]]): Observable[(TimeBucket, Path)] = {
Expand Down Expand Up @@ -60,7 +65,8 @@ object Base64Writer extends StrictLogging {
dir = dataDir,
recordsReceivedBeforeClosingBucket = fromKafkaConfig.getInt("recordsReceivedBeforeClosingBucket"),
numberOfAppendsBeforeWriterFlush = fromKafkaConfig.getInt("numberOfAppendsBeforeWriterFlush"),
timeBucketMinutes = fromKafkaConfig.getInt("timeBucketMinutes")
timeBucketMinutes = fromKafkaConfig.getInt("timeBucketMinutes"),
limit = Option(fromKafkaConfig.getLong("limit")).filter(_ > 0)
)
}
}
3 changes: 2 additions & 1 deletion src/main/scala/kafka4m/io/FileSource.scala
Expand Up @@ -52,7 +52,7 @@ object FileSource {

private def unlimited(conf: EtlConfig): Observable[(String, Array[Byte])] = {
val dir = Paths.get(conf.dataDir)
def all: Observable[(String, Array[Byte])] =
def all: Observable[(String, Array[Byte])] = {
if (conf.cache) {
val data: List[(String, Array[Byte])] = cacheDirContents(dir)
if (conf.repeat) {
Expand All @@ -65,6 +65,7 @@ object FileSource {
file.getFileName.toString -> Files.readAllBytes(file)
}
}
}

if (conf.repeat) {
val LastDot = "(.*?)\\.(.*)".r
Expand Down
6 changes: 1 addition & 5 deletions src/main/scala/kafka4m/io/TextAppenderObserver.scala
Expand Up @@ -43,10 +43,6 @@ object TextAppenderObserver {
}
}

def asFileName(first: ZonedDateTime, bucket: TimeBucket) = {
s"${first.getYear}-${first.getMonthValue}-${first.getDayOfMonth}__${bucket.hour}hr_${bucket.fromMinute}-${bucket.toMinute}.txt"
}

def fromEvents[A: HasTimestamp: Show](dir: Path, flushEvery: Int, appendEvents: Observable[AppendEvent[A]]): Observable[(TimeBucket, Path)] = {
appendEvents
.scan(GroupState[A](dir, flushEvery, Map.empty) -> Seq.empty[Notification[(TimeBucket, Path)]]) {
Expand All @@ -69,7 +65,7 @@ object TextAppenderObserver {
appender.appendLine(text)
NoOp
case None =>
val file = dir.resolve(asFileName(HasTimestamp[A].timestamp(data), bucket))
val file = dir.resolve(bucket.asFileName(HasTimestamp[A].timestamp(data)))
val appender = new TextAppenderObserver(file, flushEvery)
appender.appendLine(text)
copy(byBucket = byBucket.updated(bucket, appender)) -> Nil
Expand Down
11 changes: 4 additions & 7 deletions src/main/scala/kafka4m/package.scala
Expand Up @@ -3,7 +3,7 @@ import kafka4m.admin.RichKafkaAdmin
import kafka4m.consumer.RichKafkaConsumer
import kafka4m.producer.AsProducerRecord._
import kafka4m.producer.{AsProducerRecord, RichKafkaProducer}
import kafka4m.util.{Env, Props}
import kafka4m.util.{Props, Schedulers}
import monix.eval.Task
import monix.reactive.{Consumer, Observable}
import org.apache.kafka.clients.consumer.ConsumerRecord
Expand Down Expand Up @@ -47,17 +47,14 @@ package object kafka4m {
* @return an Observable of data coming from kafka. The offsets, etc will be controlled by the kafka4m.consumer configuration, which includes default offset strategy, etc.
*/
def read(config: Config): Observable[ConsumerRecord[Key, Bytes]] = {
val env = Env(config)
val scheduler = Schedulers.io()

val consumer: RichKafkaConsumer[String, Array[Byte]] = RichKafkaConsumer.byteArrayValues(config)(env.io)
val consumer: RichKafkaConsumer[String, Array[Byte]] = RichKafkaConsumer.byteArrayValues(config)(scheduler)

val topic = Props.topic(config, "consumer")
consumer.subscribe(topic)

val closeMe = Task.delay {
consumer.close()
env.close()
}
val closeMe = Task.delay(scheduler.shutdown())
consumer.asObservable.guarantee(closeMe)
}

Expand Down
6 changes: 5 additions & 1 deletion src/main/scala/kafka4m/partitions/TimeBucket.scala
Expand Up @@ -8,7 +8,11 @@ import java.time.ZonedDateTime
* @param fromMinute
* @param toMinute
*/
final case class TimeBucket(hour: Int, fromMinute: Int, toMinute: Int)
final case class TimeBucket(hour: Int, fromMinute: Int, toMinute: Int) {
def asFileName(first: ZonedDateTime) = {
s"${first.getYear}-${first.getMonthValue}-${first.getDayOfMonth}__${hour}hr_${fromMinute}-${toMinute}.txt"
}
}
object TimeBucket {
def apply(minutes: Int, epochMilli: Long): TimeBucket = {
apply(minutes, utcForEpochMillis(epochMilli))
Expand Down
27 changes: 0 additions & 27 deletions src/main/scala/kafka4m/util/Env.scala

This file was deleted.

2 changes: 1 addition & 1 deletion src/test/resources/application.conf
Expand Up @@ -12,7 +12,7 @@ kafka4m {
intoKafka {
dataDir: "."
cache: false
rateLimitPerSecond: 3
rateLimitPerSecond: 0
limit: 100
repeat: true
}
Expand Down
24 changes: 15 additions & 9 deletions src/test/scala/kafka4m/Kafka4mAppTest.scala
Expand Up @@ -15,9 +15,9 @@ class Kafka4mAppTest extends BaseKafka4mDockerSpec {
"Kafka4mApp.main" should {
"read and write from kafka" in {
import eie.io._
val dir = "target/Kafka4mAppTest/input".asPath.mkDirs()
val f1 = dir.resolve("input.txt").text = "This is the first input file"
val f2 = dir.resolve("input.txt").text = "This is another"
val dir = s"target/Kafka4mAppTest-${System.currentTimeMillis}/input".asPath.mkDirs()

dir.resolve("input.txt").text = "This is the first input file"

val outputDir = dir.getParent.resolve("output")

Expand All @@ -30,14 +30,16 @@ class Kafka4mAppTest extends BaseKafka4mDockerSpec {
| intoKafka {
| dataDir: "${dir.toAbsolutePath}"
| cache: true
| rateLimitPerSecond: 10
| limit: 1000
| rateLimitPerSecond: 0
| limit: 200
| repeat: true
| }
| fromKafka {
| dataDir: "${outputDir.toAbsolutePath}"
| timeBucketMinutes: 1
| limit: 200
| recordsReceivedBeforeClosingBucket: 10
| numberOfAppendsBeforeWriterFlush : 1
| }
|}""".stripMargin

Expand All @@ -46,11 +48,15 @@ class Kafka4mAppTest extends BaseKafka4mDockerSpec {

try {
eventually {
outputDir.children.size should be > 1
outputDir.children.size should be > 0
}
eventually {
outputDir.children.head.lines.size shouldBe 200
}
} finally {
writerJob.cancel()
readerJob.cancel()
dir.delete()
}
}
}
Expand All @@ -66,13 +72,13 @@ class Kafka4mAppTest extends BaseKafka4mDockerSpec {
val (report, task) = Kafka4mApp.writeToKafka(conf1)(s)
val numWritten: Long = task.futureValue
report.cancel()
numWritten shouldBe 4L
numWritten shouldBe 100L

Then("We should be able to read out the data from that topic")
val bucketWrites: Observable[(TimeBucket, Path)] = {
val writer = Base64Writer(conf1)
val kafkaData = kafka4m.read(conf1).dump("from kafka").take(numWritten)
val readEvents: Observable[AppendEvent[ConsumerRecord[Key, Bytes]]] = writer.asEvents(kafkaData).dump("\treadEvents")
val kafkaData = kafka4m.read(conf1).take(numWritten)
val readEvents: Observable[AppendEvent[ConsumerRecord[Key, Bytes]]] = writer.asEvents(kafkaData)
writer.write(readEvents :+ ForceFlushBuckets[ConsumerRecord[Key, Bytes]]())
}
val bucketsAndPaths: List[(TimeBucket, Path)] = bucketWrites.toListL.runToFuture(s).futureValue
Expand Down

0 comments on commit a08bb6f

Please sign in to comment.