Skip to content
Permalink
Browse files
Fix MockEndpoint usage in gRPC tests
  • Loading branch information
jamesnetherton committed Apr 28, 2022
1 parent 44627fc commit 7288ad870e2673360ec5dff7b481f045f6d03581
Showing 5 changed files with 202 additions and 75 deletions.
@@ -55,6 +55,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jsonb</artifactId>
</dependency>

<!-- test dependencies -->
<dependency>
@@ -72,6 +76,10 @@
<artifactId>camel-quarkus-integration-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
</dependencies>

<build>
@@ -16,7 +16,9 @@
*/
package org.apache.camel.quarkus.component.grpc.it;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@@ -35,11 +37,6 @@
import org.apache.camel.quarkus.component.grpc.it.model.PingRequest;
import org.apache.camel.quarkus.component.grpc.it.model.PongResponse;

import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_HEADER;
import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED;
import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR;
import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT;
import static org.apache.camel.component.grpc.GrpcConstants.GRPC_METHOD_NAME_HEADER;
import static org.apache.camel.quarkus.component.grpc.it.GrpcRoute.PING_PONG_SERVICE;

@Path("/grpc")
@@ -69,96 +66,107 @@ public String producer(String pingName, @QueryParam("pingId") int pingId) throws

@Path("/forwardOnCompleted")
@GET
public void forwardOnCompleted() throws InterruptedException {
MockEndpoint endpoint = context.getEndpoint("mock:forwardOnCompleted", MockEndpoint.class);
endpoint.expectedMessageCount(1);
endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER, GRPC_EVENT_TYPE_ON_COMPLETED);
endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER, "pingAsyncAsync");
endpoint.assertIsSatisfied(5000L);
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Object> forwardOnCompleted() throws InterruptedException {
MockEndpoint mockEndpoint = context.getEndpoint("mock:forwardOnCompleted", MockEndpoint.class);
List<Exchange> exchanges = mockEndpoint.getExchanges();
if (!exchanges.isEmpty()) {
Exchange exchange = exchanges.get(0);
return exchange.getMessage().getHeaders();
}
return Collections.emptyMap();
}

@Path("/forwardOnError")
@GET
public String forwardOnError() throws InterruptedException {
MockEndpoint endpoint = context.getEndpoint("mock:forwardOnError", MockEndpoint.class);
endpoint.expectedMessageCount(1);
endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER, GRPC_EVENT_TYPE_ON_ERROR);
endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER, "pingAsyncAsync");
endpoint.assertIsSatisfied(5000L);

List<Exchange> exchanges = endpoint.getExchanges();
Exchange exchange = exchanges.get(0);
Throwable throwable = exchange.getMessage().getBody(Throwable.class);
return throwable.getClass().getName();
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Object> forwardOnError() throws InterruptedException {
MockEndpoint mockEndpoint = context.getEndpoint("mock:forwardOnError", MockEndpoint.class);
List<Exchange> exchanges = mockEndpoint.getExchanges();
if (!exchanges.isEmpty()) {
Exchange exchange = exchanges.get(0);
Throwable throwable = exchange.getMessage().getBody(Throwable.class);
Map<String, Object> results = exchange.getMessage().getHeaders();
results.put("error", throwable.getClass().getName());
return results;
}
return Collections.emptyMap();
}

@Path("/grpcStreamReplies")
@GET
public void grpcStreamReplies() throws InterruptedException {
int messageCount = 10;
MockEndpoint endpoint = context.getEndpoint("mock:grpcStreamReplies", MockEndpoint.class);
endpoint.expectedMessageCount(messageCount);

for (int i = 1; i <= messageCount; i++) {
PingRequest request = PingRequest.newBuilder().setPingName(String.valueOf(i)).build();
producerTemplate.sendBody("direct:grpcStream", request);
}

MockEndpoint endpoint = context.getEndpoint("mock:grpcStreamReplies", MockEndpoint.class);
endpoint.expectedMessageCount(messageCount);
endpoint.assertIsSatisfied();
}

@Path("/tls")
@GET
public void tlsConsumer() throws InterruptedException {
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Object> tlsConsumer() throws InterruptedException {
MockEndpoint mockEndpoint = context.getEndpoint("mock:tls", MockEndpoint.class);
mockEndpoint.expectedMessageCount(1);
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER, GRPC_EVENT_TYPE_ON_NEXT);
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER, "pingAsyncSync");
mockEndpoint.assertIsSatisfied();
List<Exchange> exchanges = mockEndpoint.getExchanges();
if (!exchanges.isEmpty()) {
Exchange exchange = exchanges.get(0);
return exchange.getMessage().getHeaders();
}
return Collections.emptyMap();
}

@Path("/tls")
@POST
@Produces(MediaType.TEXT_PLAIN)
public String tlsProducer(String message) {
MockEndpoint mockEndpoint = context.getEndpoint("mock:tls", MockEndpoint.class);
try {
PingRequest pingRequest = PingRequest.newBuilder()
.setPingName(message)
.setPingId(12345)
.build();

PongResponse response = producerTemplate.requestBody("direct:sendTls", pingRequest, PongResponse.class);
return response.getPongName();
} finally {
mockEndpoint.reset();
}
PingRequest pingRequest = PingRequest.newBuilder()
.setPingName(message)
.setPingId(12345)
.build();

PongResponse response = producerTemplate.requestBodyAndHeader(
"direct:sendTls",
pingRequest,
"origin",
"producer",
PongResponse.class);
return response.getPongName();
}

@Path("/jwt")
@GET
public void jwtConsumer() throws InterruptedException {
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Object> jwtConsumer() throws InterruptedException {
MockEndpoint mockEndpoint = context.getEndpoint("mock:jwt", MockEndpoint.class);
mockEndpoint.expectedMessageCount(1);
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER, GRPC_EVENT_TYPE_ON_NEXT);
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER, "pingAsyncSync");
mockEndpoint.assertIsSatisfied();
List<Exchange> exchanges = mockEndpoint.getExchanges();
if (!exchanges.isEmpty()) {
Exchange exchange = exchanges.get(0);
return exchange.getMessage().getHeaders();
}
return Collections.emptyMap();
}

@Path("/jwt")
@POST
@Produces(MediaType.TEXT_PLAIN)
public String jwtProducer(String message) {
MockEndpoint mockEndpoint = context.getEndpoint("mock:jwt", MockEndpoint.class);
try {
PingRequest pingRequest = PingRequest.newBuilder()
.setPingName(message)
.setPingId(12345)
.build();

PongResponse response = producerTemplate.requestBody("direct:sendJwt", pingRequest, PongResponse.class);
return response.getPongName();
} finally {
mockEndpoint.reset();
}
PingRequest pingRequest = PingRequest.newBuilder()
.setPingName(message)
.setPingId(12345)
.build();

PongResponse response = producerTemplate.requestBodyAndHeader(
"direct:sendJwt",
pingRequest,
"origin",
"producer",
PongResponse.class);
return response.getPongName();
}
}
@@ -79,8 +79,14 @@ public void configure() throws Exception {
+ "/%s?consumerStrategy=PROPAGATION&"
+ "negotiationType=TLS&keyCertChainResource=certs/server.pem&"
+ "keyResource=certs/server.key&trustCertCollectionResource=certs/ca.pem", PING_PONG_SERVICE)
.process("messageOriginProcessor")
.choice()
.when(simple("${header.origin} == 'producer'"))
.bean(new GrpcMessageBuilder(), "buildAsyncPongResponse")
.endChoice()
.otherwise()
.to("mock:tls")
.bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");
.bean(new GrpcMessageBuilder(), "buildAsyncPongResponse").endChoice();

from("direct:sendTls")
.toF("grpc://localhost:{{camel.grpc.test.tls.server.port}}"
@@ -91,8 +97,14 @@ public void configure() throws Exception {
fromF("grpc://localhost:{{camel.grpc.test.jwt.server.port}}"
+ "/%s?consumerStrategy=PROPAGATION&"
+ "authenticationType=JWT&jwtSecret=%s", PING_PONG_SERVICE, GRPC_JWT_SECRET)
.process("messageOriginProcessor")
.choice()
.when(simple("${header.origin} == 'producer'"))
.bean(new GrpcMessageBuilder(), "buildAsyncPongResponse")
.endChoice()
.otherwise()
.to("mock:jwt")
.bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");
.bean(new GrpcMessageBuilder(), "buildAsyncPongResponse").endChoice();

from("direct:sendJwt")
.toF("grpc://localhost:{{camel.grpc.test.jwt.server.port}}"
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.grpc.it;

import javax.inject.Named;
import javax.inject.Singleton;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.quarkus.component.grpc.it.model.PingRequest;

@Singleton
@Named
public class MessageOriginProcessor implements Processor {
@Override
public void process(Exchange exchange) {
Message message = exchange.getMessage();
PingRequest request = message.getBody(PingRequest.class);
if (request.getPingName().endsWith("producer")) {
message.setHeader("origin", "producer");
}
}
}

0 comments on commit 7288ad8

Please sign in to comment.