Skip to content

Commit

Permalink
Merge pull request #19 from GroupHQ/GROUP-89-Fix-Loading-Issues
Browse files Browse the repository at this point in the history
GROUP-89 Replaced OutboxEventJson references with OutboxEvent
  • Loading branch information
makmn1 committed Mar 1, 2024
2 parents dbd146d + 4dc246f commit 7b0fef0
Show file tree
Hide file tree
Showing 14 changed files with 70 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import lombok.extern.slf4j.Slf4j;
import org.grouphq.groupsync.groupservice.domain.members.Member;
import org.grouphq.groupsync.groupservice.domain.outbox.EventDataModel;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEventJson;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEvent;
import org.grouphq.groupsync.groupservice.domain.outbox.enums.AggregateType;
import org.grouphq.groupsync.groupservice.domain.outbox.enums.EventStatus;
import org.grouphq.groupsync.groupservice.domain.outbox.enums.EventType;
Expand Down Expand Up @@ -34,7 +34,7 @@ public record PublicOutboxEvent(Long aggregateId, AggregateType aggregateType,
OBJECT_MAPPER.registerModule(new JavaTimeModule());
}

public static PublicOutboxEvent convertOutboxEvent(OutboxEventJson outboxEvent) {
public static PublicOutboxEvent convertOutboxEvent(OutboxEvent outboxEvent) {
return switch (outboxEvent.getEventType()) {
case GROUP_CREATED -> convertGroupCreated(outboxEvent);
case GROUP_UPDATED -> convertGroupStatusUpdated(outboxEvent);
Expand All @@ -44,15 +44,15 @@ public static PublicOutboxEvent convertOutboxEvent(OutboxEventJson outboxEvent)
};
}

private static PublicOutboxEvent convertGroupCreated(OutboxEventJson outboxEvent) {
private static PublicOutboxEvent convertGroupCreated(OutboxEvent outboxEvent) {
return convertDefault(outboxEvent);
}

private static PublicOutboxEvent convertGroupStatusUpdated(OutboxEventJson outboxEvent) {
private static PublicOutboxEvent convertGroupStatusUpdated(OutboxEvent outboxEvent) {
return convertDefault(outboxEvent);
}

private static PublicOutboxEvent convertMemberJoined(OutboxEventJson outboxEvent) {
private static PublicOutboxEvent convertMemberJoined(OutboxEvent outboxEvent) {
PublicOutboxEvent publicOutboxEvent;

publicOutboxEvent = new PublicOutboxEvent(
Expand All @@ -67,11 +67,11 @@ private static PublicOutboxEvent convertMemberJoined(OutboxEventJson outboxEvent
return publicOutboxEvent;
}

private static PublicOutboxEvent convertMemberLeft(OutboxEventJson outboxEvent) {
private static PublicOutboxEvent convertMemberLeft(OutboxEvent outboxEvent) {
return convertDefault(outboxEvent);
}

private static PublicOutboxEvent convertDefault(OutboxEventJson outboxEvent) {
private static PublicOutboxEvent convertDefault(OutboxEvent outboxEvent) {
return new PublicOutboxEvent(
outboxEvent.getAggregateId(),
outboxEvent.getAggregateType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import lombok.extern.slf4j.Slf4j;
import org.grouphq.groupsync.group.domain.PublicOutboxEvent;
import org.grouphq.groupsync.group.sync.GroupUpdateService;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEventJson;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
Expand All @@ -24,7 +24,7 @@ public GroupEventForwarder(GroupUpdateService groupUpdateService) {
}

@Bean
public Consumer<Flux<OutboxEventJson>> processedEvents() {
public Consumer<Flux<OutboxEvent>> processedEvents() {
return outboxEvents ->
outboxEvents.flatMap(this::forwardUpdate)
.doOnError(throwable -> log.error("Error while forwarding events. "
Expand All @@ -33,7 +33,7 @@ public Consumer<Flux<OutboxEventJson>> processedEvents() {
.subscribe();
}

private Mono<Void> forwardUpdate(OutboxEventJson outboxEvent) {
private Mono<Void> forwardUpdate(OutboxEvent outboxEvent) {
switch (outboxEvent.getEventStatus()) {
case SUCCESSFUL -> {
groupUpdateService.sendPublicOutboxEventToAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.time.Duration;
import lombok.extern.slf4j.Slf4j;
import org.grouphq.groupsync.group.domain.PublicOutboxEvent;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEventJson;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEvent;
import org.springframework.stereotype.Service;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
Expand All @@ -18,7 +18,7 @@ public class GroupUpdateService {

private final Sinks.Many<PublicOutboxEvent> publicUpdatesSink;

private final Sinks.Many<OutboxEventJson> userUpdatesSink;
private final Sinks.Many<OutboxEvent> userUpdatesSink;

public GroupUpdateService() {
publicUpdatesSink = Sinks.many().replay().limit(Duration.ofSeconds(5));
Expand All @@ -31,7 +31,7 @@ public Flux<PublicOutboxEvent> publicUpdatesStream() {
.onBackpressureBuffer(100, BufferOverflowStrategy.DROP_OLDEST);
}

public Flux<OutboxEventJson> eventOwnerUpdateStream() {
public Flux<OutboxEvent> eventOwnerUpdateStream() {
return userUpdatesSink.asFlux()
.onBackpressureBuffer(100, BufferOverflowStrategy.DROP_OLDEST);
}
Expand All @@ -41,7 +41,7 @@ public void sendPublicOutboxEventToAll(PublicOutboxEvent outboxEvent) {
emitResultLogger("PUBLIC", outboxEvent, result);
}

public void sendOutboxEventToEventOwner(OutboxEventJson outboxEvent) {
public void sendOutboxEventToEventOwner(OutboxEvent outboxEvent) {
final Sinks.EmitResult result = userUpdatesSink.tryEmitNext(outboxEvent);
emitResultLogger(outboxEvent.getEventStatus().toString(), outboxEvent, result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.grouphq.groupsync.group.sync.GroupUpdateService;
import org.grouphq.groupsync.groupservice.domain.exceptions.InternalServerError;
import org.grouphq.groupsync.groupservice.domain.members.Member;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEventJson;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEvent;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.security.core.context.ReactiveSecurityContextHolder;
import org.springframework.security.core.context.SecurityContext;
Expand Down Expand Up @@ -44,7 +44,7 @@ public Flux<PublicOutboxEvent> getPublicUpdates() {
}

@MessageMapping("groups.updates.user")
public Flux<OutboxEventJson> getEventOwnerUpdates() {
public Flux<OutboxEvent> getEventOwnerUpdates() {
return groupUpdateService.eventOwnerUpdateStream()
.flatMap(outboxEvent -> monoIsUserEventOwner(outboxEvent)
.flatMap(isOwner -> isOwner ? Mono.just(outboxEvent) : Mono.empty())
Expand All @@ -69,7 +69,7 @@ public Mono<Member> getMyMember() {
});
}

private Mono<Boolean> monoIsUserEventOwner(OutboxEventJson outboxEvent) {
private Mono<Boolean> monoIsUserEventOwner(OutboxEvent outboxEvent) {
return userService.getUserAuthentication()
.map(Principal::getName)
.flatMap(username -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ public class OutboxEvent {
private final String websocketId;
private final AggregateType aggregateType;
private final EventType eventType;
private final String eventData;
private final EventDataModel eventData;
private final EventStatus eventStatus;
private final Instant createdDate;

public static OutboxEvent of(UUID eventId, Long aggregateId, AggregateType aggregateType,
EventType eventType, String eventData, EventStatus eventStatus,
EventType eventType, EventDataModel eventData, EventStatus eventStatus,
String websocketId) {
return new OutboxEvent(eventId, aggregateId, websocketId, aggregateType,
eventType, eventData, eventStatus, Instant.now());
Expand Down

This file was deleted.

2 changes: 1 addition & 1 deletion src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ spring:
destination: group-leave-requests
group: ${spring.application.name}
processedEvents-in-0:
destination: group-event-results-temp-group-89
destination: group-event-results
group: ${spring.application.name}
rabbitmq:
host: {$SPRING_RABBITMQ_HOST:localhost}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import org.grouphq.groupsync.GroupTestUtility;
import org.grouphq.groupsync.group.domain.PublicOutboxEvent;
import org.grouphq.groupsync.group.web.GroupSyncController;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEventJson;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEvent;
import org.grouphq.groupsync.groupservice.domain.outbox.enums.EventStatus;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -40,9 +40,9 @@ class GroupEventForwarderIntegrationTest {
@DisplayName("Forwards events to the outbox event update successful sink")
void forwardsEventsToTheOutboxEventUpdateSink() {
final PublicOutboxEvent[] publicEvents = {
PublicOutboxEvent.convertOutboxEvent(GroupTestUtility.generateOutboxEventJson()),
PublicOutboxEvent.convertOutboxEvent(GroupTestUtility.generateOutboxEventJson()),
PublicOutboxEvent.convertOutboxEvent(GroupTestUtility.generateOutboxEventJson())
PublicOutboxEvent.convertOutboxEvent(GroupTestUtility.generateOutboxEvent()),
PublicOutboxEvent.convertOutboxEvent(GroupTestUtility.generateOutboxEvent()),
PublicOutboxEvent.convertOutboxEvent(GroupTestUtility.generateOutboxEvent())
};

final Flux<PublicOutboxEvent> groupUpdatesStream =
Expand Down Expand Up @@ -70,13 +70,13 @@ void forwardsEventsToTheOutboxEventUpdateSink() {
@WithMockUser(username = USER)
@DisplayName("Forwards events to the outbox event update failed sink")
void forwardsEventsToTheOutboxEventUpdateFailedSink() {
final OutboxEventJson[] outboxEvents = {
GroupTestUtility.generateOutboxEventJson(USER, EventStatus.FAILED),
GroupTestUtility.generateOutboxEventJson(USER, EventStatus.FAILED),
GroupTestUtility.generateOutboxEventJson(USER, EventStatus.FAILED)
final OutboxEvent[] outboxEvents = {
GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED),
GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED),
GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED)
};

final Flux<OutboxEventJson> groupUpdatesStream =
final Flux<OutboxEvent> groupUpdatesStream =
groupSyncController.getEventOwnerUpdates()
.doOnSubscribe(subscription -> {
inputDestination.send(new GenericMessage<>(outboxEvents[0]), eventDestination);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.grouphq.groupsync.GroupTestUtility;
import org.grouphq.groupsync.group.domain.PublicOutboxEvent;
import org.grouphq.groupsync.group.sync.GroupUpdateService;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEventJson;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEvent;
import org.grouphq.groupsync.groupservice.domain.outbox.enums.EventStatus;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Tag;
Expand All @@ -30,8 +30,8 @@ class GroupEventForwarderTest {
@Test
@DisplayName("Forwards successful events to the to all users sink and the event owner sink")
void forwardsSuccessfulEventsToTheUpdatesSink() {
final OutboxEventJson outboxEvent =
GroupTestUtility.generateOutboxEventJson("ID", EventStatus.SUCCESSFUL);
final OutboxEvent outboxEvent =
GroupTestUtility.generateOutboxEvent("ID", EventStatus.SUCCESSFUL);
final PublicOutboxEvent publicOutboxEvent =
PublicOutboxEvent.convertOutboxEvent(outboxEvent);

Expand All @@ -47,8 +47,8 @@ void forwardsSuccessfulEventsToTheUpdatesSink() {
@Test
@DisplayName("Forwards failed events to only the event owner sink")
void forwardsFailedEventsToTheUpdatesFailedSink() {
final OutboxEventJson outboxEvent =
GroupTestUtility.generateOutboxEventJson("ID", EventStatus.FAILED);
final OutboxEvent outboxEvent =
GroupTestUtility.generateOutboxEvent("ID", EventStatus.FAILED);

willDoNothing().given(groupUpdateService).sendOutboxEventToEventOwner(outboxEvent);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.time.Duration;
import org.grouphq.groupsync.GroupTestUtility;
import org.grouphq.groupsync.group.domain.PublicOutboxEvent;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEventJson;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEvent;
import org.grouphq.groupsync.groupservice.domain.outbox.enums.EventStatus;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Tag;
Expand All @@ -32,7 +32,7 @@ void returnsSinkForUpdates() {
@Test
@DisplayName("Updates sink with successful events and emits them")
void updatesSinkWithNewOutboxEventsAndEmitsThem() {
final OutboxEventJson outboxEvent = GroupTestUtility.generateOutboxEventJson();
final OutboxEvent outboxEvent = GroupTestUtility.generateOutboxEvent();
final PublicOutboxEvent publicOutboxEvent =
PublicOutboxEvent.convertOutboxEvent(outboxEvent);

Expand All @@ -53,8 +53,8 @@ void returnsSinkForFailedUpdates() {
@Test
@DisplayName("Updates sink with failed events and emits them")
void updatesSinkWithFailedOutboxEventsAndEmitsThem() {
final OutboxEventJson outboxEvent =
GroupTestUtility.generateOutboxEventJson("ID", EventStatus.FAILED);
final OutboxEvent outboxEvent =
GroupTestUtility.generateOutboxEvent("ID", EventStatus.FAILED);

StepVerifier.create(groupUpdateService.eventOwnerUpdateStream())
.then(() -> groupUpdateService.sendOutboxEventToEventOwner(outboxEvent))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.grouphq.groupsync.group.domain.PublicOutboxEvent;
import org.grouphq.groupsync.group.security.UserService;
import org.grouphq.groupsync.group.sync.GroupUpdateService;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEventJson;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEvent;
import org.grouphq.groupsync.groupservice.domain.outbox.enums.EventStatus;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -40,7 +40,7 @@ class GroupSyncControllerTest {
@Test
@DisplayName("Test streaming outbox events to all clients")
void testGetOutboxEventUpdates() {
final OutboxEventJson event = GroupTestUtility.generateOutboxEventJson();
final OutboxEvent event = GroupTestUtility.generateOutboxEvent();
final PublicOutboxEvent publicOutboxEvent = PublicOutboxEvent.convertOutboxEvent(event);

// Mimic a stream of events, followed by an error that should be ignored
Expand All @@ -64,14 +64,14 @@ void testGetOutboxEventUpdates() {
@WithMockUser(username = "Banana")
@DisplayName("Test streaming outbox events to the client who made the request")
void testGetOutboxEventUpdatesFailed() {
final OutboxEventJson[] events = {
GroupTestUtility.generateOutboxEventJson("Apricot", EventStatus.FAILED),
GroupTestUtility.generateOutboxEventJson("Banana", EventStatus.FAILED),
GroupTestUtility.generateOutboxEventJson("Cherry", EventStatus.FAILED)
final OutboxEvent[] events = {
GroupTestUtility.generateOutboxEvent("Apricot", EventStatus.FAILED),
GroupTestUtility.generateOutboxEvent("Banana", EventStatus.FAILED),
GroupTestUtility.generateOutboxEvent("Cherry", EventStatus.FAILED)
};

// Mimic a stream of events
final Sinks.Many<OutboxEventJson> sink = Sinks.many().replay().limit(100);
final Sinks.Many<OutboxEvent> sink = Sinks.many().replay().limit(100);
sink.tryEmitNext(events[0]);
sink.tryEmitNext(events[1]);
sink.tryEmitNext(events[2]);
Expand Down
Loading

0 comments on commit 7b0fef0

Please sign in to comment.