From 53f0e2f96edc98b07fbcbb944f57efba7cbec8af Mon Sep 17 00:00:00 2001 From: Jonathan Knight Date: Fri, 11 Aug 2023 16:49:56 +0300 Subject: [PATCH] Clean up the EventService class used for gRPC tests and add comments to explain what it does. --- .../grpc/webserver/EventService.java | 67 ++++++++++++++++++- 1 file changed, 65 insertions(+), 2 deletions(-) diff --git a/nima/tests/integration/grpc/server/src/main/java/io/helidon/nima/tests/integration/grpc/webserver/EventService.java b/nima/tests/integration/grpc/server/src/main/java/io/helidon/nima/tests/integration/grpc/webserver/EventService.java index 19c6351204b..28da3326fa3 100644 --- a/nima/tests/integration/grpc/server/src/main/java/io/helidon/nima/tests/integration/grpc/webserver/EventService.java +++ b/nima/tests/integration/grpc/server/src/main/java/io/helidon/nima/tests/integration/grpc/webserver/EventService.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.locks.Lock; @@ -32,10 +33,23 @@ import static io.helidon.nima.grpc.webserver.ResponseHelper.complete; +/** + * A simple service to send events to subscriber listeners. + *

+ * The main purpose of this service it so test long-running bidirectional + * gRPC requests alongside concurrent unary requests so that there are + * multiple in-flight requests at the same time on the same gRPC channel. + */ public class EventService implements GrpcService { + /** + * The registered listeners. + */ private final List listeners = new ArrayList<>(); + /** + * A lock to control mutating the listeners list. + */ private final Lock lock = new ReentrantLock(); @Override @@ -49,6 +63,16 @@ public void update(Routing router) { .bidi("Events", this::events); } + /** + * The events bidirectional request. + *

+ * Clients send subscribe or un-subscribe requests up the channel, and the server + * sends events back down the channel. + * + * @param responses the {@link StreamObserver} to receive event responses + * + * @return the {@link StreamObserver} to receive subscription requests + */ private StreamObserver events(StreamObserver responses) { lock.lock(); try { @@ -60,27 +84,58 @@ private StreamObserver events(StreamObserver observer) { String text = message.getText(); - for (Listener listener : listeners) { + Iterator it = listeners.iterator(); + while (it.hasNext()) { + Listener listener = it.next(); listener.send(text); } complete(observer, Empty.getDefaultInstance()); } + /** + * An implementation of a {@link StreamObserver} used to + * subscribe or unsubscribe listeners for a specific + * bidirectional channel. + */ private class Listener implements StreamObserver { + /** + * The {@link StreamObserver} to send events to. + */ private final StreamObserver responses; + /** + * The set of subscriber identifiers. + */ private final Set subscribers = new HashSet<>(); + /** + * Create a Listener. + * + * @param responses {@link StreamObserver} to send events to + */ public Listener(StreamObserver responses) { this.responses = responses; } + /** + * Send an event to all subscribers. + * + * @param text the message to send + */ public void send(String text) { - for (long id : subscribers) { + Iterator it = subscribers.iterator(); + while(it.hasNext()) { + long id = it.next(); responses.onNext(Events.EventResponse.newBuilder() .setEvent(Events.Event.newBuilder() .setId(id) @@ -89,6 +144,14 @@ public void send(String text) { } } + /** + * Handle messages from the client to subscribe or + * unsubscribe. Listeners are just registered by + * keeping an ID and events are sent to for each + * subscribed ID. + * + * @param request a request to subscribe or unsubscribe + */ @Override public void onNext(Events.EventRequest request) { long id = request.getId();