Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message payload to have retry-count present right from when it's created. #235

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

All notable changes to this project will be documented in this file. This change log follows the conventions of [keepachangelog.com](http://keepachangelog.com/).

## 3.16.0
- Uses protobuf (replaces nippy) for serializing and deserializing while publishing to and consuming from RabbitMQ

## 3.15.0
- Includes a `StreamsUncaughtExceptionHandler` which shuts down the client in case of an uncaught exception.
- Introduces a new stream-route config `:stream-thread-exception-response` which lets user control the behaviour of `StreamsUncaughtExceptionHandler`.
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,8 @@ test-cluster: setup-cluster
coverage: setup
lein code-coverage
docker-compose down

proto:
protoc -I=resources --java_out=test/ resources/proto/example.proto
protoc -I=resources --java_out=test/ resources/proto/person.proto
protoc -I=resources --java_out=src/ resources/proto/message-payload.proto
16 changes: 8 additions & 8 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(cemerick.pomegranate.aether/register-wagon-factory!
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))

(defproject tech.gojek/ziggurat "3.15.0"
(defproject tech.gojek/ziggurat "3.16.0"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down Expand Up @@ -35,8 +35,7 @@
[org.clojure/clojure "1.10.3"]
[org.clojure/tools.logging "1.1.0"]
[nrepl/nrepl "0.8.3"]
[clojusc/protobuf "3.5.1-v1.1"]
[org.flatland/protobuf "0.8.1"]
[clojusc/protobuf "3.5.1-v1.1" :exclusions [com.google.protobuf/protobuf-java]]
[prismatic/schema "1.1.12"]
[clj-statsd "0.4.0"]
[ring/ring "1.9.3"]
Expand All @@ -53,7 +52,8 @@
com.fasterxml.jackson.core/jackson-core
com.fasterxml.jackson.dataformat/jackson-dataformat-smile
com.fasterxml.jackson.dataformat/jackson-dataformat-cbor]]
[metosin/ring-swagger-ui "3.46.0"]]
[metosin/ring-swagger-ui "3.46.0"]
[com.google.protobuf/protobuf-java "3.17.3"] ] ;; NOTE: protoc should have same version as this
:deploy-repositories [["clojars" {:url "https://clojars.org/repo"
:username :env/clojars_username
:password :env/clojars_password
Expand All @@ -62,18 +62,18 @@
:pedantic? :warn
:java-source-paths ["src/com"]
:aliases {"code-coverage" ["with-profile" "test" "cloverage" "--output" "coverage" "--lcov"]}
:aot [ziggurat.kafka-consumer.invalid-return-type-exception ziggurat.init ziggurat.config ziggurat.producer ziggurat.sentry ziggurat.metrics ziggurat.fixtures]
:aot [ziggurat.kafka-consumer.invalid-return-type-exception]
:profiles {:uberjar {:aot :all
:global-vars {*warn-on-reflection* true}
:pedantic? :abort}
:test {:java-source-paths ["src/com" "test/com"]
:jvm-opts ["-Dlog4j.configurationFile=resources/log4j2.test.xml"]
:dependencies [[com.google.protobuf/protobuf-java "3.17.0"]
[junit/junit "4.13.2"]
:dependencies [[junit/junit "4.13.2"]
[org.hamcrest/hamcrest-core "2.2"]
[org.apache.kafka/kafka-streams "2.8.0" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "2.8.0" :classifier "test"]
[org.clojure/test.check "1.1.0"]]
[org.clojure/test.check "1.1.0"]
[com.google.protobuf/protobuf-java "3.17.3"]] ;; NOTE: protoc should have same version as this
:plugins [[lein-cloverage "1.0.13" :exclusions [org.clojure/clojure]]]
:repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]}
:dev {:plugins [[lein-ancient "0.6.15"]
Expand Down
2 changes: 1 addition & 1 deletion resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
:worker-count [10 :int]
:queue-size [10 :int]
:thread-termination-wait-s [1 :int]}
:rabbit-mq-connection {:host "127.0.0.1"
:rabbit-mq-connection {:host "localhost"
:port [5672 :int]
:username "guest"
:password "guest"
Expand Down
29 changes: 29 additions & 0 deletions resources/proto/example.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
syntax = "proto3";

package com.gojek.test.proto;

option java_package = "com.gojek.test.proto";
option java_outer_classname = "Example";

message Photo {
int32 id = 1;
string path = 2;
optional bytes image = 6;

message Label {
string item = 1;
bool exists = 2;
}

message Attr {
string key = 1;
optional string val = 2;
}

message Tag {
int32 person_id = 1;
optional int32 y_coord = 3;
optional int32 width = 4;
optional int32 height = 5;
}
}
12 changes: 12 additions & 0 deletions resources/proto/message-payload.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
syntax = "proto3";
// use protoc v3.17.3 to compile the proto
package com.ziggurat.proto;

option java_package = "com.ziggurat.proto";
option java_outer_classname = "MessagePayloadProto";

message MessagePayload {
bytes message = 1;
string topic_entity = 2;
int32 retry_count = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ message Person {
string email = 3;
string likes = 4;
google.protobuf.Struct characters = 5;
}
}