Skip to content
Permalink
Browse files
Add grpc tri integration test
  • Loading branch information
EarthChen committed Sep 6, 2021
1 parent 6c1932e commit e0feb8a1fe6757450de69d620abec1a5039d454c
Showing 21 changed files with 280 additions and 134 deletions.
@@ -17,10 +17,21 @@ Dubbo3 will wrap request and response with a protobuf wrapper automatically, so
Code under `test` folder is a general interoperation test suite for both Dubbo/Dubbo and Dubbo/GRPC.

#### Dubbo/Dubbo test
1. Run `org.apache.dubbo.sample.tri.TestProvider`
2. Run `org.apache.dubbo.sample.tri.PbTest` , `org.apache.dubbo.sample.tri.WrapConsumerTest` and `org.apache.dubbo.sample.tri.GenericTest`
1. Run `org.apache.dubbo.sample.tri.TriProvider`
2. Run `org.apache.dubbo.sample.tri.TriPbConsumerTest` , `org.apache.dubbo.sample.tri.TriWrapConsumerTest` and `org.apache.dubbo.sample.tri.TriGenericTest`

#### DUbbo/GRPC test
TBD

##### GRPC --> Dubbo

1. Run `org.apache.dubbo.sample.tri.TriProvider`
2. Run `org.apache.dubbo.sample.tri.GrpcConsumerTest`

##### Dubbo --> GRPC

1. Run `org.apache.dubbo.sample.tri.GrpcProvider`
2. Run `org.apache.dubbo.sample.tri.TriPbConsumerTest`




@@ -39,6 +39,7 @@
<maven-failsafe-plugin.version>2.21.0</maven-failsafe-plugin.version>
<spring-boot.version>1.5.13.RELEASE</spring-boot.version>
<protoc.version>3.7.1</protoc.version>
<dubbo.compiler.version>0.0.4-SNAPSHOT</dubbo.compiler.version>
</properties>

<dependencies>
@@ -62,7 +63,7 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
<scope>test</scope>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>junit</groupId>
@@ -119,6 +120,15 @@
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<protocPlugins>
<protocPlugin>
<id>dubbo</id>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-compiler</artifactId>
<version>${dubbo.compiler.version}</version>
<mainClass>org.apache.dubbo.gen.dubbo.Dubbo3Generator</mainClass>
</protocPlugin>
</protocPlugins>
</configuration>
<executions>
<execution>
@@ -0,0 +1,36 @@
package org.apache.dubbo.sample.tri;

import org.apache.dubbo.common.stream.StreamObserver;

import java.util.function.Function;

/**
* @author earthchen
* @date 2021/9/6
**/
public class EchoStreamObserver<T, R> implements StreamObserver<T> {

private final Function<T, R> echoFunc;
private final StreamObserver<R> responseObserver;

public EchoStreamObserver(Function<T, R> echoFunc, StreamObserver<R> responseObserver) {
this.echoFunc = echoFunc;
this.responseObserver = responseObserver;
}

@Override
public void onNext(T data) {
responseObserver.onNext(echoFunc.apply(data));
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
responseObserver.onError(new IllegalStateException("Stream err"));
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
}

This file was deleted.

@@ -0,0 +1,33 @@
package org.apache.dubbo.sample.tri;

import org.apache.dubbo.common.stream.StreamObserver;

/**
* @author earthchen
* @date 2021/9/6
**/
public class StdoutStreamObserver<T> implements StreamObserver<T>, io.grpc.stub.StreamObserver<T> {


private final String name;

StdoutStreamObserver(String name) {
this.name = name;
}

@Override
public void onNext(T data) {
System.out.println("[" + name + "] stream reply:" + data);
}

@Override
public void onError(Throwable throwable) {
System.err.println("[" + name + "] Error:");
throwable.printStackTrace();
}

@Override
public void onCompleted() {
System.out.println("[" + name + "] stream done");
}
}
@@ -25,5 +25,7 @@ public class TriSampleConstants {

public static final String ZK_ADDRESS = "zookeeper://127.0.0.1:2181";

public static final String DEFAULT_ADDRESS = "tri://127.0.0.1:" + SERVER_POINT;
public static final String HOST = "127.0.0.1";

public static final String DEFAULT_ADDRESS = "tri://" + HOST + ":" + SERVER_POINT;
}

This file was deleted.

@@ -0,0 +1,101 @@
package org.apache.dubbo.sample.tri;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class GrpcConsumerTest {
private static PbGreeterGrpc.PbGreeterStub stub;
private static PbGreeterGrpc.PbGreeterBlockingStub blockingStub;

@BeforeClass
public static void init() {
final ManagedChannel channel = ManagedChannelBuilder.forAddress(TriSampleConstants.HOST, TriSampleConstants.SERVER_POINT)
.usePlaintext()
.build();
stub = PbGreeterGrpc.newStub(channel);
blockingStub = PbGreeterGrpc.newBlockingStub(channel);
}

@Test
public void clientSendLargeSizeHeader() throws InterruptedException {
final Metadata.Key<String> key = Metadata.Key.of("large_size", Metadata.ASCII_STRING_MARSHALLER);
StringBuilder sb = new StringBuilder("a");
for (int j = 0; j < 15; j++) {
sb.append(sb);
}
Metadata meta = new Metadata();
meta.put(key, sb.toString());
final PbGreeterGrpc.PbGreeterStub curStub = MetadataUtils.attachHeaders(GrpcConsumerTest.stub, meta);
curStub.greet(GreeterRequest.newBuilder().setName("metadata").build(), new StdoutStreamObserver<>("meta"));
TimeUnit.SECONDS.sleep(1);
}


@Test
public void serverStream() throws InterruptedException {
int n = 10;
CountDownLatch latch = new CountDownLatch(n);
final GreeterRequest request = GreeterRequest.newBuilder()
.setName("request")
.build();
stub.greetServerStream(request, new StdoutStreamObserver<GreeterReply>("grpc sayGreeterServerStream") {
@Override
public void onNext(GreeterReply data) {
super.onNext(data);
latch.countDown();
}
});
Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
}


@Test
public void stream() throws InterruptedException {
int n = 10;
CountDownLatch latch = new CountDownLatch(n);
final GreeterRequest request = GreeterRequest.newBuilder()
.setName("stream request")
.build();
final io.grpc.stub.StreamObserver<GreeterRequest> requestObserver = stub.greetStream(new StdoutStreamObserver<GreeterReply>("sayGreeterStream") {
@Override
public void onNext(GreeterReply data) {
super.onNext(data);
latch.countDown();
}
});
for (int i = 0; i < n; i++) {
requestObserver.onNext(request);
}
requestObserver.onCompleted();
Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
}

@Test
public void unaryGreeter() {
final GreeterReply reply = blockingStub.greet(GreeterRequest.newBuilder()
.setName("name")
.build());
Assert.assertNotNull(reply);
}


@Test
public void attachmentTest() {
final Metadata.Key<String> key = Metadata.Key.of("large_size", Metadata.ASCII_STRING_MARSHALLER);
Metadata meta = new Metadata();
meta.put(key, "test");
final PbGreeterGrpc.PbGreeterBlockingStub curStub = MetadataUtils.attachHeaders(GrpcConsumerTest.blockingStub, meta);
GreeterReply reply = curStub.greetWithAttachment(GreeterRequest.newBuilder().setName("meta").build());
Assert.assertEquals("hello,meta", reply.getMessage());
}

}

@@ -1,5 +1,8 @@
package org.apache.dubbo.sample.tri;

import org.apache.dubbo.sample.tri.service.impl.GrpcPbGreeterImpl;
import org.apache.dubbo.sample.tri.service.impl.PbGreeterImpl;

import io.grpc.ForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.Server;
@@ -4,6 +4,7 @@
import io.grpc.stub.StreamObserver;

public class GrpcStreamObserverAdapter<T> implements StreamObserver<T> {

private final org.apache.dubbo.common.stream.StreamObserver<T> delegate;

public GrpcStreamObserverAdapter(org.apache.dubbo.common.stream.StreamObserver<T> delegate) {

This file was deleted.

@@ -12,13 +12,13 @@
import org.junit.BeforeClass;
import org.junit.Test;

public class GenericTest {
public class TriGenericTest {
private static GenericService generic;

@BeforeClass
public static void init() {
ReferenceConfig<GenericService> ref = new ReferenceConfig<>();
ref.setInterface("org.apache.dubbo.sample.tri.WrapGreeter");
ref.setInterface("org.apache.dubbo.sample.tri.service.WrapGreeter");
ref.setCheck(false);
ref.setTimeout(30000);
ref.setProtocol(CommonConstants.TRIPLE);

0 comments on commit e0feb8a

Please sign in to comment.