Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CancellationToken to TextReader.ReadXAsync #61898

Merged
merged 10 commits into from Jan 25, 2022
15 changes: 15 additions & 0 deletions src/libraries/System.Console/src/System/IO/SyncTextReader.cs
Expand Up @@ -3,6 +3,7 @@

using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace System.IO
Expand Down Expand Up @@ -95,11 +96,25 @@ public override string ReadToEnd()
return Task.FromResult(ReadLine());
}

public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken)
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
{
return cancellationToken.IsCancellationRequested ?
ValueTask.FromCanceled<string?>(cancellationToken) :
new ValueTask<string?>(ReadLine());
}

public override Task<string> ReadToEndAsync()
{
return Task.FromResult(ReadToEnd());
}

public override Task<string> ReadToEndAsync(CancellationToken cancellationToken)
{
return cancellationToken.IsCancellationRequested ?
Task.FromCanceled<string>(cancellationToken) :
Task.FromResult(ReadToEnd());
}

public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
{
if (buffer == null)
Expand Down
58 changes: 58 additions & 0 deletions src/libraries/System.IO/tests/StreamReader/StreamReaderTests.cs
Expand Up @@ -110,6 +110,43 @@ public async Task ReadToEndAsync()
Assert.Equal(5000, result.Length);
}

[Fact]
public async Task ReadToEndAsync_WithCancellationToken()
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
{
using var sw = new StreamReader(GetLargeStream());
var result = await sw.ReadToEndAsync(default);

Assert.Equal(5000, result.Length);
}

[Fact]
public async Task ReadToEndAsync_WithCanceledCancellationToken()
{
using var sw = new StreamReader(GetLargeStream());
using var cts = new CancellationTokenSource();
cts.Cancel();
var token = cts.Token;

var ex = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await sw.ReadToEndAsync(token));
Assert.Equal(token, ex.CancellationToken);
}

[Fact]
public async Task ReadToEndAsync_WithCancellation()
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
{
string path = GetTestFilePath();

// create large (~100MB) file
File.WriteAllLines(path, Enumerable.Repeat("A very large file used for testing StreamReader cancellation. 0123456789012345678901234567890123456789.", 1_000_000));

using StreamReader reader = File.OpenText(path);
using CancellationTokenSource cts = new (TimeSpan.FromMilliseconds(50));
var token = cts.Token;

var ex = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await reader.ReadToEndAsync(token));
Assert.Equal(token, ex.CancellationToken);
}

[Fact]
public void GetBaseStream()
{
Expand Down Expand Up @@ -301,6 +338,27 @@ public void VanillaReadLines2()
Assert.Equal(valueString.Substring(1, valueString.IndexOf('\r') - 1), data);
}

[Fact]
public async Task VanillaReadLineAsync()
{
var baseInfo = GetCharArrayStream();
var sr = baseInfo.Item2;

string valueString = new string(baseInfo.Item1);

var data = await sr.ReadLineAsync();
Assert.Equal(valueString.Substring(0, valueString.IndexOf('\r')), data);

data = await sr.ReadLineAsync(default);
Assert.Equal(valueString.Substring(valueString.IndexOf('\r') + 1, 3), data);

data = await sr.ReadLineAsync();
Assert.Equal(valueString.Substring(valueString.IndexOf('\n') + 1, 2), data);

data = await sr.ReadLineAsync(default);
Assert.Equal((valueString.Substring(valueString.LastIndexOf('\n') + 1)), data);
}

[Fact]
public async Task ContinuousNewLinesAndTabsAsync()
{
Expand Down
Expand Up @@ -68,6 +68,23 @@ public static void ReadLine()
}
}

[Fact]
public static async Task ReadLineAsync()
{
string str1 = "Hello\0\t\v \\ World";
string str2 = str1 + Environment.NewLine + str1;

using (StringReader sr = new StringReader(str1))
{
Assert.Equal(str1, await sr.ReadLineAsync());
}
using (StringReader sr = new StringReader(str2))
{
Assert.Equal(str1, await sr.ReadLineAsync(default));
Assert.Equal(str1, await sr.ReadLineAsync(default));
}
}

[Fact]
public static void ReadPseudoRandomString()
{
Expand Down Expand Up @@ -155,6 +172,14 @@ public static void ReadToEndEmptyString()
Assert.Equal(str1, sr.ReadToEnd());
}

[Fact]
public static async Task ReadToEndAsyncString()
{
string str1 = "Hello\0\t\v \\ World";
StringReader sr = new StringReader(str1);
Assert.Equal(str1, await sr.ReadToEndAsync(default));
}

[Fact]
public static void Closed_DisposedExceptions()
{
Expand Down Expand Up @@ -278,6 +303,8 @@ public async Task Precanceled_ThrowsException()

await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadAsync(Memory<char>.Empty, new CancellationToken(true)).AsTask());
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadBlockAsync(Memory<char>.Empty, new CancellationToken(true)).AsTask());
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadLineAsync(new CancellationToken(true)).AsTask());
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadToEndAsync(new CancellationToken(true)));
}

private static void ValidateDisposedExceptions(StringReader sr)
Expand Down
21 changes: 21 additions & 0 deletions src/libraries/System.IO/tests/TextReader/TextReaderTests.cs
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Threading;
using System.Threading.Tasks;
using Xunit;

Expand Down Expand Up @@ -54,6 +55,26 @@ public async Task ReadToEndAsync()
}
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task ReadToEndAsync_WithCancellationToken()
{
using var tr = new CharArrayTextReader(TestDataProvider.LargeData);
var result = await tr.ReadToEndAsync(default);
Assert.Equal(5000, result.Length);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task ReadToEndAsync_WithCanceledCancellationToken()
{
using var tr = new CharArrayTextReader(TestDataProvider.LargeData);
using var cts = new CancellationTokenSource();
cts.Cancel();
var token = cts.Token;

var ex = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await tr.ReadToEndAsync(token));
Assert.Equal(token, ex.CancellationToken);
}

[Fact]
public void TestRead()
{
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/System.Private.CoreLib/src/System/IO/File.cs
Expand Up @@ -649,7 +649,7 @@ private static async Task<string[]> InternalReadAllLinesAsync(string path, Encod
cancellationToken.ThrowIfCancellationRequested();
string? line;
List<string> lines = new List<string>();
while ((line = await sr.ReadLineAsync().ConfigureAwait(false)) != null)
while ((line = await sr.ReadLineAsync(cancellationToken).ConfigureAwait(false)) != null)
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
{
lines.Add(line);
cancellationToken.ThrowIfCancellationRequested();
Expand Down
31 changes: 18 additions & 13 deletions src/libraries/System.Private.CoreLib/src/System/IO/StreamReader.cs
Expand Up @@ -845,29 +845,32 @@ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)
return sb.ToString();
}

public override Task<string?> ReadLineAsync()
public override Task<string?> ReadLineAsync() =>
ReadLineAsync(default).AsTask();

public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken)
{
// If we have been inherited into a subclass, the following implementation could be incorrect
// since it does not call through to Read() which a subclass might have overridden.
// To be safe we will only use this implementation in cases where we know it is safe to do so,
// and delegate to our base class (which will call into Read) when we are not sure.
if (GetType() != typeof(StreamReader))
{
return base.ReadLineAsync();
return base.ReadLineAsync(cancellationToken);
}

ThrowIfDisposed();
CheckAsyncTaskInProgress();

Task<string?> task = ReadLineAsyncInternal();
Task<string?> task = ReadLineAsyncInternal(cancellationToken).AsTask();
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
_asyncReadTask = task;

return task;
return new ValueTask<string?>(task);
}

private async Task<string?> ReadLineAsyncInternal()
private async ValueTask<string?> ReadLineAsyncInternal(CancellationToken cancellationToken)
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
{
if (_charPos == _charLen && (await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false)) == 0)
if (_charPos == _charLen && (await ReadBufferAsync(cancellationToken).ConfigureAwait(false)) == 0)
{
return null;
}
Expand Down Expand Up @@ -903,7 +906,7 @@ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)

_charPos = tmpCharPos = i + 1;

if (ch == '\r' && (tmpCharPos < tmpCharLen || (await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false)) > 0))
if (ch == '\r' && (tmpCharPos < tmpCharLen || (await ReadBufferAsync(cancellationToken).ConfigureAwait(false)) > 0))
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
{
tmpCharPos = _charPos;
if (_charBuffer[tmpCharPos] == '\n')
Expand All @@ -921,32 +924,34 @@ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)
i = tmpCharLen - tmpCharPos;
sb ??= new StringBuilder(i + 80);
sb.Append(tmpCharBuffer, tmpCharPos, i);
} while (await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false) > 0);
} while (await ReadBufferAsync(cancellationToken).ConfigureAwait(false) > 0);

return sb.ToString();
}

public override Task<string> ReadToEndAsync()
public override Task<string> ReadToEndAsync() => ReadToEndAsync(default);

public override Task<string> ReadToEndAsync(CancellationToken cancellationToken)
{
// If we have been inherited into a subclass, the following implementation could be incorrect
// since it does not call through to Read() which a subclass might have overridden.
// To be safe we will only use this implementation in cases where we know it is safe to do so,
// and delegate to our base class (which will call into Read) when we are not sure.
if (GetType() != typeof(StreamReader))
{
return base.ReadToEndAsync();
return base.ReadToEndAsync(cancellationToken);
}

ThrowIfDisposed();
CheckAsyncTaskInProgress();

Task<string> task = ReadToEndAsyncInternal();
Task<string> task = ReadToEndAsyncInternal(cancellationToken);
_asyncReadTask = task;

return task;
}

private async Task<string> ReadToEndAsyncInternal()
private async Task<string> ReadToEndAsyncInternal(CancellationToken cancellationToken)
{
// Call ReadBuffer, then pull data out of charBuffer.
StringBuilder sb = new StringBuilder(_charLen - _charPos);
Expand All @@ -955,7 +960,7 @@ private async Task<string> ReadToEndAsyncInternal()
int tmpCharPos = _charPos;
sb.Append(_charBuffer, tmpCharPos, _charLen - tmpCharPos);
_charPos = _charLen; // We consumed these characters
await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false);
await ReadBufferAsync(cancellationToken).ConfigureAwait(false);
} while (_charLen > 0);

return sb.ToString();
Expand Down
10 changes: 10 additions & 0 deletions src/libraries/System.Private.CoreLib/src/System/IO/StringReader.cs
Expand Up @@ -224,11 +224,21 @@ public override string ReadToEnd()
return Task.FromResult(ReadLine());
}

public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken) =>
cancellationToken.IsCancellationRequested
? ValueTask.FromCanceled<string?>(cancellationToken)
: new ValueTask<string?>(ReadLine());

public override Task<string> ReadToEndAsync()
{
return Task.FromResult(ReadToEnd());
}

public override Task<string> ReadToEndAsync(CancellationToken cancellationToken) =>
cancellationToken.IsCancellationRequested
? Task.FromCanceled<string>(cancellationToken)
: Task.FromResult(ReadToEnd());

public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
{
if (buffer == null)
Expand Down
23 changes: 19 additions & 4 deletions src/libraries/System.Private.CoreLib/src/System/IO/TextReader.cs
Expand Up @@ -203,18 +203,25 @@ public virtual int ReadBlock(Span<char> buffer)
}

#region Task based Async APIs
public virtual Task<string?> ReadLineAsync() =>
public virtual Task<string?> ReadLineAsync() => ReadLineCoreAsync(default);

public virtual ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken) =>
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
new ValueTask<string?>(ReadLineCoreAsync(cancellationToken));

private Task<string?> ReadLineCoreAsync(CancellationToken cancellationToken) =>
Task<string?>.Factory.StartNew(static state => ((TextReader)state!).ReadLine(), this,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);

public virtual async Task<string> ReadToEndAsync()
public virtual Task<string> ReadToEndAsync() => ReadToEndAsync(default);

public virtual async Task<string> ReadToEndAsync(CancellationToken cancellationToken)
{
var sb = new StringBuilder(4096);
char[] chars = ArrayPool<char>.Shared.Rent(4096);
try
{
int len;
while ((len = await ReadAsyncInternal(chars, default).ConfigureAwait(false)) != 0)
while ((len = await ReadAsyncInternal(chars, cancellationToken).ConfigureAwait(false)) != 0)
{
sb.Append(chars, 0, len);
}
Expand Down Expand Up @@ -368,9 +375,17 @@ protected override void Dispose(bool disposing)
[MethodImpl(MethodImplOptions.Synchronized)]
public override Task<string?> ReadLineAsync() => Task.FromResult(ReadLine());

[MethodImpl(MethodImplOptions.Synchronized)]
public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken)
=> cancellationToken.IsCancellationRequested ? ValueTask.FromCanceled<string?>(cancellationToken) : new ValueTask<string?>(ReadLine());

[MethodImpl(MethodImplOptions.Synchronized)]
public override Task<string> ReadToEndAsync() => Task.FromResult(ReadToEnd());

[MethodImpl(MethodImplOptions.Synchronized)]
public override Task<string> ReadToEndAsync(CancellationToken cancellationToken)
=> cancellationToken.IsCancellationRequested ? Task.FromCanceled<string>(cancellationToken) : Task.FromResult(ReadToEnd());

[MethodImpl(MethodImplOptions.Synchronized)]
public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
{
Expand Down
6 changes: 6 additions & 0 deletions src/libraries/System.Runtime/ref/System.Runtime.cs
Expand Up @@ -10855,8 +10855,10 @@ public partial class StreamReader : System.IO.TextReader
public override System.Threading.Tasks.ValueTask<int> ReadBlockAsync(System.Memory<char> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override string? ReadLine() { throw null; }
public override System.Threading.Tasks.Task<string?> ReadLineAsync() { throw null; }
public override System.Threading.Tasks.ValueTask<string?> ReadLineAsync(System.Threading.CancellationToken cancellationToken) { throw null; }
public override string ReadToEnd() { throw null; }
public override System.Threading.Tasks.Task<string> ReadToEndAsync() { throw null; }
public override System.Threading.Tasks.Task<string> ReadToEndAsync(System.Threading.CancellationToken cancellationToken) { throw null; }
}
public partial class StreamWriter : System.IO.TextWriter
{
Expand Down Expand Up @@ -10920,8 +10922,10 @@ public partial class StringReader : System.IO.TextReader
public override System.Threading.Tasks.ValueTask<int> ReadBlockAsync(System.Memory<char> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override string? ReadLine() { throw null; }
public override System.Threading.Tasks.Task<string?> ReadLineAsync() { throw null; }
public override System.Threading.Tasks.ValueTask<string?> ReadLineAsync(System.Threading.CancellationToken cancellationToken) { throw null; }
public override string ReadToEnd() { throw null; }
public override System.Threading.Tasks.Task<string> ReadToEndAsync() { throw null; }
public override System.Threading.Tasks.Task<string> ReadToEndAsync(System.Threading.CancellationToken cancellationToken) { throw null; }
}
public partial class StringWriter : System.IO.TextWriter
{
Expand Down Expand Up @@ -10972,8 +10976,10 @@ public abstract partial class TextReader : System.MarshalByRefObject, System.IDi
public virtual System.Threading.Tasks.ValueTask<int> ReadBlockAsync(System.Memory<char> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual string? ReadLine() { throw null; }
public virtual System.Threading.Tasks.Task<string?> ReadLineAsync() { throw null; }
public virtual System.Threading.Tasks.ValueTask<string?> ReadLineAsync(System.Threading.CancellationToken cancellationToken) { throw null; }
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
public virtual string ReadToEnd() { throw null; }
public virtual System.Threading.Tasks.Task<string> ReadToEndAsync() { throw null; }
public virtual System.Threading.Tasks.Task<string> ReadToEndAsync(System.Threading.CancellationToken cancellationToken) { throw null; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few other TextReader-derived types in dotnet/runtime, e.g. a couple in System.Console, and NullTextReader in corelib. We should likely override the new overloads on those as well. (I'm not sure why NullTextReader doesn't already override most of the virtuals, but it seems it should, for perf.) Same goes for NullStreamReader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why NullTextReader doesn't already override most of the virtuals, but it seems it should, for perf.

Is it OK to add all the overrides as part of this PR, or should that be separate work (and this PR just add ReadXAsync)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a commit at bgrainger@92e92cc ready to go once this PR is merged.

public static System.IO.TextReader Synchronized(System.IO.TextReader reader) { throw null; }
}
public abstract partial class TextWriter : System.MarshalByRefObject, System.IAsyncDisposable, System.IDisposable
Expand Down