Provides integration of Kafka client and streams with Bootique
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
bootique-kafka-base [maven-release-plugin] prepare for next development iteration Dec 3, 2018
bootique-kafka-client-0.8
bootique-kafka-client
bootique-kafka-streams
.gitignore
.travis.yml
LICENSE
README.md
RELEASE-NOTES.md
pom.xml
travis_settings.xml

README.md

Build Status Maven Central

bootique-kafka

Integration of Kafka client and Kafka streams for Bootique. See usage examples:

Dependencies

Include the BOMs and then bootique-kafka-client:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.bootique.bom</groupId>
            <artifactId>bootique-bom</artifactId>
            <version>1.0.RC1</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
...

<!-- If using Producer and/or Consumer -->
<dependency>
	<groupId>io.bootique.kafka</groupId>
	<artifactId>bootique-kafka-client</artifactId>
</dependency>

<!-- If using streams -->
<dependency>
	<groupId>io.bootique.kafka</groupId>
	<artifactId>bootique-kafka-streams</artifactId>
</dependency>

Producer/Consumer Configuration

Configure parameters in the YAML. Note that practically all of these settings can be overidden when obtaining a specific Producer or Consumer instance via io.bootique.kafka.client.KafkaClientFactory. So this is just a collection of defaults for the most typical Producer or Consumer:

kafkaclient:
  # any number of named clusters, specifying comma-separated bootstrap Kafka servers for each.
  clusters:
    cluster1: 127.0.0.1:9092
    cluster2: host1:9092,host2:9092
  consumer:
    autoCommit: true
    autoCommitInterval: "200ms"
    defaultGroup: myappgroup
    sessionTimeout: "2s"
  producer:
    acks: all # values are "all" or numeric number for min acks
    retries: 1
    batchSize: 16384
    lingerMs: 1
    bufferMemory: 33554432

Now you can inject producer and consumer factories and create any number of producers and consumers. Producer example (also see this code sample) :

@Inject
KafkaProducerFactory factory;

public void runProducer() {

    Producer<byte[], String> producer = factory
        .charValueProducer()
        .cluster("cluster2")
        .create();

    producer.send(new ProducerRecord<>("mytopic", "Hi!"));

    // close if there's nothing else to send
    producer.close();
}

Consumer example (also see this code sample) :

@Inject
KafkaConsumerFactory factory;

public void runConsumer() {
    
    KafkaConsumerRunner<byte[], String> consumer = factory
        .charValueConsumer()
        .cluster("cluster1")
        .group("somegroup")
        .topic("mytopic")
        .pollInterval(Duration.ofSeconds(1))
        .create();

    for (ConsumerRecord<byte[], String> r : consumer) {
        System.out.println(r.topic() + "_" + r.partition() + "_" + r.offset() + ": " + r.value());
    }

    consumer.close();
}

Streams Configuration

TODO