diff --git a/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoDataWrapper.java b/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoDataWrapper.java index f4b87f5ea..3316787fc 100644 --- a/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoDataWrapper.java +++ b/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoDataWrapper.java @@ -16,8 +16,11 @@ */ package io.cloudevents.protobuf; +import com.google.protobuf.Any; import com.google.protobuf.Message; +import java.util.Arrays; + class ProtoDataWrapper implements ProtoCloudEventData { private final Message protoMessage; @@ -35,4 +38,49 @@ public Message getMessage() { public byte[] toBytes() { return protoMessage.toByteArray(); } + + @Override + public boolean equals(Object obj) { + + if (this == obj) { + return (true); + } + + if (!(obj instanceof ProtoDataWrapper)) { + return (false); + } + + // Now compare the actual data + ProtoDataWrapper rhs = (ProtoDataWrapper) obj; + + if (this.getMessage() == rhs.getMessage()){ + return true; + } + + // This is split out for readability. + // Compare the content in terms onf an 'Any'. + // - Verify the types match + // - Verify the values match. + + final Any lhsAny = getAsAny(this.getMessage()); + final Any rhsAny = getAsAny(rhs.getMessage()); + + final boolean typesMatch = (ProtoSupport.extractMessageType(lhsAny).equals(ProtoSupport.extractMessageType(rhsAny))); + + if (typesMatch) { + return lhsAny.getValue().equals(rhsAny.getValue()); + } else { + return false; + } + } + + private Any getAsAny(Message m) { + + if (m instanceof Any) { + return (Any) m; + } + + return Any.pack(m); + + } } diff --git a/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoDeserializer.java b/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoDeserializer.java index dd55973d5..dc1f54c27 100644 --- a/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoDeserializer.java +++ b/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoDeserializer.java @@ -105,7 +105,7 @@ public , R> R read( data = BytesCloudEventData.wrap(raw); break; case PROTO_DATA: - data = new ProtoAccessor(this.protoCe); + data = new ProtoDataWrapper(this.protoCe.getProtoData()); break; case DATA_NOT_SET: break; @@ -130,22 +130,4 @@ private OffsetDateTime covertProtoTimestamp(com.google.protobuf.Timestamp timest return instant.atOffset(ZoneOffset.UTC); } - private static class ProtoAccessor implements ProtoCloudEventData { - - private final Message message; - - ProtoAccessor(CloudEvent proto){ - this.message = proto.getProtoData(); - } - - @Override - public Message getMessage() { - return message; - } - - @Override - public byte[] toBytes() { - return message.toByteArray(); - } - } } diff --git a/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoSerializer.java b/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoSerializer.java index bc240d84d..2f76d81b8 100644 --- a/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoSerializer.java +++ b/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoSerializer.java @@ -16,11 +16,8 @@ */ package io.cloudevents.protobuf; -import com.google.protobuf.Any; -import com.google.protobuf.ByteString; +import com.google.protobuf.*; import com.google.protobuf.Descriptors.FieldDescriptor; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Timestamp; import io.cloudevents.CloudEventData; import io.cloudevents.SpecVersion; import io.cloudevents.core.CloudEventUtils; @@ -248,8 +245,14 @@ public CloudEvent end(CloudEventData data) throws CloudEventRWException { // If it's a proto message we can handle that directly. if (data instanceof ProtoCloudEventData) { final ProtoCloudEventData protoData = (ProtoCloudEventData) data; - if (protoData.getMessage() != null) { - protoBuilder.setProtoData(Any.pack(protoData.getMessage())); + final Message m = protoData.getMessage(); + if (m != null) { + // If it's already an 'Any' don't re-pack it. + if (m instanceof Any) { + protoBuilder.setProtoData((Any) m); + }else { + protoBuilder.setProtoData(Any.pack(m)); + } } } else { if (Objects.equals(dataContentType, PROTO_DATA_CONTENT_TYPE)) { diff --git a/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoSupport.java b/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoSupport.java index a2c079359..a4cf0ec1d 100644 --- a/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoSupport.java +++ b/formats/protobuf/src/main/java/io/cloudevents/protobuf/ProtoSupport.java @@ -17,6 +17,8 @@ package io.cloudevents.protobuf; +import com.google.protobuf.Any; + /** * General support functions. */ @@ -44,4 +46,16 @@ static boolean isTextContent(String contentType) { || contentType.endsWith("+xml") ; } + + /** + * Extract the Protobuf message type from an 'Any' + * @param anyMessage + * @return + */ + static String extractMessageType(final Any anyMessage) { + final String typeUrl = anyMessage.getTypeUrl(); + final String[] parts = typeUrl.split("/"); + + return parts[parts.length -1]; + } } diff --git a/formats/protobuf/src/test/java/io/cloudevents/protobuf/ProtoDataWrapperTest.java b/formats/protobuf/src/test/java/io/cloudevents/protobuf/ProtoDataWrapperTest.java new file mode 100644 index 000000000..232af3209 --- /dev/null +++ b/formats/protobuf/src/test/java/io/cloudevents/protobuf/ProtoDataWrapperTest.java @@ -0,0 +1,98 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.protobuf; + +import com.google.protobuf.Any; +import com.google.protobuf.Message; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +class ProtoDataWrapperTest { + + // == Closing Quotes for 2023/02/23 + private final Message quote1 = io.cloudevents.test.v1.proto.Test.Quote.newBuilder() + .setPrice(io.cloudevents.test.v1.proto.Test.Decimal.newBuilder().setScale(2).setUnscaled(7519).build()) + .setHigh(io.cloudevents.test.v1.proto.Test.Decimal.newBuilder().setScale(2).setUnscaled(7628).build()) + .setSymbol("PYPL") + .build(); + + private final Message quote2 = io.cloudevents.test.v1.proto.Test.Quote.newBuilder() + .setPrice(io.cloudevents.test.v1.proto.Test.Decimal.newBuilder().setScale(2).setUnscaled(13097).build()) + .setHigh(io.cloudevents.test.v1.proto.Test.Decimal.newBuilder().setScale(2).setUnscaled(13170).build()) + .setSymbol("IBM") + .build(); + + @Test + public void testBasic() { + + ProtoDataWrapper pdw = new ProtoDataWrapper(quote1); + + assertThat(pdw).isNotNull(); + assertThat(pdw.getMessage()).isNotNull(); + assertThat(pdw.toBytes()).withFailMessage("toBytes was NULL").isNotNull(); + assertThat(pdw.toBytes()).withFailMessage("toBytes[] returned empty array").hasSizeGreaterThan(0); + + // This is current behavior and will probably change in the next version. + assertThat(pdw.getMessage()).isInstanceOf(io.cloudevents.test.v1.proto.Test.Quote.class); + } + + @Test + public void testEquality() { + + ProtoDataWrapper pdw1 = new ProtoDataWrapper(quote1); + ProtoDataWrapper pdw2 = new ProtoDataWrapper(quote1); + + ProtoDataWrapper pdw3 = new ProtoDataWrapper(quote2); + + assertThat(pdw1).withFailMessage("Self Equality Failed - 1").isEqualTo(pdw1); + assertThat(pdw2).withFailMessage("Self Equality Failed - 2").isEqualTo(pdw2); + assertThat(pdw1).withFailMessage("Self Equality Failed - 3").isEqualTo(pdw2); + assertThat(pdw2).withFailMessage("Self Equality Failed - 4").isEqualTo(pdw1); + + assertThat(pdw1).withFailMessage("Non-Equality Failed - 1").isNotEqualTo(null); + assertThat(pdw1).withFailMessage("Non-Equality Failed - 2").isNotEqualTo(pdw3); + assertThat(pdw3).withFailMessage("Non-Equality Failed - 3").isNotEqualTo(pdw2); + + } + + /** + * Verify the generated bytes[] is correct + */ + @Test + public void testBytes() { + + // Our expected 'Any' + final Any expAny = Any.pack(quote1); + + // Our expected 'data' + final byte[] expData = expAny.toByteArray(); + + // Build the wrapper + final ProtoDataWrapper pdw = new ProtoDataWrapper(quote1); + + // Get the actual data + final byte[] actData = pdw.toBytes(); + + // Verify + Arrays.equals(expData, actData); + + } + +} diff --git a/formats/protobuf/src/test/java/io/cloudevents/protobuf/ProtobufFormatTest.java b/formats/protobuf/src/test/java/io/cloudevents/protobuf/ProtobufFormatTest.java index 539811e51..c611ce2bb 100644 --- a/formats/protobuf/src/test/java/io/cloudevents/protobuf/ProtobufFormatTest.java +++ b/formats/protobuf/src/test/java/io/cloudevents/protobuf/ProtobufFormatTest.java @@ -22,19 +22,20 @@ import io.cloudevents.core.format.EventFormat; import io.cloudevents.core.provider.EventFormatProvider; import io.cloudevents.v1.proto.CloudEvent; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + import java.io.File; import java.io.FileReader; import java.io.IOException; import java.io.Reader; import java.net.URI; +import java.net.URL; import java.time.OffsetDateTime; import java.time.ZoneId; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -import java.net.URL; +import java.util.UUID; import java.util.stream.Stream; import static com.google.common.truth.extensions.proto.ProtoTruth.assertThat; @@ -45,6 +46,46 @@ class ProtobufFormatTest { EventFormat format = new ProtobufFormat(); + public static Stream serializeTestArgumentsDefault() { + return Stream.of( + Arguments.of(V1_MIN, "v1/min.proto.json"), + Arguments.of(V1_WITH_JSON_DATA, "v1/json_data.proto.json"), + Arguments.of(V1_WITH_TEXT_DATA, "v1/text_data.proto.json"), + Arguments.of(V1_WITH_JSON_DATA_WITH_EXT, "v1/json_data_with_ext.proto.json"), + Arguments.of(V1_WITH_XML_DATA, "v1/xml_data.proto.json"), + Arguments.of(V1_WITH_BINARY_EXT, "v1/binary_ext.proto.json"), + + Arguments.of(V03_MIN, "v03/min.proto.json") + + ); + } + + public static Stream roundTripTestArguments() { + return Stream.of( + "v1/min.proto.json", + "v1/json_data.proto.json", + "v1/text_data.proto.json", + "v1/json_data_with_ext.proto.json", + "v1/xml_data.proto.json", + "v1/binary_ext.proto.json", + + "v03/min.proto.json" + ); + } + + private static Message loadProto(String filename) throws IOException { + CloudEvent.Builder b = CloudEvent.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(getReader(filename), b); + return b.build(); + } + + private static Reader getReader(String filename) throws IOException { + URL file = Thread.currentThread().getContextClassLoader().getResource(filename); + assertThat(file).isNotNull(); + File dataFile = new File(file.getFile()); + return new FileReader(dataFile); + } + @Test public void testRegistration() { EventFormat act = EventFormatProvider.getInstance().resolveFormat("application/cloudevents+protobuf"); @@ -90,6 +131,8 @@ public void serialize(io.cloudevents.CloudEvent input, String jsonFile) throws I } + // ---------------------------------------------------------------- + /** * RoundTrip Test *

@@ -131,46 +174,40 @@ public void roundTripTest(String filename) throws IOException { } - public static Stream serializeTestArgumentsDefault() { - return Stream.of( - Arguments.of(V1_MIN, "v1/min.proto.json"), - Arguments.of(V1_WITH_JSON_DATA, "v1/json_data.proto.json"), - Arguments.of(V1_WITH_TEXT_DATA, "v1/text_data.proto.json"), - Arguments.of(V1_WITH_JSON_DATA_WITH_EXT, "v1/json_data_with_ext.proto.json"), - Arguments.of(V1_WITH_XML_DATA, "v1/xml_data.proto.json"), - Arguments.of(V1_WITH_BINARY_EXT, "v1/binary_ext.proto.json"), - - Arguments.of(V03_MIN, "v03/min.proto.json") - - ); - } - - public static Stream roundTripTestArguments() { - return Stream.of( - "v1/min.proto.json", - "v1/json_data.proto.json", - "v1/text_data.proto.json", - "v1/json_data_with_ext.proto.json", - "v1/xml_data.proto.json", - "v1/binary_ext.proto.json", + /** + * This test verifies the fix for Issue #523 that reported the Data + * portion was being corrupted during multi-step round trips if the + * data was a Protobuf Message. + */ + @Test + public void verifyMultiRoundTrip() { + + io.cloudevents.CloudEvent event = CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSource(SOURCE) + .withType(TYPE) + .withData(ProtoCloudEventData.wrap(io.cloudevents.test.v1.proto.Test.Quote.newBuilder() + .setHigh(io.cloudevents.test.v1.proto.Test.Decimal.newBuilder().setScale(2).setUnscaled(4200).build()) + .setPrice(io.cloudevents.test.v1.proto.Test.Decimal.newBuilder().setScale(2).setUnscaled(1000).build()) + .setSymbol("PYPL") + .build())) + .build(); - "v03/min.proto.json" - ); - } + // 1st Round Trip + byte[] raw1 = format.serialize(event); + io.cloudevents.CloudEvent act1 = format.deserialize(raw1); + assertThat(event).withFailMessage("Mismatch on 1st round trip").isEqualTo(act1); - // ---------------------------------------------------------------- + // 2nd Round Trip + byte[] raw2 = format.serialize(act1); + io.cloudevents.CloudEvent act2 = format.deserialize(raw2); + assertThat(event).withFailMessage("Mismatch on 2nd round trip").isEqualTo(act2); - private static Message loadProto(String filename) throws IOException { - CloudEvent.Builder b = CloudEvent.newBuilder(); - JsonFormat.parser().ignoringUnknownFields().merge(getReader(filename), b); - return b.build(); - } + // 3rd Time's a charm + byte[] raw3 = format.serialize(act2); + io.cloudevents.CloudEvent act3 = format.deserialize(raw3); + assertThat(event).withFailMessage("Mismatch on 3rd round trip").isEqualTo(act3); - private static Reader getReader(String filename) throws IOException { - URL file = Thread.currentThread().getContextClassLoader().getResource(filename); - assertThat(file).isNotNull(); - File dataFile = new File(file.getFile()); - return new FileReader(dataFile); } private byte[] getProtoData(String filename) throws IOException {