Skip to content

Commit

Permalink
Implement asynchronous support in ODataNotificationReader (#2324)
Browse files Browse the repository at this point in the history
* Implement asynchronous support in ODataNotificationReader

* Address review comments
  • Loading branch information
gathogojr committed Feb 25, 2022
1 parent 7e433d5 commit 8f2401f
Show file tree
Hide file tree
Showing 2 changed files with 241 additions and 8 deletions.
52 changes: 44 additions & 8 deletions src/Microsoft.OData.Core/ODataNotificationReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,42 @@

namespace Microsoft.OData
{
using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;

/// <summary>
/// Wrapper for TextReader to listen for dispose.
/// </summary>
#if NETSTANDARD2_0
internal sealed class ODataNotificationReader : TextReader, IAsyncDisposable
#else
internal sealed class ODataNotificationReader : TextReader
#endif
{
private readonly TextReader textReader;
private IODataStreamListener listener;
private readonly IODataStreamListener listener;
private readonly bool synchronous;
private bool disposed = false;

internal ODataNotificationReader(TextReader textReader, IODataStreamListener listener)
/// <summary>
/// Initializes a new instance of the <see cref="ODataNotificationReader"/> class.
/// </summary>
/// <param name="textReader">The wrapped text reader.</param>
/// <param name="listener">Listener to notify when the text reader is being disposed.</param>
/// <param name="synchronous">true if execution context is synchronous; otherwise false.</param>
/// <remarks>
/// When an instance of this class is disposed, it in turn disposes the wrapped text reader.
/// </remarks>
internal ODataNotificationReader(TextReader textReader, IODataStreamListener listener, bool synchronous = true)
{
Debug.Assert(textReader != null, "Creating a notification reader for a null textReader.");
Debug.Assert(listener != null, "Creating a notification reader with a null textReader.");
Debug.Assert(listener != null, "Creating a notification reader with a null listener.");

this.textReader = textReader;
this.listener = listener;
this.synchronous = synchronous;
}

/// <inheritdoc/>
Expand Down Expand Up @@ -113,18 +130,37 @@ public override Task<string> ReadToEndAsync()
/// <param name="disposing">True if called from Dispose; false if called from the finalizer.</param>
protected override void Dispose(bool disposing)
{
if (disposing)
if (!this.disposed && disposing)
{
if (this.listener != null)
// Tell the listener that the stream is being disposed.
if (synchronous)
{
// Tell the listener that the stream is being disposed.
this.listener.StreamDisposed();
this.listener = null;
}
else
{
this.listener.StreamDisposedAsync().Wait();
}
}

this.textReader.Dispose();
this.disposed = true;
this.textReader?.Dispose();
base.Dispose(disposing);
}

#if NETSTANDARD2_0
public async ValueTask DisposeAsync()
{
if (!this.disposed)
{
await this.listener.StreamDisposedAsync()
.ConfigureAwait(false);
}

// Dispose unmanaged resources
// Pass `false` to ensure functional equivalence with the synchronous dispose pattern
this.Dispose(false);
}
#endif
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
//---------------------------------------------------------------------
// <copyright file="ODataNotificationReaderTests.cs" company="Microsoft">
// Copyright (C) Microsoft Corporation. All rights reserved. See License.txt in the project root for license information.
// </copyright>
//---------------------------------------------------------------------

using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Xunit;

namespace Microsoft.OData.Tests
{
public class ODataNotificationReaderTests : IDisposable
{
private MemoryStream stream;
private TextReader reader;
private TextWriter streamListenerWriter;
private IODataStreamListener streamListener;

public ODataNotificationReaderTests()
{
this.stream = new MemoryStream();
this.reader = new StreamReader(this.stream, encoding: Encoding.UTF8, detectEncodingFromByteOrderMarks: true, bufferSize: 1024, leaveOpen: true);
this.streamListenerWriter = new StreamWriter(this.stream, encoding: Encoding.UTF8, bufferSize: 1024, leaveOpen: true);
this.streamListener = new MockODataStreamListener(this.streamListenerWriter);
}

[Theory]
[InlineData(true, "StreamDisposed")]
[InlineData(false, "StreamDisposedAsync")]
public void NotificationReaderDisposeShouldInvokeStreamDisposed(bool synchronous, string expected)
{
// We care about the notification reader being disposed
// We don't care about the reader passed to the notification reader
using (var notificationReader = new ODataNotificationReader(
this.reader,
this.streamListener,
synchronous))
{
}

var result = ReadStreamContents();

Assert.Equal(expected, result);
}

[Theory]
[InlineData(true, "StreamDisposed")]
[InlineData(false, "StreamDisposedAsync")]
public void NotificationReaderDisposeShouldBeIdempotent(bool synchronous, string expected)
{
var notificationReader = new ODataNotificationReader(
this.reader,
this.streamListener,
synchronous);

// 1st call to Dispose
notificationReader.Dispose();
// 2nd call to Dispose
notificationReader.Dispose();

var result = ReadStreamContents();

// StreamDisposed/StreamDisposeAsync was written only once
Assert.Equal(expected, result);
}

#if NETCOREAPP3_1
[Fact]
public async Task NotificationReaderDisposeShouldInvokeStreamDisposedAsync()
{
await using (var notificationReader = new ODataNotificationReader(
this.reader,
this.streamListener)) // `synchronous` argument becomes irrelevant since we'll directly call DisposeAsync
{
}

var result = await this.ReadStreamContentsAsync();

Assert.Equal("StreamDisposedAsync", result);
}

[Fact]
public async Task NotificationReaderDisposeAsyncShouldBeIdempotent()
{
var notificationReader = new ODataNotificationReader(
this.reader,
this.streamListener);

// 1st call to DisposeAsync
await notificationReader.DisposeAsync();
// 2nd call to DisposeAsync
await notificationReader.DisposeAsync();

var result = await this.ReadStreamContentsAsync();

// StreamDisposeAsync was written only once
Assert.Equal("StreamDisposedAsync", result);
}

#else
[Fact]
public async Task NotificationReaderDisposeShouldInvokeStreamDisposedAsync()
{
using (var notificationReader = new ODataNotificationReader(
this.reader,
this.streamListener,
/*synchronous*/ false))
{
}

var result = await this.ReadStreamContentsAsync();

Assert.Equal("StreamDisposedAsync", result);
}
#endif

public void Dispose() // Fired after every test is ran
{
this.streamListenerWriter.Dispose();
this.reader.Dispose();
this.stream.Dispose();
}

private string ReadStreamContents()
{
string streamContents;

using (var reader = new StreamReader(
this.stream,
encoding: Encoding.UTF8,
detectEncodingFromByteOrderMarks: true,
bufferSize: 1024,
leaveOpen: true))
{

this.stream.Position = 0;
streamContents = reader.ReadToEnd();
}

return streamContents;
}

private async Task<string> ReadStreamContentsAsync()
{
string streamContents;

using (var reader = new StreamReader(
this.stream,
encoding: Encoding.UTF8,
detectEncodingFromByteOrderMarks: true,
bufferSize: 1024,
leaveOpen: true))
{

this.stream.Position = 0;
streamContents = await reader.ReadToEndAsync();
}

return streamContents;
}

private class MockODataStreamListener : IODataStreamListener
{
private TextWriter writer;

public MockODataStreamListener(TextWriter writer)
{
this.writer = writer;
}

public void StreamDisposed()
{
writer.Write("StreamDisposed");
writer.Flush();
}

public async Task StreamDisposedAsync()
{
await writer.WriteAsync("StreamDisposedAsync").ConfigureAwait(false);
await writer.FlushAsync().ConfigureAwait(false);
}

public void StreamRequested()
{
throw new NotImplementedException();
}

public Task StreamRequestedAsync()
{
throw new NotImplementedException();
}
}
}
}

0 comments on commit 8f2401f

Please sign in to comment.