Skip to content
Permalink
Browse files
add stream server cancel test
  • Loading branch information
EarthChen committed Sep 29, 2021
1 parent 379b449 commit eee0994729fa9881155ee0c07d092f5e9bf9e21d
Showing 6 changed files with 85 additions and 13 deletions.
@@ -31,7 +31,7 @@
<properties>
<source.level>1.8</source.level>
<target.level>1.8</target.level>
<dubbo.version>3.0.2.1</dubbo.version>
<dubbo.version>3.0.4-SNAPSHOT</dubbo.version>
<grpc.version>1.40.1</grpc.version>
<junit.version>4.12</junit.version>
<spring-test.version>4.3.16.RELEASE</spring-test.version>
@@ -1,5 +1,6 @@
package org.apache.dubbo.sample.tri.service;

import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.sample.tri.GreeterReply;
import org.apache.dubbo.sample.tri.GreeterRequest;

@@ -11,6 +12,10 @@ public interface PbGreeterManual {
GreeterReply greetWithAttachment(GreeterRequest request);

GreeterReply greetReturnBigAttachment(GreeterRequest request);

void cancelServerStream(GreeterRequest request, StreamObserver<GreeterReply> replyStream);


//
// GreeterReply greet(GreeterRequest request);

@@ -20,5 +25,4 @@ public interface PbGreeterManual {
//
// StreamObserver<GreeterRequest> greetStream(StreamObserver<GreeterReply> replyStream);
//
// void greetServerStream(GreeterRequest request, StreamObserver<GreeterReply> replyStream);
}
@@ -30,8 +30,18 @@ public GreeterReply greetReturnBigAttachment(GreeterRequest request) {
}

@Override
public GreeterReply greet(GreeterRequest request) {
public void cancelServerStream(GreeterRequest request, StreamObserver<GreeterReply> replyStream) {
for (int i = 0; i < 10; i++) {
replyStream.onNext(GreeterReply.newBuilder()
.setMessage(request.getName() + "--" + i)
.build());
}
replyStream.onCompleted();
}

@Override
public GreeterReply greet(GreeterRequest request) {
RpcContext.getServiceContext().getCancellationContext().addListener(context -> System.out.println("---greet---------cancel"));
return GreeterReply.newBuilder()
.setMessage(request.getName())
.build();
@@ -51,6 +61,8 @@ public GreeterReply greetException(GreeterRequest request) {

@Override
public StreamObserver<GreeterRequest> greetStream(StreamObserver<GreeterReply> replyStream) {
RpcContext.getServiceContext().getCancellationContext()
.addListener(context -> System.out.println("greetStream---------cancel"));
return new StreamObserver<GreeterRequest>() {
@Override
public void onNext(GreeterRequest data) {
@@ -18,7 +18,7 @@
#

###set log levels###
log4j.rootLogger=info, stdout
log4j.rootLogger=debug, stdout
###output to the console###
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
@@ -2,8 +2,10 @@

import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.protocol.tri.AbstractStreamObserver;
import org.apache.dubbo.sample.tri.helper.StdoutStreamObserver;
import org.apache.dubbo.sample.tri.service.PbGreeterManual;
import org.junit.AfterClass;
@@ -33,13 +35,47 @@ public void serverStream() throws InterruptedException {
final GreeterRequest request = GreeterRequest.newBuilder()
.setName("request")
.build();
delegate.greetServerStream(request, new StdoutStreamObserver<GreeterReply>("sayGreeterServerStream") {
StreamObserver<GreeterReply> observer = new StdoutStreamObserver<GreeterReply>("sayGreeterServerStream") {
@Override
public void onNext(GreeterReply data) {
super.onNext(data);
RpcContext.getCancellationContext().cancel(null);
latch.countDown();
}
});
};

delegate.greetServerStream(request, observer);
Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
}


@Test
public void cancelServerStream() throws InterruptedException {
final GreeterRequest request = GreeterRequest.newBuilder()
.setName("request")
.build();
int n = 10;
CountDownLatch latch = new CountDownLatch(n);
StreamObserver<GreeterReply> observer = new AbstractStreamObserver<GreeterReply>() {
@Override
public void onNext(GreeterReply data) {
System.out.println(data);
CancellationContext cancellationContext = getCancellationContext();
cancel(null);
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}

@Override
public void onCompleted() {
System.out.println("onCompleted");
}
};
delegateManual.cancelServerStream(request, observer);
Thread.sleep(100000);
Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
}

@@ -50,18 +86,34 @@ public void stream() throws InterruptedException {
final GreeterRequest request = GreeterRequest.newBuilder()
.setName("stream request")
.build();
final StreamObserver<GreeterRequest> requestObserver = delegate.greetStream(new StdoutStreamObserver<GreeterReply>("sayGreeterStream") {
StreamObserver<GreeterReply> observer = new AbstractStreamObserver<GreeterReply>() {
@Override
public void onNext(GreeterReply data) {
super.onNext(data);
latch.countDown();
System.out.println(data);
cancel(null);
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
});

@Override
public void onCompleted() {
System.out.println("onCompleted");
}
};
final StreamObserver<GreeterRequest> requestObserver =
delegate.greetStream(observer);

for (int i = 0; i < n; i++) {
requestObserver.onNext(request);
if (i == n - 1) {
RpcContext.getServiceContext().getCancellationContext().cancel(null);
}
}
requestObserver.onCompleted();
Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
}

@Test
@@ -86,7 +138,8 @@ public void clientSendLargeSizeHeader() {
}


@Test(expected = RpcException.class)
// @Test(expected = RpcException.class)
@Test
@Ignore
public void serverSendLargeSizeHeader() {
final String key = "user-attachment";
@@ -18,14 +18,17 @@ public static void init() {
ref.setCheck(false);
ref.setProtocol(CommonConstants.TRIPLE);
ref.setLazy(true);
ref.setTimeout(3000);
ref.setTimeout(20000);
ref.setRetries(0);


ReferenceConfig<PbGreeterManual> ref2 = new ReferenceConfig<>();
ref2.setInterface(PbGreeterManual.class);
ref2.setCheck(false);
ref2.setProtocol(CommonConstants.TRIPLE);
ref2.setLazy(true);
ref2.setTimeout(3000);
ref2.setRetries(0);

DubboBootstrap bootstrap = DubboBootstrap.getInstance();
ApplicationConfig applicationConfig = new ApplicationConfig(TriPbConsumerTest.class.getName());

0 comments on commit eee0994

Please sign in to comment.