Skip to content

Commit

Permalink
Adding configuration options for Kafka Streams key and value serdes
Browse files Browse the repository at this point in the history
  • Loading branch information
mjayprateek committed Oct 14, 2019
1 parent 64e6d33 commit e00b1b9
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
17 changes: 17 additions & 0 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:with-key-val-serdes {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:default-key-serde "org.apache.kafka.common.serialization.Serdes$StringSerde"
:default-value-serde "org.apache.kafka.common.serialization.Serdes$StringSerde"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:without-producer {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
Expand Down
11 changes: 8 additions & 3 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,26 @@
(if-not (contains? #{"latest" "earliest" nil} auto-offset-reset-config)
(throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config}))))

(defn- get-serde [default-serde]
(if (some? default-serde) default-serde (.getName (.getClass (Serdes/ByteArray)))))

(defn- properties [{:keys [application-id
bootstrap-servers
stream-threads-count
auto-offset-reset-config
buffered-records-per-partition
commit-interval-ms
upgrade-from
changelog-topic-replication-factor]}]
changelog-topic-replication-factor
default-key-serde
default-value-serde]}]
(validate-auto-offset-reset-config auto-offset-reset-config)
(doto (Properties.)
(.put StreamsConfig/APPLICATION_ID_CONFIG application-id)
(.put StreamsConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers)
(.put StreamsConfig/NUM_STREAM_THREADS_CONFIG (int stream-threads-count))
(.put StreamsConfig/DEFAULT_KEY_SERDE_CLASS_CONFIG (.getName (.getClass (Serdes/ByteArray))))
(.put StreamsConfig/DEFAULT_VALUE_SERDE_CLASS_CONFIG (.getName (.getClass (Serdes/ByteArray))))
(.put StreamsConfig/DEFAULT_KEY_SERDE_CLASS_CONFIG (get-serde default-key-serde))
(.put StreamsConfig/DEFAULT_VALUE_SERDE_CLASS_CONFIG (get-serde default-value-serde))
(.put StreamsConfig/DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG IngestionTimeExtractor)
(.put StreamsConfig/BUFFERED_RECORDS_PER_PARTITION_CONFIG (int buffered-records-per-partition))
(.put StreamsConfig/COMMIT_INTERVAL_MS_CONFIG commit-interval-ms)
Expand Down

0 comments on commit e00b1b9

Please sign in to comment.