Skip to content

Commit

Permalink
feat: add stream method for ServerStream (#1575)
Browse files Browse the repository at this point in the history
* feat: add stream method for ServerStream

* add integration test

* fix integration test

* fix format

* add javadoc

* fix format

* add exception

* remove unused exception in tests
  • Loading branch information
JoeWang1127 committed Apr 11, 2023
1 parent ffeb820 commit e38c8ec
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 0 deletions.
Expand Up @@ -31,6 +31,8 @@

import com.google.api.core.InternalApi;
import java.util.Iterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;

/**
Expand Down Expand Up @@ -89,6 +91,15 @@ public Iterator<V> iterator() {
return iterator;
}

/**
* Returns a sequential {@code Stream} with server responses as its source.
*
* @return a sequential {@code Stream} over the elements in server responses
*/
public Stream<V> stream() {
return StreamSupport.stream(this.spliterator(), false);
}

/**
* Returns true if the next call to the iterator's hasNext() or next() is guaranteed to be
* nonblocking.
Expand Down
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -109,6 +110,31 @@ public List<Integer> call() {
Truth.assertThat(results).containsExactly(0, 1, 2, 3, 4);
}

@Test
public void testMultipleItemStreamMethod() throws Exception {
Future<Void> producerFuture =
executor.submit(
() -> {
for (int i = 0; i < 5; i++) {
int requestCount = controller.popLastPull();

Truth.assertWithMessage("ServerStream should request one item at a time")
.that(requestCount)
.isEqualTo(1);

stream.observer().onResponse(i);
}
stream.observer().onComplete();
return null;
});
Future<List<Integer>> consumerFuture =
executor.submit(() -> stream.stream().collect(Collectors.toList()));

producerFuture.get(60, TimeUnit.SECONDS);
List<Integer> results = consumerFuture.get();
Truth.assertThat(results).containsExactly(0, 1, 2, 3, 4);
}

@Test
public void testEarlyTermination() throws Exception {
Future<Void> taskFuture =
Expand Down
Expand Up @@ -30,6 +30,7 @@
import com.google.showcase.v1beta1.it.util.TestClientInitializer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
Expand Down Expand Up @@ -71,6 +72,18 @@ public void testGrpc_receiveStreamedContent() {
.inOrder();
}

@Test
public void testGrpc_receiveStreamedContentStreamAPI() {
String content = "The rain in Spain stays mainly on the plain!";
ServerStream<EchoResponse> responseStream =
grpcClient.expandCallable().call(ExpandRequest.newBuilder().setContent(content).build());
assertThat(responseStream.stream().map(EchoResponse::getContent).collect(Collectors.toList()))
.containsExactlyElementsIn(
ImmutableList.of(
"The", "rain", "in", "Spain", "stays", "mainly", "on", "the", "plain!"))
.inOrder();
}

@Test
public void testGrpc_serverError_receiveErrorAfterLastWordInStream() {
String content = "The rain in Spain";
Expand Down

0 comments on commit e38c8ec

Please sign in to comment.