-
Notifications
You must be signed in to change notification settings - Fork 64
/
header_transformer_test.clj
35 lines (33 loc) · 1.77 KB
/
header_transformer_test.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
(ns ziggurat.header-transformer-test
(:require [clojure.test :refer [deftest is testing]]
[ziggurat.header-transformer :refer [create]])
(:import [org.apache.kafka.common.header.internals RecordHeaders RecordHeader]
[org.apache.kafka.streams.processor ProcessorContext]))
(deftest header-transformer-test
(testing "transforms value with passed headers"
(let [headers (RecordHeaders. (list (RecordHeader. "key" (byte-array (map byte "value")))))
topic "topic"
timestamp 1234567890
partition 1
context (reify ProcessorContext
(headers [_] headers)
(topic [_] topic)
(timestamp [_] timestamp)
(partition [_] partition))
transformer (create)
_ (.init transformer context)
transformed-val (.transform transformer "val")]
(is (= {:value "val" :headers headers :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val))))
(testing "transforms value with nil headers when not passed"
(let [topic "topic"
timestamp 1234567890
partition 1
context (reify ProcessorContext
(headers [_] nil)
(topic [_] topic)
(timestamp [_] timestamp)
(partition [_] partition))
transformer (create)
_ (.init transformer context)
transformed-val (.transform transformer "val")]
(is (= {:value "val" :headers nil :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val)))))