Skip to content

Clivern/kafka-sdk

Repository files navigation

kafka-sdk Logo

Kafka SDK

A Java SDK for Apache Kafka.

Documentation

Installation

To add a dependency using Maven, use the following:

<dependency>
    <groupId>com.clivern</groupId>
    <artifactId>kafka-sdk</artifactId>
    <version>0.1.0</version>
</dependency>

To add a dependency using Gradle, use the following:

dependencies {
    compile 'com.clivern:kafka-sdk:0.1.0'
}

To add a dependency using Scala SBT, use the following:

libraryDependencies += "com.clivern" % "kafka-sdk" % "0.1.0"

Usage

To Create a Kafka Topic:

import java.util.HashMap;
import com.clivern.kafka.Configs;
import com.clivern.kafka.Utils;


HashMap<String, String> map = new HashMap<String, String>();
map.put("bootstrap.servers", "localhost:9092");
Utils.createTopic("clivern", Configs.fromMap(map));

Kafka Producer:

import com.clivern.kafka.Configs;
import com.clivern.kafka.Producer;
import com.clivern.kafka.Kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;


Configs configs = new Configs();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

Producer producer = (new Kafka()).newProducer(configs);

for (int i = 0; i < 10; i++) {
    ProducerRecord<String, String> record =
            new ProducerRecord<>("clivern", null, "Hello World " + i);

    producer.send(record).flush();
}

producer.close();

Kafka Consumer:

import com.clivern.kafka.Configs;
import com.clivern.kafka.Consumer;
import com.clivern.kafka.Kafka;
import com.clivern.kafka.HandlerCallbackInterface;
import com.clivern.kafka.FailureCallbackInterface;
import com.clivern.kafka.SuccessCallbackInterface;
import com.clivern.kafka.exception.MissingHandler;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;


Configs configs = new Configs();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "clivern");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Consumer consumer = (new Kafka()).newConsumer(configs);

HandlerCallbackInterface<ConsumerRecord<String, String>> handler =
        (record) -> {
            System.out.println("Message Received: " + record.value());

            // Throw error if message has error
            if (record.value().equals("error")) {
                throw new Exception("Error!");
            }
        };

SuccessCallbackInterface<ConsumerRecord<String, String>> onSuccess =
        (record) -> {
            System.out.println("Message Succeeded: " + record.value());
        };

FailureCallbackInterface<ConsumerRecord<String, String>> onFailure =
        (record, exception) -> {
            System.out.println(
                    "Message " + record.value() + " Failed: " + exception.getMessage());
        };

consumer.subscribe("clivern")
        .handler(handler)
        .onSuccess(onSuccess)
        .onFailure(onFailure)
        .run();

Pub/Sub Pattern:

import com.clivern.kafka.event.MessagePublisher;
import com.clivern.kafka.event.Observer;
import com.clivern.kafka.event.Message;
import com.clivern.kafka.exception.MissingEvent;


try {
    MessagePublisher messagePublisher = new MessagePublisher();

    Observer callback1 =
            (message) -> {
                System.out.println("New Order with Payload:" + message.getContent());
            };

    Observer callback2 =
            (message) -> {
                System.out.println("New Notification with Payload:" + message.getContent());
            };

    messagePublisher.attach("service.newOrder", callback1);
    messagePublisher.attach("service.newNotification", callback2);
    messagePublisher.attach("service.newUser", (message) -> {
        System.out.println("New User with Info:" + message.getContent());
    });

    messagePublisher.notify("service.newOrder", new Message("{}"));
    messagePublisher.notify("service.newNotification", new Message("{}"));
    messagePublisher.notify("service.newUser", new Message("{}"));
} catch(MissingEvent e) {
    // Do something about it
}

Versioning

For transparency into our release cycle and in striving to maintain backward compatibility, kafka-sdk is maintained under the Semantic Versioning guidelines and release process is predictable and business-friendly.

See the Releases section of our GitHub project for changelogs for each release version of kafka-sdk. It contains summaries of the most noteworthy changes made in each release.

Bug tracker

If you have any suggestions, bug reports, or annoyances please report them to our issue tracker at https://github.com/clivern/kafka-sdk/issues

Security Issues

If you discover a security vulnerability within kafka-sdk, please send an email to hello@clivern.com

Contributing

We are an open source, community-driven project so please feel free to join us. see the contributing guidelines for more details.

License

Β© 2021, Clivern. Released under Apache License, Version 2.0.

kafka-sdk is authored and maintained by @Clivern.

About

πŸ”₯ Java SDK for Apache Kafka.

Resources

License

Code of conduct

Stars

Watchers

Forks

Sponsor this project

Packages

No packages published