Skip to content

Commit

Permalink
Merge pull request #79 from mjayprateek/ziggurat-java
Browse files Browse the repository at this point in the history
Ziggurat - Java
  • Loading branch information
theanirudhvyas committed Aug 20, 2019
2 parents b251300 + 6003da8 commit e3b55ac
Show file tree
Hide file tree
Showing 15 changed files with 453 additions and 23 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/target
/classes
/checkouts
pom.xml
/coverage
pom.xml.asc
pom.xml
*.jar
*.class
/.lein-*
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ All notable changes to this project will be documented in this file. This change

## Unreleased Changes

## 3.0.0-alpha.5 - 2019-08-20
- Exposes Java methods for init, config, producer, sentry, metrics, fixtures namespaces
- Adds util methods for converting data types between java and Clojure

## 3.0.0-alpha.4 - 2019-07-29
- Remove old metrics from being sent

Expand Down
7 changes: 4 additions & 3 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject tech.gojek/ziggurat "3.0.0-alpha.4"
(defproject tech.gojek/ziggurat "3.0.0-alpha.5"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down Expand Up @@ -39,8 +39,9 @@
:password :env/clojars_password
:sign-releases false}]]
:java-source-paths ["src/com"]
:aliases {"test-all" ["with-profile" "default:+1.8:+1.9" "test"]
"code-coverage" ["with-profile" "test" "cloverage" "--output" "coverage" "--coveralls"]}
:aliases {"test-all" ["with-profile" "default:+1.8:+1.9" "test"]
"code-coverage" ["with-profile" "test" "cloverage" "--output" "coverage" "--coveralls"]}
:aot [ziggurat.init ziggurat.config ziggurat.producer ziggurat.sentry ziggurat.metrics ziggurat.fixtures]
:profiles {:uberjar {:aot :all
:global-vars {*warn-on-reflection* true}}
:test {:jvm-opts ["-Dlog4j.configurationFile=resources/log4j2.test.xml"]
Expand Down
1 change: 1 addition & 0 deletions resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}

24 changes: 23 additions & 1 deletion src/ziggurat/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
(:require [clojure.edn :as edn]
[clojure.java.io :as io]
[clonfig.core :as clonfig]
[mount.core :refer [defstate]]))
[mount.core :refer [defstate]]
[ziggurat.util.java-util :as util])
(:gen-class
:name tech.gojek.ziggurat.internal.Config
:methods [^{:static true} [get [String] Object]
^{:static true} [getIn [java.lang.Iterable] Object]]))

(def config-file "config.edn")

Expand Down Expand Up @@ -76,3 +81,20 @@
(defn channel-retry-config [topic-entity channel]
(get-in (ziggurat-config) [:stream-router topic-entity :channels channel :retry]))

(defn- java-response
"When returning config from -get or -getIn, we can either return a Map or string (based on the key/keys passed).
Since we do not want to pass a ClojureMap to a Java application, we check whether the config-vals (config to be returned)
is a string or a PersistentHashMap. If it is a PersistentHashMap, we convert it to a Java Map and then return it."
[config-vals]
(if (map? config-vals)
(util/clojure->java-map config-vals)
config-vals))

(defn -getIn [^java.lang.Iterable keys]
(let [config-vals (get-in config (util/list-of-keywords keys))]
(java-response config-vals)))

(defn -get [^String key]
(let [config-vals (get config (keyword key))]
(java-response config-vals)))

9 changes: 8 additions & 1 deletion src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
[ziggurat.producer :as producer :refer [kafka-producers]]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.server :as server]
[ziggurat.streams :as streams]))
[ziggurat.streams :as streams]
[ziggurat.util.java-util :as util])
(:gen-class
:name tech.gojek.ziggurat.internal.Init
:methods [^{:static true} [init [java.util.Map] void]]))

(defstate statsd-reporter
:start (metrics/start-statsd-reporter (:datadog (ziggurat-config))
Expand Down Expand Up @@ -187,3 +191,6 @@
(log/error e)
(stop stop-fn modes)
(System/exit 1)))))

(defn -init [args]
(main (util/java-map->clojure-map args)))
31 changes: 29 additions & 2 deletions src/ziggurat/metrics.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@
(:require [clojure.string :as str]
[clojure.tools.logging :as log]
[clojure.walk :refer [stringify-keys]]
[ziggurat.config :refer [ziggurat-config]])
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.util.java-util :as util])
(:import com.gojek.metrics.datadog.DatadogReporter
[com.gojek.metrics.datadog.transport UdpTransport UdpTransport$Builder]
[io.dropwizard.metrics5 Histogram Meter MetricName MetricRegistry]
java.util.concurrent.TimeUnit))
java.util.concurrent.TimeUnit)
(:gen-class
:name tech.gojek.ziggurat.internal.Metrics
:methods [^{:static true} [incrementCount [String String] void]
^{:static true} [incrementCount [String String java.util.Map] void]
^{:static true} [decrementCount [String String] void]
^{:static true} [decrementCount [String String java.util.Map] void]
^{:static true} [reportTime [String long] void]
^{:static true} [reportTime [String long java.util.Map] void]]))

(defonce metrics-registry
(MetricRegistry.))
Expand Down Expand Up @@ -91,3 +100,21 @@
(.stop ^DatadogReporter reporter)
(.close ^UdpTransport transport)
(log/info "Stopped statsd reporter")))

(defn -incrementCount
([metric-namespace metric]
(increment-count metric-namespace metric))
([metric-namespace metric additional-tags]
(increment-count metric-namespace metric (util/java-map->clojure-map additional-tags))))

(defn -decrementCount
([metric-namespace metric]
(decrement-count metric-namespace metric))
([metric-namespace metric additional-tags]
(decrement-count metric-namespace metric (util/java-map->clojure-map additional-tags))))

(defn -reportTime
([metric-namespace time-val]
(report-time metric-namespace time-val))
([metric-namespace time-val additional-tags]
(report-time metric-namespace time-val (util/java-map->clojure-map additional-tags))))
20 changes: 16 additions & 4 deletions src/ziggurat/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,19 @@
- max.in.flight.requests.per.connection
- enable.idempotencecd
Please see [Producer configs](http://kafka.apache.org/documentation.html#producerconfigs)
Please see [producer configs](http://kafka.apache.org/documentation.html#producerconfigs)
for a complete list of all producer configs available in Kafka."

(:require [ziggurat.config :refer [ziggurat-config]]
[clojure.tools.logging :as log]
[mount.core :refer [defstate]])
(:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig)
(java.util Properties)))
[mount.core :refer [defstate]]
[ziggurat.util.java-util :refer [get-key]])
(:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig)
(java.util Properties))
(:gen-class
:name tech.gojek.ziggurat.internal.Producer
:methods [^{:static true} [send [String String Object Object] java.util.concurrent.Future]
^{:static true} [send [String String int Object Object] java.util.concurrent.Future]]))

(defn- producer-properties-from-config [{:keys [bootstrap-servers
acks
Expand Down Expand Up @@ -121,3 +126,10 @@
(let [error-msg (str "Can't publish data. No producers defined for stream config [" stream-config-key "]")]
(log/error error-msg)
(throw (ex-info error-msg {:stream-config-key stream-config-key}))))))

(defn -send
([stream-config-key topic key value]
(send (get-key stream-config-key) topic key value))

([stream-config-key topic partition key value]
(send (get-key stream-config-key) topic partition key value)))
8 changes: 7 additions & 1 deletion src/ziggurat/sentry.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
(:require [clojure.tools.logging :as log]
[mount.core :refer [defstate]]
[sentry-clj.async :as sentry]
[ziggurat.config :refer [ziggurat-config]]))
[ziggurat.config :refer [ziggurat-config]])
(:gen-class
:name tech.gojek.ziggurat.internal.Sentry
:methods [^{:static true} [reportError [Throwable String] java.util.concurrent.Future]]))

(defn create-sentry-reporter []
(let [sentry-config (merge (:sentry (ziggurat-config))
Expand All @@ -25,3 +28,6 @@
still want to report to Sentry."
[^Throwable error & msgs]
`(sentry/report-error sentry-reporter ~error ~@msgs))

(defn -reportError [^Throwable error msg]
(report-error error msg))
84 changes: 84 additions & 0 deletions src/ziggurat/util/java_util.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
(ns ziggurat.util.java-util
(:require [clojure.string :as str]
[clojure.walk :refer [stringify-keys postwalk]])
(:import (java.util Map Arrays)))

(declare java-map->clojure-map)
(declare create-clojure-vector)
(declare create-clojure-vector-from-array)

(defn- is-java-array?
[array]
(str/starts-with? (.getName (.getClass array)) "["))

(defn java-list->clojure-vector
[^java.lang.Iterable java-list]
(let [cloj-seq (seq java-list)]
(vec (map (fn [x]
(cond
(instance? java.util.Map x) (java-map->clojure-map x)
(instance? java.lang.Iterable x) (java-list->clojure-vector x)
:else x))
cloj-seq))))

(defn java-array->clojure-vector
[java-array]
(java-list->clojure-vector (Arrays/asList java-array)))

(defn get-key
[key]
(if (str/starts-with? key ":")
(keyword (subs key 1))
key))

(defn list-of-keywords
[keys]
(map keyword (seq keys)))

(defn output-transformer-fn
"This function returns a function which transforms the output of the provided
function `func`, if the output is an instance of `java.util.Map`."
[func]
(fn [& args]
(let [result (apply func args)]
(if (instance? Map result)
(java-map->clojure-map result)
result))))

(defn java-map->clojure-map
"A util method for converting a Java HashMap (Map) to a clojure hash-map.
This but can be used to convert any Java HashMap where following
are required:
1) An inner value of type `java.util.Map` need to be
converted to Clojure's hash-map (or `clojure.lang.APersistentMap`).
2) An inner value which either is a Java Array or extends
`java.lang.Iterable` (e.g. ArraLists, HashSets) need to
be converted to Clojure sequences.
3) An inner value which is a Clojure function but returns a java.util.Map
as output. This method will wrap the function using `output-transformer-fn`
to return the output as a clojure map.
It supports conversion of Java Maps to Clojure's `APersistent` type;
and Java Lists and Arrays or anything which extends `java.lang.Iterable`
to clojure's PersistentVector type."
[^Map java-map]
(reduce
(fn [map entry]
(let [key (.getKey entry)
value (.getValue entry)]
(cond (instance? Map value) (assoc map (get-key key) (java-map->clojure-map value))
(instance? java.lang.Iterable value) (assoc map (get-key key) (java-list->clojure-vector value))
(instance? clojure.lang.IFn value) (assoc map (get-key key) (output-transformer-fn value))
(is-java-array? value) (assoc map (get-key key) (java-array->clojure-vector value))
:else (assoc map (get-key key) value))))
(hash-map)
(.toArray (.entrySet java-map))))

(defn clojure->java-map [clojure-map]
"A util method to convert nested clojure.lang.PersistentArrayMap (or clojure.lang.PersistentHashMap to java.util.HashMap. It only converts a map and does not affect lists and vectors."
(let [string-keys-map (stringify-keys clojure-map)]
(postwalk (fn [value]
(if (map? value)
(java.util.HashMap. value)
value))
string-keys-map)))
37 changes: 35 additions & 2 deletions test/ziggurat/config_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
(:require [clojure.test :refer :all]
[ziggurat.config :refer :all]
[mount.core :as mount]
[clonfig.core :as clonfig]))
[clonfig.core :as clonfig])
(:import (java.util ArrayList)))

(deftest config-from-env-test
(testing "calls clonfig"
Expand Down Expand Up @@ -67,4 +68,36 @@
(with-redefs [config-from-env (fn [_] config-values-from-env)]
(mount/start #'config)
(is (= (-> config-values-from-env :ziggurat :stream-router topic-entity :channels channel :retry)
(channel-retry-config topic-entity channel)))))))
(channel-retry-config topic-entity channel)))
(mount/stop)))))

(deftest java-config-get-test
(testing "It fetches the correct values for a given config"
(let [mocked-config {:a "Apple"
:m {:b "Bell"
:n {:c "Cat"}}}
config-keys-list (doto (ArrayList.)
(.add "m")
(.add "b"))]
(with-redefs [config-from-env (constantly mocked-config)]
(mount/start #'config)
(is (= "Bell" (-getIn config-keys-list)))
(is (= "Apple" (-get "a")))
(mount/stop))))
(testing "-get returns a Java.util.HashMap when the requested config is a clojure map"
(let [mocked-config {:a {:b "abcd"}}]
(with-redefs [config-from-env (constantly mocked-config)]
(mount/start #'config)
(is (instance? java.util.HashMap (-get "a")))
(is (= (.get (-get "a") "b") "abcd"))
(mount/stop))))
(testing "-getin returns a Java.util.HashMap when the requested config is a clojure map"
(let [mocked-config {:a {:b "foo"}
:c {:d {:e "bar"}}}
config-keys-list (doto (ArrayList.)
(.add "c")
(.add "d"))]
(with-redefs [config-from-env (constantly mocked-config)]
(mount/start #'config)
(is (instance? java.util.HashMap (-getIn config-keys-list)))
(mount/stop)))))
21 changes: 19 additions & 2 deletions test/ziggurat/fixtures.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
[langohr.queue :as lq])
(:import (java.util Properties)
(org.apache.kafka.clients.producer ProducerConfig)
(org.apache.kafka.clients.consumer ConsumerConfig)))
(org.apache.kafka.clients.consumer ConsumerConfig))
(:gen-class
:name tech.gojek.ziggurat.internal.test.Fixtures
:methods [^{:static true} [mountConfig [] void]
^{:static true} [mountProducer [] void]
^{:static true} [unmountAll [] void]]))

(defn mount-config []
(-> (mount/only [#'config/config])
Expand Down Expand Up @@ -122,4 +127,16 @@
(.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer")
(.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer"))]
(f))))
(mount/stop))
(mount/stop))

(defn unmount-all []
(mount/stop))

(defn -mountConfig []
(mount-config))

(defn -mountProducer []
(mount-producer))

(defn -unmountAll []
(unmount-all))
Loading

0 comments on commit e3b55ac

Please sign in to comment.