Skip to content

Commit

Permalink
CAMEL-11237: Changes based on @nicolaferraro code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dmvolod committed May 23, 2017
1 parent 3025f91 commit 90bb213
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 36 deletions.
6 changes: 4 additions & 2 deletions components/camel-grpc/src/main/docs/grpc-component.adoc
Expand Up @@ -47,14 +47,16 @@ with the following path and query parameters:
| **service** | *Required* Fully qualified service name from the protocol buffer descriptor file (package dot service definition name) | | String
|=======================================================================

#### Query Parameters (10 parameters):
#### Query Parameters (12 parameters):

[width="100%",cols="2,5,^1,2",options="header"]
|=======================================================================
| Name | Description | Default | Type
| **host** (common) | The gRPC server host name | | String
| **port** (common) | The gRPC server port | | int
| **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean
| **forwardOnCompleted** (consumer) | Determines if onCompleted events should be pushed to the Camel route. | false | boolean
| **forwardOnError** (consumer) | Determines if onError events should be pushed to the Camel route. Exceptions will be set as message body. | false | boolean
| **processingStrategy** (consumer) | This option specifies the top-level strategy for processing service requests and responses in streaming mode. If an aggregation strategy is selected all requests will be accumulated in the list then transferred to the flow and the accumulated responses will be sent to the sender. If a propagation strategy is selected request is sent to the stream and the response will be immediately sent back to the sender. | | GrpcProcessing Strategies
| **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler
| **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern
Expand Down Expand Up @@ -92,7 +94,7 @@ The table below shows the types of objects in the message body, depending on the
|Header name |Description|Possible values

|*CamelGrpcMethodName*|Method name handled by the consumer service|
|*CamelGrpcEventType*|Received event type from the sended request|onNext, onCompleted or onError
|*CamelGrpcEventType*|Received event type from the sent request|onNext, onCompleted or onError
|*CamelGrpcUserAgent*|If provided, the given agent will prepend the gRPC library's user agent information|

|=======================================================================
Expand Down
Expand Up @@ -27,18 +27,30 @@ public class GrpcConfiguration {
@UriPath
@Metadata(required = "true")
private String service;

@UriParam(label = "producer")
private String method;

@UriParam
private String host;

@UriParam
private int port;

@UriParam(label = "producer")
private String target;

@UriParam(label = "producer", defaultValue = "true")
private Boolean usePlainText = true;

@UriParam(label = "consumer")
private GrpcProcessingStrategies processingStrategy = GrpcProcessingStrategies.PROPAGATION;

@UriParam(label = "consumer", defaultValue = "false")
private boolean forwardOnCompleted;

@UriParam(label = "consumer", defaultValue = "false")
private boolean forwardOnError;

private String serviceName;
private String servicePackage;
Expand Down Expand Up @@ -126,6 +138,29 @@ public void setProcessingStrategy(GrpcProcessingStrategies processingStrategy) {
this.processingStrategy = processingStrategy;
}

/**
* Determines if onCompleted events should be pushed to the Camel route.
*/
public void setForwardOnCompleted(boolean forwardOnCompleted) {
this.forwardOnCompleted = forwardOnCompleted;
}

public boolean isForwardOnCompleted() {
return forwardOnCompleted;
}

/**
* Determines if onError events should be pushed to the Camel route.
* Exceptions will be set as message body.
*/
public void setForwardOnError(boolean forwardOnError) {
this.forwardOnError = forwardOnError;
}

public boolean isForwardOnError() {
return forwardOnError;
}

/**
* The service name extracted from the full service name
*/
Expand Down
Expand Up @@ -111,17 +111,21 @@ public boolean process(Exchange exchange, AsyncCallback callback) {
}

public void onCompleted(Exchange exchange) {
exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED);
doSend(exchange, done -> {
});
if (configuration.isForwardOnCompleted()) {
exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED);
doSend(exchange, done -> {
});
}
}

public void onError(Exchange exchange, Throwable error) {
exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR);
exchange.getIn().setBody(error);
if (configuration.isForwardOnError()) {
exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR);
exchange.getIn().setBody(error);

doSend(exchange, done -> {
});
doSend(exchange, done -> {
});
}
}

private boolean doSend(Exchange exchange, AsyncCallback callback) {
Expand Down
Expand Up @@ -19,6 +19,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import io.grpc.stub.StreamObserver;
import org.apache.camel.component.grpc.GrpcConsumer;
Expand Down Expand Up @@ -47,23 +48,37 @@ public void onError(Throwable t) {
}

@Override
@SuppressWarnings("unchecked")
public void onCompleted() {
CountDownLatch latch = new CountDownLatch(1);
Object responseBody = null;

exchange.getIn().setBody(requestList);
exchange.getIn().setHeaders(headers);

consumer.process(exchange, doneSync -> {
latch.countDown();
});

try {
latch.await();

if (exchange.hasOut()) {
responseBody = exchange.getOut().getBody();
} else {
responseBody = exchange.getIn().getBody();
}

Object responseBody = exchange.getIn().getBody();
if (responseBody instanceof List) {
List<Object> responseList = (List<Object>)responseBody;
responseList.forEach((responseItem) -> {
responseObserver.onNext(responseItem);
});
} else {
responseObserver.onNext(responseBody);
if (responseBody instanceof List) {
List<?> responseList = (List<?>)responseBody;
responseList.forEach((responseItem) -> {
responseObserver.onNext(responseItem);
});
} else {
responseObserver.onNext(responseBody);
}
responseObserver.onCompleted();
} catch (InterruptedException e) {
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
}
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.camel.component.grpc.server;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import io.grpc.stub.StreamObserver;
import org.apache.camel.component.grpc.GrpcConsumer;
Expand All @@ -34,17 +36,39 @@ public GrpcRequestPropagationStreamObserver(GrpcEndpoint endpoint, GrpcConsumer

@Override
public void onNext(Object request) {
CountDownLatch latch = new CountDownLatch(1);
Object responseBody = null;

exchange = endpoint.createExchange();
exchange.getIn().setBody(request);
exchange.getIn().setHeaders(headers);

consumer.process(exchange, doneSync -> {
latch.countDown();
});
if (exchange.hasOut()) {
responseObserver.onNext(exchange.getOut().getBody());
} else {
responseObserver.onNext(exchange.getIn().getBody());

try {
latch.await();

if (exchange.hasOut()) {
responseBody = exchange.getOut().getBody();
} else {
responseBody = exchange.getIn().getBody();
}

if (responseBody instanceof List) {
List<?> responseList = (List<?>)responseBody;
responseList.forEach((responseItem) -> {
responseObserver.onNext(responseItem);
});
} else {
responseObserver.onNext(responseBody);
}
responseObserver.onCompleted();

} catch (InterruptedException e) {
responseObserver.onError(e);
}
responseObserver.onCompleted();
}

@Override
Expand Down
Expand Up @@ -35,23 +35,28 @@
public class GrpcConsumerPropagationTest extends CamelTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerPropagationTest.class);

private static final int GRPC_ASYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable();
private static final int GRPC_ASYNC_NEXT_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable();
private static final int GRPC_ASYNC_COMPLETED_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable();
private static final int GRPC_TEST_PING_ID = 1;
private static final String GRPC_TEST_PING_VALUE = "PING";
private static final String GRPC_TEST_PONG_VALUE = "PONG";

private ManagedChannel asyncRequestChannel;
private PingPongGrpc.PingPongStub asyncNonBlockingStub;
private ManagedChannel asyncOnNextChannel;
private ManagedChannel asyncOnCompletedChannel;
private PingPongGrpc.PingPongStub asyncOnNextStub;
private PingPongGrpc.PingPongStub asyncOnCompletedStub;

@Before
public void startGrpcChannels() {
asyncRequestChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_REQUEST_TEST_PORT).usePlaintext(true).build();
asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);
asyncOnNextChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_NEXT_REQUEST_TEST_PORT).usePlaintext(true).build();
asyncOnCompletedChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_COMPLETED_REQUEST_TEST_PORT).usePlaintext(true).build();
asyncOnNextStub = PingPongGrpc.newStub(asyncOnNextChannel);
asyncOnCompletedStub = PingPongGrpc.newStub(asyncOnCompletedChannel);
}

@After
public void stopGrpcChannels() throws Exception {
asyncRequestChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
asyncOnNextChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}

@Test
Expand All @@ -62,14 +67,15 @@ public void testOnNextPropagation() throws Exception {
PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);

StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncSync(responseObserver);
StreamObserver<PingRequest> requestObserver = asyncOnNextStub.pingAsyncSync(responseObserver);
requestObserver.onNext(pingRequest);
latch.await(5, TimeUnit.SECONDS);

MockEndpoint mockEndpoint = getMockEndpoint("mock:async-propagation");
MockEndpoint mockEndpoint = getMockEndpoint("mock:async-on-next-propagation");
mockEndpoint.expectedMessageCount(1);
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT);
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_METHOD_NAME_HEADER, "pingAsyncSync");
mockEndpoint.assertIsSatisfied();

PongResponse pongResponse = responseObserver.getPongResponse();
assertNotNull(pongResponse);
Expand All @@ -85,14 +91,15 @@ public void testOnCompletedPropagation() throws Exception {
PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);

StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver);
StreamObserver<PingRequest> requestObserver = asyncOnCompletedStub.pingAsyncAsync(responseObserver);
requestObserver.onCompleted();
latch.await(5, TimeUnit.SECONDS);

MockEndpoint mockEndpoint = getMockEndpoint("mock:async-propagation");
MockEndpoint mockEndpoint = getMockEndpoint("mock:async-on-completed-propagation");
mockEndpoint.expectedMessageCount(1);
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED);
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_METHOD_NAME_HEADER, "pingAsyncAsync");
mockEndpoint.assertIsSatisfied();
}

@Override
Expand All @@ -101,8 +108,12 @@ protected RouteBuilder createRouteBuilder() throws Exception {
@Override
public void configure() {

from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&host=localhost&port=" + GRPC_ASYNC_REQUEST_TEST_PORT)
.to("mock:async-propagation")
from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&host=localhost&port=" + GRPC_ASYNC_NEXT_REQUEST_TEST_PORT)
.to("mock:async-on-next-propagation")
.bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");

from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&forwardOnCompleted=true&host=localhost&port=" + GRPC_ASYNC_COMPLETED_REQUEST_TEST_PORT)
.to("mock:async-on-completed-propagation")
.bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");
}
};
Expand Down

0 comments on commit 90bb213

Please sign in to comment.