From 90bb213612c24ebf29b8cc8891cb283d7aa0225a Mon Sep 17 00:00:00 2001 From: Dmitry Volodin Date: Tue, 23 May 2017 18:45:00 +0300 Subject: [PATCH] CAMEL-11237: Changes based on @nicolaferraro code review comments --- .../src/main/docs/grpc-component.adoc | 6 ++-- .../component/grpc/GrpcConfiguration.java | 35 +++++++++++++++++++ .../camel/component/grpc/GrpcConsumer.java | 18 ++++++---- .../GrpcRequestAggregationStreamObserver.java | 35 +++++++++++++------ .../GrpcRequestPropagationStreamObserver.java | 34 +++++++++++++++--- .../grpc/GrpcConsumerPropagationTest.java | 35 ++++++++++++------- 6 files changed, 127 insertions(+), 36 deletions(-) diff --git a/components/camel-grpc/src/main/docs/grpc-component.adoc b/components/camel-grpc/src/main/docs/grpc-component.adoc index d12bf85b59ce8..a74c6dfc5228a 100644 --- a/components/camel-grpc/src/main/docs/grpc-component.adoc +++ b/components/camel-grpc/src/main/docs/grpc-component.adoc @@ -47,7 +47,7 @@ with the following path and query parameters: | **service** | *Required* Fully qualified service name from the protocol buffer descriptor file (package dot service definition name) | | String |======================================================================= -#### Query Parameters (10 parameters): +#### Query Parameters (12 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= @@ -55,6 +55,8 @@ with the following path and query parameters: | **host** (common) | The gRPC server host name | | String | **port** (common) | The gRPC server port | | int | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean +| **forwardOnCompleted** (consumer) | Determines if onCompleted events should be pushed to the Camel route. | false | boolean +| **forwardOnError** (consumer) | Determines if onError events should be pushed to the Camel route. Exceptions will be set as message body. | false | boolean | **processingStrategy** (consumer) | This option specifies the top-level strategy for processing service requests and responses in streaming mode. If an aggregation strategy is selected all requests will be accumulated in the list then transferred to the flow and the accumulated responses will be sent to the sender. If a propagation strategy is selected request is sent to the stream and the response will be immediately sent back to the sender. | | GrpcProcessing Strategies | **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern @@ -92,7 +94,7 @@ The table below shows the types of objects in the message body, depending on the |Header name |Description|Possible values |*CamelGrpcMethodName*|Method name handled by the consumer service| -|*CamelGrpcEventType*|Received event type from the sended request|onNext, onCompleted or onError +|*CamelGrpcEventType*|Received event type from the sent request|onNext, onCompleted or onError |*CamelGrpcUserAgent*|If provided, the given agent will prepend the gRPC library's user agent information| |======================================================================= diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java index c298c021af66e..123de614352ef 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java @@ -27,18 +27,30 @@ public class GrpcConfiguration { @UriPath @Metadata(required = "true") private String service; + @UriParam(label = "producer") private String method; + @UriParam private String host; + @UriParam private int port; + @UriParam(label = "producer") private String target; + @UriParam(label = "producer", defaultValue = "true") private Boolean usePlainText = true; + @UriParam(label = "consumer") private GrpcProcessingStrategies processingStrategy = GrpcProcessingStrategies.PROPAGATION; + + @UriParam(label = "consumer", defaultValue = "false") + private boolean forwardOnCompleted; + + @UriParam(label = "consumer", defaultValue = "false") + private boolean forwardOnError; private String serviceName; private String servicePackage; @@ -126,6 +138,29 @@ public void setProcessingStrategy(GrpcProcessingStrategies processingStrategy) { this.processingStrategy = processingStrategy; } + /** + * Determines if onCompleted events should be pushed to the Camel route. + */ + public void setForwardOnCompleted(boolean forwardOnCompleted) { + this.forwardOnCompleted = forwardOnCompleted; + } + + public boolean isForwardOnCompleted() { + return forwardOnCompleted; + } + + /** + * Determines if onError events should be pushed to the Camel route. + * Exceptions will be set as message body. + */ + public void setForwardOnError(boolean forwardOnError) { + this.forwardOnError = forwardOnError; + } + + public boolean isForwardOnError() { + return forwardOnError; + } + /** * The service name extracted from the full service name */ diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java index 3bcdec009bdaf..27a7d4aaf9839 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java @@ -111,17 +111,21 @@ public boolean process(Exchange exchange, AsyncCallback callback) { } public void onCompleted(Exchange exchange) { - exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED); - doSend(exchange, done -> { - }); + if (configuration.isForwardOnCompleted()) { + exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED); + doSend(exchange, done -> { + }); + } } public void onError(Exchange exchange, Throwable error) { - exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR); - exchange.getIn().setBody(error); + if (configuration.isForwardOnError()) { + exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR); + exchange.getIn().setBody(error); - doSend(exchange, done -> { - }); + doSend(exchange, done -> { + }); + } } private boolean doSend(Exchange exchange, AsyncCallback callback) { diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java index 145029e29e877..9f79b22bd7b14 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java @@ -19,6 +19,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import io.grpc.stub.StreamObserver; import org.apache.camel.component.grpc.GrpcConsumer; @@ -47,23 +48,37 @@ public void onError(Throwable t) { } @Override - @SuppressWarnings("unchecked") public void onCompleted() { + CountDownLatch latch = new CountDownLatch(1); + Object responseBody = null; + exchange.getIn().setBody(requestList); exchange.getIn().setHeaders(headers); consumer.process(exchange, doneSync -> { + latch.countDown(); }); + + try { + latch.await(); + + if (exchange.hasOut()) { + responseBody = exchange.getOut().getBody(); + } else { + responseBody = exchange.getIn().getBody(); + } - Object responseBody = exchange.getIn().getBody(); - if (responseBody instanceof List) { - List responseList = (List)responseBody; - responseList.forEach((responseItem) -> { - responseObserver.onNext(responseItem); - }); - } else { - responseObserver.onNext(responseBody); + if (responseBody instanceof List) { + List responseList = (List)responseBody; + responseList.forEach((responseItem) -> { + responseObserver.onNext(responseItem); + }); + } else { + responseObserver.onNext(responseBody); + } + responseObserver.onCompleted(); + } catch (InterruptedException e) { + responseObserver.onError(e); } - responseObserver.onCompleted(); } } diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java index ae51100ff23fc..632ff2d9d97bd 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java @@ -16,7 +16,9 @@ */ package org.apache.camel.component.grpc.server; +import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import io.grpc.stub.StreamObserver; import org.apache.camel.component.grpc.GrpcConsumer; @@ -34,17 +36,39 @@ public GrpcRequestPropagationStreamObserver(GrpcEndpoint endpoint, GrpcConsumer @Override public void onNext(Object request) { + CountDownLatch latch = new CountDownLatch(1); + Object responseBody = null; + exchange = endpoint.createExchange(); exchange.getIn().setBody(request); exchange.getIn().setHeaders(headers); + consumer.process(exchange, doneSync -> { + latch.countDown(); }); - if (exchange.hasOut()) { - responseObserver.onNext(exchange.getOut().getBody()); - } else { - responseObserver.onNext(exchange.getIn().getBody()); + + try { + latch.await(); + + if (exchange.hasOut()) { + responseBody = exchange.getOut().getBody(); + } else { + responseBody = exchange.getIn().getBody(); + } + + if (responseBody instanceof List) { + List responseList = (List)responseBody; + responseList.forEach((responseItem) -> { + responseObserver.onNext(responseItem); + }); + } else { + responseObserver.onNext(responseBody); + } + responseObserver.onCompleted(); + + } catch (InterruptedException e) { + responseObserver.onError(e); } - responseObserver.onCompleted(); } @Override diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java index d4a06418e5bd5..e7cb8c7ffab1b 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java @@ -35,23 +35,28 @@ public class GrpcConsumerPropagationTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerPropagationTest.class); - private static final int GRPC_ASYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable(); + private static final int GRPC_ASYNC_NEXT_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable(); + private static final int GRPC_ASYNC_COMPLETED_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable(); private static final int GRPC_TEST_PING_ID = 1; private static final String GRPC_TEST_PING_VALUE = "PING"; private static final String GRPC_TEST_PONG_VALUE = "PONG"; - private ManagedChannel asyncRequestChannel; - private PingPongGrpc.PingPongStub asyncNonBlockingStub; + private ManagedChannel asyncOnNextChannel; + private ManagedChannel asyncOnCompletedChannel; + private PingPongGrpc.PingPongStub asyncOnNextStub; + private PingPongGrpc.PingPongStub asyncOnCompletedStub; @Before public void startGrpcChannels() { - asyncRequestChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_REQUEST_TEST_PORT).usePlaintext(true).build(); - asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); + asyncOnNextChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_NEXT_REQUEST_TEST_PORT).usePlaintext(true).build(); + asyncOnCompletedChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_COMPLETED_REQUEST_TEST_PORT).usePlaintext(true).build(); + asyncOnNextStub = PingPongGrpc.newStub(asyncOnNextChannel); + asyncOnCompletedStub = PingPongGrpc.newStub(asyncOnCompletedChannel); } @After public void stopGrpcChannels() throws Exception { - asyncRequestChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + asyncOnNextChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } @Test @@ -62,14 +67,15 @@ public void testOnNextPropagation() throws Exception { PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); - StreamObserver requestObserver = asyncNonBlockingStub.pingAsyncSync(responseObserver); + StreamObserver requestObserver = asyncOnNextStub.pingAsyncSync(responseObserver); requestObserver.onNext(pingRequest); latch.await(5, TimeUnit.SECONDS); - MockEndpoint mockEndpoint = getMockEndpoint("mock:async-propagation"); + MockEndpoint mockEndpoint = getMockEndpoint("mock:async-on-next-propagation"); mockEndpoint.expectedMessageCount(1); mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT); mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_METHOD_NAME_HEADER, "pingAsyncSync"); + mockEndpoint.assertIsSatisfied(); PongResponse pongResponse = responseObserver.getPongResponse(); assertNotNull(pongResponse); @@ -85,14 +91,15 @@ public void testOnCompletedPropagation() throws Exception { PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); - StreamObserver requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver); + StreamObserver requestObserver = asyncOnCompletedStub.pingAsyncAsync(responseObserver); requestObserver.onCompleted(); latch.await(5, TimeUnit.SECONDS); - MockEndpoint mockEndpoint = getMockEndpoint("mock:async-propagation"); + MockEndpoint mockEndpoint = getMockEndpoint("mock:async-on-completed-propagation"); mockEndpoint.expectedMessageCount(1); mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED); mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_METHOD_NAME_HEADER, "pingAsyncAsync"); + mockEndpoint.assertIsSatisfied(); } @Override @@ -101,8 +108,12 @@ protected RouteBuilder createRouteBuilder() throws Exception { @Override public void configure() { - from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&host=localhost&port=" + GRPC_ASYNC_REQUEST_TEST_PORT) - .to("mock:async-propagation") + from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&host=localhost&port=" + GRPC_ASYNC_NEXT_REQUEST_TEST_PORT) + .to("mock:async-on-next-propagation") + .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); + + from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&forwardOnCompleted=true&host=localhost&port=" + GRPC_ASYNC_COMPLETED_REQUEST_TEST_PORT) + .to("mock:async-on-completed-propagation") .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); } };