diff --git a/README.md b/README.md index 4f9fa58e4..c786ce0e6 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,7 @@ Javadocs are available on [javadoc.io](https://www.javadoc.io): - [cloudevents-api](https://www.javadoc.io/doc/io.cloudevents/cloudevents-api) - [cloudevents-core](https://www.javadoc.io/doc/io.cloudevents/cloudevents-core) +- [cloudevents-avro-compact](https://www.javadoc.io/doc/io.cloudevents/cloudevents-avro-compact) - [cloudevents-json-jackson](https://www.javadoc.io/doc/io.cloudevents/cloudevents-json-jackson) - [cloudevents-protobuf](https://www.javadoc.io/doc/io.cloudevents/cloudevents-protobuf) - [cloudevents-xml](https://www.javadoc.io/doc/io.cloudevents/cloudevents-xml) diff --git a/core/src/main/java/io/cloudevents/core/format/ContentType.java b/core/src/main/java/io/cloudevents/core/format/ContentType.java index 1d8656393..c5b19f883 100644 --- a/core/src/main/java/io/cloudevents/core/format/ContentType.java +++ b/core/src/main/java/io/cloudevents/core/format/ContentType.java @@ -45,6 +45,10 @@ public enum ContentType { * The content type for transports sending cloudevents in the protocol buffer format. */ PROTO("application/cloudevents+protobuf"), + /** + * The content type for transports sending cloudevents in the compact Avro format. + */ + AVRO_COMPACT("application/cloudevents+avrocompact"), /** * The content type for transports sending cloudevents in XML format. */ diff --git a/docs/avro.md b/docs/avro.md new file mode 100644 index 000000000..0b98fa8f0 --- /dev/null +++ b/docs/avro.md @@ -0,0 +1,49 @@ +--- +title: CloudEvents Avro Compact +nav_order: 4 +--- + +# CloudEvents Avro Compact + +[![Javadocs](http://www.javadoc.io/badge/io.cloudevents/cloudevents-avro-compact.svg?color=green)](http://www.javadoc.io/doc/io.cloudevents/cloudevents-avro-compact) + +This module provides the Avro Compact `EventFormat` implementation. + +# Setup +For Maven based projects, use the following dependency: + +```xml + + io.cloudevents + cloudevents-avro-compact + x.y.z + +``` + +No further configuration is required is use the module. + +## Using the Avro Compact Event Format + +### Event serialization + +```java +import io.cloudevents.CloudEvent; +import io.cloudevents.core.format.EventFormatProvider; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.avro.avro.compact.AvroCompactFormat; + +CloudEvent event = CloudEventBuilder.v1() + .withId("hello") + .withType("example.vertx") + .withSource(URI.create("http://localhost")) + .build(); + +byte[] serialized = EventFormatProvider + .getInstance() + .resolveFormat(AvroCompactFormat.CONTENT_TYPE) + .serialize(event); +``` + +The `EventFormatProvider` will automatically resolve the format using the +`ServiceLoader` APIs. + diff --git a/formats/avro-compact/pom.xml b/formats/avro-compact/pom.xml new file mode 100644 index 000000000..a258c7b62 --- /dev/null +++ b/formats/avro-compact/pom.xml @@ -0,0 +1,101 @@ + + + + 4.0.0 + + + io.cloudevents + cloudevents-parent + 3.0.0-SNAPSHOT + ../../pom.xml + + + cloudevents-avro-compact + CloudEvents - Avro Compact + + + + io.cloudevents.formats.avro.compact + + + + + + org.apache.avro + avro-maven-plugin + 1.11.2 + + String + + + + generate-sources + + schema + + + + + + + + + + io.cloudevents + cloudevents-core + ${project.version} + + + org.apache.avro + avro + 1.11.2 + + + + + org.slf4j + slf4j-simple + 1.7.36 + test + + + org.junit.jupiter + junit-jupiter + ${junit-jupiter.version} + test + + + org.assertj + assertj-core + ${assertj-core.version} + test + + + io.cloudevents + cloudevents-core + tests + test-jar + ${project.version} + test + + + + + diff --git a/formats/avro-compact/src/main/avro/cloudevents-compact.avsc b/formats/avro-compact/src/main/avro/cloudevents-compact.avsc new file mode 100644 index 000000000..f875781dc --- /dev/null +++ b/formats/avro-compact/src/main/avro/cloudevents-compact.avsc @@ -0,0 +1,81 @@ +{ + "namespace": "io.cloudevents.v1.avro.compact", + "type": "record", + "name": "CloudEvent", + "version": "1.0", + "doc": "Avro Compact Event Format for CloudEvents", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "source", + "type": "string" + }, + { + "name": "type", + "type": "string" + }, + { + "name": "datacontenttype", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "dataschema", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "subject", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "time", + "type": [ + "null", + { + "type": "long", + "logicalType": "timestamp-micros" + } + ], + "default": null + }, + { + "name": "extensions", + "type": { + "type": "map", + "values": [ + "boolean", + "int", + { + "type": "long", + "logicalType" : "timestamp-micros" + }, + "string", + "bytes" + ] + }, + "default": {} + }, + { + "name": "data", + "type": [ + "bytes", + "null" + ], + "default": "null" + } + ] +} diff --git a/formats/avro-compact/src/main/java/io/cloudevents/avro/compact/AvroCompactFormat.java b/formats/avro-compact/src/main/java/io/cloudevents/avro/compact/AvroCompactFormat.java new file mode 100644 index 000000000..066776414 --- /dev/null +++ b/formats/avro-compact/src/main/java/io/cloudevents/avro/compact/AvroCompactFormat.java @@ -0,0 +1,134 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.avro.compact; + + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.data.BytesCloudEventData; +import io.cloudevents.core.format.EventDeserializationException; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.format.EventSerializationException; +import io.cloudevents.rw.CloudEventDataMapper; +import io.cloudevents.v1.avro.compact.CloudEvent.Builder; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.Map; + +/** + * An implementation of {@link EventFormat} for the Avro Compact format. + * This format is resolvable with {@link io.cloudevents.core.provider.EventFormatProvider} using the content type {@link #AVRO_COMPACT_CONTENT_TYPE}. + */ +public class AvroCompactFormat implements EventFormat { + + public static final String AVRO_COMPACT_CONTENT_TYPE = "application/cloudevents+avrocompact"; + + @Override + public byte[] serialize(CloudEvent from) throws EventSerializationException { + try { + Builder to = io.cloudevents.v1.avro.compact.CloudEvent.newBuilder(); + + // extensions + Map extensions = new HashMap<>(); + for (String name : from.getExtensionNames()) { + Object value = from.getExtension(name); + if (value instanceof byte[]) + value = ByteBuffer.wrap((byte[]) value); + else if (value instanceof OffsetDateTime) + value = ((OffsetDateTime) value).toInstant(); + extensions.put(name, value); + } + + to.setSource(from.getSource().toString()) + .setType(from.getType()) + .setId(from.getId()) + .setSubject(from.getSubject()) + .setDatacontenttype(from.getDataContentType()) + .setExtensions(extensions); + + if (from.getTime() != null) + to.setTime(from.getTime().toInstant()); + if (from.getDataSchema() != null) + to.setDataschema(from.getDataSchema().toString()); + + CloudEventData data = from.getData(); + if (data != null) + to.setData(ByteBuffer.wrap(data.toBytes())); + return to.build().toByteBuffer().array(); + } catch (Exception e) { + throw new EventSerializationException(e); + } + } + + @Override + public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper mapper) throws EventDeserializationException { + try { + io.cloudevents.v1.avro.compact.CloudEvent from = io.cloudevents.v1.avro.compact.CloudEvent.fromByteBuffer(ByteBuffer.wrap(bytes)); + CloudEventBuilder to = CloudEventBuilder.v1() + .withSource(URI.create(from.getSource())) + .withType(from.getType()) + .withId(from.getType()) + .withSubject(from.getSubject()) + .withDataContentType(from.getDatacontenttype()); + + if (from.getTime() != null) + to.withTime(from.getTime().atOffset(ZoneOffset.UTC)); + if (from.getDataschema() != null) + to.withDataSchema(URI.create(from.getDataschema())); + + // extensions + for (Map.Entry entry : from.getExtensions().entrySet()) { + String name = entry.getKey(); + Object value = entry.getValue(); + // Avro supports boolean, int, string, bytes + if (value instanceof Boolean) + to.withExtension(name, (boolean) value); + else if (value instanceof Integer) + to.withExtension(name, (int) value); + else if (value instanceof Instant) + to.withExtension(name, ((Instant) value).atOffset(ZoneOffset.UTC)); + else if (value instanceof String) + to.withExtension(name, (String) value); + else if (value instanceof ByteBuffer) + to.withExtension(name, ((ByteBuffer) value).array()); + else + // this cannot happen, if ever seen, must be bug in this library + throw new AssertionError(String.format("invalid extension %s unsupported type %s", name, value.getClass())); + } + + if (from.getData() == null) + return to.end(); + else { + CloudEventData data = BytesCloudEventData.wrap(from.getData().array()); + return to.end(mapper.map(data)); + } + } catch (Exception e) { + throw new EventDeserializationException(e); + } + } + + @Override + public String serializedContentType() { + return AVRO_COMPACT_CONTENT_TYPE; + } +} diff --git a/formats/avro-compact/src/main/resources/META-INF/services/io.cloudevents.core.format.EventFormat b/formats/avro-compact/src/main/resources/META-INF/services/io.cloudevents.core.format.EventFormat new file mode 100644 index 000000000..ae558f090 --- /dev/null +++ b/formats/avro-compact/src/main/resources/META-INF/services/io.cloudevents.core.format.EventFormat @@ -0,0 +1 @@ +io.cloudevents.avro.compact.AvroCompactFormat diff --git a/formats/avro-compact/src/test/java/io/cloudevents/avro/compact/AvroCompactFormatTest.java b/formats/avro-compact/src/test/java/io/cloudevents/avro/compact/AvroCompactFormatTest.java new file mode 100644 index 000000000..484ab15f9 --- /dev/null +++ b/formats/avro-compact/src/test/java/io/cloudevents/avro/compact/AvroCompactFormatTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.avro.compact; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.data.BytesCloudEventData; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.provider.EventFormatProvider; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class AvroCompactFormatTest { + + private final EventFormat format = EventFormatProvider.getInstance().resolveFormat(AvroCompactFormat.AVRO_COMPACT_CONTENT_TYPE); + + @Test + void format() { + assertNotNull(format); + assertEquals(Collections.singleton("application/cloudevents+avrocompact"), format.deserializableContentTypes()); + + CloudEvent event = CloudEventBuilder.v1() + // mandatory + .withId("") + .withSource(URI.create("")) + .withType("") + // optional + .withTime(Instant.EPOCH.atOffset(ZoneOffset.UTC)) + .withSubject("") + .withDataSchema(URI.create("")) + // extension + // support boolean, int, long, string, bytes + .withExtension("boolean", false) + .withExtension("int", 0) + .withExtension("time", Instant.EPOCH.atOffset(ZoneOffset.UTC)) + .withExtension("string", "") + // omitting bytes, because it is not supported by CloudEvent.equals + .withData("", BytesCloudEventData.wrap(new byte[0])) + .build(); + + byte[] serialized = format.serialize(event); + + assertNotNull(serialized); + + CloudEvent deserialized = format.deserialize(serialized); + + assertEquals(event, deserialized); + + byte[] reserialized = format.serialize(deserialized); + + assertArrayEquals(serialized, reserialized); + + + } +} diff --git a/pom.xml b/pom.xml index 15e307b74..252f9d4c3 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,7 @@ api core + formats/avro-compact formats/json-jackson formats/protobuf formats/xml