Skip to content
Permalink
Browse files
Add attachment test
  • Loading branch information
guohao committed Sep 2, 2021
1 parent 07a1e8f commit 2e932cae756aba92309d37380fe7fd5b6c7e4b1f
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 19 deletions.
@@ -0,0 +1,38 @@
package org.apache.dubbo.sample.tri;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

public class GrpcClient {
private static PbGreeterGrpc.PbGreeterStub stub;

@BeforeClass
public static void init() {
final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 50051)
.usePlaintext()
.build();
stub = PbGreeterGrpc.newStub(channel);
}

@Test
public void clientSendLargeSizeHeader() throws InterruptedException {
final Metadata.Key<String> key = Metadata.Key.of("large_size", Metadata.ASCII_STRING_MARSHALLER);
StringBuilder sb = new StringBuilder("a");
for (int j = 0; j < 15; j++) {
sb.append(sb);
}
Metadata meta = new Metadata();
meta.put(key, sb.toString());
final PbGreeterGrpc.PbGreeterStub stub = MetadataUtils.attachHeaders(GrpcClient.stub, meta);
stub.greet(GreeterRequest.newBuilder().setName("metadata").build(),
new GrpcStreamObserverAdapter<>(new StdoutStreamObserver<>("meta")));
TimeUnit.SECONDS.sleep(1);
}
}

@@ -12,7 +12,7 @@ public GrpcPbGreeterImpl(PbGreeter delegate) {
@Override
public void greet(GreeterRequest request, StreamObserver<GreeterReply> responseObserver) {
try {
final GreeterReply response = delegate.Greet(request);
final GreeterReply response = delegate.greet(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable t) {
@@ -23,7 +23,7 @@ public void greet(GreeterRequest request, StreamObserver<GreeterReply> responseO
@Override
public void greetException(GreeterRequest request, StreamObserver<GreeterReply> responseObserver) {
try {
final GreeterReply response = delegate.GreetException(request);
final GreeterReply response = delegate.greetException(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable t) {
@@ -33,11 +33,11 @@ public void greetException(GreeterRequest request, StreamObserver<GreeterReply>

@Override
public StreamObserver<GreeterRequest> greetStream(StreamObserver<GreeterReply> responseObserver) {
return new GrpcStreamObserverAdapter<>(delegate.GreetStream(new StreamObserverAdapter<>(responseObserver)));
return new GrpcStreamObserverAdapter<>(delegate.greetStream(new StreamObserverAdapter<>(responseObserver)));
}

@Override
public void greetServerStream(GreeterRequest request, StreamObserver<GreeterReply> responseObserver) {
delegate.GreetServerStream(request, new StreamObserverAdapter<>(responseObserver));
delegate.greetServerStream(request, new StreamObserverAdapter<>(responseObserver));
}
}
@@ -3,11 +3,13 @@
import org.apache.dubbo.common.stream.StreamObserver;

public interface PbGreeter {
GreeterReply Greet(GreeterRequest request);
GreeterReply greetWithAttachment(GreeterRequest request);

GreeterReply GreetException(GreeterRequest request);
GreeterReply greet(GreeterRequest request);

StreamObserver<GreeterRequest> GreetStream(StreamObserver<GreeterReply> replyStream);
GreeterReply greetException(GreeterRequest request);

void GreetServerStream(GreeterRequest request, StreamObserver<GreeterReply> replyStream);
StreamObserver<GreeterRequest> greetStream(StreamObserver<GreeterReply> replyStream);

void greetServerStream(GreeterRequest request, StreamObserver<GreeterReply> replyStream);
}
@@ -5,22 +5,30 @@

public class PbGreeterImpl implements PbGreeter {
@Override
public GreeterReply Greet(GreeterRequest request) {
public GreeterReply greetWithAttachment(GreeterRequest request) {
final String key = "user-attachment";
final String value = RpcContext.getServerAttachment().getAttachment(key);
RpcContext.getServerContext().setObjectAttachment(key, value);
return GreeterReply.newBuilder().setMessage("hello," + request.getName()).build();
}

@Override
public GreeterReply greet(GreeterRequest request) {

return GreeterReply.newBuilder()
.setMessage(request.getName())
.build();
}

public GreeterReply GreetException(GreeterRequest request) {
public GreeterReply greetException(GreeterRequest request) {
RpcContext.getServerContext().setAttachment("str", "str")
.setAttachment("integer", 1)
.setAttachment("raw", new byte[]{1, 2, 3, 4});
throw new RuntimeException("Biz Exception");
}

@Override
public StreamObserver<GreeterRequest> GreetStream(StreamObserver<GreeterReply> replyStream) {
public StreamObserver<GreeterRequest> greetStream(StreamObserver<GreeterReply> replyStream) {
return new StreamObserver<GreeterRequest>() {
@Override
public void onNext(GreeterRequest data) {
@@ -43,7 +51,7 @@ public void onCompleted() {
}

@Override
public void GreetServerStream(GreeterRequest request, StreamObserver<GreeterReply> replyStream) {
public void greetServerStream(GreeterRequest request, StreamObserver<GreeterReply> replyStream) {
for (int i = 0; i < 10; i++) {
replyStream.onNext(GreeterReply.newBuilder()
.setMessage(request.getName())
@@ -5,6 +5,8 @@
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;

import org.junit.Assert;
import org.junit.BeforeClass;
@@ -44,7 +46,7 @@ public void serverStream() throws InterruptedException {
final GreeterRequest request = GreeterRequest.newBuilder()
.setName("request")
.build();
delegate.GreetServerStream(request, new StdoutStreamObserver<GreeterReply>("sayGreeterServerStream") {
delegate.greetServerStream(request, new StdoutStreamObserver<GreeterReply>("sayGreeterServerStream") {
@Override
public void onNext(GreeterReply data) {
super.onNext(data);
@@ -61,7 +63,7 @@ public void stream() throws InterruptedException {
final GreeterRequest request = GreeterRequest.newBuilder()
.setName("stream request")
.build();
final StreamObserver<GreeterRequest> requestObserver = delegate.GreetStream(new StdoutStreamObserver<GreeterReply>("sayGreeterStream") {
final StreamObserver<GreeterRequest> requestObserver = delegate.greetStream(new StdoutStreamObserver<GreeterReply>("sayGreeterStream") {
@Override
public void onNext(GreeterReply data) {
super.onNext(data);
@@ -77,10 +79,31 @@ public void onNext(GreeterReply data) {

@Test
public void unaryGreeter() {
final GreeterReply reply = delegate.Greet(GreeterRequest.newBuilder()
final GreeterReply reply = delegate.greet(GreeterRequest.newBuilder()
.setName("name")
.build());
Assert.assertNotNull(reply);
}


@Test(expected = RpcException.class)
public void clientSendLargeSizeHeader() throws InterruptedException {
StringBuilder sb = new StringBuilder("a");
for (int j = 0; j < 15; j++) {
sb.append(sb);
}
sb.setLength(8191);
RpcContext.getClientAttachment().setObjectAttachment("large-size-meta", sb.toString());
delegate.greet(GreeterRequest.newBuilder().setName("meta").build());
}

@Test
public void attachmentTest() {
final String key = "user-attachment";
final String value = "attachment-value";
RpcContext.getClientAttachment().setAttachment(key, value);
delegate.greetWithAttachment(GreeterRequest.newBuilder().setName("meta").build());
final String returned = (String) RpcContext.getServiceContext().getObjectAttachment(key);
Assert.assertEquals(value, returned);
}
}
@@ -16,11 +16,11 @@ message GreeterReply {
}

service PbGreeter{
rpc Greet(GreeterRequest) returns (GreeterReply);
rpc greet(GreeterRequest) returns (GreeterReply);

rpc GreetException(GreeterRequest) returns (GreeterReply);
rpc greetException(GreeterRequest) returns (GreeterReply);

rpc GreetStream(stream GreeterRequest) returns (stream GreeterReply);
rpc greetStream(stream GreeterRequest) returns (stream GreeterReply);

rpc GreetServerStream(GreeterRequest) returns (stream GreeterReply);
rpc greetServerStream(GreeterRequest) returns (stream GreeterReply);
}

0 comments on commit 2e932ca

Please sign in to comment.