Scala Kafka Client

simonsouter edited this page Aug 3, 2018 · 23 revisions

A minimal Scala wrapper around the Java client API, providing some helpers for convenient configuration the client and usage from Scala.

Resolve

Artifacts are published to bintray here: Bintray Repo To resolve using sbt, add the following resolver to your build.sbt:

resolvers += Resolver.bintrayRepo("cakesolutions", "maven")

And add the dependency:

// Latest release for Kafka 2.0.0:
libraryDependencies += "net.cakesolutions" %% "scala-kafka-client" % "2.0.0"

// Latest release for Kafka 1.1.1:
libraryDependencies += "net.cakesolutions" %% "scala-kafka-client" % "1.1.1"

// Latest release for Kafka 0.11.0.0:
libraryDependencies += "net.cakesolutions" %% "scala-kafka-client" % "0.11.0.0"

// Latest release for Kafka 0.10.x.x:
libraryDependencies += "net.cakesolutions" %% "scala-kafka-client" % "0.10.2.2"

Producer

The Java KafkaProducer is typically setup by creating a Java Properties with required producer configuration. The Scala wrapper provides some convenience functions for creating the producer configuration either directly in code, via Typesafe config or a combination of both.

When delivering messages to the Java KafkaProducer via one of the send() methods, confirmation of successful message delivery is provided by resolving a java.util.concurrent.Future plus an option to provide a org.apache.kafka.clients.producer.Callback. The Scala KafkaProducer wrapper provides alternative methods of obtaining the delivery result via one of two methods:

// Asynchronously send a record to a topic, providing a scala.concurrent.Future containing the result of the operation.
def send(record: ProducerRecord[K, V]): Future[RecordMetadata]
// Asynchronously send a record to a topic and invoke the provided function callback when the send has been acknowledged.
def sendWithCallback(record: ProducerRecord[K, V])(callback: Try[RecordMetadata] => Unit): Unit

Additionally the KafkaProducerRecord object provides a number of constructors (apply methods) to help creating the org.apache.kafka.clients.producer.ProducerRecord required for the send method, such as:

val record = KafkaProducerRecord("topic1", Some("key"), "value")
producer.send(record)

Direct Configuration

The cakesolutions.kafka.KafkaProducer.Conf case class models the configuration required for the KafkaProducer and can be constructed and passed to the cakesolutions.kafka.KafkaProducer factory method to produce a configured KafkaProducer. Key and Value Serializers are also required as documented in the Kafka client docs.

import cakesolutions.kafka.KafkaProducer
import cakesolutions.kafka.KafkaProducer.Conf

// Create a org.apache.kafka.clients.producer.KafkaProducer
val producer = KafkaProducer(
    Conf(new StringSerializer(), new StringSerializer(), bootstrapServers = "localhost:9092")
)

The Conf configuration class provides additional properties with defaults:

import cakesolutions.kafka.KafkaProducer.Conf

Conf(new StringSerializer(), new StringSerializer()
  bootstrapServers = "localhost:9092",
  acks = "all",
  retries = 0,
  batchSize = 16834,
  lingerMs = 1,
  bufferMemory = 33554432
)

Typesafe Configuration

The configuration for the KafkaProducer can also specified in a Typesafe Config file:
Pass a Typesafe Config to cakesolutions.kafka.KafkaProducer.Conf:

application.conf:
{
   bootstrap.servers = "localhost:9092"
}
import cakesolutions.kafka.KafkaProducer
import cakesolutions.kafka.KafkaProducer.Conf

val conf = ConfigFactory.load

// Create a org.apache.kafka.clients.producer.KafkaProducer from properties defined in a Typesafe Config file
val producer = KafkaProducer(
  Conf(conf, new StringSerializer(), new StringSerializer())
)

Additional Config Options

A combination of static properties and Typesafe config properties is also an option. The Typesafe config properties will override Conf parameters:

import cakesolutions.kafka.KafkaProducer
import cakesolutions.kafka.KafkaProducer.Conf

val conf = ConfigFactory.parseString(
  s"""
    | bootstrap.servers = "localhost:9092"
  """.stripMargin)

val producer = KafkaProducer(
  Conf(new StringSerializer(), new StringSerializer(), acks = "all").withConf(conf)
)

Consumer

The KafkaConsumer is typically setup by creating a Java Properties with required consumer configuration. The Scala wrapper provides some convenience functions for creating the consumer configuration either directly in code, via Typesafe config or a combination of both.

Direct Configuration

The cakesolutions.kafka.KafkaConsumer.Conf case class models the configuration required for the KafkaConsumer and can be constructed and passed to the cakesolutions.kafka.KafkaConsumer factory method to produce a configured KafkaConsumer Key and Value Deserialisers are also required as documented in the Kafka client docs.

import cakesolutions.kafka.KafkaConsumer
import cakesolutions.kafka.KafkaConsumer.Conf

// Create a org.apache.kafka.clients.consumer.KafkaConsumer
val consumer = KafkaConsumer(
    Conf(new StringDeserializer(), new StringDeserializer(), bootstrapServers = "localhost:9092")
)

The Conf configuration class provides additional properties with defaults:

import cakesolutions.kafka.KafkaConsumer.Conf

Conf(new StringDeserializer(), new StringDeserializer()
  bootstrapServers = "localhost:9092",
  groupId = "group",
  enableAutoCommit = true,
  autoCommitInterval = 1000,
  sessionTimeoutMs = 30000,
  maxPartitionFetchBytes: String = "262144",
  
  // 0.10.x series only:
  maxPollRecords: Int = Integer.MAX_VALUE,
  
  autoOffsetReset: OffsetResetStrategy = OffsetResetStrategy.LATEST
)

Typesafe Configuration

The configuration for the KafkaConsumer can also specified in a Typesafe Config file:
Pass a Typesafe Config to cakesolutions.kafka.KafkaConsumer.Conf:

application.conf:
{
   bootstrap.servers = "localhost:9092"
}
import cakesolutions.kafka.KafkaConsumer
import cakesolutions.kafka.KafkaConsumer.Conf

val conf = ConfigFactory.load

// Create a org.apache.kafka.clients.consumer.KafkaConsumer from properties defined in a Typesafe Config file
val consumer = KafkaConsumer(
  Conf(conf, new StringDeserializer(), new StringDeserializer())
)

Additional Config Options

A combination of static properties and Typesafe config properties is also an option. The Typesafe config properties will override Conf parameters:

import cakesolutions.kafka.KafkaConsumer
import cakesolutions.kafka.KafkaConsumer.Conf

val conf = ConfigFactory.parseString(
  s"""
    | bootstrap.servers = "localhost:9092",
    | group.id = "group1"
  """.stripMargin)

val consumer = KafkaConsumer(
  Conf(new StringDeserializer(), new StringDeserializer(), enableAutoCommit = false).withConf(conf)
)
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.
Press h to open a hovercard with more details.