Skip to content
Permalink
Browse files
Merge pull request #372 from EarthChen/test-observer-cancel
[Triple] Add stream cancel case
  • Loading branch information
guohao committed Oct 25, 2021
2 parents 2785743 + 52983f4 commit d9e631093d56a2d72120cc356ba978479127119c
Showing 6 changed files with 251 additions and 59 deletions.
@@ -31,7 +31,7 @@
<properties>
<source.level>1.8</source.level>
<target.level>1.8</target.level>
<dubbo.version>3.0.2.1</dubbo.version>
<dubbo.version>3.0.4</dubbo.version>
<grpc.version>1.40.1</grpc.version>
<junit.version>4.12</junit.version>
<spring-test.version>4.3.16.RELEASE</spring-test.version>
@@ -153,4 +153,4 @@
</plugins>
</build>

</project>
</project>
@@ -1,5 +1,6 @@
package org.apache.dubbo.sample.tri.service;

import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.sample.tri.GreeterReply;
import org.apache.dubbo.sample.tri.GreeterRequest;

@@ -11,6 +12,23 @@ public interface PbGreeterManual {
GreeterReply greetWithAttachment(GreeterRequest request);

GreeterReply greetReturnBigAttachment(GreeterRequest request);

void cancelServerStream(GreeterRequest request, StreamObserver<GreeterReply> replyStream);

StreamObserver<GreeterRequest> cancelBiStream(StreamObserver<GreeterReply> replyStream);


StreamObserver<GreeterRequest> cancelBiStream2(StreamObserver<GreeterReply> replyStream);

/**
* only use by query cancel result
*
* @param request
* @return
*/
GreeterReply queryCancelResult(GreeterRequest request);


//
// GreeterReply greet(GreeterRequest request);

@@ -20,5 +38,4 @@ public interface PbGreeterManual {
//
// StreamObserver<GreeterRequest> greetStream(StreamObserver<GreeterReply> replyStream);
//
// void greetServerStream(GreeterRequest request, StreamObserver<GreeterReply> replyStream);
}
@@ -7,8 +7,13 @@
import org.apache.dubbo.sample.tri.PbGreeter;
import org.apache.dubbo.sample.tri.service.PbGreeterManual;

import java.util.HashMap;
import java.util.Map;

public class PbGreeterImpl implements PbGreeter, PbGreeterManual {

public static final Map<String, Boolean> cancelResultMap = new HashMap<>();

@Override
public GreeterReply greetWithAttachment(GreeterRequest request) {
final String key = "user-attachment";
@@ -30,8 +35,86 @@ public GreeterReply greetReturnBigAttachment(GreeterRequest request) {
}

@Override
public GreeterReply greet(GreeterRequest request) {
public void cancelServerStream(GreeterRequest request, StreamObserver<GreeterReply> replyStream) {
RpcContext.getCancellationContext().addListener(context -> {
System.out.println("cancel--cancelServerStream");
cancelResultMap.put("cancelServerStream", true);
});
for (int i = 0; i < 10; i++) {
replyStream.onNext(GreeterReply.newBuilder()
.setMessage(request.getName() + "--" + i)
.build());
}
// replyStream.onCompleted();
}

@Override
public StreamObserver<GreeterRequest> cancelBiStream(StreamObserver<GreeterReply> replyStream) {
System.out.println("-----cancelBiStream thread=" + Thread.currentThread().getName());
RpcContext.getCancellationContext()
.addListener(context -> {
System.out.println("cancel--cancelBiStream");
cancelResultMap.put("cancelBiStream", true);
});
return new StreamObserver<GreeterRequest>() {
@Override
public void onNext(GreeterRequest data) {
replyStream.onNext(GreeterReply.newBuilder()
.setMessage(data.getName())
.build());
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
replyStream.onError(new IllegalStateException("Stream err"));
}

@Override
public void onCompleted() {
// replyStream.onCompleted();
}
};
}

@Override
public StreamObserver<GreeterRequest> cancelBiStream2(StreamObserver<GreeterReply> replyStream) {
RpcContext.getCancellationContext()
.addListener(context -> {
System.out.println("cancel--cancelBiStream2");
cancelResultMap.put("cancelBiStream2", true);
});
return new StreamObserver<GreeterRequest>() {
@Override
public void onNext(GreeterRequest data) {
replyStream.onNext(GreeterReply.newBuilder()
.setMessage(data.getName())
.build());
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
replyStream.onError(new IllegalStateException("Stream err"));
}

@Override
public void onCompleted() {
// replyStream.onCompleted();
}
};
}

@Override
public GreeterReply queryCancelResult(GreeterRequest request) {
boolean canceled = cancelResultMap.getOrDefault(request.getName(), false);
return GreeterReply.newBuilder()
.setMessage(String.valueOf(canceled))
.build();
}

@Override
public GreeterReply greet(GreeterRequest request) {
return GreeterReply.newBuilder()
.setMessage(request.getName())
.build();
@@ -52,10 +135,14 @@ public GreeterReply greetException(GreeterRequest request) {
@Override
public StreamObserver<GreeterRequest> greetStream(StreamObserver<GreeterReply> replyStream) {
return new StreamObserver<GreeterRequest>() {
int n = 0;

@Override
public void onNext(GreeterRequest data) {
n++;
System.out.println(data.getName() + " " + n);
replyStream.onNext(GreeterReply.newBuilder()
.setMessage(data.getName())
.setMessage(data.getName() + " " + n)
.build());
}

@@ -67,6 +154,7 @@ public void onError(Throwable throwable) {

@Override
public void onCompleted() {
System.out.println("[greetStream] onCompleted");
replyStream.onCompleted();
}
};
@@ -4,6 +4,7 @@
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
import org.apache.dubbo.sample.tri.helper.StdoutStreamObserver;
import org.apache.dubbo.sample.tri.service.PbGreeterManual;
import org.junit.AfterClass;
@@ -33,35 +34,157 @@ public void serverStream() throws InterruptedException {
final GreeterRequest request = GreeterRequest.newBuilder()
.setName("request")
.build();
delegate.greetServerStream(request, new StdoutStreamObserver<GreeterReply>("sayGreeterServerStream") {
StreamObserver<GreeterReply> observer = new StdoutStreamObserver<GreeterReply>("sayGreeterServerStream") {
@Override
public void onNext(GreeterReply data) {
super.onNext(data);
RpcContext.getCancellationContext().cancel(null);
latch.countDown();
}
});
};

delegate.greetServerStream(request, observer);
Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
}


@Test
public void cancelServerStream() throws InterruptedException {
final GreeterRequest request = GreeterRequest.newBuilder()
.setName("request")
.build();
int n = 10;
StreamObserver<GreeterReply> observer = new CancelableStreamObserver<GreeterReply>() {
@Override
public void onNext(GreeterReply data) {
System.out.println(data);
cancel(null);
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}

@Override
public void onCompleted() {
System.out.println("onCompleted");
}
};
delegateManual.cancelServerStream(request, observer);
Thread.sleep(2000);
GreeterReply reply = delegateManual.queryCancelResult(
GreeterRequest.newBuilder()
.setName("cancelServerStream")
.build()
);
Assert.assertEquals("true", reply.getMessage());
}


@Test
public void cancelBiStream() throws InterruptedException {
int n = 10;
final GreeterRequest request = GreeterRequest.newBuilder()
.setName("stream request")
.build();
StreamObserver<GreeterReply> observer = new CancelableStreamObserver<GreeterReply>() {
@Override
public void onNext(GreeterReply data) {
System.out.println(data);
cancel(null);
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}

@Override
public void onCompleted() {
System.out.println("onCompleted");
}
};
final StreamObserver<GreeterRequest> requestObserver =
delegateManual.cancelBiStream(observer);
CancelableStreamObserver<GreeterRequest> streamObserver =
(CancelableStreamObserver<GreeterRequest>) requestObserver;
for (int i = 0; i < n; i++) {
streamObserver.onNext(request);
}
streamObserver.onCompleted();
Thread.sleep(2000);
GreeterReply reply = delegateManual.queryCancelResult(
GreeterRequest.newBuilder()
.setName("cancelBiStream")
.build()
);
Assert.assertEquals("true", reply.getMessage());
}


@Test
public void cancelBiStream2() throws InterruptedException {
int n = 10;
final GreeterRequest request = GreeterRequest.newBuilder()
.setName("stream request")
.build();
StreamObserver<GreeterReply> observer = new CancelableStreamObserver<GreeterReply>() {
@Override
public void onNext(GreeterReply data) {
System.out.println(data);
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}

@Override
public void onCompleted() {
System.out.println("onCompleted");
}
};
final StreamObserver<GreeterRequest> requestObserver =
delegateManual.cancelBiStream2(observer);
CancelableStreamObserver<GreeterRequest> streamObserver =
(CancelableStreamObserver<GreeterRequest>) requestObserver;
for (int i = 0; i < n; i++) {
streamObserver.onNext(request);
}
streamObserver.cancel(null);
// streamObserver.onCompleted();
Thread.sleep(2000);
GreeterReply reply = delegateManual.queryCancelResult(
GreeterRequest.newBuilder()
.setName("cancelBiStream2")
.build()
);
Assert.assertEquals("true", reply.getMessage());
}


@Test
public void stream() throws InterruptedException {
int n = 10;
CountDownLatch latch = new CountDownLatch(n);
final GreeterRequest request = GreeterRequest.newBuilder()
.setName("stream request")
.build();
final StreamObserver<GreeterRequest> requestObserver = delegate.greetStream(new StdoutStreamObserver<GreeterReply>("sayGreeterStream") {
StreamObserver<GreeterReply> observer = new StdoutStreamObserver<GreeterReply>("tri pb stream") {
@Override
public void onNext(GreeterReply data) {
super.onNext(data);
latch.countDown();
}
});
};
final StreamObserver<GreeterRequest> requestObserver =
delegate.greetStream(observer);
for (int i = 0; i < n; i++) {
requestObserver.onNext(request);
}
requestObserver.onCompleted();
Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
}

@Test
@@ -86,11 +209,13 @@ public void clientSendLargeSizeHeader() {
}


@Test(expected = RpcException.class)
// @Test(expected = RpcException.class)
@Test
@Ignore
public void serverSendLargeSizeHeader() {
final String key = "user-attachment";
GreeterReply reply = delegateManual.greetReturnBigAttachment(GreeterRequest.newBuilder().setName("meta").build());
GreeterReply reply =
delegateManual.greetReturnBigAttachment(GreeterRequest.newBuilder().setName("meta").build());
final String returned = (String) RpcContext.getServerContext().getObjectAttachment(key);
Assert.assertNotNull(returned);
}
@@ -102,7 +227,7 @@ public void attachmentTest() {
RpcContext.removeClientAttachment();
RpcContext.getClientAttachment().setAttachment(key, value);
GreeterReply reply = delegate.greetWithAttachment(GreeterRequest.newBuilder().setName("meta").build());
Assert.assertEquals("hello,meta",reply.getMessage());
Assert.assertEquals("hello,meta", reply.getMessage());
final String returned = (String) RpcContext.getServerContext().getObjectAttachment(key);
Assert.assertEquals("hello," + value, returned);
}
@@ -114,7 +239,7 @@ public void attachmentTest2() {
RpcContext.removeClientAttachment();
RpcContext.getClientAttachment().setAttachment(key, value);
GreeterReply reply = delegateManual.greetWithAttachment(GreeterRequest.newBuilder().setName("meta").build());
Assert.assertEquals("hello,meta",reply.getMessage());
Assert.assertEquals("hello,meta", reply.getMessage());
final String returned = (String) RpcContext.getServerContext().getObjectAttachment(key);
Assert.assertEquals("hello," + value, returned);
}
@@ -131,7 +256,7 @@ public void methodNotFound() {

@AfterClass
public static void alterTest() {
appDubboBootstrap.stop();
appDubboBootstrap.destroy();
DubboBootstrap.reset();
}

0 comments on commit d9e6310

Please sign in to comment.