diff --git a/nima/tests/integration/grpc/server/src/main/java/io/helidon/nima/tests/integration/grpc/webserver/EventService.java b/nima/tests/integration/grpc/server/src/main/java/io/helidon/nima/tests/integration/grpc/webserver/EventService.java
index 19c6351204b..28da3326fa3 100644
--- a/nima/tests/integration/grpc/server/src/main/java/io/helidon/nima/tests/integration/grpc/webserver/EventService.java
+++ b/nima/tests/integration/grpc/server/src/main/java/io/helidon/nima/tests/integration/grpc/webserver/EventService.java
@@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
@@ -32,10 +33,23 @@
import static io.helidon.nima.grpc.webserver.ResponseHelper.complete;
+/**
+ * A simple service to send events to subscriber listeners.
+ *
+ * The main purpose of this service it so test long-running bidirectional
+ * gRPC requests alongside concurrent unary requests so that there are
+ * multiple in-flight requests at the same time on the same gRPC channel.
+ */
public class EventService implements GrpcService {
+ /**
+ * The registered listeners.
+ */
private final List listeners = new ArrayList<>();
+ /**
+ * A lock to control mutating the listeners list.
+ */
private final Lock lock = new ReentrantLock();
@Override
@@ -49,6 +63,16 @@ public void update(Routing router) {
.bidi("Events", this::events);
}
+ /**
+ * The events bidirectional request.
+ *
+ * Clients send subscribe or un-subscribe requests up the channel, and the server
+ * sends events back down the channel.
+ *
+ * @param responses the {@link StreamObserver} to receive event responses
+ *
+ * @return the {@link StreamObserver} to receive subscription requests
+ */
private StreamObserver events(StreamObserver responses) {
lock.lock();
try {
@@ -60,27 +84,58 @@ private StreamObserver events(StreamObserver observer) {
String text = message.getText();
- for (Listener listener : listeners) {
+ Iterator it = listeners.iterator();
+ while (it.hasNext()) {
+ Listener listener = it.next();
listener.send(text);
}
complete(observer, Empty.getDefaultInstance());
}
+ /**
+ * An implementation of a {@link StreamObserver} used to
+ * subscribe or unsubscribe listeners for a specific
+ * bidirectional channel.
+ */
private class Listener
implements StreamObserver {
+ /**
+ * The {@link StreamObserver} to send events to.
+ */
private final StreamObserver responses;
+ /**
+ * The set of subscriber identifiers.
+ */
private final Set subscribers = new HashSet<>();
+ /**
+ * Create a Listener.
+ *
+ * @param responses {@link StreamObserver} to send events to
+ */
public Listener(StreamObserver responses) {
this.responses = responses;
}
+ /**
+ * Send an event to all subscribers.
+ *
+ * @param text the message to send
+ */
public void send(String text) {
- for (long id : subscribers) {
+ Iterator it = subscribers.iterator();
+ while(it.hasNext()) {
+ long id = it.next();
responses.onNext(Events.EventResponse.newBuilder()
.setEvent(Events.Event.newBuilder()
.setId(id)
@@ -89,6 +144,14 @@ public void send(String text) {
}
}
+ /**
+ * Handle messages from the client to subscribe or
+ * unsubscribe. Listeners are just registered by
+ * keeping an ID and events are sent to for each
+ * subscribed ID.
+ *
+ * @param request a request to subscribe or unsubscribe
+ */
@Override
public void onNext(Events.EventRequest request) {
long id = request.getId();