diff --git a/nima/http2/webserver/src/main/java/io/helidon/nima/http2/webserver/Http2Connection.java b/nima/http2/webserver/src/main/java/io/helidon/nima/http2/webserver/Http2Connection.java index 44abe02eb47..66bbf8dce80 100755 --- a/nima/http2/webserver/src/main/java/io/helidon/nima/http2/webserver/Http2Connection.java +++ b/nima/http2/webserver/src/main/java/io/helidon/nima/http2/webserver/Http2Connection.java @@ -242,23 +242,6 @@ public void clientSettings(Http2Settings http2Settings) { flowControl.resetMaxFrameSize(maxFrameSize.intValue()); } - - // Set server MAX_CONCURRENT_STREAMS limit when client sends number lower than hard limit - // from configuration. Refuse settings if client sends larger number than is configured. - this.clientSettings.presentValue(Http2Setting.MAX_CONCURRENT_STREAMS) - .ifPresent(it -> { - if (http2Config.maxConcurrentStreams() >= it) { - maxClientConcurrentStreams = it; - } else { - Http2GoAway frame = - new Http2GoAway(0, - Http2ErrorCode.PROTOCOL, - "Value of maximum concurrent streams limit " + it - + " exceeded hard limit value " - + http2Config.maxConcurrentStreams()); - connectionWriter.write(frame.toFrameData(clientSettings, 0, Http2Flag.NoFlags.create())); - } - }); } /** diff --git a/nima/tests/integration/grpc/server/pom.xml b/nima/tests/integration/grpc/server/pom.xml index 81da717c324..fd740a9243f 100644 --- a/nima/tests/integration/grpc/server/pom.xml +++ b/nima/tests/integration/grpc/server/pom.xml @@ -32,6 +32,11 @@ io.helidon.nima.grpc helidon-nima-grpc-webserver + + com.google.protobuf + protobuf-java + + javax.annotation diff --git a/nima/tests/integration/grpc/server/src/main/java/io/helidon/nima/tests/integration/grpc/webserver/EventService.java b/nima/tests/integration/grpc/server/src/main/java/io/helidon/nima/tests/integration/grpc/webserver/EventService.java new file mode 100644 index 00000000000..28da3326fa3 --- /dev/null +++ b/nima/tests/integration/grpc/server/src/main/java/io/helidon/nima/tests/integration/grpc/webserver/EventService.java @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.nima.tests.integration.grpc.webserver; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import io.helidon.nima.grpc.events.Events; +import io.helidon.nima.grpc.webserver.GrpcService; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Empty; +import io.grpc.stub.StreamObserver; + +import static io.helidon.nima.grpc.webserver.ResponseHelper.complete; + +/** + * A simple service to send events to subscriber listeners. + *

+ * The main purpose of this service it so test long-running bidirectional + * gRPC requests alongside concurrent unary requests so that there are + * multiple in-flight requests at the same time on the same gRPC channel. + */ +public class EventService implements GrpcService { + + /** + * The registered listeners. + */ + private final List listeners = new ArrayList<>(); + + /** + * A lock to control mutating the listeners list. + */ + private final Lock lock = new ReentrantLock(); + + @Override + public Descriptors.FileDescriptor proto() { + return Events.getDescriptor(); + } + + @Override + public void update(Routing router) { + router.unary("Send", this::send) + .bidi("Events", this::events); + } + + /** + * The events bidirectional request. + *

+ * Clients send subscribe or un-subscribe requests up the channel, and the server + * sends events back down the channel. + * + * @param responses the {@link StreamObserver} to receive event responses + * + * @return the {@link StreamObserver} to receive subscription requests + */ + private StreamObserver events(StreamObserver responses) { + lock.lock(); + try { + Listener listener = new Listener(responses); + listeners.add(listener); + return listener; + } finally { + lock.unlock(); + } + } + + /** + * Send an event to all subscribed listeners. + * + * @param message the message to send + * @param observer the observer that is completed when events have been sent + */ + private void send(Events.Message message, StreamObserver observer) { + String text = message.getText(); + Iterator it = listeners.iterator(); + while (it.hasNext()) { + Listener listener = it.next(); + listener.send(text); + } + complete(observer, Empty.getDefaultInstance()); + } + + /** + * An implementation of a {@link StreamObserver} used to + * subscribe or unsubscribe listeners for a specific + * bidirectional channel. + */ + private class Listener + implements StreamObserver { + + /** + * The {@link StreamObserver} to send events to. + */ + private final StreamObserver responses; + + /** + * The set of subscriber identifiers. + */ + private final Set subscribers = new HashSet<>(); + + /** + * Create a Listener. + * + * @param responses {@link StreamObserver} to send events to + */ + public Listener(StreamObserver responses) { + this.responses = responses; + } + + /** + * Send an event to all subscribers. + * + * @param text the message to send + */ + public void send(String text) { + Iterator it = subscribers.iterator(); + while(it.hasNext()) { + long id = it.next(); + responses.onNext(Events.EventResponse.newBuilder() + .setEvent(Events.Event.newBuilder() + .setId(id) + .setText(text).build()) + .build()); + } + } + + /** + * Handle messages from the client to subscribe or + * unsubscribe. Listeners are just registered by + * keeping an ID and events are sent to for each + * subscribed ID. + * + * @param request a request to subscribe or unsubscribe + */ + @Override + public void onNext(Events.EventRequest request) { + long id = request.getId(); + if (request.getAction() == Events.EventRequest.Action.SUBSCRIBE) { + subscribers.add(id); + responses.onNext(Events.EventResponse.newBuilder() + .setSubscribed(Events.Subscribed.newBuilder().setId(id).build()) + .build()); + } else { + subscribers.remove(id); + } + } + + @Override + public void onError(Throwable throwable) { + close(); + } + + @Override + public void onCompleted() { + close(); + } + + private void close() { + lock.lock(); + try { + subscribers.clear(); + listeners.remove(this); + } finally { + lock.unlock(); + } + } + } +} diff --git a/nima/tests/integration/grpc/server/src/main/proto/events.proto b/nima/tests/integration/grpc/server/src/main/proto/events.proto new file mode 100644 index 00000000000..1b44447608f --- /dev/null +++ b/nima/tests/integration/grpc/server/src/main/proto/events.proto @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +syntax = "proto3"; +option java_package = "io.helidon.nima.grpc.events"; + +import "google/protobuf/empty.proto"; + +service EventService { + rpc Send (Message) returns (google.protobuf.Empty) {} + rpc Events (stream EventRequest) returns (stream EventResponse) {} +} + +message Message { + string text = 2; +} + +message EventRequest { + int64 id = 1; + enum Action { + SUBSCRIBE = 0; + UNSUBSCRIBE = 1; + } + Action action = 2; +} + +message EventResponse { + oneof response_type { + Subscribed subscribed = 1; + Unsubscribed unsubscribed = 2; + Event event = 3; + } +} + +message Subscribed { + int64 id = 1; +} + +message Unsubscribed { + int64 id = 1; +} + +message Event { + int64 id = 1; + string text = 2; +} diff --git a/nima/tests/integration/grpc/server/src/test/java/io/helidon/nima/tests/integration/grpc/webserver/EventsTest.java b/nima/tests/integration/grpc/server/src/test/java/io/helidon/nima/tests/integration/grpc/webserver/EventsTest.java new file mode 100644 index 00000000000..6b504a258a3 --- /dev/null +++ b/nima/tests/integration/grpc/server/src/test/java/io/helidon/nima/tests/integration/grpc/webserver/EventsTest.java @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.nima.tests.integration.grpc.webserver; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.helidon.nima.grpc.events.EventServiceGrpc; +import io.helidon.nima.grpc.events.Events; +import io.helidon.nima.grpc.webserver.GrpcRouting; +import io.helidon.nima.testing.junit5.webserver.ServerTest; +import io.helidon.nima.testing.junit5.webserver.SetUpRoute; +import io.helidon.nima.webserver.Router; +import io.helidon.nima.webserver.WebServer; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +@ServerTest +public class EventsTest { + private final int port; + + protected ManagedChannel channel; + + protected EventServiceGrpc.EventServiceBlockingStub blockingStub; + + protected EventServiceGrpc.EventServiceStub stub; + + EventsTest(WebServer server) { + this.port = server.port(); + } + + @SetUpRoute + static void routing(Router.RouterBuilder router) { + router.addRouting(GrpcRouting.builder().service(new EventService())); + } + + @BeforeEach + void beforeEach() { + channel = ManagedChannelBuilder.forAddress("localhost", port) + .usePlaintext() + .build(); + blockingStub = EventServiceGrpc.newBlockingStub(channel); + stub = EventServiceGrpc.newStub(channel); + } + + @AfterEach + void afterEach() throws InterruptedException { + blockingStub = null; + stub = null; + channel.shutdown(); + if (!channel.awaitTermination(10, TimeUnit.SECONDS)) { + System.err.println("Failed to terminate channel"); + } + if (!channel.isTerminated()) { + System.err.println("Channel is not terminated!!!"); + } + } + + @Test + public void shouldReceiveEvents() throws Exception { + TestObserver observer = new TestObserver(); + + StreamObserver requests = stub.events(observer); + + CountDownLatch latch = observer.setLatch(2); + requests.onNext(subscribe(19L)); + requests.onNext(subscribe(20L)); + + assertThat(latch.await(1, TimeUnit.MINUTES), is(true)); + + List responses = observer.getResponses(); + assertThat(responses.size(), is(2)); + + Events.EventResponse response = responses.get(0); + assertThat(response.getResponseTypeCase(), is(Events.EventResponse.ResponseTypeCase.SUBSCRIBED)); + Events.Subscribed subscribed = response.getSubscribed(); + assertThat(subscribed.getId(), is(19L)); + response = responses.get(1); + assertThat(response.getResponseTypeCase(), is(Events.EventResponse.ResponseTypeCase.SUBSCRIBED)); + subscribed = response.getSubscribed(); + assertThat(subscribed.getId(), is(20L)); + + observer.clear(); + latch = observer.setLatch(2); + blockingStub.send(message("foo")); + + assertThat(latch.await(1, TimeUnit.MINUTES), is(true)); + + responses = observer.getResponses(); + assertThat(responses.size(), is(2)); + + response = responses.get(0); + assertThat(response.getResponseTypeCase(), is(Events.EventResponse.ResponseTypeCase.EVENT)); + Events.Event event = response.getEvent(); + assertThat(event.getId(), is(19L)); + assertThat(event.getText(), is("foo")); + + response = responses.get(1); + assertThat(response.getResponseTypeCase(), is(Events.EventResponse.ResponseTypeCase.EVENT)); + event = response.getEvent(); + assertThat(event.getId(), is(20L)); + assertThat(event.getText(), is("foo")); + } + + private Events.EventRequest subscribe(long id) { + return Events.EventRequest.newBuilder() + .setAction(Events.EventRequest.Action.SUBSCRIBE) + .setId(id) + .build(); + } + + private Events.EventRequest unsubscribe(long id) { + return Events.EventRequest.newBuilder() + .setAction(Events.EventRequest.Action.UNSUBSCRIBE) + .setId(id) + .build(); + } + + private Events.Message message(String text) { + return Events.Message.newBuilder() + .setText(text) + .build(); + } + + private static class TestObserver + implements StreamObserver { + + private final List responses = new ArrayList<>(); + + private CountDownLatch latch; + + public CountDownLatch setLatch(int count) { + latch = new CountDownLatch(count); + return latch; + } + + public List getResponses() { + return responses; + } + + public void clear() { + responses.clear(); + } + + @Override + public void onNext(Events.EventResponse response) { + responses.add(response); + if (latch != null) { + latch.countDown(); + } + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + } +}