Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloud events json schema #1105

Open
wants to merge 3 commits into
base: 7.1.1-post
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions clients/cloud/java-cloud-events/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

57 changes: 57 additions & 0 deletions clients/cloud/java-cloud-events/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
## Writing Cloud Events to Confluent Cloud

[Cloud Events](https://github.com/cloudevents/spec) is a well-known standard for attaching Meta-Data to business relevant events.

Cloud Events Metadata includes the following attributes amongst others:
* logical producer (event source),
* the subject (the person or thing) that an event pertains to,
* an event identifier,
* the type of an event,
* the time when the event occurred.

For the full set of attributes see the [specification](https://github.com/cloudevents/spec)

In this example, use the online order domain as an example for sending Cloud Events data to
Confluent Cloud and registering the schema with Confluent Schema Registry.

A Cloud Event in the order domain could be an event for the creation of a new order.
This cloud event could be serialized as follows in JSON:

```
{
"id": "614282a3-7818-48bf-8a74-85332e5579c7",
"source": "/v1/orders",
"specVersion": "1.0",
"type": "io.confluent.samples.orders.created",
"datacontenttype": "application/json",
"dataschema": null,
"subject": "60f9a967-077a-43ff-be2c-0c14c09bcb3a",
"timestamp": 1653990696381,
"data": {
"productID": "21c2d736-56b4-4ddf-9dbf-5ebc3c79e126",
"customerID": "68e5bde6-c5d5-488c-8469-8c9853d94589",
"timestamp": 1653990696380
}
}
```

#### Prerequisites

The following items are required to run this demo:

* access to a Confluent Cloud cluster
* a Confluent Cloud API Key for Kafka
* a Confluent Cloud API Key for Schema Registry
* a recent version of Java and Javac
* a recent version of Apache Maven
* Access to Maven Central for downloading the dependencies

#### Running the Demo

* Create a topic named `order-cloud-events` either via the confluent CLI or via the Confluent Cloud UI.
A single partition is sufficient.
* Copy `src/main/resources/application.properties.template` to `src/main/resources/application.properties`,
and fill in the bootstrap servers url, the schema registry url, your API keys and secrets for Kafka as well as for schema registry.
* Compile the code: `mvn compile`
* run the SampleProducer application: `./run-sample-producer.sh`
* Go to the Confluent Cloud UI of your cluster, and inspect the messages produced to the topic, as well as the associated schema.
141 changes: 141 additions & 0 deletions clients/cloud/java-cloud-events/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.confluent.examples</groupId>
<artifactId>kafka-clients-cloud-events-example</artifactId>
<packaging>jar</packaging>
<version>7.1.1</version>

<organization>
<name>Confluent, Inc.</name>
<url>http://confluent.io</url>
</organization>
<url>http://confluent.io</url>
<description>
Example for writing business events in the Cloud Events Specifification to Kafka and registering the event schema with Confluent Schema Registry.
</description>

<properties>
<kafka.version>3.1.0</kafka.version>
<io.confluent.schema-registry.version>7.0.1</io.confluent.schema-registry.version>
<java.version>8</java.version>
<slf4j-api.version>1.7.6</slf4j-api.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<gson.version>2.2.4</gson.version>
<schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
<schemaRegistryBasicAuthUserInfo></schemaRegistryBasicAuthUserInfo>
<slf4j.version>1.7.6</slf4j.version>
</properties>

<licenses>
<license>
<name>Apache License 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
<distribution>repo</distribution>
</license>
</licenses>

<repositories>
<repository>
<id>confluent</id>
<name>Confluent</name>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<pluginRepositories>
<pluginRepository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</pluginRepository>
</pluginRepositories>


<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
<version>${io.confluent.schema-registry.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j-api.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<inherited>true</inherited>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
3 changes: 3 additions & 0 deletions clients/cloud/java-cloud-events/run-sample-producer.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

mvn exec:java -Dexec.mainClass="io.confluent.samples.cloud.cloudevents.SampleProducer"
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.confluent.samples.cloud.cloudevents;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.InputStream;
import java.net.URI;
import java.util.Properties;
import java.util.UUID;

@Data
@Builder
@AllArgsConstructor
class Order {
UUID productID;
UUID customerID;
long timestamp;
}

@Data
@AllArgsConstructor
@Builder
class OrderCloudEvent {
private String id;
private URI source;
private String specVersion;
private String type;
private String datacontenttype;
private URI dataschema;
private String subject;
private long timestamp;
private Order data;
}

@Slf4j
public class SampleProducer {

@SneakyThrows
private static Properties producerProperties() {
Properties prop = new Properties();
ClassLoader loader = Thread.currentThread().getContextClassLoader();
InputStream stream = loader.getResourceAsStream("producer.properties");
prop.load(stream);
return prop;
}

public static void main(String [] args) {
produceNonGenericCloudEvent();
}

@SneakyThrows
private static void produceNonGenericCloudEvent() {
KafkaProducer<String, OrderCloudEvent> kafkaProducer = new KafkaProducer<>(producerProperties());
Order order = Order.builder().productID(UUID.randomUUID()).customerID(UUID.randomUUID()).timestamp(System.currentTimeMillis()).build();
OrderCloudEvent orderCloudEvent = OrderCloudEvent.builder()
.data(order)
.id(UUID.randomUUID().toString())
.specVersion("1.0")
.subject(UUID.randomUUID().toString())
.type("io.confluent.samples.orders.created")
.datacontenttype("application/json")
.timestamp(System.currentTimeMillis())
.source(new URI("/v1/orders"))
.build();
log.info(new ObjectMapper().writeValueAsString(orderCloudEvent));
var result = kafkaProducer.send(
new ProducerRecord<>("order-cloud-events", orderCloudEvent.getId(), orderCloudEvent)
);
System.err.println(result.get().toString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
producer.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
bootstrap.servers=<...>.confluent.cloud:9092
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<API_KEY>' password='<API_SECRET>';
sasl.mechanism=PLAIN
acks=all
value.serializer=io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
key.serializer=org.apache.kafka.common.serialization.StringSerializer
schema.registry.url=https://<...>.confluent.cloud
basic.auth.credentials.source=USER_INFO
basic.auth.user.info=<SR_API_KEY>:<SR_API_SECRETE>
json.oneof.for.nullables=false