Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fr33m0nk]: adds the macro method and bumps the release version
Browse files Browse the repository at this point in the history
  • Loading branch information
Prashant Sinha committed Apr 28, 2021
1 parent 34d3cf0 commit 03b9689
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 58 deletions.
11 changes: 1 addition & 10 deletions .github/workflows/publish_clojars.yml
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
# This is a basic workflow to help you get started with Actions

name: Clojars Publish

# Controls when the action will run.
on:
workflow_dispatch:

jobs:

build:

publish:
runs-on: ubuntu-latest


steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@v2
- name: Publish
env:
Expand Down
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,19 @@ That said, it is tested with the following versions of the dependencies:

This library provides convenience in namespace `kafka-cluster-test-utility.core`
, which can be used in tests.
Following test is an example of this library in action.

#### Usage as a tool to speed up development
```clojure
(require '[kafka-cluster-test-utility.core :as core])

(core/with-embedded-kafka-cluster 3 ["test-1"]
(core/send-with-producer "test-topic" (.getBytes "hello dost"))
(core/with-consumer-read-one "test-topic" 2)
(comment (do something))
(comment (do omething more)))
```

#### Usage as test fixture
```clojure
(ns kafka-cluster-test-utility.core-test
(:require [clojure.test :refer :all]
Expand Down
12 changes: 6 additions & 6 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject net.clojars.fr33m0nk/kafka-cluster-test-utility "0.2.0"
(defproject net.clojars.fr33m0nk/kafka-cluster-test-utility "0.3.0"
:description "Embedded Kafka Cluster and Protobuf util"
:url "https://github.com/fr33m0nk/kafka-cluster-test-utility"
:license {:name "Eclipse Public License"
Expand All @@ -13,17 +13,17 @@
[org.clojure/clojure "1.10.1" :scope "provided"]
[org.apache.kafka/kafka_2.13 "2.8.0" :scope "provided"]
[org.apache.kafka/kafka-streams "2.8.0" :scope "provided"]
[org.apache.kafka/kafka-clients "2.8.0" :scope "provided"]]
[org.apache.kafka/kafka-clients "2.8.0" :scope "provided"]
[org.apache.kafka/kafka_2.13 "2.8.0" :classifier "test" :scope "provided"]
[org.apache.kafka/kafka-streams "2.8.0" :classifier "test" :scope "provided"]
[org.apache.kafka/kafka-clients "2.8.0" :classifier "test" :scope "provided"]]
:plugins [[lein-cloverage "1.0.13"]
[lein-shell "0.5.0"]
[lein-ancient "0.6.15"]
[lein-changelog "0.3.2"]]
:profiles {:uberjar {:aot :all}
:dev {:global-vars {*warn-on-reflection* true}
:dependencies [[org.tensorflow/proto "1.15.0"]
[org.apache.kafka/kafka_2.13 "2.8.0" :classifier "test" :scope "provided"]
[org.apache.kafka/kafka-streams "2.8.0" :classifier "test" :scope "provided"]
[org.apache.kafka/kafka-clients "2.8.0" :classifier "test" :scope "provided"]]}
:dependencies [[org.tensorflow/proto "1.15.0"]]}
:provided {:dependencies []}}
:deploy-repositories [["clojars" {:url "https://repo.clojars.org"
:username :env/clojars_user
Expand Down
24 changes: 18 additions & 6 deletions src/kafka_cluster_test_utility/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,26 @@
(defn with-embedded-kafka-cluster-and-topics
"Convenience method for returning a function which takes a function as parameter.
Returned function would start embedded Kafka cluster as well as create the provided topics.
This can be used in test fixtures."
This can be used as a test fixtures."
[number-of-brokers & topics]
(fn [f]
(try (let [kafka-cluster (cluster/start-cluster number-of-brokers)]
(topic/recreate-topics (:cluster kafka-cluster) topics)
(f))
(finally
(cluster/stop-cluster)))))
(try
(cluster/start-cluster number-of-brokers)
(topic/recreate-topics topics)
(f)
(finally
(cluster/stop-cluster)))))

(defmacro with-embedded-kafka-cluster
"Macro that wraps and executes body within after starting Kafka cluster creating topics"
[number-of-brokers topic-list & body]
{:pre [(sequential? topic-list)]}
`(try
(cluster/start-cluster ~number-of-brokers)
(topic/recreate-topics ~topic-list)
~@body
(finally
(cluster/stop-cluster))))

(defn send-with-producer [topic message]
"Produces the provided message to Kafka topic provided.
Expand Down
13 changes: 4 additions & 9 deletions src/kafka_cluster_test_utility/kafka_cluster.clj
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
(ns kafka-cluster-test-utility.kafka-cluster
(:require [kafka-cluster-test-utility.kafka-cluster-state :as s])
(:require
[kafka-cluster-test-utility.kafka-cluster-state :as s]
[kafka-cluster-test-utility.utility :as utils])
(:import (org.apache.kafka.streams.integration.utils EmbeddedKafkaCluster)))

(defmacro when-let*
([bindings & body]
(if (seq bindings)
`(when-let [~(first bindings) ~(second bindings)]
(when-let* ~(drop 2 bindings) ~@body))
`(do ~@body))))

(defn- call-method
[obj method-name & args]
(let [m (first (filter (fn [x] (.. x getName (equals (name method-name))))
Expand Down Expand Up @@ -54,7 +49,7 @@
(defn get-bootstrap-server
"Returns the bootstrap servers if embedded Kafka cluster is running else nil."
[]
(when-let* [running? (:running? @s/state)
(utils/when-let* [running? (:running? @s/state)
kafka-cluster (:cluster @s/state)]
(.bootstrapServers kafka-cluster)))

Expand Down
50 changes: 30 additions & 20 deletions src/kafka_cluster_test_utility/topic_management.clj
Original file line number Diff line number Diff line change
@@ -1,38 +1,48 @@
(ns kafka-cluster-test-utility.topic-management
(:require [kafka-cluster-test-utility.kafka-cluster-state :as s]
[kafka-cluster-test-utility.utility :as utils])
(:import (org.apache.kafka.streams.integration.utils EmbeddedKafkaCluster)))

(defn create-topics
"Creates topic(s) on supplied Kafka cluster"
[kafka-cluster ^EmbeddedKafkaCluster & topic-names]
(try (.createTopics kafka-cluster (into-array topic-names))
true
(catch Exception e
false)))
[& topic-names]
(utils/when-let* [running? (:running? @s/state)
kafka-cluster (:cluster @s/state)]
(try (.createTopics ^EmbeddedKafkaCluster kafka-cluster (into-array topic-names))
true
(catch Exception e
false))))

(defn delete-topics
"Deletes topic(s) on supplied Kafka cluster"
[kafka-cluster ^EmbeddedKafkaCluster & topic-names]
(try (.deleteTopicsAndWait kafka-cluster (into-array topic-names))
true
(catch Exception e
false)))
[& topic-names]
(utils/when-let* [running? (:running? @s/state)
kafka-cluster (:cluster @s/state)]
(try (.deleteTopicsAndWait ^EmbeddedKafkaCluster kafka-cluster (into-array topic-names))
true
(catch Exception e
false))))

(defn recreate-topic
"Recreates topic on supplied Kafka cluster"
[kafka-cluster ^EmbeddedKafkaCluster topic-name]
(try
(delete-topics kafka-cluster topic-name)
(finally
(create-topics kafka-cluster topic-name))))
[topic-name]
(utils/when-let* [running? (:running? @s/state)
kafka-cluster (:cluster @s/state)]
(try
(delete-topics topic-name)
(finally
(create-topics topic-name)))))

(defn recreate-topics
"Recreates topics on supplied Kafka cluster"
[kafka-cluster ^EmbeddedKafkaCluster topic-names]
[topic-names]
{:pre [(sequential? topic-names)]}
(try
(apply (partial delete-topics kafka-cluster) topic-names)
(finally
(apply (partial create-topics kafka-cluster) topic-names))))
(utils/when-let* [running? (:running? @s/state)
kafka-cluster (:cluster @s/state)]
(try
(apply delete-topics topic-names)
(finally
(apply create-topics topic-names)))))



7 changes: 7 additions & 0 deletions src/kafka_cluster_test_utility/utility.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
(org.apache.kafka.clients.consumer ConsumerConfig)
(java.util UUID)))

(defmacro when-let*
([bindings & body]
(if (seq bindings)
`(when-let [~(first bindings) ~(second bindings)]
(when-let* ~(drop 2 bindings) ~@body))
`(do ~@body))))

(def default-properties
{ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer"
ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer"
Expand Down
12 changes: 12 additions & 0 deletions test/kafka_cluster_test_utility/core_macro_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
(ns kafka-cluster-test-utility.core-macro-test
(:require [clojure.test :refer :all]
[kafka-cluster-test-utility.core :as core]))

(deftest with-embedded-kafka-cluster-test
(testing "should produce and consume message with in the with-embedded-kafka-cluster macro"
(core/with-embedded-kafka-cluster 3
["test-1"]
(core/send-with-producer "test-topic" (.getBytes "hello dost"))
(is (= "hello dost"
(->> (core/with-consumer-read-one "test-topic" 2)
String.))))))
10 changes: 8 additions & 2 deletions test/kafka_cluster_test_utility/core_test.clj
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
(ns kafka-cluster-test-utility.core-test
(:require [clojure.test :refer :all]
[kafka-cluster-test-utility.core :as core]
[kafka-cluster-test-utility.utility :as utility])
[kafka-cluster-test-utility.utility :as utility]
[clojure.string :as str])
(:import [org.tensorflow.util.testlog PlatformInfo]))

(use-fixtures :once (core/with-embedded-kafka-cluster-and-topics 3 "test-topic"))

(deftest with-embedded-kafka-cluster-test
(deftest with-embedded-kafka-cluster-and-topics-test
(testing "should produce and consume message from cluster"
(let [message {:release "4.19.0"}]
(core/send-with-producer "test-topic" (utility/clj-map->bytes PlatformInfo message))
Expand All @@ -24,3 +25,8 @@
(is (= [first-message second-message third-message]
(->> (core/with-consumer-read-multiple "test-topic" 2)
(map #(String. %))))))))

(deftest set-env-bootstrap-servers-test
(testing "should set the environment variable with values"
(core/set-env-bootstrap-servers "BOOTSRAPSERVERS")
(is (str/starts-with? (System/getenv "BOOTSRAPSERVERS") "localhost"))))
8 changes: 4 additions & 4 deletions test/kafka_cluster_test_utility/topic_management_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
(deftest create-topics-test
(try (let [kafka-cluster (cluster/start-cluster 3)]
(testing "should create topics in a kafka cluster"
(is (true? (tm/create-topics (:cluster kafka-cluster) "test-1" "test-2"))))
(is (true? (tm/create-topics "test-1" "test-2"))))
(testing "should delete topics in a kafka cluster"
(is (true? (tm/delete-topics (:cluster kafka-cluster) "test-1" "test-2"))))
(is (true? (tm/delete-topics "test-1" "test-2"))))
(testing "should recreate topic in a kafka cluster"
(is (true? (tm/recreate-topics (:cluster kafka-cluster) ["test-1" "test-2"]))))
(is (true? (tm/recreate-topics ["test-1" "test-2"]))))
(testing "should recreate topics in a kafka cluster"
(is (true? (tm/recreate-topic (:cluster kafka-cluster) "test-1")))))
(is (true? (tm/recreate-topic "test-1")))))
(finally (cluster/stop-cluster))))

0 comments on commit 03b9689

Please sign in to comment.