Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement asynchronous support in ODataNotificationReader #2324

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 44 additions & 8 deletions src/Microsoft.OData.Core/ODataNotificationReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,42 @@

namespace Microsoft.OData
{
using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;

/// <summary>
/// Wrapper for TextReader to listen for dispose.
/// </summary>
#if NETSTANDARD2_0
internal sealed class ODataNotificationReader : TextReader, IAsyncDisposable
#else
internal sealed class ODataNotificationReader : TextReader
#endif
{
private readonly TextReader textReader;
private IODataStreamListener listener;
private readonly IODataStreamListener listener;
private readonly bool synchronous;
private bool disposed = false;

internal ODataNotificationReader(TextReader textReader, IODataStreamListener listener)
/// <summary>
/// Initializes a new instance of the <see cref="ODataNotificationReader"/> class.
/// </summary>
/// <param name="textReader">The wrapped text reader.</param>
/// <param name="listener">Listener to notify when the text reader is being disposed.</param>
/// <param name="synchronous">true if execution context is synchronous; otherwise false.</param>
/// <remarks>
/// When an instance of this class is disposed, it in turn disposes the wrapped text reader.
/// </remarks>
internal ODataNotificationReader(TextReader textReader, IODataStreamListener listener, bool synchronous = true)
{
Debug.Assert(textReader != null, "Creating a notification reader for a null textReader.");
Debug.Assert(listener != null, "Creating a notification reader with a null textReader.");
Debug.Assert(listener != null, "Creating a notification reader with a null listener.");

this.textReader = textReader;
this.listener = listener;
this.synchronous = synchronous;
}

/// <inheritdoc/>
Expand Down Expand Up @@ -113,18 +130,37 @@ public override Task<string> ReadToEndAsync()
/// <param name="disposing">True if called from Dispose; false if called from the finalizer.</param>
protected override void Dispose(bool disposing)
{
if (disposing)
if (!this.disposed && disposing)
gathogojr marked this conversation as resolved.
Show resolved Hide resolved
{
if (this.listener != null)
// Tell the listener that the stream is being disposed.
if (synchronous)
{
// Tell the listener that the stream is being disposed.
this.listener.StreamDisposed();
this.listener = null;
}
else
{
this.listener.StreamDisposedAsync().Wait();
}
}

this.textReader.Dispose();
this.disposed = true;
this.textReader?.Dispose();
base.Dispose(disposing);
}

#if NETSTANDARD2_0
public async ValueTask DisposeAsync()
{
if (!this.disposed)
{
await this.listener.StreamDisposedAsync()
.ConfigureAwait(false);
}

// Dispose unmanaged resources
// Pass `false` to ensure functional equivalence with the synchronous dispose pattern
this.Dispose(false);
}
#endif
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
//---------------------------------------------------------------------
// <copyright file="ODataNotificationReaderTests.cs" company="Microsoft">
// Copyright (C) Microsoft Corporation. All rights reserved. See License.txt in the project root for license information.
// </copyright>
//---------------------------------------------------------------------

using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Xunit;

namespace Microsoft.OData.Tests
{
public class ODataNotificationReaderTests : IDisposable
{
private MemoryStream stream;
private TextReader reader;
private TextWriter streamListenerWriter;
private IODataStreamListener streamListener;

public ODataNotificationReaderTests()
{
this.stream = new MemoryStream();
gathogojr marked this conversation as resolved.
Show resolved Hide resolved
this.reader = new StreamReader(this.stream, encoding: Encoding.UTF8, detectEncodingFromByteOrderMarks: true, bufferSize: 1024, leaveOpen: true);
this.streamListenerWriter = new StreamWriter(this.stream, encoding: Encoding.UTF8, bufferSize: 1024, leaveOpen: true);
this.streamListener = new MockODataStreamListener(this.streamListenerWriter);
}

[Theory]
[InlineData(true, "StreamDisposed")]
[InlineData(false, "StreamDisposedAsync")]
public void NotificationReaderDisposeShouldInvokeStreamDisposed(bool synchronous, string expected)
{
// We care about the notification reader being disposed
// We don't care about the reader passed to the notification reader
using (var notificationReader = new ODataNotificationReader(
this.reader,
this.streamListener,
synchronous))
{
}

var result = ReadStreamContents();

Assert.Equal(expected, result);
}

[Theory]
[InlineData(true, "StreamDisposed")]
[InlineData(false, "StreamDisposedAsync")]
public void NotificationReaderDisposeShouldBeIdempotent(bool synchronous, string expected)
{
var notificationReader = new ODataNotificationReader(
this.reader,
this.streamListener,
synchronous);

// 1st call to Dispose
notificationReader.Dispose();
// 2nd call to Dispose
notificationReader.Dispose();

var result = ReadStreamContents();

// StreamDisposed/StreamDisposeAsync was written only once
Assert.Equal(expected, result);
}

#if NETCOREAPP3_1
gathogojr marked this conversation as resolved.
Show resolved Hide resolved
[Fact]
public async Task NotificationReaderDisposeShouldInvokeStreamDisposedAsync()
{
await using (var notificationReader = new ODataNotificationReader(
this.reader,
this.streamListener)) // `synchronous` argument becomes irrelevant since we'll directly call DisposeAsync
{
}

var result = await this.ReadStreamContentsAsync();

Assert.Equal("StreamDisposedAsync", result);
}

[Fact]
public async Task NotificationReaderDisposeAsyncShouldBeIdempotent()
{
var notificationReader = new ODataNotificationReader(
this.reader,
this.streamListener);

// 1st call to DisposeAsync
await notificationReader.DisposeAsync();
// 2nd call to DisposeAsync
await notificationReader.DisposeAsync();

var result = await this.ReadStreamContentsAsync();

// StreamDisposeAsync was written only once
Assert.Equal("StreamDisposedAsync", result);
}

#else
[Fact]
public async Task NotificationReaderDisposeShouldInvokeStreamDisposedAsync()
{
using (var notificationReader = new ODataNotificationReader(
this.reader,
this.streamListener,
/*synchronous*/ false))
{
}

var result = await this.ReadStreamContentsAsync();

Assert.Equal("StreamDisposedAsync", result);
}
#endif

public void Dispose() // Fired after every test is ran
{
this.streamListenerWriter.Dispose();
this.reader.Dispose();
this.stream.Dispose();
}

private string ReadStreamContents()
{
string streamContents;

using (var reader = new StreamReader(
this.stream,
encoding: Encoding.UTF8,
detectEncodingFromByteOrderMarks: true,
bufferSize: 1024,
leaveOpen: true))
{

this.stream.Position = 0;
streamContents = reader.ReadToEnd();
}

return streamContents;
}

private async Task<string> ReadStreamContentsAsync()
{
string streamContents;

using (var reader = new StreamReader(
this.stream,
encoding: Encoding.UTF8,
detectEncodingFromByteOrderMarks: true,
bufferSize: 1024,
leaveOpen: true))
{

this.stream.Position = 0;
streamContents = await reader.ReadToEndAsync();
}

return streamContents;
}

private class MockODataStreamListener : IODataStreamListener
{
private TextWriter writer;

public MockODataStreamListener(TextWriter writer)
{
this.writer = writer;
}

public void StreamDisposed()
{
writer.Write("StreamDisposed");
writer.Flush();
}

public async Task StreamDisposedAsync()
{
await writer.WriteAsync("StreamDisposedAsync").ConfigureAwait(false);
await writer.FlushAsync().ConfigureAwait(false);
}

public void StreamRequested()
{
throw new NotImplementedException();
}

public Task StreamRequestedAsync()
{
throw new NotImplementedException();
}
}
}
}