From 05bed21df8fe615a4331bdf7a140b958f03f6765 Mon Sep 17 00:00:00 2001 From: Guilherme Biff Zarelli Date: Sun, 12 Apr 2026 18:52:10 -0300 Subject: [PATCH] Implement CloudEvents specification Adopt CloudEvents structured JSON for Kafka user events, update the event adapters and tests, and document the contract decision.\n\nCloses #3\n\nCo-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- README.md | 8 +- README.pt.md | 6 +- application/pom.xml | 13 +++ .../input/kafka/UserEventListener.java | 33 +++++-- .../input/kafka/dto/UserEventDataDto.java | 8 ++ .../input/kafka/dto/UserEventDto.java | 13 ++- .../adapters/output/kafka/UserEvent.java | 29 ++---- .../adapters/output/kafka/UserEventData.java | 8 ++ .../output/kafka/UserEventDispatcher.java | 81 +++++++++++++---- .../input/kafka/UserEventListenerTest.java | 79 ++++++++++++---- .../output/kafka/UserEventDispatcherTest.java | 91 +++++++++++++++++-- ...cloudevents-as-the-kafka-event-contract.md | 51 +++++++++++ docs/adr/README.md | 1 + docs/adr/README.pt.md | 2 +- 14 files changed, 340 insertions(+), 83 deletions(-) create mode 100644 application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/dto/UserEventDataDto.java create mode 100644 application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEventData.java create mode 100644 docs/adr/0004-use-cloudevents-as-the-kafka-event-contract.md diff --git a/README.md b/README.md index d80b889..061de1d 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,10 @@ Here you will describe this project, what it does, and its goals, making it clear to everyone. Example: -The **Java Architecture Template** is a project designed to serve as a template for creating applications, aiming for development with exceptional -technical quality to ensure long-term maintainability. -In this template, we provide a user registration endpoint that triggers an event in the broker when a user is registered. A listener will receive these creation events and enrich them with address data. +The **Java Architecture Template** is a project designed to serve as a template for creating applications, aiming for development with exceptional +technical quality to ensure long-term maintainability. +In this template, we provide a user registration endpoint that publishes a **CloudEvent** to Kafka when a user is registered. A listener consumes +CloudEvents of type `br.com.helpdev.sample.user.created` and enriches the user with address data. 📖 Read this in: - 🇧🇷 [Português](README.pt.md) @@ -171,6 +172,7 @@ After starting the application, access: ### **AsyncAPI** This project uses **Springwolf** to document asynchronous events (Kafka, RabbitMQ, etc.) with **AsyncAPI**. +Kafka messages on the `user-events` topic follow the **CloudEvents structured JSON** format (`application/cloudevents+json`). 🔗 [Official AsyncAPI site](https://www.asyncapi.com/) diff --git a/README.pt.md b/README.pt.md index beb5eb9..0be271a 100644 --- a/README.pt.md +++ b/README.pt.md @@ -6,8 +6,9 @@ Aqui você deve descrever seu projeto, seu funcionamento e seus objetivos, tornando-o claro para todos. Exemplo: -O **Java Architecture Template** é um projeto criado para servir como modelo na criação de aplicações, visando um desenvolvimento com **qualidade técnica excepcional** para garantir **manutenção a longo prazo**. -Neste template, fornecemos um **endpoint de cadastro de usuário**, que **dispara um evento no broker** quando um usuário é registrado. Um **listener recebe esses eventos** de criação e os enriquece com dados de endereço. +O **Java Architecture Template** é um projeto criado para servir como modelo na criação de aplicações, visando um desenvolvimento com **qualidade técnica excepcional** para garantir **manutenção a longo prazo**. +Neste template, fornecemos um **endpoint de cadastro de usuário** que publica um **CloudEvent** no Kafka quando um usuário é registrado. Um +**listener consome CloudEvents** do tipo `br.com.helpdev.sample.user.created` e enriquece o usuário com dados de endereço. 📚 Leia em: - 🇬🇧 [English](README.md) @@ -159,6 +160,7 @@ Após iniciar a aplicação, acesse: ### **AsyncAPI** Este projeto utiliza o **Springwolf** para documentar eventos assíncronos (Kafka, RabbitMQ, etc.) com **AsyncAPI**. +As mensagens Kafka no tópico `user-events` seguem o formato **CloudEvents structured JSON** (`application/cloudevents+json`). 🔗 [Site oficial da AsyncAPI](https://www.asyncapi.com/) diff --git a/application/pom.xml b/application/pom.xml index fc3268c..095f721 100644 --- a/application/pom.xml +++ b/application/pom.xml @@ -17,6 +17,7 @@ 2024.0.0 3.4.1 1.3.0 + 4.0.2 8.4.0 2.8.1 1.9.0 @@ -52,6 +53,18 @@ ${springwolf.version} + + + io.cloudevents + cloudevents-core + ${cloudevents.version} + + + io.cloudevents + cloudevents-json-jackson + ${cloudevents.version} + + org.springframework.cloud diff --git a/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListener.java b/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListener.java index b6513d7..8cd6a48 100644 --- a/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListener.java +++ b/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListener.java @@ -1,11 +1,17 @@ package br.com.helpdev.sample.adapters.input.kafka; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Objects; import java.util.UUID; import io.github.springwolf.bindings.kafka.annotations.KafkaAsyncOperationBinding; import io.github.springwolf.core.asyncapi.annotations.AsyncListener; import io.github.springwolf.core.asyncapi.annotations.AsyncMessage; import io.github.springwolf.core.asyncapi.annotations.AsyncOperation; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,9 +20,9 @@ import org.springframework.retry.annotation.Backoff; import org.springframework.stereotype.Controller; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import br.com.helpdev.sample.adapters.input.kafka.dto.UserEventDataDto; import br.com.helpdev.sample.adapters.input.kafka.dto.UserEventDto; import br.com.helpdev.sample.core.ports.input.UserEnricherPort; @@ -25,6 +31,9 @@ class UserEventListener { private static final String TOPIC_NAME = "user-events"; + private static final EventFormat EVENT_FORMAT = Objects.requireNonNull( + EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)); + private final Logger logger = LoggerFactory.getLogger(UserEventListener.class); private final ObjectMapper objectMapper; @@ -41,8 +50,8 @@ class UserEventListener { description = "Listen for user events", message = @AsyncMessage( name = "UserEventDto", - contentType = "application/json", - messageId = "uuid" + contentType = JsonFormat.CONTENT_TYPE, + messageId = "id" ), headers = @AsyncOperation.Headers( notUsed = true @@ -52,16 +61,22 @@ class UserEventListener { @KafkaAsyncOperationBinding(bindingVersion = "1.0.0") @KafkaListener(topics = TOPIC_NAME) @RetryableTopic(attempts = "3", backoff = @Backoff(delay = 1000, maxDelay = 10000, multiplier = 2), autoCreateTopics = "true") - public void listen(final String message) throws JsonProcessingException { - final var userEventDto = objectMapper.readValue(message, UserEventDto.class); + public void listen(final String message) throws IOException { + final var cloudEvent = EVENT_FORMAT.deserialize(message.getBytes(StandardCharsets.UTF_8)); + + if (UserEventDto.EVENT_TYPE_CREATED.equals(cloudEvent.getType())) { + final var eventData = cloudEvent.getData(); + if (eventData == null) { + throw new IllegalArgumentException("CloudEvent data is required for user events"); + } + final var userEventData = objectMapper.readValue(eventData.toBytes(), UserEventDataDto.class); - if (UserEventDto.EVENT_CREATED.equals(userEventDto.event())) { - userEnricherPort.enrichUser(UUID.fromString(userEventDto.uuid())); - logger.info("User enriched: {}", userEventDto.uuid()); + userEnricherPort.enrichUser(UUID.fromString(userEventData.userUuid())); + logger.info("User enriched: {}", userEventData.userUuid()); return; } - logger.info("User event ignored to enrich process: {}", userEventDto.event()); + logger.info("User event ignored to enrich process: {}", cloudEvent.getType()); } } diff --git a/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/dto/UserEventDataDto.java b/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/dto/UserEventDataDto.java new file mode 100644 index 0000000..aabcfe4 --- /dev/null +++ b/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/dto/UserEventDataDto.java @@ -0,0 +1,8 @@ +package br.com.helpdev.sample.adapters.input.kafka.dto; + +import io.swagger.v3.oas.annotations.media.Schema; + +public record UserEventDataDto( + @Schema(title = "User UUID", example = "f0f8cf3e-e856-4d61-a613-44f5df7742ca") String userUuid) +{ +} diff --git a/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/dto/UserEventDto.java b/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/dto/UserEventDto.java index ce01235..eace819 100644 --- a/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/dto/UserEventDto.java +++ b/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/dto/UserEventDto.java @@ -1,12 +1,19 @@ package br.com.helpdev.sample.adapters.input.kafka.dto; +import java.time.OffsetDateTime; + import io.swagger.v3.oas.annotations.media.Schema; public record UserEventDto( - @Schema(title = "Event", example = "CREATED|UPDATED") String event, - @Schema(title = "UUID", example = "uuid") String uuid) + @Schema(title = "CloudEvent Spec Version", example = "1.0") String specversion, + @Schema(title = "CloudEvent Identifier", example = "ef5f318c-4b7c-4fd7-a661-56293d8b8a91") String id, + @Schema(title = "CloudEvent Source", example = "urn:helpdev:sample:user") String source, + @Schema(title = "CloudEvent Type", example = "br.com.helpdev.sample.user.created") String type, + @Schema(title = "CloudEvent Time", example = "2026-04-12T21:33:04Z") OffsetDateTime time, + @Schema(title = "CloudEvent Data Content Type", example = "application/json") String datacontenttype, + @Schema(title = "CloudEvent Data") UserEventDataDto data) { - public static final String EVENT_CREATED = "CREATED"; + public static final String EVENT_TYPE_CREATED = "br.com.helpdev.sample.user.created"; } diff --git a/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEvent.java b/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEvent.java index 72dfb73..5a071ed 100644 --- a/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEvent.java +++ b/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEvent.java @@ -1,27 +1,16 @@ package br.com.helpdev.sample.adapters.output.kafka; -import java.util.UUID; +import java.time.OffsetDateTime; import io.swagger.v3.oas.annotations.media.Schema; public record UserEvent( - @Schema(title = "Event", example = "CREATED|UPDATED") String event, - @Schema(title = "UUID", example = "uuid") String uuid) { - - public static final String EVENT_CREATED = "CREATED"; - - public static final String EVENT_UPDATED = "UPDATED"; - - public static UserEvent ofCreated(UUID uuid) { - return new UserEvent(EVENT_CREATED, uuid.toString()); - } - - public static UserEvent ofUpdated(UUID uuid) { - return new UserEvent(EVENT_UPDATED, uuid.toString()); - } - - public String toJson() { - return String.format("{\"event\":\"%s\",\"uuid\":\"%s\"}", event, uuid); - } - + @Schema(title = "CloudEvent Spec Version", example = "1.0") String specversion, + @Schema(title = "CloudEvent Identifier", example = "ef5f318c-4b7c-4fd7-a661-56293d8b8a91") String id, + @Schema(title = "CloudEvent Source", example = "urn:helpdev:sample:user") String source, + @Schema(title = "CloudEvent Type", example = "br.com.helpdev.sample.user.created") String type, + @Schema(title = "CloudEvent Time", example = "2026-04-12T21:33:04Z") OffsetDateTime time, + @Schema(title = "CloudEvent Data Content Type", example = "application/json") String datacontenttype, + @Schema(title = "CloudEvent Data") UserEventData data) +{ } diff --git a/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEventData.java b/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEventData.java new file mode 100644 index 0000000..f0524d4 --- /dev/null +++ b/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEventData.java @@ -0,0 +1,8 @@ +package br.com.helpdev.sample.adapters.output.kafka; + +import io.swagger.v3.oas.annotations.media.Schema; + +public record UserEventData( + @Schema(title = "User UUID", example = "f0f8cf3e-e856-4d61-a613-44f5df7742ca") String userUuid) +{ +} diff --git a/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcher.java b/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcher.java index d32820d..c37f9bf 100644 --- a/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcher.java +++ b/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcher.java @@ -1,13 +1,28 @@ package br.com.helpdev.sample.adapters.output.kafka; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Objects; +import java.util.UUID; + import io.github.springwolf.bindings.kafka.annotations.KafkaAsyncOperationBinding; import io.github.springwolf.core.asyncapi.annotations.AsyncMessage; import io.github.springwolf.core.asyncapi.annotations.AsyncOperation; import io.github.springwolf.core.asyncapi.annotations.AsyncPublisher; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; +import org.springframework.kafka.KafkaException; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + import br.com.helpdev.sample.core.domain.entities.User; import br.com.helpdev.sample.core.ports.output.UserEventDispatcherPort; @@ -16,37 +31,67 @@ class UserEventDispatcher implements UserEventDispatcherPort { private static final String USER_EVENTS_TOPIC = "user-events"; + private static final String CLOUD_EVENT_SOURCE = "urn:helpdev:sample:user"; + + private static final String USER_CREATED_EVENT_TYPE = "br.com.helpdev.sample.user.created"; + + private static final String USER_ADDRESS_UPDATED_EVENT_TYPE = "br.com.helpdev.sample.user.address.updated"; + + private static final EventFormat EVENT_FORMAT = Objects.requireNonNull( + EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)); + private final KafkaTemplate kafkaProducer; - UserEventDispatcher(final KafkaTemplate kafkaProducer) { + private final ObjectMapper objectMapper; + + UserEventDispatcher(final KafkaTemplate kafkaProducer, final ObjectMapper objectMapper) { this.kafkaProducer = kafkaProducer; + this.objectMapper = objectMapper; } @Override public void sendUserCreatedEvent(final User user) { - publish(user, UserEvent.ofCreated(user.uuid())); + publish(user, USER_CREATED_EVENT_TYPE); } @Override public void sendUserAddressUpdatedEvent(final User user) { - publish(user, UserEvent.ofUpdated(user.uuid())); + publish(user, USER_ADDRESS_UPDATED_EVENT_TYPE); } - @AsyncPublisher( - operation = @AsyncOperation( - channelName = USER_EVENTS_TOPIC, - description = "Publish user events", - message = @AsyncMessage( - name = "UserEvent", - contentType = "application/json", - messageId = "uuid" - ), - payloadType = UserEvent.class - ) - ) - @KafkaAsyncOperationBinding(bindingVersion = "1.0.0") - void publish(User user, UserEvent userEvent) { - kafkaProducer.send(USER_EVENTS_TOPIC, user.uuid().toString(), userEvent.toJson()); + @AsyncPublisher( + operation = @AsyncOperation( + channelName = USER_EVENTS_TOPIC, + description = "Publish user events", + message = @AsyncMessage( + name = "UserEvent", + contentType = JsonFormat.CONTENT_TYPE, + messageId = "id" + ), + payloadType = UserEvent.class + ) + ) + @KafkaAsyncOperationBinding(bindingVersion = "1.0.0") + void publish(final User user, final String eventType) { + final var cloudEvent = CloudEventBuilder + .v1() + .withId(UUID.randomUUID().toString()) + .withType(eventType) + .withSource(URI.create(CLOUD_EVENT_SOURCE)) + .withTime(OffsetDateTime.now(ZoneOffset.UTC)) + .withData("application/json", serializeData(new UserEventData(user.uuid().toString()))) + .build(); + + kafkaProducer.send(USER_EVENTS_TOPIC, user.uuid().toString(), + new String(EVENT_FORMAT.serialize(cloudEvent), StandardCharsets.UTF_8)); + } + + private byte[] serializeData(final UserEventData userEventData) { + try { + return objectMapper.writeValueAsBytes(userEventData); + } catch (JsonProcessingException exception) { + throw new KafkaException("Cannot serialize CloudEvent user payload", exception); + } } } diff --git a/application/src/test/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListenerTest.java b/application/src/test/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListenerTest.java index 5e7d49c..b243ba5 100644 --- a/application/src/test/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListenerTest.java +++ b/application/src/test/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListenerTest.java @@ -1,58 +1,99 @@ package br.com.helpdev.sample.adapters.input.kafka; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; +import java.util.Objects; import java.util.UUID; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import br.com.helpdev.sample.adapters.input.kafka.dto.UserEventDto; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; + +import br.com.helpdev.sample.adapters.input.kafka.dto.UserEventDataDto; import br.com.helpdev.sample.core.ports.input.UserEnricherPort; @ExtendWith(MockitoExtension.class) class UserEventListenerTest { - @Mock - private ObjectMapper objectMapper; + private static final String CLOUD_EVENT_SOURCE = "urn:helpdev:sample:user"; + + private static final EventFormat EVENT_FORMAT = Objects.requireNonNull( + EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)); @Mock private UserEnricherPort userEnricherPort; - @InjectMocks private UserEventListener userEventListener; - @Test - void testListen_UserCreatedEvent() throws JsonProcessingException { - UserEventDto userEventDto = new UserEventDto(UserEventDto.EVENT_CREATED, UUID.randomUUID().toString()); - String message = "{\"event\":\"" + UserEventDto.EVENT_CREATED + "\",\"uuid\":\"" + userEventDto.uuid() + "\"}"; + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper(); + userEventListener = new UserEventListener(objectMapper, userEnricherPort); + } - when(objectMapper.readValue(message, UserEventDto.class)).thenReturn(userEventDto); + @Test + void testListen_UserCreatedEvent() throws Exception { + final var userUuid = UUID.randomUUID(); + final var message = cloudEventMessage("br.com.helpdev.sample.user.created", userUuid.toString()); userEventListener.listen(message); - verify(userEnricherPort).enrichUser(UUID.fromString(userEventDto.uuid())); + verify(userEnricherPort).enrichUser(userUuid); verifyNoMoreInteractions(userEnricherPort); } @Test - void testListen_UserEventIgnored() throws JsonProcessingException { - UserEventDto userEventDto = new UserEventDto("EVENT_UPDATED", UUID.randomUUID().toString()); - String message = "{\"event\":\"OTHER_EVENT\",\"uuid\":\"" + userEventDto.uuid() + "\"}"; - - when(objectMapper.readValue(message, UserEventDto.class)).thenReturn(userEventDto); + void testListen_UserEventIgnored() throws Exception { + final var message = cloudEventMessage("br.com.helpdev.sample.user.address.updated", UUID.randomUUID().toString()); userEventListener.listen(message); verifyNoInteractions(userEnricherPort); } -} \ No newline at end of file + + @Test + void testListen_UserCreatedEventWithoutDataShouldThrowException() { + final var cloudEvent = CloudEventBuilder + .v1() + .withId(UUID.randomUUID().toString()) + .withType("br.com.helpdev.sample.user.created") + .withSource(URI.create(CLOUD_EVENT_SOURCE)) + .withTime(OffsetDateTime.now()) + .build(); + + final var message = new String(EVENT_FORMAT.serialize(cloudEvent), StandardCharsets.UTF_8); + + assertThrows(IllegalArgumentException.class, () -> userEventListener.listen(message)); + verifyNoInteractions(userEnricherPort); + } + + private String cloudEventMessage(final String eventType, final String userUuid) throws Exception { + final var cloudEvent = CloudEventBuilder + .v1() + .withId(UUID.randomUUID().toString()) + .withType(eventType) + .withSource(URI.create(CLOUD_EVENT_SOURCE)) + .withTime(OffsetDateTime.now()) + .withData("application/json", objectMapper.writeValueAsBytes(new UserEventDataDto(userUuid))) + .build(); + + return new String(EVENT_FORMAT.serialize(cloudEvent), StandardCharsets.UTF_8); + } +} diff --git a/application/src/test/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcherTest.java b/application/src/test/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcherTest.java index d48d9d3..e49d317 100644 --- a/application/src/test/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcherTest.java +++ b/application/src/test/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcherTest.java @@ -1,52 +1,127 @@ package br.com.helpdev.sample.adapters.output.kafka; +import java.nio.charset.StandardCharsets; import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.Objects; import java.util.UUID; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.kafka.KafkaException; import org.springframework.kafka.core.KafkaTemplate; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.cloudevents.SpecVersion; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; + import br.com.helpdev.sample.core.domain.entities.User; import br.com.helpdev.sample.core.domain.vo.Email; @ExtendWith(MockitoExtension.class) class UserEventDispatcherTest { + private static final String CLOUD_EVENT_SOURCE = "urn:helpdev:sample:user"; + + private static final EventFormat EVENT_FORMAT = Objects.requireNonNull( + EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)); + @Mock private KafkaTemplate kafkaProducer; - @InjectMocks private UserEventDispatcher userEventDispatcher; + private ObjectMapper objectMapper; + private User user; @BeforeEach void setUp() { final var userUuid = UUID.randomUUID(); + objectMapper = new ObjectMapper(); + userEventDispatcher = new UserEventDispatcher(kafkaProducer, objectMapper); user = new User(1L, userUuid, "John Doe", Email.of("john.doe@example.com"), LocalDate.of(2000, 1, 1), null); } @Test - void testSendUserCreatedEvent() { + void testSendUserCreatedEvent() throws Exception { userEventDispatcher.sendUserCreatedEvent(user); - verify(kafkaProducer).send("user-events", user.uuid().toString(), UserEvent.ofCreated(user.uuid()).toJson()); - verifyNoMoreInteractions(kafkaProducer); + assertPublishedEvent("br.com.helpdev.sample.user.created"); } @Test - void testSendUserAddressUpdatedEvent() { + void testSendUserAddressUpdatedEvent() throws Exception { userEventDispatcher.sendUserAddressUpdatedEvent(user); - verify(kafkaProducer).send("user-events", user.uuid().toString(), UserEvent.ofUpdated(user.uuid()).toJson()); + assertPublishedEvent("br.com.helpdev.sample.user.address.updated"); + } + + @Test + void testSendUserCreatedEvent_WhenSerializationFailsShouldThrowKafkaException() throws Exception { + final var failingObjectMapper = mock(ObjectMapper.class); + userEventDispatcher = new UserEventDispatcher(kafkaProducer, failingObjectMapper); + + when(failingObjectMapper.writeValueAsBytes(any(UserEventData.class))).thenThrow(new JsonProcessingException("boom") { + private static final long serialVersionUID = 1L; + }); + + assertThrows(KafkaException.class, () -> userEventDispatcher.sendUserCreatedEvent(user)); + verifyNoInteractions(kafkaProducer); + } + + @Test + void testUserEventRecordShouldExposeCloudEventFields() { + final var time = OffsetDateTime.now(); + final var userEventData = new UserEventData(user.uuid().toString()); + final var userEvent = new UserEvent("1.0", "event-id", CLOUD_EVENT_SOURCE, "br.com.helpdev.sample.user.created", time, + "application/json", userEventData); + + assertEquals("1.0", userEvent.specversion()); + assertEquals("event-id", userEvent.id()); + assertEquals(CLOUD_EVENT_SOURCE, userEvent.source()); + assertEquals("br.com.helpdev.sample.user.created", userEvent.type()); + assertEquals(time, userEvent.time()); + assertEquals("application/json", userEvent.datacontenttype()); + assertEquals(user.uuid().toString(), userEvent.data().userUuid()); + } + + private void assertPublishedEvent(final String expectedType) throws Exception { + final var payloadCaptor = ArgumentCaptor.forClass(String.class); + + verify(kafkaProducer).send(eq("user-events"), eq(user.uuid().toString()), payloadCaptor.capture()); verifyNoMoreInteractions(kafkaProducer); + + final var cloudEvent = EVENT_FORMAT.deserialize(payloadCaptor.getValue().getBytes(StandardCharsets.UTF_8)); + + assertEquals(SpecVersion.V1, cloudEvent.getSpecVersion()); + assertNotNull(cloudEvent.getId()); + assertEquals(expectedType, cloudEvent.getType()); + assertEquals(CLOUD_EVENT_SOURCE, cloudEvent.getSource().toString()); + assertEquals("application/json", cloudEvent.getDataContentType()); + assertNotNull(cloudEvent.getTime()); + assertNotNull(cloudEvent.getData()); + + final var data = objectMapper.readValue(cloudEvent.getData().toBytes(), UserEventData.class); + assertEquals(user.uuid().toString(), data.userUuid()); } -} \ No newline at end of file +} diff --git a/docs/adr/0004-use-cloudevents-as-the-kafka-event-contract.md b/docs/adr/0004-use-cloudevents-as-the-kafka-event-contract.md new file mode 100644 index 0000000..fce28f7 --- /dev/null +++ b/docs/adr/0004-use-cloudevents-as-the-kafka-event-contract.md @@ -0,0 +1,51 @@ +# 4. Use CloudEvents as the Kafka Event Contract + +## Context + +The application already uses Kafka to publish domain events on the `user-events` topic, but the current payload is a custom JSON shape that mixes transport metadata and business data. That makes the contract ad hoc, limits interoperability with external consumers, and leaves event metadata such as source, type, identifier, and content type without a standard representation. + +Issue #3 asks the project to implement the CloudEvents specification using the Java SDK, so the team needs a durable decision for how Kafka events should be represented from now on. + +## Decision + +We will publish and consume Kafka events as **CloudEvents v1.0 in structured JSON format**. + +The event contract will follow these rules: + +- use the Java CloudEvents SDK to build and parse events +- serialize messages as `application/cloudevents+json` +- use the CloudEvent `type` attribute to represent the domain event kind +- use the CloudEvent `data` attribute for the business payload +- keep the Kafka record key tied to the user identifier so ordering semantics stay unchanged for a given user + +## Consequences + +- Kafka events become self-describing and interoperable with CloudEvents-aware tooling and consumers. +- Event metadata is normalized through standard attributes instead of custom JSON conventions. +- The async contract becomes easier to document consistently in AsyncAPI because the envelope is explicit. +- Publishers and listeners must map between domain payloads and the CloudEvents envelope. +- Consumers of the existing custom JSON payload need to understand the new CloudEvents envelope. + +## Alternatives Considered + +- **Keep the current custom JSON payload**: simpler short term, but it keeps the contract proprietary and pushes metadata conventions into custom code. +- **Use CloudEvents binary mode over Kafka headers**: valid and efficient, but it spreads the contract across headers and payload, which makes the sample harder to inspect, test, and document than a self-contained structured JSON message. + +## Rationale + +Structured JSON CloudEvents gives the template a standards-based event contract without changing the existing topic topology or ordering behavior. It keeps each Kafka message self-contained, which fits the template's goals of clarity, learnability, and generated documentation while still aligning with the CloudEvents specification and SDK. + +## Date + +2026-04-12 + +## Status + +Accepted + +## Links + +- [ADR 0003: Use Kafka for Event Streaming](0003-use-kafka-for-event-streaming.md) +- [Issue #3](https://github.com/helpdeveloper/java-architecture-template/issues/3) +- [CloudEvents Specification](https://cloudevents.io/) +- [CloudEvents Java SDK](https://github.com/cloudevents/sdk-java) diff --git a/docs/adr/README.md b/docs/adr/README.md index 8701be6..d30f912 100644 --- a/docs/adr/README.md +++ b/docs/adr/README.md @@ -42,6 +42,7 @@ Follow these steps when creating a new ADR: | 0001 | [Adopt REST for API Communication](0001-adopt-rest-for-api-communication.md) | Accepted | 2024-12-31 | | 0002 | [Use MySQL as the Primary Database](0002-use-mysql.md) | Accepted | 2024-12-31 | | 0003 | [Use Kafka for Event Streaming](0003-use-kafka-for-event-streaming.md) | Accepted | 2024-12-31 | +| 0004 | [Use CloudEvents as the Kafka Event Contract](0004-use-cloudevents-as-the-kafka-event-contract.md) | Accepted | 2026-04-12 | --- diff --git a/docs/adr/README.pt.md b/docs/adr/README.pt.md index 0d0a028..057beec 100644 --- a/docs/adr/README.pt.md +++ b/docs/adr/README.pt.md @@ -40,6 +40,7 @@ Siga estes passos ao criar um novo ADR: | 0001 | [Adotar REST para Comunicação de API](0001-adopt-rest-for-api-communication.md) | Aceito | 2024-12-31 | | 0002 | [Usar MySQL como Banco de Dados Primário](0002-use-mysql.md) | Aceito | 2024-12-31 | | 0003 | [Usar Kafka para Streaming de Eventos](0003-use-kafka-for-event-streaming.md) | Aceito | 2024-12-31 | +| 0004 | [Usar CloudEvents como Contrato de Eventos do Kafka](0004-use-cloudevents-as-the-kafka-event-contract.md) | Aceito | 2026-04-12 | --- @@ -53,4 +54,3 @@ Consulte o arquivo [Modelo de ADR](template.md) para criar novos ADRs. - [O que é um ADR?](https://adr.github.io/) - [Modelos e Exemplos de ADR](https://github.com/joelparkerhenderson/architecture_decision_record) -