Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transducers are Coming #200

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/pipe/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
log/
95 changes: 1 addition & 94 deletions examples/pipe/README.md
Original file line number Diff line number Diff line change
@@ -1,96 +1,3 @@
# Pipe

This tutorial contains a simple stream processing application using Jackdaw and Kafka Streams.

## Setting up

Before starting, it is recommended to install the Confluent Platform CLI which can be obtained from [https://www.confluent.io/download/](https://www.confluent.io/download/).

To install Clojure: [https://clojure.org/guides/getting_started](https://clojure.org/guides/getting_started).

## Project structure

The project structure looks like this:
```
$ tree pipe
pipe
├── README.md
├── deps.edn
├── dev
│   └── system.clj
├── src
│   └── pipe.clj
└── test
└── pipe_test.clj
```

The `deps.edn` file describes the project's dependencies and source paths.

The `system.clj` file contains functions to start, stop, and reset the app. These are required by the `user` namespace for interactive development and should not be invoked directly.

The `pipe.clj` file describes the app and topology. Pipe reads from a Kafka topic called "input", logs the key and value, and writes to a Kafka topic called "output":
```
(defn build-topology
[builder]
(-> (j/kstream builder (topic-config "input"))
(j/peek (fn [[k v]]
(info (str {:key k :value v}))))
(j/to (topic-config "output")))
builder)
```

The `pipe_test.clj` file contains a test.

## Running the app

Let's get started! Fire up a Clojure REPL and load the `pipe` namespace. Then, start ZooKeeper and Kafka. If these services are already running, you may skip this step:
```
user> (confluent/start)
INFO zookeeper is up (confluent:288)
INFO kafka is up (confluent:288)
nil
```

Now, start the app.
```
user> (start)
INFO topic 'input' is created (jackdaw.admin.client:288)
INFO topic 'output' is created (jackdaw.admin.client:288)
INFO pipe is up (pipe:288)
{:app #object[org.apache.kafka.streams.KafkaStreams 0x225dcbb9 "org.apache.kafka.streams.KafkaStreams@225dcbb9"]}
```

The `user/start` function creates two Kafka topics needed by Pipe and starts it.

For the full list of topics, type:
```
user> (get-topics)
#{"output" "__confluent.support.metrics" "input"}
```

With the app running, place a new record on the input stream:
```
user> (publish (topic-config "input") nil "this is a pipe")
INFO {:key nil, :value "this is a pipe"} (pipe:288)
nil
```
Pipe logs the key and value to the standard output.

To read from the output stream:
```
user> (get-keyvals (topic-config "output"))
((nil "this is a pipe"))
```

This concludes this tutorial.

## Interactive development

For interactive development, reload the file and invoke `user/reset`. These stops the app, deletes topics and internal state using a regex, and recreates the topics and restarts the app. The details are in the `system` namespace.

## Running tests

To run tests, load the `pipe-test` namespace and invoke a test runner using your editor, or from the command line:
```
clj -Atest
```
This example creates a simple stream processing application using transducers.
40 changes: 25 additions & 15 deletions examples/pipe/deps.edn
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
{:deps
{fundingcircle/jackdaw {:mvn/version "0.6.4"}
org.apache.kafka/kafka-streams {:mvn/version "2.1.0"}
org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.1.0"}
org.clojure/clojure {:mvn/version "1.10.0"}
org.clojure/tools.logging {:mvn/version "0.4.1"}}
{:paths
["src" "resources"]

:mvn/repos
{"confluent" {:url "https://packages.confluent.io/maven/"}}

:paths
["src" "test" "dev" "../dev"]
:deps
{fundingcircle/jackdaw {:mvn/version "0.6.9-transducers-SNAPSHOT"
:exclusions [org.apache.zookeeper/zookeeper]}
org.clojure/clojure {:mvn/version "1.10.1"}
org.clojure/tools.logging {:mvn/version "0.4.1"}
org.apache.kafka/kafka-streams {:mvn/version "2.3.0"}
org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.3.0"}
ch.qos.logback/logback-classic {:mvn/version "1.2.3"}
integrant {:mvn/version "0.7.0"}}

:aliases
{:test {:extra-deps {com.cognitect/test-runner
{:git/url "https://github.com/cognitect-labs/test-runner.git"
:sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}}
:main-opts ["-m" "cognitect.test-runner"]}}}
{:dev
{:extra-paths ["dev" "../../dev"]
:extra-deps {integrant/repl {:mvn/version "0.3.1"}
danlentz/clj-uuid {:mvn/version "0.1.7"
:exclusions [primitive-math]}}}

:test
{:extra-paths ["test"]
:extra-deps {com.cognitect/test-runner {:git/url "https://github.com/cognitect-labs/test-runner.git"
:sha "028a6d41ac9ac5d5c405dfc38e4da6b4cc1255d5"}}
:main-opts ["-m" "cognitect.test-runner"]}}

:mvn/repos
{"confluent" {:url "https://packages.confluent.io/maven/"}}}
49 changes: 0 additions & 49 deletions examples/pipe/dev/system.clj

This file was deleted.

83 changes: 83 additions & 0 deletions examples/pipe/dev/user.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
(ns user
"Use this namespace for interactive development.

This namespace requires libs needed to reset the app and helpers
from `jackdaw.repl`. WARNING: Do no use `clj-refactor` (or
equivalent) to clean this namespace since these tools cannot tell
which libs are actually required."
(:gen-class)
(:require [clojure.string :as str]
[clojure.tools.logging :refer [info]]
[integrant.core :as ig]
[integrant.repl :refer [clear go halt prep init reset reset-all]]
[jackdaw.admin :as ja]
[jackdaw.serdes :as js]
[jackdaw.repl :refer :all]
[jackdaw.streams :as j]
[jackdaw.streams.xform :as jxf]
[pipe]))


(def repl-config
"The development config.
When the 'dev' alias is active, this config will be used."
{:topics {:client-config (select-keys pipe/streams-config ["bootstrap.servers"])
:topic-metadata {:input
{:topic-name "input"
:partition-count 15
:replication-factor 1
:key-serde (js/edn-serde)
:value-serde (js/edn-serde)}

:output
{:topic-name "output"
:partition-count 15
:replication-factor 1
:key-serde (js/edn-serde)
:value-serde (js/edn-serde)}}}

:topology {:topology-builder pipe/topology-builder
:xforms [#'pipe/xf]
:swap-fn jxf/kv-store-swap-fn}

:app {:streams-config pipe/streams-config
:topology (ig/ref :topology)
:topics (ig/ref :topics)}})


(integrant.repl/set-prep! (constantly repl-config))


(defmethod ig/init-key :topics [_ {:keys [client-config topic-metadata] :as opts}]
(with-open [client (ja/->AdminClient client-config)]
(ja/create-topics! client (vals topic-metadata)))
(assoc opts :topic-metadata topic-metadata))

(defmethod ig/init-key :topology [_ {:keys [topology-builder xforms swap-fn]}]
(let [xform-map (reduce-kv (fn [m k v]
(let [k (keyword (str (:ns (meta v)))
(str (:name (meta v))))]
(assoc m k #(v % jxf/kv-store-swap-fn))))
{}
xforms)
streams-builder (j/streams-builder)]
((topology-builder topic-metadata xform-map) streams-builder)))

(defmethod ig/init-key :app [_ {:keys [streams-config topology] :as opts}]
(let [streams-app (j/kafka-streams topology streams-config)]
(j/start streams-app)
(assoc opts :streams-app streams-app)))

(defmethod ig/halt-key! :topics [_ {:keys [client-config topic-metadata]}]
(let [re (re-pattern (str "(" (->> topic-metadata
keys
(map name)
(str/join "|"))
")"))]
(re-delete-topics client-config re)))

(defmethod ig/halt-key! :app [_ {:keys [streams-config topics streams-app]}]
(j/close streams-app)
(destroy-state-stores streams-config)
(let [re (re-pattern (str "(" (get streams-config "application.id") ")"))]
(re-delete-topics (:client-config topics) re)))
1 change: 1 addition & 0 deletions examples/pipe/resources/logback.xml