Skip to content

Commit

Permalink
Implemented ConcurrentStream to support only committing offsets when …
Browse files Browse the repository at this point in the history
…load-balanced tasks all complete successfully

This includes commits for:
 contiguous ordering
 multithread support

It also introduces ConsumerAccess to ensure thread-safe kafka consumer access
  • Loading branch information
aaronp committed Mar 7, 2020
1 parent 1d7e7d1 commit 9ddae11
Show file tree
Hide file tree
Showing 38 changed files with 1,522 additions and 579 deletions.
41 changes: 28 additions & 13 deletions src/main/resources/reference.conf
Expand Up @@ -27,6 +27,9 @@ kafka4m {
# Should the producer simply log/continue on serialization errors?
continueOnError: false

# should the producer be closed when the consumer completes
closeOnComplete : true

key.serializer: "org.apache.kafka.common.serialization.StringSerializer"
value.serializer: "org.apache.kafka.common.serialization.ByteArraySerializer"
}
Expand Down Expand Up @@ -63,6 +66,17 @@ kafka4m {
# when 'asObservable' is set on a consumer, this controls whether the consumer should be closed when the
# observable is completed.
closeOnComplete = true

# Used for the command queue when sending the RichKafkaConsumer commands from differnet threads.
# Those commands should be infrequent - typically when a job run by ConcurrentStream completes and
# fancies committing offsets. The job pool for the ConcurrentStream defaults to the number of cores,
# so this should be a sensible default.
#
# If set to 0 or a negative number then this buffer will be unbounded.
commandBufferCapacity : 100

# should we automagically subscribe when creating a RickKafkaConsumer
subscribeOnConnect : true
}

# instructs the admin client what to do when a topic is missing
Expand All @@ -74,19 +88,6 @@ kafka4m {
timeout: 10s
}

# the 'streams' configuration is used for reading from Kafka
streams {
topic = ${kafka4m.topic}
bootstrap.servers = ${kafka4m.bootstrap.servers}
bootstrap.servers: ${?KAFKA4M_BOOTSTRAP}

application.id: "kafka4m-app"
default.key.serde: "org.apache.kafka.common.serialization.Serdes$StringSerde"
default.value.serde: "org.apache.kafka.common.serialization.Serdes$ByteArraySerde"
auto.offset.reset: earliest
auto.offset.reset: ${?KAFKA4M_AUTO_OFFSET}
}


jmx {
client {
Expand All @@ -103,6 +104,20 @@ kafka4m {
}
}

jobs {
# this controls as timeout for writing offsets back to kafka.
# in ConcurrentStream, this determines how long to wait for the last job to complete before giving up
awaitJobTimeout : 10s

# how long should we pause when checking for job completions status of the last job?
# essentially a poll frequncy when checking awaitJobTimeout
retryDuration : 100ms

# how frequently should we commit completed task offsets back to kafka
# set to 0 to commit as frequently as possible
minCommitFrequency = 100
}

# We add some ETL functionality to kafka4m for reading/writing data in and out of Kafka
etl {

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/kafka4m/Kafka4mApp.scala
Expand Up @@ -88,9 +88,9 @@ object Kafka4mApp extends ConfigApp with StrictLogging {

val kafkaData: Observable[ConsumerRecord[Key, Bytes]] = {
if (stats.enabled) {
kafka4m.readRecords(config).doOnNext(stats.onReadFromKafka)
kafka4m.readRecords[ConsumerRecord[Key, Bytes]](config).doOnNext(stats.onReadFromKafka)
} else {
kafka4m.readRecords(config)
kafka4m.readRecords[ConsumerRecord[Key, Bytes]](config)
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/main/scala/kafka4m/admin/RichKafkaAdmin.scala
Expand Up @@ -85,9 +85,7 @@ final class RichKafkaAdmin(val admin: AdminClient) extends AutoCloseable with St
val result = admin.listConsumerGroups()
val kfuture = result.all()
val future: Future[Seq[ConsumerGroupListing]] = Future {
val list = kfuture.get().asScala.toSeq
println(s"consumerGroups is ${list}")
list
kfuture.get().asScala.toSeq
}
val cancel = Cancelable.apply(() => kfuture.cancel(true))
CancelableFuture(future, cancel)
Expand Down
93 changes: 18 additions & 75 deletions src/main/scala/kafka4m/consumer/AckableRecord.scala
@@ -1,39 +1,39 @@
package kafka4m.consumer

import java.util.concurrent.{ExecutorService, Executors, ThreadFactory}

import cats.Functor
import monix.execution.schedulers.SchedulerService
import monix.execution.{ExecutionModel, Scheduler}
import kafka4m.data.PartitionOffsetState
import monix.reactive.Observable
import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata}
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition

import scala.concurrent.Future

/**
* Represents a message with the components needed to commit the offsets/partitions to Kafka
* @param consumer
* @param access
* @param offset
* @param record
* @tparam A
*/
final case class AckableRecord[A] private (consumer: RichKafkaConsumer[_, _], offset: PartitionOffsetState, record: A) {
final case class AckableRecord[A] private[consumer] (access: ConsumerAccess, offset: PartitionOffsetState, record: A) extends ConsumerAccess {

override type Key = access.Key
override type Value = access.Value

/**
* Commits the current partition offset + 1 to Kafka so that, should we disconnect, we'll receive the next message
* @return a future of the offsets
*/
def commitPosition(): Future[Map[TopicPartition, OffsetAndMetadata]] = {
consumer.commitAsync(offset.incOffsets())
}
def commitPosition(): Future[Map[TopicPartition, OffsetAndMetadata]] = commit(offset.incOffsets())

def commit(offset: PartitionOffsetState) = commitAsync(offset).flatten

/**
* If we commit this offset, then on reconnect we would receive this same message again
*
* @return the commit future
*/
def commitConsumedPosition(): Future[Map[TopicPartition, OffsetAndMetadata]] = {
consumer.commitAsync(offset)
commit(offset)
}

/**
Expand All @@ -42,49 +42,16 @@ final case class AckableRecord[A] private (consumer: RichKafkaConsumer[_, _], of
* @tparam B
* @return
*/
final def map[B](f: A => B): AckableRecord[B] = copy(consumer, offset, f(record))
}

object AckableRecord {
final def map[B](f: A => B): AckableRecord[B] = copy(access, offset, f(record))

/** Creates a stream of ack-able records for this consumer
*
* @param closeOnComplete a flag to determine whether the kafka consumer should be closed when the stream completes
* @param kafkaScheduler the single-threaded Kafka scheduler
* @param makeConsumer a function for creating a consumer
* @tparam K the key type
* @tparam V the value type
* @return a stream of [[AckableRecord]]s
*/
def apply[K, V](closeOnComplete: Boolean, kafkaScheduler: Scheduler = singleThreadedScheduler())(
makeConsumer: => RichKafkaConsumer[K, V]): Observable[AckableRecord[ConsumerRecord[K, V]]] = {
val withConsumer: Observable[(RichKafkaConsumer[K, V], ConsumerRecord[K, V])] = singleThreadObservable(closeOnComplete, kafkaScheduler)(makeConsumer)
def commitAsync(state: PartitionOffsetState) = withConsumer(_.commitAsync(state))

withOffsets(withConsumer).map {
case (state, (consumer, record)) =>
new AckableRecord[ConsumerRecord[K, V]](consumer, state, record)
}
override def withConsumer[A](thunk: RichKafkaConsumer[Key, Value] => A): Future[A] = {
access.withConsumer(thunk)
}
}

/**
* Create a kafka consumer observable which will ensure all the 'consumer.poll(...)' calls happen on a single thread
* (lest we incur the wrath of apache Kafka "one and only one thread can use a KafkaConsumer" rule
*
* @param closeOnComplete
* @param kafkaScheduler
* @param makeConsumer
* @tparam K
* @tparam V
* @return
*/
def singleThreadObservable[K, V](closeOnComplete: Boolean, kafkaScheduler: Scheduler = singleThreadedScheduler())(
makeConsumer: => RichKafkaConsumer[K, V]): Observable[(RichKafkaConsumer[K, V], ConsumerRecord[K, V])] = {
Observable.delay(makeConsumer).executeOn(kafkaScheduler, true).flatMap { kafkaConsumer =>
kafkaConsumer.asObservable(closeOnComplete)(kafkaScheduler).map { record =>
(kafkaConsumer, record)
}
}
}
object AckableRecord {

/**
* combine the records with a means of tracking the offsets
Expand All @@ -104,28 +71,4 @@ object AckableRecord {
}
}

def singleThreadedScheduler(name: String = "SingleThreadForKafkaRead"): SchedulerService = {
Scheduler(singleThreadedExecutor(name), ExecutionModel.SynchronousExecution)
}

def singleThreadedExecutor(name: String): ExecutorService = {
singleThreadedExecutor { thread =>
thread.setName(name)
thread.setDaemon(true)
thread
}
}

def singleThreadedExecutor(prepare: Thread => Thread): ExecutorService = {
Executors.newSingleThreadExecutor(new ThreadFactory {
override def newThread(r: Runnable): Thread = {
prepare(new Thread(r))
}
})
}

implicit object AckableRecordFunctor extends Functor[AckableRecord] {
override def map[A, B](fa: AckableRecord[A])(f: A => B): AckableRecord[B] = fa.map(f)
}

}
27 changes: 27 additions & 0 deletions src/main/scala/kafka4m/consumer/BytesDecoder.scala
@@ -0,0 +1,27 @@
package kafka4m.consumer

import java.nio.charset.StandardCharsets

import kafka4m.Bytes

/**
* If we have a BytesDecoder, then we can have an [[RecordDecoder]].
* This shit chains.
*
* @tparam A the result type
*/
trait BytesDecoder[A] {
def decode(bytes: kafka4m.Bytes): A
}

object BytesDecoder {
def apply[A](implicit instance: BytesDecoder[A]): BytesDecoder[A] = instance

def lift[A](f: kafka4m.Bytes => A): BytesDecoder[A] = new BytesDecoder[A] {
override def decode(bytes: Bytes): A = f(bytes)
}

implicit object StringDecoder extends BytesDecoder[String] {
override def decode(bytes: Bytes): String = new String(bytes, StandardCharsets.UTF_8)
}
}
26 changes: 26 additions & 0 deletions src/main/scala/kafka4m/consumer/ComputeResult.scala
@@ -0,0 +1,26 @@
package kafka4m.consumer

import kafka4m.consumer.ConcurrentStream.ZipOffset

import scala.concurrent.Future

/**
* A tuple of all the information at hand from a [[ConcurrentStream]] computation
*
* @param localConsumedOffset
* @param kafkaRecord
* @param taskResult
* @tparam A
* @tparam B
*/
case class ComputeResult[A, B](localConsumedOffset: ZipOffset, kafkaRecord: AckableRecord[A], taskResult: B) extends ConsumerAccess {
def offset = kafkaRecord.offset
override type Key = kafkaRecord.Key
override type Value = kafkaRecord.Value
def input: A = kafkaRecord.record
override def toString: String = s"ComputeResult #$localConsumedOffset (${kafkaRecord.record} -> $taskResult)"

override def withConsumer[A](thunk: RichKafkaConsumer[kafkaRecord.access.Key, kafkaRecord.access.Value] => A): Future[A] = {
kafkaRecord.withConsumer(thunk)
}
}

0 comments on commit 9ddae11

Please sign in to comment.