Skip to content

Commit

Permalink
Clean up the EventService class used for gRPC tests and add comments …
Browse files Browse the repository at this point in the history
…to explain what it does.
  • Loading branch information
thegridman committed Aug 11, 2023
1 parent f5a1676 commit 53f0e2f
Showing 1 changed file with 65 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
Expand All @@ -32,10 +33,23 @@

import static io.helidon.nima.grpc.webserver.ResponseHelper.complete;

/**
* A simple service to send events to subscriber listeners.
* <p/>
* The main purpose of this service it so test long-running bidirectional
* gRPC requests alongside concurrent unary requests so that there are
* multiple in-flight requests at the same time on the same gRPC channel.
*/
public class EventService implements GrpcService {

/**
* The registered listeners.
*/
private final List<Listener> listeners = new ArrayList<>();

/**
* A lock to control mutating the listeners list.
*/
private final Lock lock = new ReentrantLock();

@Override
Expand All @@ -49,6 +63,16 @@ public void update(Routing router) {
.bidi("Events", this::events);
}

/**
* The events bidirectional request.
* <p/>
* Clients send subscribe or un-subscribe requests up the channel, and the server
* sends events back down the channel.
*
* @param responses the {@link StreamObserver} to receive event responses
*
* @return the {@link StreamObserver} to receive subscription requests
*/
private StreamObserver<Events.EventRequest> events(StreamObserver<Events.EventResponse> responses) {
lock.lock();
try {
Expand All @@ -60,27 +84,58 @@ private StreamObserver<Events.EventRequest> events(StreamObserver<Events.EventRe
}
}

/**
* Send an event to all subscribed listeners.
*
* @param message the message to send
* @param observer the observer that is completed when events have been sent
*/
private void send(Events.Message message, StreamObserver<Empty> observer) {
String text = message.getText();
for (Listener listener : listeners) {
Iterator<Listener> it = listeners.iterator();
while (it.hasNext()) {
Listener listener = it.next();
listener.send(text);
}
complete(observer, Empty.getDefaultInstance());
}

/**
* An implementation of a {@link StreamObserver} used to
* subscribe or unsubscribe listeners for a specific
* bidirectional channel.
*/
private class Listener
implements StreamObserver<Events.EventRequest> {

/**
* The {@link StreamObserver} to send events to.
*/
private final StreamObserver<Events.EventResponse> responses;

/**
* The set of subscriber identifiers.
*/
private final Set<Long> subscribers = new HashSet<>();

/**
* Create a Listener.
*
* @param responses {@link StreamObserver} to send events to
*/
public Listener(StreamObserver<Events.EventResponse> responses) {
this.responses = responses;
}

/**
* Send an event to all subscribers.
*
* @param text the message to send
*/
public void send(String text) {
for (long id : subscribers) {
Iterator<Long> it = subscribers.iterator();
while(it.hasNext()) {
long id = it.next();
responses.onNext(Events.EventResponse.newBuilder()
.setEvent(Events.Event.newBuilder()
.setId(id)
Expand All @@ -89,6 +144,14 @@ public void send(String text) {
}
}

/**
* Handle messages from the client to subscribe or
* unsubscribe. Listeners are just registered by
* keeping an ID and events are sent to for each
* subscribed ID.
*
* @param request a request to subscribe or unsubscribe
*/
@Override
public void onNext(Events.EventRequest request) {
long id = request.getId();
Expand Down

0 comments on commit 53f0e2f

Please sign in to comment.