-
Notifications
You must be signed in to change notification settings - Fork 64
/
timestamp_transformer.clj
36 lines (32 loc) · 1.76 KB
/
timestamp_transformer.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
(ns ziggurat.timestamp-transformer
(:require [ziggurat.kafka-delay :refer :all]
[ziggurat.util.time :refer :all])
(:import [org.apache.kafka.streams KeyValue]
[org.apache.kafka.streams.kstream Transformer]
[org.apache.kafka.streams.processor TimestampExtractor ProcessorContext]))
(defn- message-to-process? [message-timestamp oldest-processed-message-in-s]
(let [current-time (get-current-time-in-millis)
allowed-time (- current-time (* 1000 oldest-processed-message-in-s))]
(> message-timestamp allowed-time)))
(deftype IngestionTimeExtractor [] TimestampExtractor
(extract [_ record _]
(let [ingestion-time (get-timestamp-from-record record)]
(if (neg? ingestion-time)
(get-current-time-in-millis)
ingestion-time))))
(deftype TimestampTransformer [^{:volatile-mutable true} processor-context metric-namespaces oldest-processed-message-in-s additional-tags] Transformer
(^void init [_ ^ProcessorContext context]
(do (set! processor-context context)
nil))
(transform [_ record-key record-value]
(let [message-time (.timestamp processor-context)]
(when (message-to-process? message-time oldest-processed-message-in-s)
(calculate-and-report-kafka-delay metric-namespaces message-time additional-tags)
(KeyValue/pair record-key record-value))))
(punctuate [_ _] nil)
(close [_] nil))
(defn create
([metric-namespace process-message-since-in-s]
(create metric-namespace process-message-since-in-s nil))
([metric-namespace process-message-since-in-s additional-tags]
(TimestampTransformer. nil metric-namespace process-message-since-in-s additional-tags)))