Skip to content

Commit

Permalink
Add issue quarkusio#31422 reproducer.
Browse files Browse the repository at this point in the history
  • Loading branch information
alesj committed Mar 9, 2023
1 parent a0b71ff commit dcb8e62
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 1 deletion.
@@ -1,10 +1,13 @@
package io.quarkus.grpc.example.streaming;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

import io.grpc.examples.streaming.Empty;
import io.grpc.examples.streaming.Item;
import io.grpc.examples.streaming.MutinyStreamingGrpc;
import io.grpc.examples.streaming.StringReply;
import io.grpc.examples.streaming.StringRequest;
import io.quarkus.grpc.GrpcService;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -36,4 +39,25 @@ public Multi<Item> pipe(Multi<Item> request) {
.onItem().scan(() -> 0L, Long::sum)
.onItem().transform(l -> Item.newBuilder().setValue(Long.toString(l)).build());
}

@Override
public Uni<StringReply> stringStream(Multi<StringRequest> request) {
AtomicInteger atomicInteger = new AtomicInteger(0);
return request
// .call(() -> Uni.createFrom().failure(new RuntimeException("Any error")))
.map(x -> {
if (atomicInteger.get() == 30) {
throw new RuntimeException("We reached 30, error here");
}
return StringReply.newBuilder()
.setMessage(x.toString())
.build();
})
.collect().asList()
.replaceWith(StringReply.newBuilder()
.setMessage("DONE")
.build())
.onFailure()
.invoke(th -> System.err.println(th.getMessage()));
}
}
11 changes: 10 additions & 1 deletion integration-tests/grpc-streaming/src/main/proto/streaming.proto
Expand Up @@ -12,11 +12,20 @@ service Streaming {
rpc Source(Empty) returns (stream Item) {}
rpc Sink(stream Item) returns (Empty) {}
rpc Pipe(stream Item) returns (stream Item) {}
rpc StringStream (stream StringRequest) returns (StringReply) {}
}

message Item {
string value = 1;
}

message Empty {
}
}

message StringRequest {
string anyValue = 1;
}

message StringReply {
string message = 1;
}
Expand Up @@ -4,3 +4,12 @@ quarkus.grpc.clients.streaming.port=9001
%vertx.quarkus.grpc.clients.streaming.port=8081
%vertx.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=true
%vertx.quarkus.grpc.server.use-separate-server=false

%n2o.quarkus.grpc.server.use-separate-server=true
%o2n.quarkus.grpc.server.use-separate-server=false

%n2o.quarkus.grpc.clients.streaming.port=9001
%n2o.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=true

%o2n.quarkus.grpc.clients.streaming.port=8081
%o2n.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=false
@@ -0,0 +1,7 @@
package io.quarkus.grpc.example.streaming;

import io.quarkus.test.junit.QuarkusTest;

@QuarkusTest
public class LongStreamTest extends LongStreamTestBase {
}
@@ -0,0 +1,37 @@
package io.quarkus.grpc.example.streaming;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.grpc.examples.streaming.Streaming;
import io.grpc.examples.streaming.StringRequest;
import io.quarkus.grpc.GrpcClient;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

@SuppressWarnings("NewClassNamingConvention")
public class LongStreamTestBase {
private final Logger log = LoggerFactory.getLogger(getClass());

@GrpcClient("streaming")
Streaming streamSvc;

@Test
public void testFailure() {
Multi<StringRequest> multi = Multi.createFrom().range(1, 10000)
// delaying stream to make it a bit longer
.call(() -> Uni.createFrom().nullItem().onItem().delayIt().by(Duration.of(1000, ChronoUnit.NANOS)))
.map(x -> StringRequest.newBuilder()
.setAnyValue(x.toString())
.build())
// .invoke(x -> log.info("Stream piece number is: " + x.getAnyValue()));

streamSvc.stringStream(multi)
.subscribe().with(ok -> log.info("All is ok: {}", ok), th -> log.error(th.getMessage(), th));
}

}
@@ -0,0 +1,10 @@
package io.quarkus.grpc.example.streaming;

import io.quarkus.grpc.test.utils.N2OGRPCTestProfile;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

@QuarkusTest
@TestProfile(N2OGRPCTestProfile.class)
public class N2OLongStreamTest extends LongStreamTestBase {
}
@@ -0,0 +1,10 @@
package io.quarkus.grpc.example.streaming;

import io.quarkus.grpc.test.utils.O2NGRPCTestProfile;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

@QuarkusTest
@TestProfile(O2NGRPCTestProfile.class)
public class O2NLongStreamTest extends LongStreamTestBase {
}
@@ -0,0 +1,10 @@
package io.quarkus.grpc.example.streaming;

import io.quarkus.grpc.test.utils.VertxGRPCTestProfile;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

@QuarkusTest
@TestProfile(VertxGRPCTestProfile.class)
public class VertxLongStreamTest extends LongStreamTestBase {
}

0 comments on commit dcb8e62

Please sign in to comment.