Skip to content

Commit

Permalink
GROUP-89 Updated event handling logic and related tests
Browse files Browse the repository at this point in the history
  • Loading branch information
makmn1 committed Mar 18, 2024
1 parent 5780eb6 commit bab636e
Show file tree
Hide file tree
Showing 30 changed files with 273 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.grouphq.groupsync.groupservice.domain.outbox.enums.AggregateType;
import org.grouphq.groupsync.groupservice.domain.outbox.enums.EventStatus;
import org.grouphq.groupsync.groupservice.domain.outbox.enums.EventType;
import reactor.core.publisher.Mono;

/**
* A data-access-object representing an outbox event containing
Expand Down Expand Up @@ -108,4 +109,16 @@ public PublicOutboxEvent withNewEventData(EventDataModel eventDataModel) {
this.createdDate
);
}

public static Mono<PublicOutboxEvent> getEmptyEvent() {
return Mono.just(new PublicOutboxEvent(
UUID.randomUUID(),
0L,
AggregateType.NONE,
EventType.NOTHING,
null,
EventStatus.SUCCESSFUL,
Instant.now()
)).share();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public Consumer<Flux<OutboxEvent>> processedEvents() {
return outboxEvents ->
outboxEvents.flatMap(this::forwardUpdate)
.doOnError(throwable -> log.error("Error while forwarding events. "
+ "Attempting to resume.", throwable))
+ "Attempting to resume. Error: {}", throwable.getMessage()))
.onErrorResume(throwable -> Flux.empty())
.subscribe();
}
Expand All @@ -38,9 +38,10 @@ private Mono<Void> forwardUpdate(OutboxEvent outboxEvent) {
case SUCCESSFUL -> {
groupUpdateService.sendPublicOutboxEventToAll(
PublicOutboxEvent.convertOutboxEvent(outboxEvent));
groupUpdateService.sendOutboxEventToEventOwner(outboxEvent);
groupUpdateService.sendOutboxEventToEventOwner(OutboxEvent.convertEventDataToPublic(outboxEvent));
}
case FAILED -> groupUpdateService.sendOutboxEventToEventOwner(outboxEvent);
case FAILED -> groupUpdateService
.sendOutboxEventToEventOwner(OutboxEvent.convertEventDataToPublic(outboxEvent));
default -> log.error("Unknown event status: {}", outboxEvent.getEventStatus());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public Flux<Group> getGroups() {
.retryWhen(
Retry.backoff(clientProperties.getGroupsRetryAttempts(),
Duration.ofMillis(clientProperties.getGroupsRetryBackoffMilliseconds())))
.doOnError(throwable -> log.error("Group service failed on request to get groups", throwable))
.doOnError(throwable -> log.error("Group service failed on request to get groups. Error: {}",
throwable.getMessage()))
.onErrorMap(throwable -> new GroupServiceUnavailableException(
"Group service failed on request to get groups"));
}
Expand All @@ -58,7 +59,8 @@ public Flux<PublicOutboxEvent> getGroupsAsEvents() {
.retryWhen(
Retry.backoff(clientProperties.getGroupsRetryAttempts(),
Duration.ofMillis(clientProperties.getGroupsRetryBackoffMilliseconds())))
.doOnError(throwable -> log.error("Group service failed on request to get groups as events", throwable))
.doOnError(throwable -> log.error("Group service failed on request to get groups as events: {}",
throwable.getMessage()))
.onErrorMap(throwable -> new GroupServiceUnavailableException(
"Group service failed on request to get groups as events"));
}
Expand All @@ -78,7 +80,7 @@ public Mono<Member> getMyMember(String websocketId) {
Retry.backoff(clientProperties.getGroupsRetryAttempts(),
Duration.ofMillis(clientProperties.getGroupsRetryBackoffMilliseconds())))
.doOnError(throwable -> log.error("Error while getting my member from group service: {}",
throwable.getMessage(), throwable))
throwable.getMessage()))
.onErrorMap(throwable -> new GroupServiceUnavailableException(
"Group service failed on request to get my member"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.grouphq.groupsync.group.sync.state;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.grouphq.groupsync.config.ClientProperties;
Expand Down Expand Up @@ -33,9 +32,6 @@ public DormantState(GroupInitialStateService groupInitialStateService, ClientPro
public Mono<Void> onRequest() {
initialRequest.compareAndSet(null,
groupInitialStateService.initializeGroupState()
.timeout(Duration.ofMillis(clientProperties.getGroupsTimeoutMilliseconds()))
.doOnError(throwable -> log.error("Error initializing group state", throwable))
.doOnSuccess(empty -> log.info("Successfully initialized group state"))
.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE) {
groupInitialStateService.setState(new ReadyState(groupInitialStateService));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public GroupInitialStateService(GroupFetchService groupFetchService, GroupUpdate

public Flux<PublicOutboxEvent> requestCurrentEvents() {
return state.onRequest()
.thenMany(buildState());
.thenMany(buildState())
.doOnError(throwable -> log.error("Error initializing group state: {}", throwable.getMessage()));
}

private Flux<PublicOutboxEvent> buildState() {
Expand All @@ -78,9 +79,11 @@ protected Mono<Void> initializeGroupState() {
Retry.backoff(clientProperties.getGroupsRetryAttempts(),
Duration.ofMillis(clientProperties.getGroupsRetryBackoffMilliseconds()))
.maxBackoff(Duration.ofSeconds(10))
.doBeforeRetry(retrySignal -> log.warn("Retrying due to error", retrySignal.failure())))
.doBeforeRetry(retrySignal -> log.warn("Retrying due to error: {}",
retrySignal.failure().getMessage())))
.doOnComplete(this::createUpdateSubscription)
.doOnError(error -> log.error("Error getting initial state of events", error))
.doOnComplete(() -> log.info("Successfully initialized group state"))
.doOnError(error -> log.error("Error getting initial state of events: {}", error.getMessage()))
.then(Mono.empty())
);
}
Expand Down Expand Up @@ -110,7 +113,8 @@ public void cleanUp() {
protected Flux<PublicOutboxEvent> keepGroupsAndMembersUpToDate() {
return groupUpdateService.publicUpdatesStream()
.flatMap(groupStateService::handleEventUpdate)
.doOnError(throwable -> log.error("Error keeping groups and members up to date", throwable));
.doOnError(throwable -> log.error("Error keeping groups and members up to date. Error: {}",
throwable.getMessage()));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.grouphq.groupsync.groupservice.domain.exceptions.InternalServerError;
import org.grouphq.groupsync.groupservice.domain.members.Member;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEvent;
import org.grouphq.groupsync.groupservice.web.objects.egress.PublicMember;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.security.core.context.ReactiveSecurityContextHolder;
import org.springframework.security.core.context.SecurityContext;
Expand Down Expand Up @@ -42,38 +43,48 @@ public Mono<Boolean> ping() {
@MessageMapping("groups.updates.all")
public Flux<PublicOutboxEvent> getPublicUpdates() {
log.info("Subscribing connection to public updates stream.");
return groupInitialStateService.requestCurrentEvents().concatWith(groupUpdateService.publicUpdatesStream())
return concatPublicUpdates()
.doOnNext(outboxEvent -> log.info("Sending public outbox event: {}", outboxEvent))
.doOnCancel(() -> log.info("User cancelled streaming outbox events."))
.doOnComplete(() -> log.info("Stopped streaming outbox events."))
.doOnError(throwable -> log.error("Error while streaming outbox events. "
+ "Stream will be terminated.", throwable))
+ "Stream will be terminated. Error: {}", throwable.getMessage()))
.onErrorMap(unusedThrowable -> new InternalServerError("Update stream closed"));
}

private Flux<PublicOutboxEvent> concatPublicUpdates() {
final Flux<PublicOutboxEvent> currentEvents = groupInitialStateService.requestCurrentEvents();
final Mono<PublicOutboxEvent> emptyEvent = PublicOutboxEvent.getEmptyEvent();
final Flux<PublicOutboxEvent> publicUpdates = groupUpdateService.publicUpdatesStream();

return Flux.concat(currentEvents, emptyEvent, publicUpdates);
}

@MessageMapping("groups.updates.user")
public Flux<OutboxEvent> getEventOwnerUpdates() {
return groupUpdateService.eventOwnerUpdateStream()
.flatMap(outboxEvent -> monoIsUserEventOwner(outboxEvent)
.flatMap(isOwner -> isOwner ? Mono.just(outboxEvent) : Mono.empty())
.doOnError(throwable -> log.error(
"Error while verifying user ownership on event: {}", outboxEvent, throwable))
"Error while verifying user ownership. Error: {} with event: {}",
throwable.getMessage(), outboxEvent))
.onErrorResume(throwable -> Mono.empty())
)
.doOnCancel(() -> log.info("User cancelled streaming user outbox events."))
.doOnComplete(() -> log.info("Stopped streaming outbox events to users."))
.doOnError(throwable -> log.error("Error while streaming user outbox events. "
+ "Stream will be terminated.", throwable))
+ "Stream will be terminated. Error: {}", throwable.getMessage()))
.onErrorMap(unusedThrowable -> new InternalServerError("User update stream closed"));
}

@MessageMapping("groups.user.member")
public Mono<Member> getMyMember() {
public Mono<PublicMember> getMyMember() {
return ReactiveSecurityContextHolder.getContext()
.map(SecurityContext::getAuthentication)
.flatMap(authentication -> {
final String websocketId = authentication.getName();
return groupFetchService.getMyMember(websocketId);
return groupFetchService.getMyMember(websocketId)
.map(Member::toPublicMember);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.security.Principal;
import java.time.Instant;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.grouphq.groupsync.group.event.GroupEventPublisher;
Expand Down Expand Up @@ -32,7 +31,7 @@ public Mono<Void> createGroup(GroupCreateRequestEvent groupCreateRequestEvent) {
return createCreateRequestWithCurrentAuthorization(groupCreateRequestEvent)
.flatMap(eventWithAuth -> groupEventPublisher.publishGroupCreateRequest(eventWithAuth)
.doOnSuccess(unused -> log.debug("Sent create request: {}", eventWithAuth))
.doOnError(throwable -> log.error("Error while creating group.", throwable))
.doOnError(throwable -> log.error("Error while creating group: {}", throwable.getMessage()))
.onErrorMap(unusedThrowable -> new InternalServerError("Cannot create group"))
);
}
Expand All @@ -42,7 +41,7 @@ private Mono<GroupCreateRequestEvent> createCreateRequestWithCurrentAuthorizatio
return userService.getUserAuthentication()
.map(Principal::getName)
.flatMap(websocketId -> Mono.just(new GroupCreateRequestEvent(
UUID.randomUUID(),
groupCreateRequestEvent.getEventId(),
groupCreateRequestEvent.getTitle(),
groupCreateRequestEvent.getDescription(),
groupCreateRequestEvent.getMaxGroupSize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.security.Principal;
import java.time.Instant;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.grouphq.groupsync.group.event.GroupEventPublisher;
Expand Down Expand Up @@ -32,7 +31,7 @@ public Mono<Void> joinGroup(GroupJoinRequestEvent groupJoinRequestEvent) {
return createJoinRequestWithCurrentAuthorization(groupJoinRequestEvent)
.flatMap(eventWithAuth -> groupEventPublisher.publishGroupJoinRequest(eventWithAuth)
.doOnSuccess(unused -> log.debug("Sent join request: {}", eventWithAuth))
.doOnError(throwable -> log.error("Error while joining group.", throwable))
.doOnError(throwable -> log.error("Error while joining group: {}", throwable.getMessage()))
.onErrorMap(unusedThrowable -> new InternalServerError("Cannot join group"))
);
}
Expand All @@ -42,7 +41,7 @@ private Mono<GroupJoinRequestEvent> createJoinRequestWithCurrentAuthorization(
return userService.getUserAuthentication()
.map(Principal::getName)
.flatMap(websocketId -> Mono.just(new GroupJoinRequestEvent(
UUID.randomUUID(),
groupJoinRequestEvent.getEventId(),
groupJoinRequestEvent.getAggregateId(),
groupJoinRequestEvent.getUsername(),
websocketId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.security.Principal;
import java.time.Instant;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.grouphq.groupsync.group.event.GroupEventPublisher;
Expand Down Expand Up @@ -32,7 +31,7 @@ public Mono<Void> leaveGroup(GroupLeaveRequestEvent groupLeaveRequestEvent) {
return createLeaveRequestWithCurrentAuthorization(groupLeaveRequestEvent)
.flatMap(eventWithAuth -> groupEventPublisher.publishGroupLeaveRequest(eventWithAuth)
.doOnSuccess(unused -> log.debug("Sent leave request: {}", eventWithAuth))
.doOnError(throwable -> log.error("Error while leaving group.", throwable))
.doOnError(throwable -> log.error("Error while leaving group: {}", throwable.getMessage()))
.onErrorMap(unusedThrowable -> new InternalServerError("Cannot leave group"))
);
}
Expand All @@ -42,7 +41,7 @@ private Mono<GroupLeaveRequestEvent> createLeaveRequestWithCurrentAuthorization(
return userService.getUserAuthentication()
.map(Principal::getName)
.flatMap(websocketId -> Mono.just(new GroupLeaveRequestEvent(
UUID.randomUUID(),
groupLeaveRequestEvent.getEventId(),
groupLeaveRequestEvent.getAggregateId(),
groupLeaveRequestEvent.getMemberId(),
websocketId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.security.Principal;
import java.time.Instant;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.grouphq.groupsync.group.event.GroupEventPublisher;
Expand Down Expand Up @@ -32,7 +31,7 @@ public Mono<Void> updateGroupStatus(GroupStatusRequestEvent groupStatusRequestEv
return createUpdateStatusRequestWithCurrentAuthorization(groupStatusRequestEvent)
.flatMap(eventWithAuth -> groupEventPublisher.publishGroupUpdateStatusRequest(eventWithAuth)
.doOnSuccess(unused -> log.debug("Sent update group status request: {}", eventWithAuth))
.doOnError(throwable -> log.error("Error while updating group status.", throwable))
.doOnError(throwable -> log.error("Error while updating group status: {}", throwable.getMessage()))
.onErrorMap(unusedThrowable -> new InternalServerError("Cannot update group status"))
);
}
Expand All @@ -42,7 +41,7 @@ private Mono<GroupStatusRequestEvent> createUpdateStatusRequestWithCurrentAuthor
return userService.getUserAuthentication()
.map(Principal::getName)
.flatMap(websocketId -> Mono.just(new GroupStatusRequestEvent(
UUID.randomUUID(),
groupStatusRequestEvent.getEventId(),
groupStatusRequestEvent.getAggregateId(),
groupStatusRequestEvent.getNewStatus(),
websocketId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ public record Member(

int version
) implements EventDataModel {
public static Member of(String username, Long groupId) {
return new Member(null, UUID.randomUUID(), username, groupId, MemberStatus.ACTIVE, null,
null, null, null, null, 0);
}

public static Member of(UUID websocketId, String username, Long groupId) {
return new Member(null, websocketId, username, groupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.grouphq.groupsync.groupservice.domain.members.Member;
import org.grouphq.groupsync.groupservice.domain.outbox.enums.AggregateType;
import org.grouphq.groupsync.groupservice.domain.outbox.enums.EventStatus;
import org.grouphq.groupsync.groupservice.domain.outbox.enums.EventType;
Expand Down Expand Up @@ -47,4 +48,24 @@ public static OutboxEvent of(UUID eventId, Long aggregateId, AggregateType aggre
eventType, eventData, eventStatus, Instant.now());
}

public static OutboxEvent convertEventDataToPublic(OutboxEvent outboxEvent) {
return switch (outboxEvent.getEventType()) {
case MEMBER_JOINED, MEMBER_LEFT -> convertMember(outboxEvent);
default -> outboxEvent;
};
}

private static OutboxEvent convertMember(OutboxEvent outboxEvent) {
return new OutboxEventBuilder()
.eventId(outboxEvent.getEventId())
.aggregateId(outboxEvent.getAggregateId())
.websocketId(outboxEvent.getWebsocketId())
.aggregateType(outboxEvent.getAggregateType())
.eventType(outboxEvent.getEventType())
.eventData(Member.toPublicMember((Member) outboxEvent.getEventData()))
.eventStatus(outboxEvent.getEventStatus())
.createdDate(outboxEvent.getCreatedDate())
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
*/
public enum AggregateType {
GROUP,
MEMBER
MEMBER,
NONE
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ public enum EventType {
GROUP_UPDATED,
GROUP_DISBANDED,
MEMBER_JOINED,
MEMBER_LEFT
MEMBER_LEFT,
NOTHING
}
Loading

0 comments on commit bab636e

Please sign in to comment.