Skip to content

Commit

Permalink
Add grpc server
Browse files Browse the repository at this point in the history
  • Loading branch information
guohao committed Sep 2, 2021
1 parent 912a856 commit 07a1e8f
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 36 deletions.
30 changes: 8 additions & 22 deletions dubbo-samples-triple/pom.xml
Expand Up @@ -32,12 +32,12 @@
<source.level>1.8</source.level>
<target.level>1.8</target.level>
<dubbo.version>3.0.3-SNAPSHOT</dubbo.version>
<grpc.version>1.40.1</grpc.version>
<junit.version>4.12</junit.version>
<spring-test.version>4.3.16.RELEASE</spring-test.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<maven-failsafe-plugin.version>2.21.0</maven-failsafe-plugin.version>
<spring-boot.version>1.5.13.RELEASE</spring-boot.version>
<grpc.version>1.19.0</grpc.version>
<protoc.version>3.7.1</protoc.version>
</properties>

Expand All @@ -60,8 +60,8 @@
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>1.40.1</version>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -116,14 +116,17 @@
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>triple-java</pluginId>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
<goal>compile-custom</goal>
<goal>test-compile-custom</goal>
</goals>
</execution>
</executions>
Expand All @@ -137,23 +140,6 @@
<target>${target.level}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>build/generated/source/proto/main/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
@@ -0,0 +1,43 @@
package org.apache.dubbo.sample.tri;

import io.grpc.stub.StreamObserver;

public class GrpcPbGreeterImpl extends PbGreeterGrpc.PbGreeterImplBase {
private final PbGreeter delegate;

public GrpcPbGreeterImpl(PbGreeter delegate) {
this.delegate = delegate;
}

@Override
public void greet(GreeterRequest request, StreamObserver<GreeterReply> responseObserver) {
try {
final GreeterReply response = delegate.Greet(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable t) {
responseObserver.onError(t);
}
}

@Override
public void greetException(GreeterRequest request, StreamObserver<GreeterReply> responseObserver) {
try {
final GreeterReply response = delegate.GreetException(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable t) {
responseObserver.onError(t);
}
}

@Override
public StreamObserver<GreeterRequest> greetStream(StreamObserver<GreeterReply> responseObserver) {
return new GrpcStreamObserverAdapter<>(delegate.GreetStream(new StreamObserverAdapter<>(responseObserver)));
}

@Override
public void greetServerStream(GreeterRequest request, StreamObserver<GreeterReply> responseObserver) {
delegate.GreetServerStream(request, new StreamObserverAdapter<>(responseObserver));
}
}
@@ -0,0 +1,40 @@
package org.apache.dubbo.sample.tri;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class GrpcProvider {
public static void main(String[] args) throws IOException, InterruptedException {
final Server server = ServerBuilder.forPort(50051)
.addService(new GrpcPbGreeterImpl(new PbGreeterImpl()))
.build();
server.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Start graceful shutdown
server.shutdown();
try {
// Wait for RPCs to complete processing
if (!server.awaitTermination(30, TimeUnit.SECONDS)) {
// That was plenty of time. Let's cancel the remaining RPCs
server.shutdownNow();
// shutdownNow isn't instantaneous, so give a bit of time to clean resources up
// gracefully. Normally this will be well under a second.
server.awaitTermination(5, TimeUnit.SECONDS);
}
} catch (InterruptedException ex) {
server.shutdownNow();
}
}
});
// This would normally be tied to the service's dependencies. For example, if HostnameGreeter
// used a Channel to contact a required service, then when 'channel.getState() ==
// TRANSIENT_FAILURE' we'd want to set NOT_SERVING. But HostnameGreeter has no dependencies, so
// hard-coding SERVING is appropriate.
server.awaitTermination();
}
}
@@ -0,0 +1,27 @@
package org.apache.dubbo.sample.tri;


import io.grpc.stub.StreamObserver;

public class GrpcStreamObserverAdapter<T> implements StreamObserver<T> {
private final org.apache.dubbo.common.stream.StreamObserver<T> delegate;

public GrpcStreamObserverAdapter(org.apache.dubbo.common.stream.StreamObserver<T> delegate) {
this.delegate = delegate;
}

@Override
public void onNext(T data) {
delegate.onNext(data);
}

@Override
public void onError(Throwable throwable) {
delegate.onError(throwable);
}

@Override
public void onCompleted() {
delegate.onCompleted();
}
}
Expand Up @@ -3,11 +3,11 @@
import org.apache.dubbo.common.stream.StreamObserver;

public interface PbGreeter {
GreeterReply sayGreeter(GreeterRequest request);
GreeterReply Greet(GreeterRequest request);

GreeterReply sayGreeterException(GreeterRequest request);
GreeterReply GreetException(GreeterRequest request);

StreamObserver<GreeterRequest> sayGreeterStream(StreamObserver<GreeterReply> replyStream);
StreamObserver<GreeterRequest> GreetStream(StreamObserver<GreeterReply> replyStream);

void sayGreeterServerStream(GreeterRequest request, StreamObserver<GreeterReply> replyStream);
void GreetServerStream(GreeterRequest request, StreamObserver<GreeterReply> replyStream);
}
Expand Up @@ -5,22 +5,22 @@

public class PbGreeterImpl implements PbGreeter {
@Override
public GreeterReply sayGreeter(GreeterRequest request) {
public GreeterReply Greet(GreeterRequest request) {

return GreeterReply.newBuilder()
.setMessage(request.getName())
.build();
}

public GreeterReply sayGreeterException(GreeterRequest request) {
public GreeterReply GreetException(GreeterRequest request) {
RpcContext.getServerContext().setAttachment("str", "str")
.setAttachment("integer", 1)
.setAttachment("raw", new byte[]{1, 2, 3, 4});
throw new RuntimeException("Biz Exception");
}

@Override
public StreamObserver<GreeterRequest> sayGreeterStream(StreamObserver<GreeterReply> replyStream) {
public StreamObserver<GreeterRequest> GreetStream(StreamObserver<GreeterReply> replyStream) {
return new StreamObserver<GreeterRequest>() {
@Override
public void onNext(GreeterRequest data) {
Expand All @@ -43,7 +43,7 @@ public void onCompleted() {
}

@Override
public void sayGreeterServerStream(GreeterRequest request, StreamObserver<GreeterReply> replyStream) {
public void GreetServerStream(GreeterRequest request, StreamObserver<GreeterReply> replyStream) {
for (int i = 0; i < 10; i++) {
replyStream.onNext(GreeterReply.newBuilder()
.setMessage(request.getName())
Expand Down
Expand Up @@ -4,7 +4,6 @@
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;

import org.junit.Assert;
Expand All @@ -24,13 +23,14 @@ public static void init() {
ref.setCheck(false);
ref.setInterface(PbGreeter.class);
ref.setCheck(false);
ref.setUrl("tri://127.0.0.1:50051");
ref.setProtocol(CommonConstants.TRIPLE);
ref.setLazy(true);
ref.setTimeout(10000);

DubboBootstrap bootstrap = DubboBootstrap.getInstance();
bootstrap.application(new ApplicationConfig("demo-consumer"))
.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
// .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
.reference(ref)
.start();

Expand All @@ -44,7 +44,7 @@ public void serverStream() throws InterruptedException {
final GreeterRequest request = GreeterRequest.newBuilder()
.setName("request")
.build();
delegate.sayGreeterServerStream(request, new StdoutStreamObserver<GreeterReply>("sayGreeterServerStream") {
delegate.GreetServerStream(request, new StdoutStreamObserver<GreeterReply>("sayGreeterServerStream") {
@Override
public void onNext(GreeterReply data) {
super.onNext(data);
Expand All @@ -61,7 +61,7 @@ public void stream() throws InterruptedException {
final GreeterRequest request = GreeterRequest.newBuilder()
.setName("stream request")
.build();
final StreamObserver<GreeterRequest> requestObserver = delegate.sayGreeterStream(new StdoutStreamObserver<GreeterReply>("sayGreeterStream") {
final StreamObserver<GreeterRequest> requestObserver = delegate.GreetStream(new StdoutStreamObserver<GreeterReply>("sayGreeterStream") {
@Override
public void onNext(GreeterReply data) {
super.onNext(data);
Expand All @@ -77,7 +77,7 @@ public void onNext(GreeterReply data) {

@Test
public void unaryGreeter() {
final GreeterReply reply = delegate.sayGreeter(GreeterRequest.newBuilder()
final GreeterReply reply = delegate.Greet(GreeterRequest.newBuilder()
.setName("name")
.build());
Assert.assertNotNull(reply);
Expand Down
@@ -0,0 +1,26 @@
package org.apache.dubbo.sample.tri;

import org.apache.dubbo.common.stream.StreamObserver;

public class StreamObserverAdapter<T> implements StreamObserver<T> {
private final io.grpc.stub.StreamObserver<T> delegate;

public StreamObserverAdapter(io.grpc.stub.StreamObserver<T> delegate) {
this.delegate = delegate;
}

@Override
public void onNext(T data) {
delegate.onNext(data);
}

@Override
public void onError(Throwable throwable) {
delegate.onError(throwable);
}

@Override
public void onCompleted() {
delegate.onCompleted();
}
}
8 changes: 7 additions & 1 deletion dubbo-samples-triple/src/test/proto/prototest.proto
Expand Up @@ -16,5 +16,11 @@ message GreeterReply {
}

service PbGreeter{
rpc SayHello(GreeterRequest) returns (GreeterReply);
rpc Greet(GreeterRequest) returns (GreeterReply);

rpc GreetException(GreeterRequest) returns (GreeterReply);

rpc GreetStream(stream GreeterRequest) returns (stream GreeterReply);

rpc GreetServerStream(GreeterRequest) returns (stream GreeterReply);
}

0 comments on commit 07a1e8f

Please sign in to comment.