Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for TimestampExtractor #302

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,8 @@
# Changelog

## Unreleased
* Add support for TimestampExtractor

### [0.9.5] - [2022-05-26]

* Move away from deprecated class ConsumerRecordFactory (to prepare migration to Kafka Streams 3.2.0)
Expand Down
14 changes: 10 additions & 4 deletions src/jackdaw/streams/interop.clj
Expand Up @@ -3,7 +3,8 @@
{:license "BSD 3-Clause License <https://github.com/FundingCircle/jackdaw/blob/master/LICENSE>"}
(:refer-clojure :exclude [count map reduce group-by merge filter peek])
(:require [jackdaw.streams.protocols :refer :all]
[jackdaw.streams.lambdas :refer :all])
[jackdaw.streams.lambdas :refer :all]
[clojure.string :as str])
(:import [java.util
Collection]
[java.util.regex
Expand All @@ -28,12 +29,17 @@
[org.apache.kafka.streams.state
KeyValueStore Stores]
(org.apache.kafka.streams.processor.api
ProcessorSupplier)))
ProcessorSupplier)
[org.apache.kafka.streams
Topology$AutoOffsetReset]))

(set! *warn-on-reflection* true)

(defn topic->consumed [{:keys [key-serde value-serde]}]
(Consumed/with key-serde value-serde))
(defn topic->consumed [{:keys [key-serde value-serde timestamp-extractor reset-policy]}]
(let [reset-policy (and reset-policy
(Topology$AutoOffsetReset/valueOf
(str/upper-case (name reset-policy))))]
(Consumed/with key-serde value-serde timestamp-extractor reset-policy)))

(defn topic->produced [{:keys [key-serde value-serde partition-fn]}]
(if partition-fn
Expand Down
15 changes: 13 additions & 2 deletions src/jackdaw/streams/specs.clj
Expand Up @@ -10,7 +10,8 @@
IKGroupedStream IKStream IKTable
ITimeWindowedKStream ISessionWindowedKStream]])
(:import org.apache.kafka.common.serialization.Serde
org.apache.kafka.streams.kstream.JoinWindows))
org.apache.kafka.streams.kstream.JoinWindows
org.apache.kafka.streams.processor.TimestampExtractor))

(set! *warn-on-reflection* true)

Expand Down Expand Up @@ -41,6 +42,12 @@
(def serde?
(partial instance? Serde))

(def timestamp-extractor?
(partial instance? TimestampExtractor))

(def reset-policy?
(partial contains? #{:earliest :latest}))

(def streams-builder?
(partial satisfies? IStreamsBuilder))

Expand All @@ -53,13 +60,17 @@
(s/def ::topic-name string?)
(s/def ::key-serde any?)
(s/def ::value-serde any?)
(s/def ::timestamp-extractor timestamp-extractor?)
(s/def ::reset-policy reset-policy?)
(s/def ::partition-fn fn?)

(s/def ::topic-config
(s/keys :req-un [::topic-name
::key-serde
::value-serde]
:opt-un [::partition-fn]))
:opt-un [::timestamp-extractor
::reset-policy
::partition-fn]))

(s/def ::topic-configs (s/coll-of ::topic-config))

Expand Down
29 changes: 27 additions & 2 deletions test/jackdaw/streams_test.clj
@@ -1,6 +1,7 @@
(ns jackdaw.streams-test
"Tests of the kafka streams wrapper."
(:require [clojure.spec.test.alpha :as stest]
[clojure.spec.alpha :as s]
[clojure.test :refer :all]
[jackdaw.serdes.edn :as jse]
[jackdaw.streams :as k]
Expand All @@ -9,15 +10,18 @@
[jackdaw.streams.lambdas :as lambdas :refer [key-value]]
[jackdaw.streams.lambdas.specs]
[jackdaw.streams.mock :as mock]
[jackdaw.streams.interop :as interop]
[jackdaw.streams.specs :as streams.specs]
[jackdaw.streams.protocols
:refer [IKStream IKTable IStreamsBuilder]]
[jackdaw.streams.specs])
(:import [java.time Duration]
[org.apache.kafka.streams.kstream
JoinWindows SessionWindows TimeWindows Transformer
ValueTransformer]
ValueTransformer Consumed]
org.apache.kafka.streams.StreamsBuilder
[org.apache.kafka.common.serialization Serdes]))
[org.apache.kafka.common.serialization Serdes Serdes$ByteArraySerde]
[org.apache.kafka.streams.processor WallclockTimestampExtractor]))

(set! *warn-on-reflection* false)

Expand Down Expand Up @@ -1434,3 +1438,24 @@
(let [msgs (into {} (mock/get-keyvals driver output-t))]
(is (= 6 (:value (msgs 1))))
(is (= 60 (:value (msgs 2))))))))))

(defn topic->consumed
[x]
{:pre [(s/valid? ::streams.specs/topic-config x)]}
(interop/topic->consumed x))

(deftest topic->consumedTest
(let [base {:topic-name "foo"
:key-serde (Serdes$ByteArraySerde.)
:value-serde (Serdes$ByteArraySerde.)}]
(testing "defaults handled"
(is (instance? Consumed (topic->consumed base))))
(testing "invalid TimestampExtractor"
(is (thrown? java.lang.AssertionError (topic->consumed (assoc base :timestamp-extractor :bad)))))
(testing "valid TimestampExtractor"
(is (instance? Consumed (topic->consumed (assoc base :timestamp-extractor (WallclockTimestampExtractor.))))))
(testing "valid reset-policy"
(is (every? #(instance? Consumed (topic->consumed (assoc base :reset-policy %))) [:earliest :latest])))
(testing "invalid reset-policy"
(is (thrown? java.lang.AssertionError (topic->consumed (assoc base :reset-policy :bad)))))))