Skip to content

Commit

Permalink
Return false instead of throwing InvalidOperationException on read of…
Browse files Browse the repository at this point in the history
… a gRPC server stream after the connection was closed
  • Loading branch information
erikmav committed Sep 30, 2021
1 parent ac3b3f4 commit 0c5f7d5
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ async Task<bool> MoveNextAsync(ValueTask<TRequest?> readStreamTask)

if (_completed || _serverCallContext.CancellationToken.IsCancellationRequested)
{
return Task.FromException<bool>(new InvalidOperationException("Can't read messages after the request is complete."));
// gRPC specification indicates that MoveNext() should not throw. Simply return false.
return CommonGrpcProtocolHelpers.FalseTask;
}

var request = _serverCallContext.HttpContext.Request.BodyReader.ReadStreamMessageAsync(_serverCallContext, _deserializer, cancellationToken);
Expand Down
22 changes: 11 additions & 11 deletions test/FunctionalTests/Client/StreamingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace Grpc.AspNetCore.FunctionalTests.Client
public class StreamingTests : FunctionalTestBase
{
[Test]
public async Task DuplexStream_SendLargeFileBatchedAndRecieveLargeFileBatched_Success()
public async Task DuplexStream_SendLargeFileBatchedAndReceiveLargeFileBatched_Success()
{
// Arrange
var data = CreateTestData(1024 * 1024 * 1); // 1 MB
Expand Down Expand Up @@ -306,7 +306,7 @@ await foreach (var message in call.ResponseStream.ReadAllAsync().DefaultTimeout(
[TestCase(1)]
[TestCase(5)]
[TestCase(20)]
public async Task DuplexStreaming_SimultaniousSendAndReceiveInParallel_Success(int tasks)
public async Task DuplexStreaming_SimultaneousSendAndReceiveInParallel_Success(int tasks)
{
// Arrange
const int total = 1024 * 1024 * 1;
Expand All @@ -316,7 +316,7 @@ public async Task DuplexStreaming_SimultaniousSendAndReceiveInParallel_Success(i

var client = new StreamService.StreamServiceClient(Channel);

await TestHelpers.RunParallel(tasks, async taskIndex =>
await TestHelpers.RunParallel(tasks, async _ =>
{
var (sent, received) = await EchoData(total, data, client).DefaultTimeout();
Expand Down Expand Up @@ -421,7 +421,7 @@ await foreach (var message in requestStream.ReadAllAsync())
[Test]
public async Task DuplexStreaming_ParallelCallsFromOneChannel_Success()
{
async Task UnaryDeadlineExceeded(IAsyncStreamReader<DataMessage> requestStream, IServerStreamWriter<DataMessage> responseStream, ServerCallContext context)
static async Task UnaryDeadlineExceeded(IAsyncStreamReader<DataMessage> requestStream, IServerStreamWriter<DataMessage> responseStream, ServerCallContext context)
{
await foreach (var message in requestStream.ReadAllAsync())
{
Expand Down Expand Up @@ -460,7 +460,7 @@ await foreach (var message in requestStream.ReadAllAsync())
[Test]
public async Task ServerStreaming_GetTrailersAndStatus_Success()
{
async Task ServerStreamingWithTrailers(DataMessage request, IServerStreamWriter<DataMessage> responseStream, ServerCallContext context)
static async Task ServerStreamingWithTrailers(DataMessage request, IServerStreamWriter<DataMessage> responseStream, ServerCallContext context)
{
await responseStream.WriteAsync(new DataMessage());
context.ResponseTrailers.Add("my-trailer", "value");
Expand Down Expand Up @@ -625,7 +625,7 @@ async Task ServerStreamingWithTrailers(DataMessage request, IServerStreamWriter<

[TestCase(true)]
[TestCase(false)]
public async Task ClientStreaming_ReadAfterMethodComplete_Error(bool readBeforeExit)
public async Task ClientStreaming_ReadAfterMethodComplete_False(bool readBeforeExit)
{
SetExpectedErrorsFilter(writeContext =>
{
Expand All @@ -641,7 +641,7 @@ public async Task ClientStreaming_ReadAfterMethodComplete_Error(bool readBeforeE
var tcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
var readTcs = new TaskCompletionSource<Task>(TaskCreationOptions.RunContinuationsAsynchronously);
var syncPoint = new SyncPoint(runContinuationsAsynchronously: true);
async Task<DataMessage> ClientStreamingWithTrailers(IAsyncStreamReader<DataMessage> requestStream, ServerCallContext context)
async Task<DataMessage> ClientStreamingWithTrailersAsync(IAsyncStreamReader<DataMessage> requestStream, ServerCallContext context)
{
var readTask = Task.Run(async () =>
{
Expand All @@ -661,7 +661,7 @@ async Task<DataMessage> ClientStreamingWithTrailers(IAsyncStreamReader<DataMessa
}

// Arrange
var method = Fixture.DynamicGrpc.AddClientStreamingMethod<DataMessage, DataMessage>(ClientStreamingWithTrailers);
var method = Fixture.DynamicGrpc.AddClientStreamingMethod<DataMessage, DataMessage>(ClientStreamingWithTrailersAsync);

var channel = CreateChannel();

Expand All @@ -680,13 +680,13 @@ async Task<DataMessage> ClientStreamingWithTrailers(IAsyncStreamReader<DataMessa

tcs.SetResult(null);

var response = await call;
DataMessage response = await call;
Assert.IsNotNull(response);

syncPoint.Continue();

var readTask = await readTcs.Task.DefaultTimeout();
var ex = await ExceptionAssert.ThrowsAsync<InvalidOperationException>(() => readTask).DefaultTimeout();
Assert.AreEqual("Can't read messages after the request is complete.", ex.Message);
await readTask.DefaultTimeout();

var clientException = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.RequestStream.WriteAsync(new DataMessage())).DefaultTimeout();
Assert.AreEqual(StatusCode.OK, clientException.StatusCode);
Expand Down

0 comments on commit 0c5f7d5

Please sign in to comment.