diff --git a/Makefile b/Makefile
index e12e60c2..e8768734 100644
--- a/Makefile
+++ b/Makefile
@@ -34,3 +34,8 @@ test-cluster: setup-cluster
coverage: setup
lein code-coverage
docker-compose down
+
+proto:
+ protoc -I=resources --java_out=test/ resources/proto/example.proto
+ protoc -I=resources --java_out=test/ resources/proto/person.proto
+ protoc -I=resources --java_out=src/ resources/proto/message-payload.proto
diff --git a/project.clj b/project.clj
index 3720ec3d..b34c71b3 100644
--- a/project.clj
+++ b/project.clj
@@ -36,7 +36,6 @@
[org.clojure/tools.logging "1.1.0"]
[nrepl/nrepl "0.8.3"]
[clojusc/protobuf "3.5.1-v1.1"]
- [org.flatland/protobuf "0.8.1"]
[prismatic/schema "1.1.12"]
[clj-statsd "0.4.0"]
[ring/ring "1.9.3"]
@@ -53,7 +52,8 @@
com.fasterxml.jackson.core/jackson-core
com.fasterxml.jackson.dataformat/jackson-dataformat-smile
com.fasterxml.jackson.dataformat/jackson-dataformat-cbor]]
- [metosin/ring-swagger-ui "3.46.0"]]
+ [metosin/ring-swagger-ui "3.46.0"]
+ [com.google.protobuf/protobuf-java "3.17.3"]] ;; NOTE: protoc should have same version as this
:deploy-repositories [["clojars" {:url "https://clojars.org/repo"
:username :env/clojars_username
:password :env/clojars_password
@@ -68,12 +68,12 @@
:pedantic? :abort}
:test {:java-source-paths ["src/com" "test/com"]
:jvm-opts ["-Dlog4j.configurationFile=resources/log4j2.test.xml"]
- :dependencies [[com.google.protobuf/protobuf-java "3.17.0"]
- [junit/junit "4.13.2"]
+ :dependencies [[junit/junit "4.13.2"]
[org.hamcrest/hamcrest-core "2.2"]
[org.apache.kafka/kafka-streams "2.8.0" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "2.8.0" :classifier "test"]
- [org.clojure/test.check "1.1.0"]]
+ [org.clojure/test.check "1.1.0"]
+ [com.google.protobuf/protobuf-java "3.17.3"]] ;; NOTE: protoc should have same version as this
:plugins [[lein-cloverage "1.0.13" :exclusions [org.clojure/clojure]]]
:repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]}
:dev {:plugins [[lein-ancient "0.6.15"]
diff --git a/resources/proto/example.proto b/resources/proto/example.proto
new file mode 100644
index 00000000..681fa294
--- /dev/null
+++ b/resources/proto/example.proto
@@ -0,0 +1,29 @@
+syntax = "proto3";
+
+package com.gojek.test.proto;
+
+option java_package = "com.gojek.test.proto";
+option java_outer_classname = "Example";
+
+message Photo {
+ int32 id = 1;
+ string path = 2;
+ optional bytes image = 6;
+
+ message Label {
+ string item = 1;
+ bool exists = 2;
+ }
+
+ message Attr {
+ string key = 1;
+ optional string val = 2;
+ }
+
+ message Tag {
+ int32 person_id = 1;
+ optional int32 y_coord = 3;
+ optional int32 width = 4;
+ optional int32 height = 5;
+ }
+}
diff --git a/resources/proto/message-payload.proto b/resources/proto/message-payload.proto
new file mode 100644
index 00000000..4cc7b429
--- /dev/null
+++ b/resources/proto/message-payload.proto
@@ -0,0 +1,12 @@
+syntax = "proto3";
+
+package com.ziggurat.proto;
+
+option java_package = "com.ziggurat.proto";
+option java_outer_classname = "MessagePayloadProto";
+
+message MessagePayload {
+ bytes message = 1;
+ string topic_entity = 2;
+ int32 retry_count = 3;
+}
diff --git a/resources/proto/person.proto b/resources/proto/person.proto
new file mode 100644
index 00000000..2202a320
--- /dev/null
+++ b/resources/proto/person.proto
@@ -0,0 +1,13 @@
+syntax = "proto3";
+
+package com.gojek.test.proto;
+
+option java_outer_classname = "PersonTestProto";
+option java_package = "com.gojek.test.proto";
+
+message Person {
+ int32 id = 1;
+ string name = 2;
+ string email = 3;
+ string likes = 4;
+}
diff --git a/src/com/ziggurat/proto/MessagePayloadProto.java b/src/com/ziggurat/proto/MessagePayloadProto.java
new file mode 100644
index 00000000..08d148a3
--- /dev/null
+++ b/src/com/ziggurat/proto/MessagePayloadProto.java
@@ -0,0 +1,762 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: resources/proto/message-payload.proto
+
+package com.ziggurat.proto;
+
+public final class MessagePayloadProto {
+ private MessagePayloadProto() {}
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistryLite registry) {
+ }
+
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistry registry) {
+ registerAllExtensions(
+ (com.google.protobuf.ExtensionRegistryLite) registry);
+ }
+ public interface MessagePayloadOrBuilder extends
+ // @@protoc_insertion_point(interface_extends:com.ziggurat.proto.MessagePayload)
+ com.google.protobuf.MessageOrBuilder {
+
+ /**
+ * bytes message = 1;
+ * @return The message.
+ */
+ com.google.protobuf.ByteString getMessage();
+
+ /**
+ * string topic_entity = 2;
+ * @return The topicEntity.
+ */
+ java.lang.String getTopicEntity();
+ /**
+ * string topic_entity = 2;
+ * @return The bytes for topicEntity.
+ */
+ com.google.protobuf.ByteString
+ getTopicEntityBytes();
+
+ /**
+ * int32 retry_count = 3;
+ * @return The retryCount.
+ */
+ int getRetryCount();
+ }
+ /**
+ * Protobuf type {@code com.ziggurat.proto.MessagePayload}
+ */
+ public static final class MessagePayload extends
+ com.google.protobuf.GeneratedMessageV3 implements
+ // @@protoc_insertion_point(message_implements:com.ziggurat.proto.MessagePayload)
+ MessagePayloadOrBuilder {
+ private static final long serialVersionUID = 0L;
+ // Use MessagePayload.newBuilder() to construct.
+ private MessagePayload(com.google.protobuf.GeneratedMessageV3.Builder> builder) {
+ super(builder);
+ }
+ private MessagePayload() {
+ message_ = com.google.protobuf.ByteString.EMPTY;
+ topicEntity_ = "";
+ }
+
+ @java.lang.Override
+ @SuppressWarnings({"unused"})
+ protected java.lang.Object newInstance(
+ UnusedPrivateParameter unused) {
+ return new MessagePayload();
+ }
+
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private MessagePayload(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ this();
+ if (extensionRegistry == null) {
+ throw new java.lang.NullPointerException();
+ }
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ case 10: {
+
+ message_ = input.readBytes();
+ break;
+ }
+ case 18: {
+ java.lang.String s = input.readStringRequireUtf8();
+
+ topicEntity_ = s;
+ break;
+ }
+ case 24: {
+
+ retryCount_ = input.readInt32();
+ break;
+ }
+ default: {
+ if (!parseUnknownField(
+ input, unknownFields, extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return com.ziggurat.proto.MessagePayloadProto.internal_static_com_ziggurat_proto_MessagePayload_descriptor;
+ }
+
+ @java.lang.Override
+ protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return com.ziggurat.proto.MessagePayloadProto.internal_static_com_ziggurat_proto_MessagePayload_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ com.ziggurat.proto.MessagePayloadProto.MessagePayload.class, com.ziggurat.proto.MessagePayloadProto.MessagePayload.Builder.class);
+ }
+
+ public static final int MESSAGE_FIELD_NUMBER = 1;
+ private com.google.protobuf.ByteString message_;
+ /**
+ * bytes message = 1;
+ * @return The message.
+ */
+ @java.lang.Override
+ public com.google.protobuf.ByteString getMessage() {
+ return message_;
+ }
+
+ public static final int TOPIC_ENTITY_FIELD_NUMBER = 2;
+ private volatile java.lang.Object topicEntity_;
+ /**
+ * string topic_entity = 2;
+ * @return The topicEntity.
+ */
+ @java.lang.Override
+ public java.lang.String getTopicEntity() {
+ java.lang.Object ref = topicEntity_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ topicEntity_ = s;
+ return s;
+ }
+ }
+ /**
+ * string topic_entity = 2;
+ * @return The bytes for topicEntity.
+ */
+ @java.lang.Override
+ public com.google.protobuf.ByteString
+ getTopicEntityBytes() {
+ java.lang.Object ref = topicEntity_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ topicEntity_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ public static final int RETRY_COUNT_FIELD_NUMBER = 3;
+ private int retryCount_;
+ /**
+ * int32 retry_count = 3;
+ * @return The retryCount.
+ */
+ @java.lang.Override
+ public int getRetryCount() {
+ return retryCount_;
+ }
+
+ private byte memoizedIsInitialized = -1;
+ @java.lang.Override
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized == 1) return true;
+ if (isInitialized == 0) return false;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ @java.lang.Override
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ if (!message_.isEmpty()) {
+ output.writeBytes(1, message_);
+ }
+ if (!getTopicEntityBytes().isEmpty()) {
+ com.google.protobuf.GeneratedMessageV3.writeString(output, 2, topicEntity_);
+ }
+ if (retryCount_ != 0) {
+ output.writeInt32(3, retryCount_);
+ }
+ unknownFields.writeTo(output);
+ }
+
+ @java.lang.Override
+ public int getSerializedSize() {
+ int size = memoizedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (!message_.isEmpty()) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(1, message_);
+ }
+ if (!getTopicEntityBytes().isEmpty()) {
+ size += com.google.protobuf.GeneratedMessageV3.computeStringSize(2, topicEntity_);
+ }
+ if (retryCount_ != 0) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(3, retryCount_);
+ }
+ size += unknownFields.getSerializedSize();
+ memoizedSize = size;
+ return size;
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof com.ziggurat.proto.MessagePayloadProto.MessagePayload)) {
+ return super.equals(obj);
+ }
+ com.ziggurat.proto.MessagePayloadProto.MessagePayload other = (com.ziggurat.proto.MessagePayloadProto.MessagePayload) obj;
+
+ if (!getMessage()
+ .equals(other.getMessage())) return false;
+ if (!getTopicEntity()
+ .equals(other.getTopicEntity())) return false;
+ if (getRetryCount()
+ != other.getRetryCount()) return false;
+ if (!unknownFields.equals(other.unknownFields)) return false;
+ return true;
+ }
+
+ @java.lang.Override
+ public int hashCode() {
+ if (memoizedHashCode != 0) {
+ return memoizedHashCode;
+ }
+ int hash = 41;
+ hash = (19 * hash) + getDescriptor().hashCode();
+ hash = (37 * hash) + MESSAGE_FIELD_NUMBER;
+ hash = (53 * hash) + getMessage().hashCode();
+ hash = (37 * hash) + TOPIC_ENTITY_FIELD_NUMBER;
+ hash = (53 * hash) + getTopicEntity().hashCode();
+ hash = (37 * hash) + RETRY_COUNT_FIELD_NUMBER;
+ hash = (53 * hash) + getRetryCount();
+ hash = (29 * hash) + unknownFields.hashCode();
+ memoizedHashCode = hash;
+ return hash;
+ }
+
+ public static com.ziggurat.proto.MessagePayloadProto.MessagePayload parseFrom(
+ java.nio.ByteBuffer data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static com.ziggurat.proto.MessagePayloadProto.MessagePayload parseFrom(
+ java.nio.ByteBuffer data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static com.ziggurat.proto.MessagePayloadProto.MessagePayload parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static com.ziggurat.proto.MessagePayloadProto.MessagePayload parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static com.ziggurat.proto.MessagePayloadProto.MessagePayload parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static com.ziggurat.proto.MessagePayloadProto.MessagePayload parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static com.ziggurat.proto.MessagePayloadProto.MessagePayload parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input);
+ }
+ public static com.ziggurat.proto.MessagePayloadProto.MessagePayload parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input, extensionRegistry);
+ }
+ public static com.ziggurat.proto.MessagePayloadProto.MessagePayload parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseDelimitedWithIOException(PARSER, input);
+ }
+ public static com.ziggurat.proto.MessagePayloadProto.MessagePayload parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseDelimitedWithIOException(PARSER, input, extensionRegistry);
+ }
+ public static com.ziggurat.proto.MessagePayloadProto.MessagePayload parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input);
+ }
+ public static com.ziggurat.proto.MessagePayloadProto.MessagePayload parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input, extensionRegistry);
+ }
+
+ @java.lang.Override
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder() {
+ return DEFAULT_INSTANCE.toBuilder();
+ }
+ public static Builder newBuilder(com.ziggurat.proto.MessagePayloadProto.MessagePayload prototype) {
+ return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
+ }
+ @java.lang.Override
+ public Builder toBuilder() {
+ return this == DEFAULT_INSTANCE
+ ? new Builder() : new Builder().mergeFrom(this);
+ }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code com.ziggurat.proto.MessagePayload}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessageV3.Builder implements
+ // @@protoc_insertion_point(builder_implements:com.ziggurat.proto.MessagePayload)
+ com.ziggurat.proto.MessagePayloadProto.MessagePayloadOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return com.ziggurat.proto.MessagePayloadProto.internal_static_com_ziggurat_proto_MessagePayload_descriptor;
+ }
+
+ @java.lang.Override
+ protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return com.ziggurat.proto.MessagePayloadProto.internal_static_com_ziggurat_proto_MessagePayload_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ com.ziggurat.proto.MessagePayloadProto.MessagePayload.class, com.ziggurat.proto.MessagePayloadProto.MessagePayload.Builder.class);
+ }
+
+ // Construct using com.ziggurat.proto.MessagePayloadProto.MessagePayload.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessageV3
+ .alwaysUseFieldBuilders) {
+ }
+ }
+ @java.lang.Override
+ public Builder clear() {
+ super.clear();
+ message_ = com.google.protobuf.ByteString.EMPTY;
+
+ topicEntity_ = "";
+
+ retryCount_ = 0;
+
+ return this;
+ }
+
+ @java.lang.Override
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return com.ziggurat.proto.MessagePayloadProto.internal_static_com_ziggurat_proto_MessagePayload_descriptor;
+ }
+
+ @java.lang.Override
+ public com.ziggurat.proto.MessagePayloadProto.MessagePayload getDefaultInstanceForType() {
+ return com.ziggurat.proto.MessagePayloadProto.MessagePayload.getDefaultInstance();
+ }
+
+ @java.lang.Override
+ public com.ziggurat.proto.MessagePayloadProto.MessagePayload build() {
+ com.ziggurat.proto.MessagePayloadProto.MessagePayload result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ @java.lang.Override
+ public com.ziggurat.proto.MessagePayloadProto.MessagePayload buildPartial() {
+ com.ziggurat.proto.MessagePayloadProto.MessagePayload result = new com.ziggurat.proto.MessagePayloadProto.MessagePayload(this);
+ result.message_ = message_;
+ result.topicEntity_ = topicEntity_;
+ result.retryCount_ = retryCount_;
+ onBuilt();
+ return result;
+ }
+
+ @java.lang.Override
+ public Builder clone() {
+ return super.clone();
+ }
+ @java.lang.Override
+ public Builder setField(
+ com.google.protobuf.Descriptors.FieldDescriptor field,
+ java.lang.Object value) {
+ return super.setField(field, value);
+ }
+ @java.lang.Override
+ public Builder clearField(
+ com.google.protobuf.Descriptors.FieldDescriptor field) {
+ return super.clearField(field);
+ }
+ @java.lang.Override
+ public Builder clearOneof(
+ com.google.protobuf.Descriptors.OneofDescriptor oneof) {
+ return super.clearOneof(oneof);
+ }
+ @java.lang.Override
+ public Builder setRepeatedField(
+ com.google.protobuf.Descriptors.FieldDescriptor field,
+ int index, java.lang.Object value) {
+ return super.setRepeatedField(field, index, value);
+ }
+ @java.lang.Override
+ public Builder addRepeatedField(
+ com.google.protobuf.Descriptors.FieldDescriptor field,
+ java.lang.Object value) {
+ return super.addRepeatedField(field, value);
+ }
+ @java.lang.Override
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof com.ziggurat.proto.MessagePayloadProto.MessagePayload) {
+ return mergeFrom((com.ziggurat.proto.MessagePayloadProto.MessagePayload)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(com.ziggurat.proto.MessagePayloadProto.MessagePayload other) {
+ if (other == com.ziggurat.proto.MessagePayloadProto.MessagePayload.getDefaultInstance()) return this;
+ if (other.getMessage() != com.google.protobuf.ByteString.EMPTY) {
+ setMessage(other.getMessage());
+ }
+ if (!other.getTopicEntity().isEmpty()) {
+ topicEntity_ = other.topicEntity_;
+ onChanged();
+ }
+ if (other.getRetryCount() != 0) {
+ setRetryCount(other.getRetryCount());
+ }
+ this.mergeUnknownFields(other.unknownFields);
+ onChanged();
+ return this;
+ }
+
+ @java.lang.Override
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ @java.lang.Override
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ com.ziggurat.proto.MessagePayloadProto.MessagePayload parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (com.ziggurat.proto.MessagePayloadProto.MessagePayload) e.getUnfinishedMessage();
+ throw e.unwrapIOException();
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+
+ private com.google.protobuf.ByteString message_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * bytes message = 1;
+ * @return The message.
+ */
+ @java.lang.Override
+ public com.google.protobuf.ByteString getMessage() {
+ return message_;
+ }
+ /**
+ * bytes message = 1;
+ * @param value The message to set.
+ * @return This builder for chaining.
+ */
+ public Builder setMessage(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+
+ message_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * bytes message = 1;
+ * @return This builder for chaining.
+ */
+ public Builder clearMessage() {
+
+ message_ = getDefaultInstance().getMessage();
+ onChanged();
+ return this;
+ }
+
+ private java.lang.Object topicEntity_ = "";
+ /**
+ * string topic_entity = 2;
+ * @return The topicEntity.
+ */
+ public java.lang.String getTopicEntity() {
+ java.lang.Object ref = topicEntity_;
+ if (!(ref instanceof java.lang.String)) {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ topicEntity_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * string topic_entity = 2;
+ * @return The bytes for topicEntity.
+ */
+ public com.google.protobuf.ByteString
+ getTopicEntityBytes() {
+ java.lang.Object ref = topicEntity_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ topicEntity_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * string topic_entity = 2;
+ * @param value The topicEntity to set.
+ * @return This builder for chaining.
+ */
+ public Builder setTopicEntity(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+
+ topicEntity_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * string topic_entity = 2;
+ * @return This builder for chaining.
+ */
+ public Builder clearTopicEntity() {
+
+ topicEntity_ = getDefaultInstance().getTopicEntity();
+ onChanged();
+ return this;
+ }
+ /**
+ * string topic_entity = 2;
+ * @param value The bytes for topicEntity to set.
+ * @return This builder for chaining.
+ */
+ public Builder setTopicEntityBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ checkByteStringIsUtf8(value);
+
+ topicEntity_ = value;
+ onChanged();
+ return this;
+ }
+
+ private int retryCount_ ;
+ /**
+ * int32 retry_count = 3;
+ * @return The retryCount.
+ */
+ @java.lang.Override
+ public int getRetryCount() {
+ return retryCount_;
+ }
+ /**
+ * int32 retry_count = 3;
+ * @param value The retryCount to set.
+ * @return This builder for chaining.
+ */
+ public Builder setRetryCount(int value) {
+
+ retryCount_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * int32 retry_count = 3;
+ * @return This builder for chaining.
+ */
+ public Builder clearRetryCount() {
+
+ retryCount_ = 0;
+ onChanged();
+ return this;
+ }
+ @java.lang.Override
+ public final Builder setUnknownFields(
+ final com.google.protobuf.UnknownFieldSet unknownFields) {
+ return super.setUnknownFields(unknownFields);
+ }
+
+ @java.lang.Override
+ public final Builder mergeUnknownFields(
+ final com.google.protobuf.UnknownFieldSet unknownFields) {
+ return super.mergeUnknownFields(unknownFields);
+ }
+
+
+ // @@protoc_insertion_point(builder_scope:com.ziggurat.proto.MessagePayload)
+ }
+
+ // @@protoc_insertion_point(class_scope:com.ziggurat.proto.MessagePayload)
+ private static final com.ziggurat.proto.MessagePayloadProto.MessagePayload DEFAULT_INSTANCE;
+ static {
+ DEFAULT_INSTANCE = new com.ziggurat.proto.MessagePayloadProto.MessagePayload();
+ }
+
+ public static com.ziggurat.proto.MessagePayloadProto.MessagePayload getDefaultInstance() {
+ return DEFAULT_INSTANCE;
+ }
+
+ private static final com.google.protobuf.Parser
+ PARSER = new com.google.protobuf.AbstractParser() {
+ @java.lang.Override
+ public MessagePayload parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new MessagePayload(input, extensionRegistry);
+ }
+ };
+
+ public static com.google.protobuf.Parser parser() {
+ return PARSER;
+ }
+
+ @java.lang.Override
+ public com.google.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ @java.lang.Override
+ public com.ziggurat.proto.MessagePayloadProto.MessagePayload getDefaultInstanceForType() {
+ return DEFAULT_INSTANCE;
+ }
+
+ }
+
+ private static final com.google.protobuf.Descriptors.Descriptor
+ internal_static_com_ziggurat_proto_MessagePayload_descriptor;
+ private static final
+ com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+ internal_static_com_ziggurat_proto_MessagePayload_fieldAccessorTable;
+
+ public static com.google.protobuf.Descriptors.FileDescriptor
+ getDescriptor() {
+ return descriptor;
+ }
+ private static com.google.protobuf.Descriptors.FileDescriptor
+ descriptor;
+ static {
+ java.lang.String[] descriptorData = {
+ "\n%resources/proto/message-payload.proto\022" +
+ "\022com.ziggurat.proto\"L\n\016MessagePayload\022\017\n" +
+ "\007message\030\001 \001(\014\022\024\n\014topic_entity\030\002 \001(\t\022\023\n\013" +
+ "retry_count\030\003 \001(\005B)\n\022com.ziggurat.protoB" +
+ "\023MessagePayloadProtob\006proto3"
+ };
+ descriptor = com.google.protobuf.Descriptors.FileDescriptor
+ .internalBuildGeneratedFileFrom(descriptorData,
+ new com.google.protobuf.Descriptors.FileDescriptor[] {
+ });
+ internal_static_com_ziggurat_proto_MessagePayload_descriptor =
+ getDescriptor().getMessageTypes().get(0);
+ internal_static_com_ziggurat_proto_MessagePayload_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
+ internal_static_com_ziggurat_proto_MessagePayload_descriptor,
+ new java.lang.String[] { "Message", "TopicEntity", "RetryCount", });
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj
index f49a5c3b..d2930140 100644
--- a/src/ziggurat/init.clj
+++ b/src/ziggurat/init.clj
@@ -1,10 +1,13 @@
(ns ziggurat.init
"Contains the entry point for your application."
- (:require [clojure.tools.logging :as log]
- [mount.core :as mount :refer [defstate]]
+ (:require [clojure.set :as set]
+ [clojure.string :as str]
+ [clojure.tools.logging :as log]
+ [mount.core :as mount]
[schema.core :as s]
- [clojure.set :as set]
[ziggurat.config :refer [ziggurat-config] :as config]
+ [ziggurat.kafka-consumer.consumer-driver :as consumer-driver]
+ [ziggurat.kafka-consumer.executor-service :as executor-service]
[ziggurat.messaging.connection :as messaging-connection :refer [connection]]
[ziggurat.messaging.consumer :as messaging-consumer]
[ziggurat.messaging.producer :as messaging-producer]
@@ -15,9 +18,7 @@
[ziggurat.server :as server]
[ziggurat.streams :as streams]
[ziggurat.tracer :as tracer]
- [ziggurat.util.java-util :as util]
- [ziggurat.kafka-consumer.executor-service :as executor-service]
- [ziggurat.kafka-consumer.consumer-driver :as consumer-driver])
+ [ziggurat.util.java-util :as util])
(:gen-class
:methods [^{:static true} [init [java.util.Map] void]]
:name tech.gojek.ziggurat.internal.Init))
@@ -179,6 +180,8 @@
{s/Keyword {:handler-fn (s/pred #(fn? %))
s/Keyword (s/pred #(fn? %))}}))
+(declare BatchRoute)
+
(s/defschema BatchRoute
(s/conditional
#(and (seq %)
@@ -188,9 +191,9 @@
(defn- validate-routes-against-config
([routes route-type]
(doseq [[topic-entity handler-map] routes]
- (let [route-config (-> (ziggurat-config)
- (get-in [route-type topic-entity]))
- channels (-> handler-map (dissoc :handler-fn) (keys) (set))
+ (let [route-config (-> (ziggurat-config)
+ (get-in [route-type topic-entity]))
+ channels (-> handler-map (dissoc :handler-fn) (keys) (set))
config-channels (-> (ziggurat-config)
(get-in [route-type topic-entity :channels])
(keys)
@@ -199,7 +202,7 @@
(throw (IllegalArgumentException. (format "Error! Route %s isn't present in the %s config" topic-entity route-type)))
(when-not (set/subset? channels config-channels)
(let [diff (set/difference channels config-channels)]
- (throw (IllegalArgumentException. (format "Error! The channel(s) %s aren't present in the channels config of %s " (clojure.string/join "," diff) route-type))))))))))
+ (throw (IllegalArgumentException. (format "Error! The channel(s) %s aren't present in the channels config of %s " (str/join "," diff) route-type))))))))))
(defn validate-routes [stream-routes batch-routes modes]
(when (contains? (set modes) :stream-worker)
@@ -210,13 +213,13 @@
(validate-routes-against-config batch-routes :batch-routes)))
(defn- derive-modes [stream-routes batch-routes actor-routes]
- (let [base-modes [:management-api :worker]]
- (if (and (nil? stream-routes) (nil? batch-routes))
+ (let [base-modes [:management-api :worker]]
+ (when (and (nil? stream-routes) (nil? batch-routes))
(throw (IllegalArgumentException. "Either :stream-routes or :batch-routes should be present in init args")))
(cond-> base-modes
(some? stream-routes) (conj :stream-worker)
- (some? batch-routes) (conj :batch-worker)
- (some? actor-routes) (conj :api-server))))
+ (some? batch-routes) (conj :batch-worker)
+ (some? actor-routes) (conj :api-server))))
(defn validate-modes [modes stream-routes batch-routes actor-routes]
(let [derived-modes (if-not (empty? modes)
diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj
index e9f1d17d..c82fc33e 100644
--- a/src/ziggurat/mapper.clj
+++ b/src/ziggurat/mapper.clj
@@ -1,11 +1,9 @@
(ns ziggurat.mapper
(:require [clojure.string :as str]
- [sentry-clj.async :as sentry]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.messaging.producer :as producer]
[ziggurat.metrics :as metrics]
[ziggurat.new-relic :as nr]
- [ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.util.error :refer [report-error]])
(:import (java.time Instant)))
@@ -89,4 +87,4 @@
(report-error e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))
-(defrecord MessagePayload [message topic-entity])
+(defrecord MessagePayload [message topic-entity]) ;; NOTE: is this still needed?
diff --git a/src/ziggurat/message_payload.clj b/src/ziggurat/message_payload.clj
index d5ef26fc..b77fc2f6 100644
--- a/src/ziggurat/message_payload.clj
+++ b/src/ziggurat/message_payload.clj
@@ -10,3 +10,7 @@
:topic-entity s/Keyword
(s/optional-key :retry-count) s/Int
(s/optional-key :headers) s/Any})
+
+(defn mk-message-payload
+ [msg topic-entity retry-count]
+ {:message (.getBytes (str msg)) :topic-entity (name topic-entity) :retry-count retry-count})
diff --git a/src/ziggurat/messaging/connection.clj b/src/ziggurat/messaging/connection.clj
index 889601ea..18e1101a 100644
--- a/src/ziggurat/messaging/connection.clj
+++ b/src/ziggurat/messaging/connection.clj
@@ -1,19 +1,19 @@
(ns ziggurat.messaging.connection
- (:require [clojure.tools.logging :as log]
+ (:require [clojure.string :as str]
+ [clojure.tools.logging :as log]
[langohr.core :as rmq]
[mount.core :as mount :refer [defstate start]]
[sentry-clj.async :as sentry]
+ [ziggurat.channel :refer [get-keys-for-topic]]
[ziggurat.config :refer [ziggurat-config]]
+ [ziggurat.messaging.util :as util]
[ziggurat.sentry :refer [sentry-reporter]]
- [ziggurat.channel :refer [get-keys-for-topic]]
[ziggurat.tracer :refer [tracer]]
- [ziggurat.messaging.util :as util]
- [clojure.string :as str]
[ziggurat.util.error :refer [report-error]])
(:import [com.rabbitmq.client ShutdownListener Address ListAddressResolver]
- [java.util.concurrent Executors ExecutorService]
+ [com.rabbitmq.client.impl DefaultCredentialsProvider]
[io.opentracing.contrib.rabbitmq TracingConnectionFactory]
- [com.rabbitmq.client.impl DefaultCredentialsProvider]))
+ [java.util.concurrent Executors ExecutorService]))
(defn is-connection-required? []
(let [stream-routes (:stream-routes (mount/args))
@@ -66,6 +66,8 @@
(rmq/close conn))
(log/info "Disconnected from RabbitMQ")))
+(declare connection)
+
(defstate connection
:start (start-connection)
:stop (stop-connection connection))
diff --git a/src/ziggurat/messaging/consumer.clj b/src/ziggurat/messaging/consumer.clj
index 86a5eb91..ae32ca69 100644
--- a/src/ziggurat/messaging/consumer.clj
+++ b/src/ziggurat/messaging/consumer.clj
@@ -1,45 +1,33 @@
(ns ziggurat.messaging.consumer
- (:require [ziggurat.mapper :as mpr]
- [ziggurat.message-payload :as mp]
- [clojure.tools.logging :as log]
+ (:require [clojure.tools.logging :as log]
[langohr.basic :as lb]
[langohr.channel :as lch]
[langohr.consumers :as lcons]
- [ziggurat.kafka-consumer.consumer-handler :as ch]
- [schema.core :as s]
- [sentry-clj.async :as sentry]
[taoensso.nippy :as nippy]
[ziggurat.config :refer [get-in-config]]
+ [ziggurat.kafka-consumer.consumer-handler :as ch]
+ [ziggurat.mapper :as mpr]
[ziggurat.messaging.connection :refer [connection]]
- [ziggurat.sentry :refer [sentry-reporter]]
- [ziggurat.messaging.util :refer :all]
+ [ziggurat.messaging.util :refer [prefixed-queue-name prefixed-channel-name]]
[ziggurat.metrics :as metrics]
- [ziggurat.util.error :refer [report-error]]))
-
-(defn- convert-to-message-payload
- "This function is used for migration from Ziggurat Version 2.x to 3.x. It checks if the message is a message payload or a message(pushed by Ziggurat version < 3.0.0) and converts messages to
- message-payload to pass onto the mapper-fn.
+ [ziggurat.middleware.default :as mw]
+ [ziggurat.util.error :refer [report-error]])
+ (:import [com.ziggurat.proto MessagePayloadProto$MessagePayload]))
- If the `:retry-count` key is absent in the `message`, then it puts `0` as the value for `:retry-count` in `MessagePayload`.
- It also converts the topic-entity into a keyword while constructing MessagePayload."
- [message topic-entity]
- (try
- (s/validate mp/message-payload-schema message)
- (catch Exception e
- (log/info "old message format read, converting to message-payload: " message)
- (let [retry-count (or (:retry-count message) 0)
- message-payload (mp/->MessagePayload (dissoc message :retry-count) (keyword topic-entity))]
- (assoc message-payload :retry-count retry-count)))))
+(defn- try-deserialize-message
+ [topic-entity-name message]
+ (let [from-proto (mw/deserialize-message message MessagePayloadProto$MessagePayload topic-entity-name)]
+ (if (nil? from-proto) (nippy/thaw message) from-proto)))
(defn convert-and-ack-message
- "De-serializes the message payload (`payload`) using `nippy/thaw` and converts it to `MessagePayload`. Acks the message
- if `ack?` is true."
- [ch {:keys [delivery-tag] :as meta} ^bytes payload ack? topic-entity]
+ "De-serializes the message payload (`payload`) using `nippy/thaw` or `proto` (whichever of the two succeeds).
+ Acks the message if `ack?` is true."
+ [ch {:keys [delivery-tag]} ^bytes payload ack? topic-entity]
(try
- (let [message (nippy/thaw payload)]
+ (let [message (try-deserialize-message topic-entity payload)]
(when ack?
(lb/ack ch delivery-tag))
- (convert-to-message-payload message topic-entity))
+ message)
(catch Exception e
(lb/reject ch delivery-tag false)
(report-error e "Error while decoding message")
@@ -51,8 +39,8 @@
(lb/ack ch delivery-tag))
(defn process-message-from-queue [ch meta payload topic-entity processing-fn]
- (let [delivery-tag (:delivery-tag meta)
- message-payload (convert-and-ack-message ch meta payload false topic-entity)]
+ (let [delivery-tag (:delivery-tag meta)
+ message-payload (convert-and-ack-message ch meta payload false topic-entity)]
(when message-payload
(log/infof "Processing message [%s] from RabbitMQ " message-payload)
(try
@@ -112,13 +100,13 @@
(defn- start-subscriber* [ch prefetch-count queue-name wrapped-mapper-fn topic-entity]
(lb/qos ch prefetch-count)
- (let [consumer-tag (lcons/subscribe ch
- queue-name
- (message-handler wrapped-mapper-fn topic-entity)
- {:handle-shutdown-signal-fn (fn [consumer_tag reason]
- (log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason))
- :handle-consume-ok-fn (fn [consumer_tag]
- (log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))})]))
+ (lcons/subscribe ch
+ queue-name
+ (message-handler wrapped-mapper-fn topic-entity)
+ {:handle-shutdown-signal-fn (fn [consumer_tag reason]
+ (log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason))
+ :handle-consume-ok-fn (fn [consumer_tag]
+ (log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))}))
(defn start-retry-subscriber* [handler-fn topic-entity]
(when (get-in-config [:retry :enabled])
@@ -144,12 +132,12 @@
"Starts the subscriber to the instant queue of the rabbitmq"
[stream-routes batch-routes]
(doseq [stream-route stream-routes]
- (let [topic-entity (first stream-route)
- handler (-> stream-route second :handler-fn)
- channels (-> stream-route second (dissoc :handler-fn))]
+ (let [topic-entity (first stream-route)
+ handler (-> stream-route second :handler-fn)
+ channels (-> stream-route second (dissoc :handler-fn))]
(start-channels-subscriber channels topic-entity)
(start-retry-subscriber* (mpr/mapper-func handler (keys channels)) topic-entity)))
(doseq [batch-route batch-routes]
- (let [topic-entity (first batch-route)
- handler (-> batch-route second :handler-fn)]
+ (let [topic-entity (first batch-route)
+ handler (-> batch-route second :handler-fn)]
(start-retry-subscriber* (fn [message] (ch/process handler message)) topic-entity))))
diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj
index 699b2105..212d301e 100644
--- a/src/ziggurat/messaging/producer.clj
+++ b/src/ziggurat/messaging/producer.clj
@@ -3,22 +3,20 @@
[langohr.basic :as lb]
[langohr.channel :as lch]
[langohr.exchange :as le]
- [langohr.queue :as lq]
[langohr.http :as lh]
- [taoensso.nippy :as nippy]
+ [langohr.queue :as lq]
[ziggurat.config :refer [config ziggurat-config rabbitmq-config channel-retry-config]]
[ziggurat.messaging.connection :refer [connection is-connection-required?]]
- [ziggurat.messaging.util :refer :all]
- [ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.messaging.util :as util]
- [ziggurat.metrics :as metrics])
- (:import (java.io IOException)
- (com.rabbitmq.client AlreadyClosedException)))
+ [ziggurat.metrics :as metrics]
+ [ziggurat.middleware.default :as mwd])
+ (:import (com.rabbitmq.client AlreadyClosedException)
+ (java.io IOException)))
(def MAX_EXPONENTIAL_RETRIES 25)
(defn delay-queue-name [topic-entity queue-name]
- (prefixed-queue-name topic-entity queue-name))
+ (util/prefixed-queue-name topic-entity queue-name))
(defn get-replica-count [host-count]
(int (Math/ceil (/ host-count 2))))
@@ -52,8 +50,8 @@
hosts-vec (util/list-of-hosts rmq-config)
ha-policy-body (get-default-ha-policy rmq-config (get-replica-count (count hosts-vec)))]
(loop [hosts hosts-vec]
- (let [host-endpoint (str "http://" (first hosts) ":" (get rmq-config :admin-port 15672))
- resp (set-ha-policy-on-host host-endpoint username password ha-policy-body exchange-name queue-name)
+ (let [host-endpoint (str "http://" (first hosts) ":" (get rmq-config :admin-port 15672))
+ resp (set-ha-policy-on-host host-endpoint username password ha-policy-body exchange-name queue-name)
remaining-hosts (rest hosts)]
(when (and (nil? resp)
(pos? (count remaining-hosts)))
@@ -78,12 +76,12 @@
(try
(let [props (if dead-letter-exchange
{"x-dead-letter-exchange" dead-letter-exchange}
- {})]
- (let [ch (lch/open connection)]
- (create-queue queue-name props ch)
- (declare-exchange ch exchange-name)
- (bind-queue-to-exchange ch queue-name exchange-name)
- (set-ha-policy queue-name exchange-name (get-in config [:ziggurat :rabbit-mq-connection]))))
+ {})
+ ch (lch/open connection)]
+ (create-queue queue-name props ch)
+ (declare-exchange ch exchange-name)
+ (bind-queue-to-exchange ch queue-name exchange-name)
+ (set-ha-policy queue-name exchange-name (get-in config [:ziggurat :rabbit-mq-connection])))
(catch Exception e
(log/error e "Error while declaring RabbitMQ queues")
(throw e)))))
@@ -113,8 +111,9 @@
[exchange message-payload expiration]
(try
(with-open [ch (lch/open connection)]
- (lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers))
- (properties-for-publish expiration (:headers message-payload))))
+ (let [rmq-msg (mwd/serialize-to-message-payload (:topic-entity message-payload) (dissoc message-payload :headers))
+ props (properties-for-publish expiration (:headers message-payload))]
+ (lb/publish ch exchange "" rmq-msg props)))
false
(catch AlreadyClosedException e
(handle-network-exception e message-payload))
@@ -151,13 +150,14 @@
queue-timeout-ms (get-in (rabbitmq-config) [:delay :queue-timeout-ms])]
(or channel-queue-timeout-ms queue-timeout-ms)))
-(defn- get-backoff-exponent [retry-count message-retry-count]
+(defn- get-backoff-exponent
"Calculates the exponent using the formula `retry-count` and `message-retry-count`, where `retry-count` is the total retries
possible and `message-retry-count` is the count of retries available for the message.
Caps the value of `retry-count` to MAX_EXPONENTIAL_RETRIES.
Returns 1, if `message-retry-count` is higher than `max(MAX_EXPONENTIAL_RETRIES, retry-count)`."
+ [retry-count message-retry-count]
(let [exponent (- (min MAX_EXPONENTIAL_RETRIES retry-count) message-retry-count)]
(max 1 exponent)))
@@ -181,40 +181,44 @@
(let [exponential-backoff (get-backoff-exponent retry-count message-retry-count)]
(long (* (dec (Math/pow 2 exponential-backoff)) queue-timeout-ms))))
-(defn get-queue-timeout-ms [message-payload]
+(defn get-queue-timeout-ms
"Calculate queue timeout for delay queue. Uses the value from [[get-exponential-backoff-timeout-ms]] if exponential backoff enabled."
- (let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms)
- retry-count (-> (ziggurat-config) :retry :count)
+ [message-payload]
+ (let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms)
+ retry-count (-> (ziggurat-config) :retry :count)
message-retry-count (:retry-count message-payload)]
(if (= :exponential (-> (ziggurat-config) :retry :type))
(get-exponential-backoff-timeout-ms retry-count message-retry-count queue-timeout-ms)
queue-timeout-ms)))
-(defn get-channel-queue-timeout-ms [topic-entity channel message-payload]
+(defn get-channel-queue-timeout-ms
"Calculate queue timeout for channel delay queue. Uses the value from [[get-exponential-backoff-timeout-ms]] if exponential backoff enabled."
+ [topic-entity channel message-payload]
(let [channel-queue-timeout-ms (get-channel-queue-timeout-or-default-timeout topic-entity channel)
- message-retry-count (:retry-count message-payload)
- channel-retry-count (get-channel-retry-count topic-entity channel)]
+ message-retry-count (:retry-count message-payload)
+ channel-retry-count (get-channel-retry-count topic-entity channel)]
(if (= :exponential (channel-retry-type topic-entity channel))
(get-exponential-backoff-timeout-ms channel-retry-count message-retry-count channel-queue-timeout-ms)
channel-queue-timeout-ms)))
-(defn get-delay-exchange-name [topic-entity message-payload]
+(defn get-delay-exchange-name
"This function return delay exchange name for retry when using flow without channel. It will return exchange name with retry count as suffix if exponential backoff enabled."
+ [topic-entity message-payload]
(let [{:keys [exchange-name]} (:delay (rabbitmq-config))
- exchange-name (prefixed-queue-name topic-entity exchange-name)
- retry-count (-> (ziggurat-config) :retry :count)]
+ exchange-name (util/prefixed-queue-name topic-entity exchange-name)
+ retry-count (-> (ziggurat-config) :retry :count)]
(if (= :exponential (-> (ziggurat-config) :retry :type))
(let [message-retry-count (:retry-count message-payload)
- backoff-exponent (get-backoff-exponent retry-count message-retry-count)]
- (prefixed-queue-name exchange-name backoff-exponent))
+ backoff-exponent (get-backoff-exponent retry-count message-retry-count)]
+ (util/prefixed-queue-name exchange-name backoff-exponent))
exchange-name)))
-(defn get-channel-delay-exchange-name [topic-entity channel message-payload]
+(defn get-channel-delay-exchange-name
"This function return delay exchange name for retry when using channel flow. It will return exchange name with retry count as suffix if exponential backoff enabled."
+ [topic-entity channel message-payload]
(let [{:keys [exchange-name]} (:delay (rabbitmq-config))
- exchange-name (prefixed-channel-name topic-entity channel exchange-name)
- channel-retry-count (get-channel-retry-count topic-entity channel)]
+ exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)
+ channel-retry-count (get-channel-retry-count topic-entity channel)]
(if (= :exponential (channel-retry-type topic-entity channel))
(let [message-retry-count (:retry-count message-payload)
exponential-backoff (get-backoff-exponent channel-retry-count message-retry-count)]
@@ -222,85 +226,85 @@
exchange-name)))
(defn publish-to-delay-queue [message-payload]
- (let [topic-entity (:topic-entity message-payload)
- exchange-name (get-delay-exchange-name topic-entity message-payload)
+ (let [topic-entity (:topic-entity message-payload)
+ exchange-name (get-delay-exchange-name topic-entity message-payload)
queue-timeout-ms (get-queue-timeout-ms message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))
(defn publish-to-dead-queue [message-payload]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
- topic-entity (:topic-entity message-payload)
- exchange-name (prefixed-queue-name topic-entity exchange-name)]
+ topic-entity (:topic-entity message-payload)
+ exchange-name (util/prefixed-queue-name topic-entity exchange-name)]
(publish exchange-name message-payload)))
(defn publish-to-instant-queue [message-payload]
(let [{:keys [exchange-name]} (:instant (rabbitmq-config))
- topic-entity (:topic-entity message-payload)
- exchange-name (prefixed-queue-name topic-entity exchange-name)]
+ topic-entity (:topic-entity message-payload)
+ exchange-name (util/prefixed-queue-name topic-entity exchange-name)]
(publish exchange-name message-payload)))
(defn publish-to-channel-delay-queue [channel message-payload]
- (let [topic-entity (:topic-entity message-payload)
- exchange-name (get-channel-delay-exchange-name topic-entity channel message-payload)
+ (let [topic-entity (:topic-entity message-payload)
+ exchange-name (get-channel-delay-exchange-name topic-entity channel message-payload)
queue-timeout-ms (get-channel-queue-timeout-ms topic-entity channel message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))
(defn publish-to-channel-dead-queue [channel message-payload]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
- topic-entity (:topic-entity message-payload)
- exchange-name (prefixed-channel-name topic-entity channel exchange-name)]
+ topic-entity (:topic-entity message-payload)
+ exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)]
(publish exchange-name message-payload)))
(defn publish-to-channel-instant-queue [channel message-payload]
(let [{:keys [exchange-name]} (:instant (rabbitmq-config))
- topic-entity (:topic-entity message-payload)
- exchange-name (prefixed-channel-name topic-entity channel exchange-name)]
+ topic-entity (:topic-entity message-payload)
+ exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)]
(publish exchange-name message-payload)))
-(defn retry [{:keys [retry-count topic-entity] :as message-payload}]
+(defn retry [{:keys [retry-count] :as message-payload}]
(when (-> (ziggurat-config) :retry :enabled)
(cond
- (nil? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec (-> (ziggurat-config) :retry :count))))
- (pos? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec retry-count)))
+ (nil? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec (-> (ziggurat-config) :retry :count))))
+ (pos? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec retry-count)))
(zero? retry-count) (publish-to-dead-queue (assoc message-payload :retry-count (-> (ziggurat-config) :retry :count))))))
(defn retry-for-channel [{:keys [retry-count topic-entity] :as message-payload} channel]
(when (channel-retries-enabled topic-entity channel)
(cond
- (nil? retry-count) (publish-to-channel-delay-queue channel (assoc message-payload :retry-count (dec (get-channel-retry-count topic-entity channel))))
- (pos? retry-count) (publish-to-channel-delay-queue channel (assoc message-payload :retry-count (dec retry-count)))
+ (nil? retry-count) (publish-to-channel-delay-queue channel (assoc message-payload :retry-count (dec (get-channel-retry-count topic-entity channel))))
+ (pos? retry-count) (publish-to-channel-delay-queue channel (assoc message-payload :retry-count (dec retry-count)))
(zero? retry-count) (publish-to-channel-dead-queue channel (assoc message-payload :retry-count (get-channel-retry-count topic-entity channel))))))
(defn- make-delay-queue [topic-entity]
(let [{:keys [queue-name exchange-name dead-letter-exchange]} (:delay (rabbitmq-config))
- queue-name (delay-queue-name topic-entity queue-name)
- exchange-name (prefixed-queue-name topic-entity exchange-name)
- dead-letter-exchange-name (prefixed-queue-name topic-entity dead-letter-exchange)]
+ queue-name (delay-queue-name topic-entity queue-name)
+ exchange-name (util/prefixed-queue-name topic-entity exchange-name)
+ dead-letter-exchange-name (util/prefixed-queue-name topic-entity dead-letter-exchange)]
(create-and-bind-queue queue-name exchange-name dead-letter-exchange-name)))
(defn- make-delay-queue-with-retry-count [topic-entity retry-count]
(let [{:keys [queue-name exchange-name dead-letter-exchange]} (:delay (rabbitmq-config))
- queue-name (delay-queue-name topic-entity queue-name)
- exchange-name (prefixed-queue-name topic-entity exchange-name)
- dead-letter-exchange-name (prefixed-queue-name topic-entity dead-letter-exchange)
- sequence (min MAX_EXPONENTIAL_RETRIES (inc retry-count))]
+ queue-name (delay-queue-name topic-entity queue-name)
+ exchange-name (util/prefixed-queue-name topic-entity exchange-name)
+ dead-letter-exchange-name (util/prefixed-queue-name topic-entity dead-letter-exchange)
+ sequence (min MAX_EXPONENTIAL_RETRIES (inc retry-count))]
(doseq [s (range 1 sequence)]
- (create-and-bind-queue (prefixed-queue-name queue-name s) (prefixed-queue-name exchange-name s) dead-letter-exchange-name))))
+ (create-and-bind-queue (util/prefixed-queue-name queue-name s) (util/prefixed-queue-name exchange-name s) dead-letter-exchange-name))))
(defn- make-channel-delay-queue-with-retry-count [topic-entity channel retry-count]
- (make-delay-queue-with-retry-count (with-channel-name topic-entity channel) retry-count))
+ (make-delay-queue-with-retry-count (util/with-channel-name topic-entity channel) retry-count))
(defn- make-channel-delay-queue [topic-entity channel]
- (make-delay-queue (with-channel-name topic-entity channel)))
+ (make-delay-queue (util/with-channel-name topic-entity channel)))
(defn- make-queue [topic-identifier queue-type]
(let [{:keys [queue-name exchange-name]} (queue-type (rabbitmq-config))
- queue-name (prefixed-queue-name topic-identifier queue-name)
- exchange-name (prefixed-queue-name topic-identifier exchange-name)]
+ queue-name (util/prefixed-queue-name topic-identifier queue-name)
+ exchange-name (util/prefixed-queue-name topic-identifier exchange-name)]
(create-and-bind-queue queue-name exchange-name)))
(defn- make-channel-queue [topic-entity channel-name queue-type]
- (make-queue (with-channel-name topic-entity channel-name) queue-type))
+ (make-queue (util/with-channel-name topic-entity channel-name) queue-type))
(defn- make-channel-queues [channels topic-entity]
(doseq [channel channels]
@@ -314,20 +318,20 @@
"Please use it only after understanding its risks and implications."
"Its contract can change in the future releases of Ziggurat.")
(make-channel-delay-queue-with-retry-count topic-entity channel (get-channel-retry-count topic-entity channel)))
- (= :linear channel-retry-type) (make-channel-delay-queue topic-entity channel)
- (nil? channel-retry-type) (do
- (log/warn "[Deprecation Notice]: Please note that the configuration for channel retries has changed."
- "Please look at the upgrade guide for details: https://github.com/gojek/ziggurat/wiki/Upgrade-guide"
- "Use :type to specify the type of retry mechanism in the channel config.")
- (make-channel-delay-queue topic-entity channel))
- :else (do
- (log/warn "Incorrect keyword for type passed, falling back to linear backoff for channel: " channel)
- (make-channel-delay-queue topic-entity channel)))))))
+ (= :linear channel-retry-type) (make-channel-delay-queue topic-entity channel)
+ (nil? channel-retry-type) (do
+ (log/warn "[Deprecation Notice]: Please note that the configuration for channel retries has changed."
+ "Please look at the upgrade guide for details: https://github.com/gojek/ziggurat/wiki/Upgrade-guide"
+ "Use :type to specify the type of retry mechanism in the channel config.")
+ (make-channel-delay-queue topic-entity channel))
+ :else (do
+ (log/warn "Incorrect keyword for type passed, falling back to linear backoff for channel: " channel)
+ (make-channel-delay-queue topic-entity channel)))))))
(defn make-queues [routes]
(when (is-connection-required?)
(doseq [topic-entity (keys routes)]
- (let [channels (get-channel-names routes topic-entity)
+ (let [channels (util/get-channel-names routes topic-entity)
retry-type (retry-type)]
(make-channel-queues channels topic-entity)
(when (-> (ziggurat-config) :retry :enabled)
@@ -345,7 +349,6 @@
"Please look at the upgrade guide for details: https://github.com/gojek/ziggurat/wiki/Upgrade-guide"
"Use :type to specify the type of retry mechanism in the config.")
(make-delay-queue topic-entity))
- :else (do
- (log/warn "Incorrect keyword for type passed, falling back to linear backoff for topic Entity: " topic-entity)
- (make-delay-queue topic-entity))))))))
-
+ :else (do
+ (log/warn "Incorrect keyword for type passed, falling back to linear backoff for topic Entity: " topic-entity)
+ (make-delay-queue topic-entity))))))))
diff --git a/src/ziggurat/middleware/default.clj b/src/ziggurat/middleware/default.clj
index c4ba53fc..63e6bcf6 100644
--- a/src/ziggurat/middleware/default.clj
+++ b/src/ziggurat/middleware/default.clj
@@ -1,13 +1,16 @@
(ns ziggurat.middleware.default
- (:require [protobuf.impl.flatland.mapdef :as protodef]
- [sentry-clj.async :as sentry]
- [ziggurat.config :refer [get-in-config ziggurat-config]]
+ (:require [clojure.edn :as edn]
+ [protobuf.core :as protobuf]
+ [protobuf.impl.flatland.mapdef :as protodef]
+ [ziggurat.config :refer [ziggurat-config]]
+ [ziggurat.message-payload :refer [mk-message-payload]]
[ziggurat.metrics :as metrics]
- [ziggurat.sentry :refer [sentry-reporter]]
- [ziggurat.util.error :refer [report-error]]))
+ [ziggurat.util.error :refer [report-error]])
+ (:import [com.ziggurat.proto MessagePayloadProto$MessagePayload]))
-(defn protobuf-struct->persistent-map [struct]
+(defn protobuf-struct->persistent-map
"This functions converts a protobuf struct in to clojure persistent map recursively"
+ [struct]
(let [fields (:fields struct)
protobuf-value-flattener (fn flatten-value [value]
(first (map (fn [[type val]]
@@ -33,6 +36,13 @@
{}
fields)))
+(defn serialize-message
+ [proto-class topic-entity message]
+ (let [message (mk-message-payload (:message message) topic-entity (:retry-count message))]
+ (protobuf/->bytes (protobuf/create proto-class message))))
+
+(def serialize-to-message-payload (partial serialize-message MessagePayloadProto$MessagePayload))
+
(defn deserialize-message
"This function takes in the message(proto Byte Array) and the proto-class and deserializes the proto ByteArray into a
Clojure PersistentHashMap.
@@ -50,7 +60,13 @@
protodef/mapdef->schema
:fields
keys)
- result (select-keys loaded-proto proto-keys)]
+ result (select-keys loaded-proto proto-keys)
+ msg (if (:message result) ;; TODO: please read the TODO above!
+ (edn/read-string (.toStringUtf8 (:message result)))
+ result)
+ te (keyword (get result :topic-entity topic-entity-name))
+ rc (get result :retry-count 0)
+ result {:message msg :topic-entity te :retry-count rc}]
(if flatten-protobuf-struct?
(let [struct-entries (-> proto-klass
protodef/mapdef->schema
diff --git a/src/ziggurat/middleware/json.clj b/src/ziggurat/middleware/json.clj
index cdc7bc4e..9f7f19ce 100644
--- a/src/ziggurat/middleware/json.clj
+++ b/src/ziggurat/middleware/json.clj
@@ -3,9 +3,7 @@
Please see [Ziggurat Middleware](https://github.com/gojek/ziggurat#middleware-in-ziggurat) for more details.
"
(:require [cheshire.core :refer [parse-string]]
- [sentry-clj.async :as sentry]
[ziggurat.config :refer [ziggurat-config]]
- [ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.metrics :as metrics]
[ziggurat.util.error :refer [report-error]]))
diff --git a/test/com/gojek/test/proto/Example.java b/test/com/gojek/test/proto/Example.java
new file mode 100644
index 00000000..ae4bfd4b
--- /dev/null
+++ b/test/com/gojek/test/proto/Example.java
@@ -0,0 +1,3017 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: proto/example.proto
+
+package com.gojek.test.proto;
+
+public final class Example {
+ private Example() {}
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistryLite registry) {
+ }
+
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistry registry) {
+ registerAllExtensions(
+ (com.google.protobuf.ExtensionRegistryLite) registry);
+ }
+ public interface PhotoOrBuilder extends
+ // @@protoc_insertion_point(interface_extends:com.gojek.test.proto.Photo)
+ com.google.protobuf.MessageOrBuilder {
+
+ /**
+ * int32 id = 1;
+ * @return The id.
+ */
+ int getId();
+
+ /**
+ * string path = 2;
+ * @return The path.
+ */
+ java.lang.String getPath();
+ /**
+ * string path = 2;
+ * @return The bytes for path.
+ */
+ com.google.protobuf.ByteString
+ getPathBytes();
+
+ /**
+ * optional bytes image = 6;
+ * @return Whether the image field is set.
+ */
+ boolean hasImage();
+ /**
+ * optional bytes image = 6;
+ * @return The image.
+ */
+ com.google.protobuf.ByteString getImage();
+ }
+ /**
+ * Protobuf type {@code com.gojek.test.proto.Photo}
+ */
+ public static final class Photo extends
+ com.google.protobuf.GeneratedMessageV3 implements
+ // @@protoc_insertion_point(message_implements:com.gojek.test.proto.Photo)
+ PhotoOrBuilder {
+ private static final long serialVersionUID = 0L;
+ // Use Photo.newBuilder() to construct.
+ private Photo(com.google.protobuf.GeneratedMessageV3.Builder> builder) {
+ super(builder);
+ }
+ private Photo() {
+ path_ = "";
+ image_ = com.google.protobuf.ByteString.EMPTY;
+ }
+
+ @java.lang.Override
+ @SuppressWarnings({"unused"})
+ protected java.lang.Object newInstance(
+ UnusedPrivateParameter unused) {
+ return new Photo();
+ }
+
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private Photo(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ this();
+ if (extensionRegistry == null) {
+ throw new java.lang.NullPointerException();
+ }
+ int mutable_bitField0_ = 0;
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ case 8: {
+
+ id_ = input.readInt32();
+ break;
+ }
+ case 18: {
+ java.lang.String s = input.readStringRequireUtf8();
+
+ path_ = s;
+ break;
+ }
+ case 50: {
+ bitField0_ |= 0x00000001;
+ image_ = input.readBytes();
+ break;
+ }
+ default: {
+ if (!parseUnknownField(
+ input, unknownFields, extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return com.gojek.test.proto.Example.internal_static_com_gojek_test_proto_Photo_descriptor;
+ }
+
+ @java.lang.Override
+ protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return com.gojek.test.proto.Example.internal_static_com_gojek_test_proto_Photo_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ com.gojek.test.proto.Example.Photo.class, com.gojek.test.proto.Example.Photo.Builder.class);
+ }
+
+ public interface LabelOrBuilder extends
+ // @@protoc_insertion_point(interface_extends:com.gojek.test.proto.Photo.Label)
+ com.google.protobuf.MessageOrBuilder {
+
+ /**
+ * string item = 1;
+ * @return The item.
+ */
+ java.lang.String getItem();
+ /**
+ * string item = 1;
+ * @return The bytes for item.
+ */
+ com.google.protobuf.ByteString
+ getItemBytes();
+
+ /**
+ * bool exists = 2;
+ * @return The exists.
+ */
+ boolean getExists();
+ }
+ /**
+ * Protobuf type {@code com.gojek.test.proto.Photo.Label}
+ */
+ public static final class Label extends
+ com.google.protobuf.GeneratedMessageV3 implements
+ // @@protoc_insertion_point(message_implements:com.gojek.test.proto.Photo.Label)
+ LabelOrBuilder {
+ private static final long serialVersionUID = 0L;
+ // Use Label.newBuilder() to construct.
+ private Label(com.google.protobuf.GeneratedMessageV3.Builder> builder) {
+ super(builder);
+ }
+ private Label() {
+ item_ = "";
+ }
+
+ @java.lang.Override
+ @SuppressWarnings({"unused"})
+ protected java.lang.Object newInstance(
+ UnusedPrivateParameter unused) {
+ return new Label();
+ }
+
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private Label(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ this();
+ if (extensionRegistry == null) {
+ throw new java.lang.NullPointerException();
+ }
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ case 10: {
+ java.lang.String s = input.readStringRequireUtf8();
+
+ item_ = s;
+ break;
+ }
+ case 16: {
+
+ exists_ = input.readBool();
+ break;
+ }
+ default: {
+ if (!parseUnknownField(
+ input, unknownFields, extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return com.gojek.test.proto.Example.internal_static_com_gojek_test_proto_Photo_Label_descriptor;
+ }
+
+ @java.lang.Override
+ protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return com.gojek.test.proto.Example.internal_static_com_gojek_test_proto_Photo_Label_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ com.gojek.test.proto.Example.Photo.Label.class, com.gojek.test.proto.Example.Photo.Label.Builder.class);
+ }
+
+ public static final int ITEM_FIELD_NUMBER = 1;
+ private volatile java.lang.Object item_;
+ /**
+ * string item = 1;
+ * @return The item.
+ */
+ @java.lang.Override
+ public java.lang.String getItem() {
+ java.lang.Object ref = item_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ item_ = s;
+ return s;
+ }
+ }
+ /**
+ * string item = 1;
+ * @return The bytes for item.
+ */
+ @java.lang.Override
+ public com.google.protobuf.ByteString
+ getItemBytes() {
+ java.lang.Object ref = item_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ item_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ public static final int EXISTS_FIELD_NUMBER = 2;
+ private boolean exists_;
+ /**
+ * bool exists = 2;
+ * @return The exists.
+ */
+ @java.lang.Override
+ public boolean getExists() {
+ return exists_;
+ }
+
+ private byte memoizedIsInitialized = -1;
+ @java.lang.Override
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized == 1) return true;
+ if (isInitialized == 0) return false;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ @java.lang.Override
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ if (!getItemBytes().isEmpty()) {
+ com.google.protobuf.GeneratedMessageV3.writeString(output, 1, item_);
+ }
+ if (exists_ != false) {
+ output.writeBool(2, exists_);
+ }
+ unknownFields.writeTo(output);
+ }
+
+ @java.lang.Override
+ public int getSerializedSize() {
+ int size = memoizedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (!getItemBytes().isEmpty()) {
+ size += com.google.protobuf.GeneratedMessageV3.computeStringSize(1, item_);
+ }
+ if (exists_ != false) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(2, exists_);
+ }
+ size += unknownFields.getSerializedSize();
+ memoizedSize = size;
+ return size;
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof com.gojek.test.proto.Example.Photo.Label)) {
+ return super.equals(obj);
+ }
+ com.gojek.test.proto.Example.Photo.Label other = (com.gojek.test.proto.Example.Photo.Label) obj;
+
+ if (!getItem()
+ .equals(other.getItem())) return false;
+ if (getExists()
+ != other.getExists()) return false;
+ if (!unknownFields.equals(other.unknownFields)) return false;
+ return true;
+ }
+
+ @java.lang.Override
+ public int hashCode() {
+ if (memoizedHashCode != 0) {
+ return memoizedHashCode;
+ }
+ int hash = 41;
+ hash = (19 * hash) + getDescriptor().hashCode();
+ hash = (37 * hash) + ITEM_FIELD_NUMBER;
+ hash = (53 * hash) + getItem().hashCode();
+ hash = (37 * hash) + EXISTS_FIELD_NUMBER;
+ hash = (53 * hash) + com.google.protobuf.Internal.hashBoolean(
+ getExists());
+ hash = (29 * hash) + unknownFields.hashCode();
+ memoizedHashCode = hash;
+ return hash;
+ }
+
+ public static com.gojek.test.proto.Example.Photo.Label parseFrom(
+ java.nio.ByteBuffer data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static com.gojek.test.proto.Example.Photo.Label parseFrom(
+ java.nio.ByteBuffer data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static com.gojek.test.proto.Example.Photo.Label parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static com.gojek.test.proto.Example.Photo.Label parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static com.gojek.test.proto.Example.Photo.Label parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static com.gojek.test.proto.Example.Photo.Label parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static com.gojek.test.proto.Example.Photo.Label parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input);
+ }
+ public static com.gojek.test.proto.Example.Photo.Label parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input, extensionRegistry);
+ }
+ public static com.gojek.test.proto.Example.Photo.Label parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseDelimitedWithIOException(PARSER, input);
+ }
+ public static com.gojek.test.proto.Example.Photo.Label parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseDelimitedWithIOException(PARSER, input, extensionRegistry);
+ }
+ public static com.gojek.test.proto.Example.Photo.Label parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input);
+ }
+ public static com.gojek.test.proto.Example.Photo.Label parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input, extensionRegistry);
+ }
+
+ @java.lang.Override
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder() {
+ return DEFAULT_INSTANCE.toBuilder();
+ }
+ public static Builder newBuilder(com.gojek.test.proto.Example.Photo.Label prototype) {
+ return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
+ }
+ @java.lang.Override
+ public Builder toBuilder() {
+ return this == DEFAULT_INSTANCE
+ ? new Builder() : new Builder().mergeFrom(this);
+ }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code com.gojek.test.proto.Photo.Label}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessageV3.Builder implements
+ // @@protoc_insertion_point(builder_implements:com.gojek.test.proto.Photo.Label)
+ com.gojek.test.proto.Example.Photo.LabelOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return com.gojek.test.proto.Example.internal_static_com_gojek_test_proto_Photo_Label_descriptor;
+ }
+
+ @java.lang.Override
+ protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return com.gojek.test.proto.Example.internal_static_com_gojek_test_proto_Photo_Label_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ com.gojek.test.proto.Example.Photo.Label.class, com.gojek.test.proto.Example.Photo.Label.Builder.class);
+ }
+
+ // Construct using com.gojek.test.proto.Example.Photo.Label.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessageV3
+ .alwaysUseFieldBuilders) {
+ }
+ }
+ @java.lang.Override
+ public Builder clear() {
+ super.clear();
+ item_ = "";
+
+ exists_ = false;
+
+ return this;
+ }
+
+ @java.lang.Override
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return com.gojek.test.proto.Example.internal_static_com_gojek_test_proto_Photo_Label_descriptor;
+ }
+
+ @java.lang.Override
+ public com.gojek.test.proto.Example.Photo.Label getDefaultInstanceForType() {
+ return com.gojek.test.proto.Example.Photo.Label.getDefaultInstance();
+ }
+
+ @java.lang.Override
+ public com.gojek.test.proto.Example.Photo.Label build() {
+ com.gojek.test.proto.Example.Photo.Label result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ @java.lang.Override
+ public com.gojek.test.proto.Example.Photo.Label buildPartial() {
+ com.gojek.test.proto.Example.Photo.Label result = new com.gojek.test.proto.Example.Photo.Label(this);
+ result.item_ = item_;
+ result.exists_ = exists_;
+ onBuilt();
+ return result;
+ }
+
+ @java.lang.Override
+ public Builder clone() {
+ return super.clone();
+ }
+ @java.lang.Override
+ public Builder setField(
+ com.google.protobuf.Descriptors.FieldDescriptor field,
+ java.lang.Object value) {
+ return super.setField(field, value);
+ }
+ @java.lang.Override
+ public Builder clearField(
+ com.google.protobuf.Descriptors.FieldDescriptor field) {
+ return super.clearField(field);
+ }
+ @java.lang.Override
+ public Builder clearOneof(
+ com.google.protobuf.Descriptors.OneofDescriptor oneof) {
+ return super.clearOneof(oneof);
+ }
+ @java.lang.Override
+ public Builder setRepeatedField(
+ com.google.protobuf.Descriptors.FieldDescriptor field,
+ int index, java.lang.Object value) {
+ return super.setRepeatedField(field, index, value);
+ }
+ @java.lang.Override
+ public Builder addRepeatedField(
+ com.google.protobuf.Descriptors.FieldDescriptor field,
+ java.lang.Object value) {
+ return super.addRepeatedField(field, value);
+ }
+ @java.lang.Override
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof com.gojek.test.proto.Example.Photo.Label) {
+ return mergeFrom((com.gojek.test.proto.Example.Photo.Label)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(com.gojek.test.proto.Example.Photo.Label other) {
+ if (other == com.gojek.test.proto.Example.Photo.Label.getDefaultInstance()) return this;
+ if (!other.getItem().isEmpty()) {
+ item_ = other.item_;
+ onChanged();
+ }
+ if (other.getExists() != false) {
+ setExists(other.getExists());
+ }
+ this.mergeUnknownFields(other.unknownFields);
+ onChanged();
+ return this;
+ }
+
+ @java.lang.Override
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ @java.lang.Override
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ com.gojek.test.proto.Example.Photo.Label parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (com.gojek.test.proto.Example.Photo.Label) e.getUnfinishedMessage();
+ throw e.unwrapIOException();
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+
+ private java.lang.Object item_ = "";
+ /**
+ * string item = 1;
+ * @return The item.
+ */
+ public java.lang.String getItem() {
+ java.lang.Object ref = item_;
+ if (!(ref instanceof java.lang.String)) {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ item_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * string item = 1;
+ * @return The bytes for item.
+ */
+ public com.google.protobuf.ByteString
+ getItemBytes() {
+ java.lang.Object ref = item_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ item_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * string item = 1;
+ * @param value The item to set.
+ * @return This builder for chaining.
+ */
+ public Builder setItem(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+
+ item_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * string item = 1;
+ * @return This builder for chaining.
+ */
+ public Builder clearItem() {
+
+ item_ = getDefaultInstance().getItem();
+ onChanged();
+ return this;
+ }
+ /**
+ * string item = 1;
+ * @param value The bytes for item to set.
+ * @return This builder for chaining.
+ */
+ public Builder setItemBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ checkByteStringIsUtf8(value);
+
+ item_ = value;
+ onChanged();
+ return this;
+ }
+
+ private boolean exists_ ;
+ /**
+ * bool exists = 2;
+ * @return The exists.
+ */
+ @java.lang.Override
+ public boolean getExists() {
+ return exists_;
+ }
+ /**
+ * bool exists = 2;
+ * @param value The exists to set.
+ * @return This builder for chaining.
+ */
+ public Builder setExists(boolean value) {
+
+ exists_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * bool exists = 2;
+ * @return This builder for chaining.
+ */
+ public Builder clearExists() {
+
+ exists_ = false;
+ onChanged();
+ return this;
+ }
+ @java.lang.Override
+ public final Builder setUnknownFields(
+ final com.google.protobuf.UnknownFieldSet unknownFields) {
+ return super.setUnknownFields(unknownFields);
+ }
+
+ @java.lang.Override
+ public final Builder mergeUnknownFields(
+ final com.google.protobuf.UnknownFieldSet unknownFields) {
+ return super.mergeUnknownFields(unknownFields);
+ }
+
+
+ // @@protoc_insertion_point(builder_scope:com.gojek.test.proto.Photo.Label)
+ }
+
+ // @@protoc_insertion_point(class_scope:com.gojek.test.proto.Photo.Label)
+ private static final com.gojek.test.proto.Example.Photo.Label DEFAULT_INSTANCE;
+ static {
+ DEFAULT_INSTANCE = new com.gojek.test.proto.Example.Photo.Label();
+ }
+
+ public static com.gojek.test.proto.Example.Photo.Label getDefaultInstance() {
+ return DEFAULT_INSTANCE;
+ }
+
+ private static final com.google.protobuf.Parser