Skip to content

Commit

Permalink
Modify Simple Ledger save balances after rekey
Browse files Browse the repository at this point in the history
  • Loading branch information
Charles Reese committed Sep 25, 2019
1 parent e7e7a21 commit de98739
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 81 deletions.
67 changes: 43 additions & 24 deletions examples/simple-ledger/dev/user.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,68 @@
which libs are actually required."
(:gen-class)
(:require [clojure.string :as str]
[clojure.tools.logging :refer [info]]
[integrant.core :as ig]
[integrant.repl :refer [clear go halt prep init reset reset-all]]
[jackdaw.admin :as ja]
[jackdaw.serdes :as js]
[jackdaw.repl :refer :all]
[jackdaw.streams :as j]
[jackdaw.streams.xform :as jxf]
simple-ledger)
(:import org.apache.kafka.streams.kstream.ValueTransformer
[org.apache.kafka.streams.state KeyValueStore Stores]
org.apache.kafka.streams.StreamsBuilder))
[simple-ledger :as sl]))


(def repl-config
"The development config.
When the 'dev' alias is active, this config will be used."
{:topology {:topology-builder simple-ledger/topology-builder
:xform simple-ledger/xf
:swap-fn jxf/kv-store-swap-fn}
{:topics {:client-config (select-keys sl/streams-config ["bootstrap.servers"])
:topic-metadata {:entry-added
{:topic-name "entry-requested"
:partition-count 15
:replication-factor 1
:key-serde (js/edn-serde)
:value-serde (js/edn-serde)}

:transaction-pending
{:topic-name "transaction-pending"
:partition-count 15
:replication-factor 1
:key-serde (js/edn-serde)
:value-serde (js/edn-serde)}

:topics {:streams-config simple-ledger/streams-config
:client-config (select-keys simple-ledger/streams-config
["bootstrap.servers"])
:topology (ig/ref :topology)}
:transaction-added
{:topic-name "transaction-added"
:partition-count 15
:replication-factor 1
:key-serde (js/edn-serde)
:value-serde (js/edn-serde)}}}

:app {:streams-config simple-ledger/streams-config
:topology {:topology-builder sl/topology-builder
:xforms [#'sl/xf-split-entries #'sl/xf-running-balances]
:swap-fn jxf/kv-store-swap-fn}

:app {:streams-config sl/streams-config
:topology (ig/ref :topology)
:topics (ig/ref :topics)}})


(defmethod ig/init-key :topology [_ {:keys [topology-builder xform swap-fn]}]
(let [streams-builder (j/streams-builder)]
((topology-builder topic-metadata #(xform % swap-fn)) streams-builder)))
(integrant.repl/set-prep! (constantly repl-config))


(defmethod ig/init-key :topics [_ {:keys [client-config topic-metadata] :as opts}]
(with-open [client (ja/->AdminClient client-config)]
(ja/create-topics! client (vals topic-metadata)))
(assoc opts :topic-metadata topic-metadata))

(defmethod ig/init-key :topics [_ {:keys [streams-config client-config topology]
:as opts}]
(let [topic-metadata (topology->topic-metadata topology streams-config)]
(with-open [client (ja/->AdminClient client-config)]
(ja/create-topics! client (vals topic-metadata)))
(assoc opts :topic-metadata topic-metadata)))
(defmethod ig/init-key :topology [_ {:keys [topology-builder xforms swap-fn]}]
(let [xform-map (reduce-kv (fn [m k v]
(let [k (keyword (str (:ns (meta v)))
(str (:name (meta v))))]
(assoc m k #(v % jxf/kv-store-swap-fn))))
{}
xforms)
streams-builder (j/streams-builder)]
((topology-builder topic-metadata xform-map) streams-builder)))

(defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}]
(let [streams-app (j/kafka-streams topology streams-config)]
Expand All @@ -66,6 +88,3 @@
(destroy-state-stores streams-config)
(let [re (re-pattern (str "(" (get streams-config "application.id") ")"))]
(re-delete-topics (:client-config topics) re)))


(integrant.repl/set-prep! (constantly repl-config))
144 changes: 94 additions & 50 deletions examples/simple-ledger/src/simple_ledger.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@
[clojure.tools.logging :refer [info]]
[jackdaw.serdes :as js]
[jackdaw.streams :as j]
[jackdaw.streams.xform :as jxf])
(:import org.apache.kafka.streams.kstream.ValueTransformer
[org.apache.kafka.streams.state KeyValueStore Stores]
org.apache.kafka.streams.StreamsBuilder))
[jackdaw.streams.xform :as jxf]))


(defn xf-split-entries
[_ _]
(map (fn [[k {:keys [debit-account credit-account amount] :as entry}]]
[[debit-account
{:account-name debit-account
:debit-credit-indicator :dr
:amount amount}]
[credit-account
{:account-name credit-account
:debit-credit-indicator :cr
:amount amount}]])))

(defn next-balances
[starting-balances {:keys [debit-account
credit-account
amount]
:as entry}]
(-> starting-balances
(update debit-account (fnil - 0) amount)
(update credit-account (fnil + 0) amount)))
[starting-balances {:keys [account-name debit-credit-indicator amount]
:as transaction}]
(let [op (if (= :dr debit-credit-indicator) - +)]
(update starting-balances account-name (fnil op 0) amount)))

(defn xf-running-balances
[state swap-fn]
Expand All @@ -27,38 +33,28 @@
([] (rf))
([result] (rf result))
([result input]
(let [{:keys [debit-account credit-account] :as entry} input
next (as-> input %
(let [[k v] input
{:keys [account-name debit-credit-indicator amount] :as entry} v
next (as-> entry %
(swap-fn state next-balances %)
(select-keys % [debit-account credit-account])
(hash-map :entry entry :current-balances %))]
(select-keys % [account-name])
((juxt (comp first keys) (comp first vals)) %)
(zipmap [:account-name :current-balance] %)
(assoc % :starting-balance (if (= :dr debit-credit-indicator)
(+ amount (:current-balance %))
(- amount (:current-balance %))))
(vector k %)
(vector %))]
(rf result next))))))

(defn balances->transactions
[{{:keys [debit-account amount]} :entry
current-balances :current-balances}]
(map (fn [[k v]]
[k {:account-name k
:starting-balance (if (= debit-account k)
(+ amount v)
(- amount v))
:current-balance v}])
current-balances))

(defn xf
[state swap-fn]
(comp
(xf-running-balances state swap-fn)
(map balances->transactions)))


(comment
;; Use this comment block to explore the Simple Ledger using Clojure
;; transducers.

;; Launch a Clojure REPL:
;; ```
;; cd <path-to-jackdaw>/examples/xf-word-count
;; cd <path-to-jackdaw>/examples/simple-ledger
;; clj -A:dev
;; ```

Expand All @@ -67,15 +63,17 @@

;; Evaluate the form:
(def coll
[{:debit-account "tech"
:credit-account "cash"
:amount 1000}
{:debit-account "cash"
:credit-account "sales"
:amount 2000}])
[[nil {:debit-account "tech"
:credit-account "cash"
:amount 1000}]
[nil {:debit-account "cash"
:credit-account "sales"
:amount 2000}]])

;; Let's record the entries. Evaluate the form:
(transduce (xf (atom {}) swap!) concat coll)
(->> coll
(transduce (xf-split-entries nil nil) concat)
(transduce (xf-running-balances (atom {}) swap!) concat))

;; You should see output like the following:

Expand All @@ -102,7 +100,10 @@
;; KeyValueStore interface with overrides for get and put."

;; Evaluate the form:
(transduce (xf (jxf/fake-kv-store {}) jxf/kv-store-swap-fn) concat coll)
(->> coll
(transduce (xf-split-entries nil nil) concat)
(transduce (xf-running-balances (jxf/fake-kv-store {})
jxf/kv-store-swap-fn) concat))

;; You should see the same output.
)
Expand All @@ -114,11 +115,13 @@
"cache.max.bytes.buffering" "0"})

(defn topology-builder
[{:keys [entry-requested transaction-added] :as topics} xf]
[{:keys [entry-requested transaction-pending transaction-added] :as topics} xforms]
(fn [builder]
(jxf/add-state-store! builder)
(-> (j/kstream builder entry-requested)
(jxf/transduce-kstream xf)
(jxf/transduce-kstream (::xf-split-entries xforms))
(j/through transaction-pending)
(jxf/transduce-kstream (::xf-running-balances xforms))
(j/to transaction-added))
builder))

Expand All @@ -128,7 +131,7 @@
;; processing application.

;; For more details on dynamic development, see the comment block in
;; <path-to-jackdaw>/examples/simple-ledger/src/simple_ledger.clj
;; <path-to-jackdaw>/examples/word-count/src/word_count.clj

;; Start ZooKeeper and Kafka:
;; ```
Expand Down Expand Up @@ -160,9 +163,14 @@
;; Evaluate the form:
(get-keyvals (:transaction-added topic-metadata))

;; You should see output like the following:
;; You should see output like the following. Notice transaction
;; order is not preserved:

;; (["tech"
;; (["sales"
;; {:account-name "sales"
;; :starting-balance 0
;; :current-balance 2000}]
;; ["tech"
;; {:account-name "tech"
;; :starting-balance 0
;; :current-balance -1000}]
Expand All @@ -172,10 +180,46 @@
;; :current-balance 1000}]
;; ["cash"
;; {:account-name "cash"
;; :starting-balance 1000,
;; :current-balance -1000}]
;; ["sales"
;; :starting-balance 1000
;; :current-balance -1000}])


;; The `transaction-added` topic has 15 partitions. Let's see how
;; the records distributed. Evaluate the form:
(->> (get-records (:transaction-added topic-metadata))
(map (fn [x]
(select-keys x [:key :offset :partition :value]))))

;; You should see output like the following. The balances are spread
;; across partitions 0, 11, and 14. Transaction order is preserved
;; only for each account. There is no global order.

;; ({:key "sales"
;; :offset 0
;; :partition 0
;; :value
;; {:account-name "sales"
;; :starting-balance 0
;; :current-balance 2000}])
;; :current-balance 2000}}
;; {:key "tech"
;; :offset 0
;; :partition 11
;; :value
;; {:account-name "tech"
;; :starting-balance 0
;; :current-balance -1000}}
;; {:key "cash"
;; :offset 0
;; :partition 14
;; :value
;; {:account-name "cash"
;; :starting-balance 0
;; :current-balance 1000}}
;; {:key "cash"
;; :offset 1
;; :partition 14
;; :value
;; {:account-name "cash"
;; :starting-balance 1000
;; :current-balance -1000}})
)
7 changes: 3 additions & 4 deletions examples/word-count/dev/user.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
:topics (ig/ref :topics)}})


(integrant.repl/set-prep! (constantly repl-config))


(defmethod ig/init-key :topology [_ {:keys [topology-builder]}]
(let [streams-builder (j/streams-builder)]
((topology-builder topic-metadata) streams-builder)))
Expand All @@ -49,10 +52,6 @@

(defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}]
(j/close streams-app)
;; BUG: Does not delete state on reset!
(destroy-state-stores streams-config)
(let [re (re-pattern (str "(" (get streams-config "application.id") ")"))]
(re-delete-topics (:client-config topics) re)))


(integrant.repl/set-prep! (constantly repl-config))
6 changes: 3 additions & 3 deletions examples/xf-word-count/dev/user.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
[xf-word-count :as xfwc]))


(integrant.repl/set-prep! (constantly repl-config))


(def repl-config
"The development config.
When the 'dev' alias is active, this config will be used."
Expand Down Expand Up @@ -64,6 +67,3 @@
(destroy-state-stores streams-config)
(let [re (re-pattern (str "(" (get streams-config "application.id") ")"))]
(re-delete-topics (:client-config topics) re)))


(integrant.repl/set-prep! (constantly repl-config))

0 comments on commit de98739

Please sign in to comment.