Skip to content

Commit

Permalink
140 spring cloud registry extension (#146)
Browse files Browse the repository at this point in the history
Adds the ability to deserialize messages that have been sent using a schema like Avro which is registered in the Spring Cloud Schema Registry.
  • Loading branch information
JaidenAshmore committed May 4, 2020
1 parent 68fb92a commit df64a93
Show file tree
Hide file tree
Showing 65 changed files with 2,586 additions and 0 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ If the Spring Boot application needs to connect to SQS queues across multiple AW
which will be able to obtain a specific `SqsAsyncClient` based on an identifier. For more information on how to do this, take a look at the documentation
at [How To Connect to Multiple AWS Accounts](doc/how-to-guides/spring/spring-how-to-connect-to-multiple-aws-accounts.md)

### Versioning Message Payloads using Apache Avro Schemas
As the application grows, it may be beneficial to allow for versioning of the schema so that the consumer can still serialize messages from producers sending
different versions of the schema. To allow for this the [spring-cloud-schema-registry-extension](extensions/spring-cloud-schema-registry-extension) was written
to support this functionality. See the [README.md](extensions/spring-cloud-schema-registry-extension/README.md) for this extension for more details.

### Comparing Libraries
If you want to see the difference between this library and others like the
[Spring Cloud AWS Messaging](https://github.com/spring-cloud/spring-cloud-aws/tree/master/spring-cloud-aws-messaging) and
Expand Down
3 changes: 3 additions & 0 deletions configuration/spotbugs/bugsExcludeFilter.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@
<Match>
<Bug pattern="SE_NO_SERIALVERSIONID" />
</Match>
<Match>
<Source name="~.*generated-sources.*" />
</Match>
</FindBugsFilter>
2 changes: 2 additions & 0 deletions doc/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ more in depth understanding take a look at the JavaDoc for the [java-dynamic-sqs
processing of messages for specific queue listeners
1. [How to connect to multiple AWS Accounts](how-to-guides/spring/spring-how-to-connect-to-multiple-aws-accounts.md): guide for listening to queues
across multiple AWS Accounts
1. [How to version message payload schemas](how-to-guides/spring/spring-how-to-version-payload-schemas-using-spring-cloud-schema-registry.md): guide
for versioning payloads using Avro and the Spring Cloud Schema Registry.
1. Local Development:
1. [Setting up IntelliJ](local-development/setting-up-intellij.md): steps for setting IntelliJ up for development,
e.g. configuring checkstyle, Lombok, etc
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Spring - How to version message payload Schemas using Spring Cloud Schema Registry
As your application grows over time the format of the data that needs to be sent in the SQS messages may change as well. To allow for
these changes, the [Spring Cloud Schema Registry](https://cloud.spring.io/spring-cloud-static/spring-cloud-schema-registry/1.0.0.RC1/reference/html/spring-cloud-schema-registry.html)
can be used to track the version of your schemas, allowing the SQS consumer to be able to interpret multiple versions of your payload.

## Full reference
For a full working solution of this feature, take a look at the [Spring Cloud Schema Registry Example](../../../examples/spring-cloud-schema-registry-example).

## Steps to consume messages serialized using Apache Avro
1. Include the `Spring Cloud Schema Registry Extension` dependency
```xml
<dependency>
<groupId>com.jashmore</groupId>
<artifactId>avro-spring-cloud-schema-registry-extension</artifactId>
<version>${project.version}</version>
</dependency>
```
1. Define your schemas and map this in your spring `application.yml`
```yml
spring:
cloud:
schema-registry-client:
endpoint: http://localhost:8990
schema:
avro:
schema-imports:
- classpath:avro/author.avsc
schema-locations:
- classpath:avro/book.avsc
```
In this example above we have a book schema which is dependent on the author schema. We have also hardcoded the Schema Registry
to be at [http://localhost:8990](http://localhost:8990).
1. Create your schemas and place them in your `resources` directory. For example this is an example schema for the Book.
```json
{
"namespace" : "com.jashmore.sqs.extensions.registry.model",
"type" : "record",
"name" : "Book",
"fields" : [
{ "name":"id","type":"string" },
{ "name":"name","type":"string" },
{ "name":"author","type":"Author" }
]
}
```
1. Enable the extension by annotating the Spring Application
```java
@EnableSchemaRegistrySqsExtension
@SpringBootApplication
class Application {
// normal code
}
```
1. Define your queue listener using the `@SpringCloudSchemaRegistryPayload` to represent the payload that needs to be deserialized from
the message payload.
```java
@QueueListener(value = "queueName")
public void listen(@SpringCloudSchemaRegistryPayload Book payload) {
log.info("Payload: {}", payload);
}
```

## Steps to produce messages using Avro
You can wrap your `SqsAsyncClient` with the
[AvroSchemaRegistrySqsAsyncClient](../../../util/proxy-method-interceptor/src/main/java/com/jashmore/sqs/registry/AvroSchemaRegistrySqsAsyncClient.java)
to be able to more easily send a message that will be serialized using the Avro Schema. This Avro SQS Client was built for testing purposes and therefore it is
recommended to developer your own logic for sending these messages.

For a full example of building this client, take a look at the
[Producer Example](../../../examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer).
1 change: 1 addition & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<modules>
<module>core-examples</module>
<module>spring-aws-example</module>
<module>spring-cloud-schema-registry-example</module>
<module>spring-integration-test-example</module>
<module>spring-multiple-aws-account-example</module>
<module>spring-starter-examples</module>
Expand Down
36 changes: 36 additions & 0 deletions examples/spring-cloud-schema-registry-example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Spring Cloud Schema Registry Extension Example
This example shows how you can consume messages which have been defined using an [Avro](https://avro.apache.org/docs/1.9.2/gettingstartedjava.html)
Schema and the [Spring Cloud Schema Registry](https://cloud.spring.io/spring-cloud-static/spring-cloud-schema-registry/1.0.0.RC1/reference/html/spring-cloud-schema-registry.html).

To find the corresponding code look in the [Spring Cloud Schema Registry Extension](../../extensions/spring-cloud-schema-registry-extension) module.

## Steps
Start each of these applications in new terminals/your IDE:
1. A Spring Cloud Schema Registry server
```bash
wget -O /tmp/schema-registry-server.jar https://repo.spring.io/libs-release-ossrh-cache/org/springframework/cloud/spring-cloud-schema-registry-server/1.0.3.RELEASE/spring-cloud-schema-registry-server-1.0.3.RELEASE.jar
cd /tmp
java -jar schema-registry-server.jar
```
1. A local SQS server using ElasticMQ
```bash
docker run -p 9324:9324 softwaremill/elasticmq
```
1. The SQS consumer service
```bash
cd java-dynamic-sqs-listener/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-consumer
mvn spring-boot:run
```
1. The first SQS producer service
```bash
cd java-dynamic-sqs-listener/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer
mvn spring-boot:run
```
1. The second SQS producer service
```bash
cd java-dynamic-sqs-listener/examples/spring-cloud-schema-registry-example/spring-cloud-schema-registry-producer-2
mvn spring-boot:run
```

You should now see the consumer receiving messages from both producers and even though the producers are sending
the payload in different schema versions the consumer is still able to process the message.
24 changes: 24 additions & 0 deletions examples/spring-cloud-schema-registry-example/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>examples</artifactId>
<groupId>com.jashmore</groupId>
<version>3.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>spring-cloud-schema-registry-example</artifactId>
<packaging>pom</packaging>

<name>Java Dynamic SQS Listener - Spring Starter - Spring Cloud Schema Registry Example</name>
<description>Contains examples for serializing messages using Avro and storing these Schema Definitions in the Spring Cloud Schema Registry</description>

<modules>
<module>spring-cloud-schema-registry-consumer</module>
<module>spring-cloud-schema-registry-producer</module>
<module>spring-cloud-schema-registry-producer-two</module>
</modules>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-schema-registry-example</artifactId>
<groupId>com.jashmore</groupId>
<version>3.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>spring-cloud-schema-registry-consumer</artifactId>

<name>Java Dynamic SQS Listener - Spring Starter - Spring Cloud Schema Registry Example - Consumer</name>
<description>Contains an example of a consumer deserializing a message payload that is in a schema registered in the Spring Cloud Schema Registry.</description>

<properties>
<spotbugs.config.location>../../../configuration/spotbugs/bugsExcludeFilter.xml</spotbugs.config.location>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure-processor</artifactId>
</dependency>

<dependency>
<groupId>com.jashmore</groupId>
<artifactId>java-dynamic-sqs-listener-spring-starter</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.jashmore</groupId>
<artifactId>avro-spring-cloud-schema-registry-extension</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.jashmore</groupId>
<artifactId>local-amazon-sqs</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources/avro</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.jashmore.sqs.examples.schemaregistry.ConsumerApplication</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>src/main/resources/avro</sourceDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.jashmore.sqs.examples.schemaregistry;

import com.example.Sensor;
import com.jashmore.sqs.extensions.registry.SpringCloudSchemaRegistryPayload;
import com.jashmore.sqs.extensions.registry.avro.EnableSchemaRegistrySqsExtension;
import com.jashmore.sqs.spring.container.basic.QueueListener;
import com.jashmore.sqs.util.LocalSqsAsyncClient;
import com.jashmore.sqs.util.SqsQueuesConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

@Slf4j
@SpringBootApplication
@EnableSchemaRegistrySqsExtension
@SuppressWarnings("checkstyle:javadocmethod")
public class ConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class);
}

@Bean
public SqsAsyncClient sqsAsyncClient() {
return new LocalSqsAsyncClient(SqsQueuesConfig.builder()
.sqsServerUrl("http://localhost:9324")
.queue(SqsQueuesConfig.QueueConfig.builder()
.queueName("test")
.deadLetterQueueName("test-dlq")
.maxReceiveCount(3)
.build())
.build());
}

@QueueListener(value = "test", identifier = "message-listener")
public void listen(@SpringCloudSchemaRegistryPayload Sensor payload) {
log.info("Payload: {}", payload);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
spring:
cloud:
schema-registry-client:
endpoint: http://localhost:8990
schema:
avro:
schema-locations:
- classpath:avro/sensor.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"namespace" : "com.example",
"type" : "record",
"name" : "Sensor",
"fields" : [
{"name":"id","type":"string"},
{"name":"internalTemperature", "type":"float", "default":0.0, "aliases":["temperature"]},
{"name":"externalTemperature", "type":"float", "default":0.0},
{"name":"acceleration", "type":"float","default":0.0},
{"name":"velocity","type":"float","default":0.0}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<!-- We reduce the log messages from ElasticMQ, Akka and Netty to reduce the amount of unnecessary log messages being published -->
<logger name="akka" level="OFF" />
<logger name="io.netty" level="ERROR" />

<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>

0 comments on commit df64a93

Please sign in to comment.