Skip to content

Commit

Permalink
Adds helper functions for working with transducers and an example.
Browse files Browse the repository at this point in the history
  • Loading branch information
Charles Reese committed Sep 23, 2019
1 parent 9b5bbd0 commit 3430581
Show file tree
Hide file tree
Showing 8 changed files with 387 additions and 0 deletions.
1 change: 1 addition & 0 deletions examples/xf-word-count/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
log/
3 changes: 3 additions & 0 deletions examples/xf-word-count/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# XF Word Count

This is the classic 'word count' example done using transducers.
28 changes: 28 additions & 0 deletions examples/xf-word-count/deps.edn
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{:paths
["src" "resources"]

:deps
{fundingcircle/jackdaw {:mvn/version "0.6.9-transducers-SNAPSHOT"
:exclusions [org.apache.zookeeper/zookeeper]}
org.clojure/clojure {:mvn/version "1.10.1"}
org.clojure/tools.logging {:mvn/version "0.4.1"}
org.apache.kafka/kafka-streams {:mvn/version "2.3.0"}
org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.3.0"}
ch.qos.logback/logback-classic {:mvn/version "1.2.3"}
integrant {:mvn/version "0.7.0"}}

:aliases
{:dev
{:extra-paths ["dev" "../../dev"]
:extra-deps {integrant/repl {:mvn/version "0.3.1"}
danlentz/clj-uuid {:mvn/version "0.1.7"
:exclusions [primitive-math]}}}

:test
{:extra-paths ["test"]
:extra-deps {com.cognitect/test-runner {:git/url "https://github.com/cognitect-labs/test-runner.git"
:sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}}
:main-opts ["-m" "cognitect.test-runner"]}}

:mvn/repos
{"confluent" {:url "https://packages.confluent.io/maven/"}}}
71 changes: 71 additions & 0 deletions examples/xf-word-count/dev/user.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
(ns user
"Use this namespace for interactive development.
This namespace requires libs needed to reset the app and helpers
from `jackdaw.repl`. WARNING: Do no use `clj-refactor` (or
equivalent) to clean this namespace since these tools cannot tell
which libs are actually required."
(:gen-class)
(:require [clojure.string :as str]
[integrant.core :as ig]
[integrant.repl :refer [clear go halt prep init reset reset-all]]
[jackdaw.admin :as ja]
[jackdaw.serdes :as js]
[jackdaw.repl :refer :all]
[jackdaw.streams :as j]
[jackdaw.streams.xform :as jxf]
[xf-word-count :as xfwc])
(:import org.apache.kafka.streams.kstream.ValueTransformer
[org.apache.kafka.streams.state KeyValueStore Stores]
org.apache.kafka.streams.StreamsBuilder))


(def repl-config
"The development config.
When the 'dev' alias is active, this config will be used."
{:topology {:topology-builder xfwc/topology-builder
:xform xfwc/xf
:swap-fn jxf/kv-store-swap-fn}

:topics {:streams-config xfwc/streams-config
:client-config (select-keys xfwc/streams-config
["bootstrap.servers"])
:topology (ig/ref :topology)}

:app {:streams-config xfwc/streams-config
:topology (ig/ref :topology)
:topics (ig/ref :topics)}})


(defmethod ig/init-key :topology [_ {:keys [topology-builder xform swap-fn]}]
(let [streams-builder (j/streams-builder)]
((topology-builder topic-metadata #(xform % swap-fn)) streams-builder)))

(defmethod ig/init-key :topics [_ {:keys [streams-config client-config topology]
:as opts}]
(let [topic-metadata (topology->topic-metadata topology streams-config)]
(with-open [client (ja/->AdminClient client-config)]
(ja/create-topics! client (vals topic-metadata)))
(assoc opts :topic-metadata topic-metadata)))

(defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}]
(let [streams-app (j/kafka-streams topology streams-config)]
(j/start streams-app)
(assoc opts :streams-app streams-app)))

(defmethod ig/halt-key! :topics [_ {:keys [client-config topic-metadata]}]
(let [re (re-pattern (str "(" (->> topic-metadata
keys
(map name)
(str/join "|"))
")"))]
(re-delete-topics client-config re)))

(defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}]
(j/close streams-app)
(destroy-state-stores streams-config)
(let [re (re-pattern (str "(" (get streams-config "application.id") ")"))]
(re-delete-topics (:client-config topics) re)))


(integrant.repl/set-prep! (constantly repl-config))
1 change: 1 addition & 0 deletions examples/xf-word-count/resources/logback.xml
144 changes: 144 additions & 0 deletions examples/xf-word-count/src/xf_word_count.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
(ns xf-word-count
"This is the classic 'word count' example done using transducers."
(:gen-class)
(:require [clojure.string :as str]
[clojure.tools.logging :refer [info]]
[jackdaw.serdes :as js]
[jackdaw.streams :as j]
[jackdaw.streams.xform :as jxf])
(:import org.apache.kafka.streams.kstream.ValueTransformer
[org.apache.kafka.streams.state KeyValueStore Stores]
org.apache.kafka.streams.StreamsBuilder))


(defn xf-running-total
[state swap-fn]
(fn [rf]
(fn
([] (rf))
([result] (rf result))
([result input]
(let [next (as-> input %
(swap-fn state #(merge-with (fnil + 0) %1 %2) %)
(select-keys % (keys input))
(map vec %))]
(rf result next))))))

(defn xf
[state swap-fn]
(comp
(map (fn [x] (str/split x #" ")))
(map frequencies)
(xf-running-total state swap-fn)))


(comment
;; Use this comment block to explore Word Count using Clojure
;; transducers.

;; Launch a Clojure REPL:
;; ```
;; cd <path-to-jackdaw>/examples/xf-word-count
;; clj -A:dev
;; ```

;; Emacs users: Open a project file, e.g. this one, and enter
;; `M-x cider-jack-in`.

;; Evaluate the form:
(def coll
["inside every large program"
"is a small program"
"struggling to get out"])

;; Let's counts the words. Evaluate the form:
(transduce (xf (atom {}) swap!) concat coll)

;; You should see output like the following:
;; (["inside" 1]
;; ["every" 1]
;; ["large" 1]
;; ["program" 1]
;; ["is" 1]
;; ["a" 1]
;; ["small" 1]
;; ["program" 2]
;; ["struggling" 1]
;; ["to" 1]
;; ["get" 1]
;; ["out" 1])

;; This time, let's count the words using
;; `jackdaw.streams.xform/fake-kv-store` which implements the
;; KeyValueStore interface with overrides for get and put."

;; Evaluate the form:
(transduce (xf (jxf/fake-kv-store {}) jxf/kv-store-swap-fn) concat coll)

;; You should see the same output.
)


(def streams-config
{"application.id" "xf-word-count"
"bootstrap.servers" (or (System/getenv "BOOTSTRAP_SERVERS") "localhost:9092")
"cache.max.bytes.buffering" "0"})

(defn topology-builder
[{:keys [input output] :as topics} xf]
(fn [builder]
(jxf/add-state-store! builder)
(-> (j/kstream builder input)
(jxf/transduce-kstream xf)
(j/to output))
builder))


(comment
;; Use this comment block to explore Word Count as a stream
;; processing application.

;; For more details on dynamic development, see the comment block in
;; <path-to-jackdaw>/examples/word-count/src/word_count.clj

;; Start ZooKeeper and Kafka:
;; ```
;; <path-to-directory>/bin/confluent local start kafka
;; ```

;; Launch a Clojure REPL:
;; ```
;; cd <path-to-jackdaw>/examples/xf-word-count
;; clj -A:dev
;; ```

;; Emacs users: Open a project file, e.g. this one, and enter
;; `M-x cider-jack-in`.

;; Evaluate the form:
(reset)

;; Evaluate the form:
(let [coll ["inside every large program"
"is a small program"
"struggling to get out"]]
(doseq [x coll]
(publish (:input topic-metadata) nil x)))

;; Evaluate the form:
(get-keyvals (:output topic-metadata))

;; You should see output like the following:
;; (["inside" 1]
;; ["every" 1]
;; ["large" 1]
;; ["program" 1]
;; ["is" 1]
;; ["a" 1]
;; ["small" 1]
;; ["program" 2]
;; ["struggling" 1]
;; ["to" 1]
;; ["get" 1]
;; ["out" 1])
)
76 changes: 76 additions & 0 deletions examples/xf-word-count/test/xf_word_count_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
(ns xf-word-count-test
(:gen-class)
(:require [clojure.test :refer [deftest is]]
[jackdaw.serdes :as js]
[jackdaw.streams :as j]
[jackdaw.streams.protocols :as jsp]
[jackdaw.test :as jt]
[jackdaw.test.fixtures :as jtf]
[xf-word-count :as xfwc])
(:import java.util.Properties
org.apache.kafka.streams.TopologyTestDriver))

(def test-config
{:broker-config {"bootstrap.servers" "localhost:9092"}
:topic-metadata {:input
{:topic-name "input"
:partition-count 1
:replication-factor 1
:key-serde (js/edn-serde)
:value-serde (js/edn-serde)}

:output
{:topic-name "output"
:partition-count 1
:replication-factor 1
:key-serde (js/edn-serde)
:value-serde (js/edn-serde)}}
:app-config xfwc/streams-config
:enable? (System/getenv "BOOTSTRAP_SERVERS")})

(defn topology-builder
[topic-metadata]
(xfwc/topology-builder topic-metadata #(xfwc/xf % xfwc/kv-store-swap-fn)))

(defn props-for
[x]
(doto (Properties.)
(.putAll (reduce-kv (fn [m k v]
(assoc m (str k) (str v)))
{}
x))))

(def mock-transport-config
{:driver (let [streams-builder (j/streams-builder)
topology ((topology-builder (:topic-metadata test-config)) streams-builder)]
(TopologyTestDriver. (.build (j/streams-builder* topology))
(props-for (:app-config test-config))))})

(def test-transport
(jt/mock-transport mock-transport-config (:topic-metadata test-config)))

(defn done?
[journal]
(= 12 (count (get-in journal [:topics :output]))))

(def commands
[[:write! :input "inside every large program" {:key-fn (constantly "")}]
[:write! :input "is a small program" {:key-fn (constantly "")}]
[:write! :input "struggling to get out" {:key-fn (constantly "")}]
[:watch done? {:timeout 2000}]])

(defn word-count
[journal word]
(->> (get-in journal [:topics :output])
(filter (fn [x] (= word (:key x))))
last
:value))

(deftest test-xf-word-count
(jtf/with-fixtures [(jtf/integration-fixture topology-builder test-config)]
(jackdaw.test/with-test-machine test-transport
(fn [machine]
(let [{:keys [results journal]} (jackdaw.test/run-test machine commands)]

(is (= 1 (word-count journal "large")))
(is (= 2 (word-count journal "program"))))))))
63 changes: 63 additions & 0 deletions src/jackdaw/streams/xform.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
(ns jackdaw.streams.xform
"Helper functions for working with transducers."
(:gen-class)
(:require [jackdaw.serdes :as js]
[jackdaw.streams :as j])
(:import org.apache.kafka.streams.kstream.ValueTransformer
[org.apache.kafka.streams.state KeyValueStore Stores]
org.apache.kafka.streams.StreamsBuilder))

(defn fake-kv-store
"Creates an instance of org.apache.kafka.streams.state.KeyValueStore
with overrides for get and put."
[init]
(let [store (volatile! init)]
(reify KeyValueStore
(get [_ k]
(clojure.core/get @store k))

(put [_ k v]
(vswap! store assoc k v)))))

(defn kv-store-swap-fn
"Takes an instance of KeyValueStore, a function f, and map m, and
updates the store in a manner similar to `clojure.core/swap!`."
[^KeyValueStore store f m]
(let [prev (reduce (fn [m k]
(assoc m k (.get store k)))
{}
(keys m))
next (f prev m)]
(doall (map (fn [[k v]] (.put store k v)) next))
next))

(defn value-transformer
"Creates an instance of org.apache.kafka.streams.kstream.ValueTransformer
with overrides for init, transform, and close."
[xf]
(let [ctx (atom nil)]
(reify
ValueTransformer
(init [_ context]
(reset! ctx context))
(transform [_ v]
(let [^KeyValueStore store (.getStateStore @ctx "transducer")]
(first (into [] (xf store) [v]))))
(close [_]))))

(defn transduce-kstream
[kstream xf]
"Takes a kstream and xf and transduces the stream."
(-> kstream
(j/transform-values (fn [] (value-transformer xf)) ["transducer"])
(j/flat-map (fn [[_ v]] v))))

(defn add-state-store!
[builder]
"Takes a builder and adds a state store."
(doto ^StreamsBuilder (j/streams-builder* builder)
(.addStateStore (Stores/keyValueStoreBuilder
(Stores/persistentKeyValueStore "transducer")
(js/edn-serde)
(js/edn-serde))))
builder)

0 comments on commit 3430581

Please sign in to comment.