Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions components/camel-grpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,6 @@
<version>${mockito-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.googlecode.junit-toolbox</groupId>
<artifactId>junit-toolbox</artifactId>
<version>${junit-toolbox-version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- GRPC does not support the jakarta annotation, see https://github.com/grpc/grpc-java/issues/9179 -->
<dependency>
<groupId>javax.annotation</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.camel.component.grpc;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.googlecode.junittoolbox.MultithreadingTester;
import com.googlecode.junittoolbox.RunnableAssert;
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -56,87 +59,95 @@ public static Integer getId() {
}

@Test
public void testAsyncWithConcurrentThreads() {
public void testAsyncWithConcurrentThreads() throws Exception {
int asyncPort = getRoutePort("grpc-async");
RunnableAssert ra = new RunnableAssert("foo") {

@Override
public void run() {
final CountDownLatch latch = new CountDownLatch(1);
ManagedChannel asyncRequestChannel
= NettyChannelBuilder.forAddress("localhost", asyncPort).usePlaintext()
.build();
PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);

PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);
int instanceId = createId();

final PingRequest pingRequest
= PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build();
StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver);
requestObserver.onNext(pingRequest);
requestObserver.onNext(pingRequest);
requestObserver.onCompleted();
try {
assertTrue(latch.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
LOG.debug("Unhandled exception (probably safe to ignore): {}", e.getMessage(), e);
}

PongResponse pongResponse = responseObserver.getPongResponse();

assertNotNull(pongResponse, "instanceId = " + instanceId);
assertEquals(instanceId, pongResponse.getPongId());
assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName());

asyncRequestChannel.shutdown().shutdownNow();
runConcurrent(() -> {
final CountDownLatch latch = new CountDownLatch(1);
ManagedChannel asyncRequestChannel
= NettyChannelBuilder.forAddress("localhost", asyncPort).usePlaintext()
.build();
PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);

PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);
int instanceId = createId();

final PingRequest pingRequest
= PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build();
StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver);
requestObserver.onNext(pingRequest);
requestObserver.onNext(pingRequest);
requestObserver.onCompleted();
try {
assertTrue(latch.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
LOG.debug("Unhandled exception (probably safe to ignore): {}", e.getMessage(), e);
}
};

new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT)
.run();
PongResponse pongResponse = responseObserver.getPongResponse();

assertNotNull(pongResponse, "instanceId = " + instanceId);
assertEquals(instanceId, pongResponse.getPongId());
assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName());

asyncRequestChannel.shutdown().shutdownNow();
return null;
});
}

@Test
public void testHeadersWithConcurrentThreads() {
public void testHeadersWithConcurrentThreads() throws Exception {
int headersPort = getRoutePort("grpc-headers");
RunnableAssert ra = new RunnableAssert("foo") {

@Override
public void run() {
int instanceId = createId();
final CountDownLatch latch = new CountDownLatch(1);
ManagedChannel asyncRequestChannel = NettyChannelBuilder.forAddress("localhost", headersPort)
.userAgent(GRPC_USER_AGENT_PREFIX + instanceId)
.usePlaintext().build();
PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);

PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);

final PingRequest pingRequest
= PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build();
StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver);
requestObserver.onNext(pingRequest);
requestObserver.onNext(pingRequest);
requestObserver.onCompleted();
try {
assertTrue(latch.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
LOG.debug("Interrupted while waiting for the response", e);
}

PongResponse pongResponse = responseObserver.getPongResponse();

assertNotNull(pongResponse, "instanceId = " + instanceId);
assertEquals(instanceId, pongResponse.getPongId());
assertEquals(GRPC_USER_AGENT_PREFIX + instanceId, pongResponse.getPongName());

asyncRequestChannel.shutdown().shutdownNow();
runConcurrent(() -> {
int instanceId = createId();
final CountDownLatch latch = new CountDownLatch(1);
ManagedChannel asyncRequestChannel = NettyChannelBuilder.forAddress("localhost", headersPort)
.userAgent(GRPC_USER_AGENT_PREFIX + instanceId)
.usePlaintext().build();
PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);

PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);

final PingRequest pingRequest
= PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build();
StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver);
requestObserver.onNext(pingRequest);
requestObserver.onNext(pingRequest);
requestObserver.onCompleted();
try {
assertTrue(latch.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
LOG.debug("Interrupted while waiting for the response", e);
}
};

new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT)
.run();
PongResponse pongResponse = responseObserver.getPongResponse();

assertNotNull(pongResponse, "instanceId = " + instanceId);
assertEquals(instanceId, pongResponse.getPongId());
assertEquals(GRPC_USER_AGENT_PREFIX + instanceId, pongResponse.getPongName());

asyncRequestChannel.shutdown().shutdownNow();
return null;
});
}

private void runConcurrent(Callable<?> task) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(CONCURRENT_THREAD_COUNT);
try {
List<Future<?>> futures = new ArrayList<>();
for (int thread = 0; thread < CONCURRENT_THREAD_COUNT; thread++) {
futures.add(executor.submit(() -> {
for (int round = 0; round < ROUNDS_PER_THREAD_COUNT; round++) {
task.call();
}
return null;
}));
}
for (Future<?> future : futures) {
future.get(1, TimeUnit.MINUTES);
}
} finally {
executor.shutdownNow();
}
}

@Override
Expand Down
12 changes: 0 additions & 12 deletions components/camel-thrift/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,6 @@
<artifactId>gson</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.googlecode.junit-toolbox</groupId>
<artifactId>junit-toolbox</artifactId>
<version>${junit-toolbox-version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.camel.component.thrift;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.googlecode.junittoolbox.MultithreadingTester;
import com.googlecode.junittoolbox.RunnableAssert;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.thrift.generated.Calculator;
import org.apache.camel.component.thrift.generated.Operation;
Expand All @@ -37,7 +40,6 @@
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.layered.TFramedTransport;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
Expand Down Expand Up @@ -68,68 +70,76 @@ public static Integer getId() {
}

@Test
public void testSyncWithConcurrentThreads() {
RunnableAssert ra = new RunnableAssert("testSyncWithConcurrentThreads") {

@Override
public void run() throws TTransportException {
TTransport transport = new TSocket("localhost", getPortForRoute(0));
transport.open();
TProtocol protocol = new TBinaryProtocol(new TFramedTransport(transport));
Calculator.Client client = (new Calculator.Client.Factory()).getClient(protocol);

int instanceId = createId();
public void testSyncWithConcurrentThreads() throws Exception {
runConcurrent(() -> {
TTransport transport = new TSocket("localhost", getPortForRoute(0));
transport.open();
TProtocol protocol = new TBinaryProtocol(new TFramedTransport(transport));
Calculator.Client client = (new Calculator.Client.Factory()).getClient(protocol);

int instanceId = createId();

int calculateResponse = 0;
try {
calculateResponse = client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1, Operation.MULTIPLY));
} catch (TException e) {
LOG.info("Exception", e);
}

int calculateResponse = 0;
try {
calculateResponse = client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1, Operation.MULTIPLY));
} catch (TException e) {
LOG.info("Exception", e);
}
assertNotEquals(0, calculateResponse, "instanceId = " + instanceId);
assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse);

assertNotEquals(0, calculateResponse, "instanceId = " + instanceId);
assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse);
transport.close();
return null;
});
}

transport.close();
@Test
public void testAsyncWithConcurrentThreads() throws Exception {
runConcurrent(() -> {
final CountDownLatch latch = new CountDownLatch(1);

TNonblockingTransport transport = new TNonblockingSocket("localhost", getPortForRoute(1));
Calculator.AsyncClient client
= (new Calculator.AsyncClient.Factory(new TAsyncClientManager(), new TBinaryProtocol.Factory()))
.getAsyncClient(transport);

int instanceId = createId();
CalculateAsyncMethodCallback calculateCallback = new CalculateAsyncMethodCallback(latch);
try {
client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1, Operation.MULTIPLY), calculateCallback);
} catch (TException e) {
LOG.info("Exception", e);
}
};
latch.await(5, TimeUnit.SECONDS);

new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT)
.run();
}
int calculateResponse = calculateCallback.getCalculateResponse();
LOG.debug("instanceId = {}", instanceId);
assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse);

@Test
public void testAsyncWithConcurrentThreads() {
RunnableAssert ra = new RunnableAssert("testAsyncWithConcurrentThreads") {
transport.close();
return null;
});
}

@Override
public void run() throws TTransportException, IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);

TNonblockingTransport transport = new TNonblockingSocket("localhost", getPortForRoute(1));
Calculator.AsyncClient client
= (new Calculator.AsyncClient.Factory(new TAsyncClientManager(), new TBinaryProtocol.Factory()))
.getAsyncClient(transport);

int instanceId = createId();
CalculateAsyncMethodCallback calculateCallback = new CalculateAsyncMethodCallback(latch);
try {
client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1, Operation.MULTIPLY), calculateCallback);
} catch (TException e) {
LOG.info("Exception", e);
}
latch.await(5, TimeUnit.SECONDS);

int calculateResponse = calculateCallback.getCalculateResponse();
LOG.debug("instanceId = {}", instanceId);
assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse);

transport.close();
private void runConcurrent(Callable<?> task) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(CONCURRENT_THREAD_COUNT);
try {
List<Future<?>> futures = new ArrayList<>();
for (int thread = 0; thread < CONCURRENT_THREAD_COUNT; thread++) {
futures.add(executor.submit(() -> {
for (int round = 0; round < ROUNDS_PER_THREAD_COUNT; round++) {
task.call();
}
return null;
}));
}
};

new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT)
.run();
for (Future<?> future : futures) {
future.get(1, TimeUnit.MINUTES);
}
} finally {
executor.shutdownNow();
}
}

public class CalculateAsyncMethodCallback implements AsyncMethodCallback<Integer> {
Expand Down
1 change: 0 additions & 1 deletion parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@
<nullaway-version>0.13.4</nullaway-version>
<jt400-version>21.0.6</jt400-version>
<jte-version>3.2.4</jte-version>
<junit-toolbox-version>2.4</junit-toolbox-version>
<junit-jupiter-version>5.13.4</junit-jupiter-version>
<junit6-jupiter-version>6.1.0</junit6-jupiter-version>
<junit-pioneer-version>2.3.0</junit-pioneer-version>
Expand Down