Skip to content

Commit

Permalink
add replication factor for internal changelog topic (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
s7saxena authored and kartik7153 committed May 3, 2019
1 parent 9a9d5b1 commit 84140dc
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:oldest-processed-message-in-s [604800 :int]
:proto-class "proto-class"}}
:proto-class "proto-class"
:changelog-topic-replication-factor [3 :int]}}
:datadog {:host "localhost"
:port [8125 :int]
:enabled [false :bool]}
Expand Down Expand Up @@ -170,6 +171,7 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur
* origin-topic - The topic that the stream should read from. This can be a regex that enables you to read from multiple streams and handle the messages in the same way. It is to be kept in mind that the messages from different streams should be of the same proto-class.
* oldest-processed-messages-in-s - The oldest message which will be processed by stream in second. By default the value is 604800 (1 week)
* proto-class - The proto-class of the message so that it can be decompiled before being passed to the mapper function
* changelog-topic-replication-factor - the internal changelog topic replication factor. By default the value is 3
* datadog - The statsd host and port that metrics should be sent to, although the key name is datadog, it supports statsd as well to send metrics.
* sentry - Whenever a :failure keyword is returned from the mapper-function or an exception is raised while executing the mapper-function, an event is sent to sentry. You can skip this flow by disabling it.
* rabbit-mq-connection - The details required to make a connection to rabbitmq. We use rabbitmq for the retry mechanism.
Expand Down
6 changes: 4 additions & 2 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
{:buffered-records-per-partition 10000
:commit-interval-ms 15000
:auto-offset-reset-config "latest"
:oldest-processed-message-in-s 604800})
:oldest-processed-message-in-s 604800
:changelog-topic-replication-factor 3})

(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms]}]
(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms changelog-topic-replication-factor]}]
(if-not (contains? #{"latest" "earliest" nil} auto-offset-reset-config)
(throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config})))
(doto (Properties.)
Expand All @@ -38,6 +39,7 @@
(.put StreamsConfig/DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG IngestionTimeExtractor)
(.put StreamsConfig/BUFFERED_RECORDS_PER_PARTITION_CONFIG (int buffered-records-per-partition))
(.put StreamsConfig/COMMIT_INTERVAL_MS_CONFIG commit-interval-ms)
(.put StreamsConfig/REPLICATION_FACTOR_CONFIG (int changelog-topic-replication-factor))
(.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config)))

(defn- get-metric-namespace [default topic]
Expand Down
4 changes: 2 additions & 2 deletions test/ziggurat/streams_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
(swap! message-received-count inc))
:success)]
(let [topic "topic"
cluster (doto (EmbeddedKafkaCluster. 1) (.start))
cluster (doto (EmbeddedKafkaCluster. 3) (.start))
bootstrap-serves (.bootstrapServers cluster)
times 6
oldest-processed-message-in-s 10
Expand Down Expand Up @@ -61,7 +61,7 @@
(swap! message-received-count inc))
:success)]
(let [topic "topic"
cluster (doto (EmbeddedKafkaCluster. 1) (.start))
cluster (doto (EmbeddedKafkaCluster. 3) (.start))
bootstrap-serves (.bootstrapServers cluster)
times 6
kvs (repeat times (KeyValue/pair (create-photo) (create-photo)))
Expand Down

0 comments on commit 84140dc

Please sign in to comment.