Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

Here you will describe this project, what it does, and its goals, making it clear to everyone. Example:

The **Java Architecture Template** is a project designed to serve as a template for creating applications, aiming for development with exceptional
technical quality to ensure long-term maintainability.
In this template, we provide a user registration endpoint that triggers an event in the broker when a user is registered. A listener will receive these creation events and enrich them with address data.
The **Java Architecture Template** is a project designed to serve as a template for creating applications, aiming for development with exceptional
technical quality to ensure long-term maintainability.
In this template, we provide a user registration endpoint that publishes a **CloudEvent** to Kafka when a user is registered. A listener consumes
CloudEvents of type `br.com.helpdev.sample.user.created` and enriches the user with address data.

📖 Read this in:
- 🇧🇷 [Português](README.pt.md)
Expand Down Expand Up @@ -171,6 +172,7 @@ After starting the application, access:

### **AsyncAPI**
This project uses **Springwolf** to document asynchronous events (Kafka, RabbitMQ, etc.) with **AsyncAPI**.
Kafka messages on the `user-events` topic follow the **CloudEvents structured JSON** format (`application/cloudevents+json`).

🔗 [Official AsyncAPI site](https://www.asyncapi.com/)

Expand Down
6 changes: 4 additions & 2 deletions README.pt.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

Aqui você deve descrever seu projeto, seu funcionamento e seus objetivos, tornando-o claro para todos. Exemplo:

O **Java Architecture Template** é um projeto criado para servir como modelo na criação de aplicações, visando um desenvolvimento com **qualidade técnica excepcional** para garantir **manutenção a longo prazo**.
Neste template, fornecemos um **endpoint de cadastro de usuário**, que **dispara um evento no broker** quando um usuário é registrado. Um **listener recebe esses eventos** de criação e os enriquece com dados de endereço.
O **Java Architecture Template** é um projeto criado para servir como modelo na criação de aplicações, visando um desenvolvimento com **qualidade técnica excepcional** para garantir **manutenção a longo prazo**.
Neste template, fornecemos um **endpoint de cadastro de usuário** que publica um **CloudEvent** no Kafka quando um usuário é registrado. Um
**listener consome CloudEvents** do tipo `br.com.helpdev.sample.user.created` e enriquece o usuário com dados de endereço.

📚 Leia em:
- 🇬🇧 [English](README.md)
Expand Down Expand Up @@ -159,6 +160,7 @@ Após iniciar a aplicação, acesse:

### **AsyncAPI**
Este projeto utiliza o **Springwolf** para documentar eventos assíncronos (Kafka, RabbitMQ, etc.) com **AsyncAPI**.
As mensagens Kafka no tópico `user-events` seguem o formato **CloudEvents structured JSON** (`application/cloudevents+json`).

🔗 [Site oficial da AsyncAPI](https://www.asyncapi.com/)

Expand Down
13 changes: 13 additions & 0 deletions application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<spring-cloud.version>2024.0.0</spring-cloud.version>
<spring-boot-dependencies.version>3.4.1</spring-boot-dependencies.version>
<archunit.version>1.3.0</archunit.version>
<cloudevents.version>4.0.2</cloudevents.version>
<mysql-connector-j.version>8.4.0</mysql-connector-j.version>
<springdoc-openapi-starter-webmvc-ui.version>2.8.1</springdoc-openapi-starter-webmvc-ui.version>
<springwolf.version>1.9.0</springwolf.version>
Expand Down Expand Up @@ -52,6 +53,18 @@
<version>${springwolf.version}</version>
</dependency>

<!-- CloudEvents Dependencies -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${cloudevents.version}</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>${cloudevents.version}</version>
</dependency>

<!-- Resilience Dependencies -->
<dependency>
<groupId>org.springframework.cloud</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package br.com.helpdev.sample.adapters.input.kafka;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.UUID;

import io.github.springwolf.bindings.kafka.annotations.KafkaAsyncOperationBinding;
import io.github.springwolf.core.asyncapi.annotations.AsyncListener;
import io.github.springwolf.core.asyncapi.annotations.AsyncMessage;
import io.github.springwolf.core.asyncapi.annotations.AsyncOperation;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -14,9 +20,9 @@
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Controller;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import br.com.helpdev.sample.adapters.input.kafka.dto.UserEventDataDto;
import br.com.helpdev.sample.adapters.input.kafka.dto.UserEventDto;
import br.com.helpdev.sample.core.ports.input.UserEnricherPort;

Expand All @@ -25,6 +31,9 @@ class UserEventListener {

private static final String TOPIC_NAME = "user-events";

private static final EventFormat EVENT_FORMAT = Objects.requireNonNull(
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE));

private final Logger logger = LoggerFactory.getLogger(UserEventListener.class);

private final ObjectMapper objectMapper;
Expand All @@ -41,8 +50,8 @@ class UserEventListener {
description = "Listen for user events",
message = @AsyncMessage(
name = "UserEventDto",
contentType = "application/json",
messageId = "uuid"
contentType = JsonFormat.CONTENT_TYPE,
messageId = "id"
),
headers = @AsyncOperation.Headers(
notUsed = true
Expand All @@ -52,16 +61,22 @@ class UserEventListener {
@KafkaAsyncOperationBinding(bindingVersion = "1.0.0")
@KafkaListener(topics = TOPIC_NAME)
@RetryableTopic(attempts = "3", backoff = @Backoff(delay = 1000, maxDelay = 10000, multiplier = 2), autoCreateTopics = "true")
public void listen(final String message) throws JsonProcessingException {
final var userEventDto = objectMapper.readValue(message, UserEventDto.class);
public void listen(final String message) throws IOException {
final var cloudEvent = EVENT_FORMAT.deserialize(message.getBytes(StandardCharsets.UTF_8));

if (UserEventDto.EVENT_TYPE_CREATED.equals(cloudEvent.getType())) {
final var eventData = cloudEvent.getData();
if (eventData == null) {
throw new IllegalArgumentException("CloudEvent data is required for user events");
}
final var userEventData = objectMapper.readValue(eventData.toBytes(), UserEventDataDto.class);

if (UserEventDto.EVENT_CREATED.equals(userEventDto.event())) {
userEnricherPort.enrichUser(UUID.fromString(userEventDto.uuid()));
logger.info("User enriched: {}", userEventDto.uuid());
userEnricherPort.enrichUser(UUID.fromString(userEventData.userUuid()));
logger.info("User enriched: {}", userEventData.userUuid());
return;
}

logger.info("User event ignored to enrich process: {}", userEventDto.event());
logger.info("User event ignored to enrich process: {}", cloudEvent.getType());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package br.com.helpdev.sample.adapters.input.kafka.dto;

import io.swagger.v3.oas.annotations.media.Schema;

public record UserEventDataDto(
@Schema(title = "User UUID", example = "f0f8cf3e-e856-4d61-a613-44f5df7742ca") String userUuid)
{
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package br.com.helpdev.sample.adapters.input.kafka.dto;

import java.time.OffsetDateTime;

import io.swagger.v3.oas.annotations.media.Schema;

public record UserEventDto(
@Schema(title = "Event", example = "CREATED|UPDATED") String event,
@Schema(title = "UUID", example = "uuid") String uuid)
@Schema(title = "CloudEvent Spec Version", example = "1.0") String specversion,
@Schema(title = "CloudEvent Identifier", example = "ef5f318c-4b7c-4fd7-a661-56293d8b8a91") String id,
@Schema(title = "CloudEvent Source", example = "urn:helpdev:sample:user") String source,
@Schema(title = "CloudEvent Type", example = "br.com.helpdev.sample.user.created") String type,
@Schema(title = "CloudEvent Time", example = "2026-04-12T21:33:04Z") OffsetDateTime time,
@Schema(title = "CloudEvent Data Content Type", example = "application/json") String datacontenttype,
@Schema(title = "CloudEvent Data") UserEventDataDto data)
{

public static final String EVENT_CREATED = "CREATED";
public static final String EVENT_TYPE_CREATED = "br.com.helpdev.sample.user.created";

}
Original file line number Diff line number Diff line change
@@ -1,27 +1,16 @@
package br.com.helpdev.sample.adapters.output.kafka;

import java.util.UUID;
import java.time.OffsetDateTime;

import io.swagger.v3.oas.annotations.media.Schema;

public record UserEvent(
@Schema(title = "Event", example = "CREATED|UPDATED") String event,
@Schema(title = "UUID", example = "uuid") String uuid) {

public static final String EVENT_CREATED = "CREATED";

public static final String EVENT_UPDATED = "UPDATED";

public static UserEvent ofCreated(UUID uuid) {
return new UserEvent(EVENT_CREATED, uuid.toString());
}

public static UserEvent ofUpdated(UUID uuid) {
return new UserEvent(EVENT_UPDATED, uuid.toString());
}

public String toJson() {
return String.format("{\"event\":\"%s\",\"uuid\":\"%s\"}", event, uuid);
}

@Schema(title = "CloudEvent Spec Version", example = "1.0") String specversion,
@Schema(title = "CloudEvent Identifier", example = "ef5f318c-4b7c-4fd7-a661-56293d8b8a91") String id,
@Schema(title = "CloudEvent Source", example = "urn:helpdev:sample:user") String source,
@Schema(title = "CloudEvent Type", example = "br.com.helpdev.sample.user.created") String type,
@Schema(title = "CloudEvent Time", example = "2026-04-12T21:33:04Z") OffsetDateTime time,
@Schema(title = "CloudEvent Data Content Type", example = "application/json") String datacontenttype,
@Schema(title = "CloudEvent Data") UserEventData data)
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package br.com.helpdev.sample.adapters.output.kafka;

import io.swagger.v3.oas.annotations.media.Schema;

public record UserEventData(
@Schema(title = "User UUID", example = "f0f8cf3e-e856-4d61-a613-44f5df7742ca") String userUuid)
{
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
package br.com.helpdev.sample.adapters.output.kafka;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Objects;
import java.util.UUID;

import io.github.springwolf.bindings.kafka.annotations.KafkaAsyncOperationBinding;
import io.github.springwolf.core.asyncapi.annotations.AsyncMessage;
import io.github.springwolf.core.asyncapi.annotations.AsyncOperation;
import io.github.springwolf.core.asyncapi.annotations.AsyncPublisher;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;

import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import br.com.helpdev.sample.core.domain.entities.User;
import br.com.helpdev.sample.core.ports.output.UserEventDispatcherPort;

Expand All @@ -16,37 +31,67 @@ class UserEventDispatcher implements UserEventDispatcherPort {

private static final String USER_EVENTS_TOPIC = "user-events";

private static final String CLOUD_EVENT_SOURCE = "urn:helpdev:sample:user";

private static final String USER_CREATED_EVENT_TYPE = "br.com.helpdev.sample.user.created";

private static final String USER_ADDRESS_UPDATED_EVENT_TYPE = "br.com.helpdev.sample.user.address.updated";

private static final EventFormat EVENT_FORMAT = Objects.requireNonNull(
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE));

private final KafkaTemplate<String, String> kafkaProducer;

UserEventDispatcher(final KafkaTemplate<String, String> kafkaProducer) {
private final ObjectMapper objectMapper;

UserEventDispatcher(final KafkaTemplate<String, String> kafkaProducer, final ObjectMapper objectMapper) {
this.kafkaProducer = kafkaProducer;
this.objectMapper = objectMapper;
}

@Override
public void sendUserCreatedEvent(final User user) {
publish(user, UserEvent.ofCreated(user.uuid()));
publish(user, USER_CREATED_EVENT_TYPE);
}

@Override
public void sendUserAddressUpdatedEvent(final User user) {
publish(user, UserEvent.ofUpdated(user.uuid()));
publish(user, USER_ADDRESS_UPDATED_EVENT_TYPE);
}

@AsyncPublisher(
operation = @AsyncOperation(
channelName = USER_EVENTS_TOPIC,
description = "Publish user events",
message = @AsyncMessage(
name = "UserEvent",
contentType = "application/json",
messageId = "uuid"
),
payloadType = UserEvent.class
)
)
@KafkaAsyncOperationBinding(bindingVersion = "1.0.0")
void publish(User user, UserEvent userEvent) {
kafkaProducer.send(USER_EVENTS_TOPIC, user.uuid().toString(), userEvent.toJson());
@AsyncPublisher(
operation = @AsyncOperation(
channelName = USER_EVENTS_TOPIC,
description = "Publish user events",
message = @AsyncMessage(
name = "UserEvent",
contentType = JsonFormat.CONTENT_TYPE,
messageId = "id"
),
payloadType = UserEvent.class
)
)
@KafkaAsyncOperationBinding(bindingVersion = "1.0.0")
void publish(final User user, final String eventType) {
final var cloudEvent = CloudEventBuilder
.v1()
.withId(UUID.randomUUID().toString())
.withType(eventType)
.withSource(URI.create(CLOUD_EVENT_SOURCE))
.withTime(OffsetDateTime.now(ZoneOffset.UTC))
.withData("application/json", serializeData(new UserEventData(user.uuid().toString())))
.build();

kafkaProducer.send(USER_EVENTS_TOPIC, user.uuid().toString(),
new String(EVENT_FORMAT.serialize(cloudEvent), StandardCharsets.UTF_8));
}

private byte[] serializeData(final UserEventData userEventData) {
try {
return objectMapper.writeValueAsBytes(userEventData);
} catch (JsonProcessingException exception) {
throw new KafkaException("Cannot serialize CloudEvent user payload", exception);
}
}

}
Loading