Skip to content
master
Go to file
Code
This branch is 39 commits ahead of Genscape:master.

Latest commit

 

Git stats

Files

Permalink
Failed to load latest commit information.
Type
Name
Latest commit message
Commit time
 
 
 
 
 
 
 
 
 
 
 
 
 
 

README.md

Gregor

Lightweight Clojure wrapper for Apache Kafka Consumer + Producer APIs.

 :dependencies [[org.clojure/clojure "1.10.0"]
                [org.apache.kafka/kafka_2.12 "2.1.1"]]

cljdoc Badge

Clojars Project

CHANGELOG

Gregor wraps most of the Java API for the Kafka Producer and New Consumer The goal of this project is to stay very close to the Kafka API instead of adding more advanced features.

Example

Here's an example of at-least-once processing (using mount):

(ns gregor-sample-app.core
  (:gen-class)
  (:require [clojure.repl :as repl]
            [gregor.core :as gregor]
            [mount.core :as mount :refer [defstate]]))

(def run (atom true))

(defstate consumer
  :start (gregor/consumer "localhost:9092"
                          "testgroup"
                          ["test-topic"]
                          {"auto.offset.reset" "earliest"
                           "enable.auto.commit" "false"})
  :stop (gregor/close consumer))

(defstate producer
  :start (gregor/producer "localhost:9092")
  :stop (gregor/close producer))

(defn -main
  [& args]
  (mount/start)
  (repl/set-break-handler! (fn [sig] (reset! run false)))
  (while @run
    (let [consumer-records (gregor/poll consumer)
          values (process-records consumer-records)]
      (doseq [v values]
        (gregor/send producer "other-topic" v))
      (gregor/commit-offsets! consumer)))
  (mount/stop))

Transformations over consumer records are applied in process-records. Each record in the seq returned by poll is a map. Here's an example with a JSON object as the :value:

{:value "{\"foo\":42}"
 :key nil
 :partition 0
 :topic "test-topic"
 :offset 939}

Producing

Gregor provides the send function for asynchronously sending a record to a topic. There are multiple arities which correspond to those of the ProducerRecord Java constructor. If you'd like to provide a callback to be invoked when the send has been acknowledged use send-then instead.

Topic Management

Create a topic:

(create-topic {:connection-string "localhost:2181"} "some-topic" {})

That empty map can be used to specify configuration for number of topic partitions, replication factor,

Delete a topic:

(delete-topic {:connection-string "localhost:2181"} "some-topic")

Query about a topic's existence:

(topic-exists? {:connection-string "localhost:2181"} "some-topic")

List existing topics:

(topics {:connection-string "localhost:2181"})

License

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.

About

Lightweight Clojure bindings for Apache Kafka

Topics

Resources

License

Packages

No packages published