Skip to content

Commit

Permalink
[working-draft] Support Avro Compact Format (#578)
Browse files Browse the repository at this point in the history
Add support for working-draft spec Avro compact format: cloudevents/spec@777d0c0

Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec committed Jul 20, 2023
1 parent 582feed commit 4ef3041
Show file tree
Hide file tree
Showing 9 changed files with 449 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Javadocs are available on [javadoc.io](https://www.javadoc.io):

- [cloudevents-api](https://www.javadoc.io/doc/io.cloudevents/cloudevents-api)
- [cloudevents-core](https://www.javadoc.io/doc/io.cloudevents/cloudevents-core)
- [cloudevents-avro-compact](https://www.javadoc.io/doc/io.cloudevents/cloudevents-avro-compact)
- [cloudevents-json-jackson](https://www.javadoc.io/doc/io.cloudevents/cloudevents-json-jackson)
- [cloudevents-protobuf](https://www.javadoc.io/doc/io.cloudevents/cloudevents-protobuf)
- [cloudevents-xml](https://www.javadoc.io/doc/io.cloudevents/cloudevents-xml)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public enum ContentType {
* The content type for transports sending cloudevents in the protocol buffer format.
*/
PROTO("application/cloudevents+protobuf"),
/**
* The content type for transports sending cloudevents in the compact Avro format.
*/
AVRO_COMPACT("application/cloudevents+avrocompact"),
/**
* The content type for transports sending cloudevents in XML format.
*/
Expand Down
49 changes: 49 additions & 0 deletions docs/avro.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
title: CloudEvents Avro Compact
nav_order: 4
---

# CloudEvents Avro Compact

[![Javadocs](http://www.javadoc.io/badge/io.cloudevents/cloudevents-avro-compact.svg?color=green)](http://www.javadoc.io/doc/io.cloudevents/cloudevents-avro-compact)

This module provides the Avro Compact `EventFormat` implementation.

# Setup
For Maven based projects, use the following dependency:

```xml
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-avro-compact</artifactId>
<version>x.y.z</version>
</dependency>
```

No further configuration is required is use the module.

## Using the Avro Compact Event Format

### Event serialization

```java
import io.cloudevents.CloudEvent;
import io.cloudevents.core.format.EventFormatProvider;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.avro.avro.compact.AvroCompactFormat;

CloudEvent event = CloudEventBuilder.v1()
.withId("hello")
.withType("example.vertx")
.withSource(URI.create("http://localhost"))
.build();

byte[] serialized = EventFormatProvider
.getInstance()
.resolveFormat(AvroCompactFormat.CONTENT_TYPE)
.serialize(event);
```

The `EventFormatProvider` will automatically resolve the format using the
`ServiceLoader` APIs.

101 changes: 101 additions & 0 deletions formats/avro-compact/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2021-Present The CloudEvents Authors
~ <p>
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~ <p>
~ http://www.apache.org/licenses/LICENSE-2.0
~ <p>
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-parent</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>cloudevents-avro-compact</artifactId>
<name>CloudEvents - Avro Compact</name>


<properties>
<module-name>io.cloudevents.formats.avro.compact</module-name>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.2</version>
<configuration>
<stringType>String</stringType>
</configuration>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.2</version>
</dependency>

<!-- Test deps -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.36</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<classifier>tests</classifier>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
81 changes: 81 additions & 0 deletions formats/avro-compact/src/main/avro/cloudevents-compact.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
{
"namespace": "io.cloudevents.v1.avro.compact",
"type": "record",
"name": "CloudEvent",
"version": "1.0",
"doc": "Avro Compact Event Format for CloudEvents",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "source",
"type": "string"
},
{
"name": "type",
"type": "string"
},
{
"name": "datacontenttype",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "dataschema",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "subject",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "time",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-micros"
}
],
"default": null
},
{
"name": "extensions",
"type": {
"type": "map",
"values": [
"boolean",
"int",
{
"type": "long",
"logicalType" : "timestamp-micros"
},
"string",
"bytes"
]
},
"default": {}
},
{
"name": "data",
"type": [
"bytes",
"null"
],
"default": "null"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.avro.compact;


import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.data.BytesCloudEventData;
import io.cloudevents.core.format.EventDeserializationException;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.format.EventSerializationException;
import io.cloudevents.rw.CloudEventDataMapper;
import io.cloudevents.v1.avro.compact.CloudEvent.Builder;

import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;

/**
* An implementation of {@link EventFormat} for the Avro Compact format.
* This format is resolvable with {@link io.cloudevents.core.provider.EventFormatProvider} using the content type {@link #AVRO_COMPACT_CONTENT_TYPE}.
*/
public class AvroCompactFormat implements EventFormat {

public static final String AVRO_COMPACT_CONTENT_TYPE = "application/cloudevents+avrocompact";

@Override
public byte[] serialize(CloudEvent from) throws EventSerializationException {
try {
Builder to = io.cloudevents.v1.avro.compact.CloudEvent.newBuilder();

// extensions
Map<String, Object> extensions = new HashMap<>();
for (String name : from.getExtensionNames()) {
Object value = from.getExtension(name);
if (value instanceof byte[])
value = ByteBuffer.wrap((byte[]) value);
else if (value instanceof OffsetDateTime)
value = ((OffsetDateTime) value).toInstant();
extensions.put(name, value);
}

to.setSource(from.getSource().toString())
.setType(from.getType())
.setId(from.getId())
.setSubject(from.getSubject())
.setDatacontenttype(from.getDataContentType())
.setExtensions(extensions);

if (from.getTime() != null)
to.setTime(from.getTime().toInstant());
if (from.getDataSchema() != null)
to.setDataschema(from.getDataSchema().toString());

CloudEventData data = from.getData();
if (data != null)
to.setData(ByteBuffer.wrap(data.toBytes()));
return to.build().toByteBuffer().array();
} catch (Exception e) {
throw new EventSerializationException(e);
}
}

@Override
public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper<? extends CloudEventData> mapper) throws EventDeserializationException {
try {
io.cloudevents.v1.avro.compact.CloudEvent from = io.cloudevents.v1.avro.compact.CloudEvent.fromByteBuffer(ByteBuffer.wrap(bytes));
CloudEventBuilder to = CloudEventBuilder.v1()
.withSource(URI.create(from.getSource()))
.withType(from.getType())
.withId(from.getType())
.withSubject(from.getSubject())
.withDataContentType(from.getDatacontenttype());

if (from.getTime() != null)
to.withTime(from.getTime().atOffset(ZoneOffset.UTC));
if (from.getDataschema() != null)
to.withDataSchema(URI.create(from.getDataschema()));

// extensions
for (Map.Entry<String, Object> entry : from.getExtensions().entrySet()) {
String name = entry.getKey();
Object value = entry.getValue();
// Avro supports boolean, int, string, bytes
if (value instanceof Boolean)
to.withExtension(name, (boolean) value);
else if (value instanceof Integer)
to.withExtension(name, (int) value);
else if (value instanceof Instant)
to.withExtension(name, ((Instant) value).atOffset(ZoneOffset.UTC));
else if (value instanceof String)
to.withExtension(name, (String) value);
else if (value instanceof ByteBuffer)
to.withExtension(name, ((ByteBuffer) value).array());
else
// this cannot happen, if ever seen, must be bug in this library
throw new AssertionError(String.format("invalid extension %s unsupported type %s", name, value.getClass()));
}

if (from.getData() == null)
return to.end();
else {
CloudEventData data = BytesCloudEventData.wrap(from.getData().array());
return to.end(mapper.map(data));
}
} catch (Exception e) {
throw new EventDeserializationException(e);
}
}

@Override
public String serializedContentType() {
return AVRO_COMPACT_CONTENT_TYPE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.cloudevents.avro.compact.AvroCompactFormat
Loading

0 comments on commit 4ef3041

Please sign in to comment.