-
Notifications
You must be signed in to change notification settings - Fork 739
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
64 changed files
with
6,187 additions
and
2,139 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
113 changes: 113 additions & 0 deletions
113
Ix.NET/Source/System.Interactive.Async/AsyncEnumerableHelpers.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
// Licensed to the .NET Foundation under one or more agreements. | ||
// The .NET Foundation licenses this file to you under the Apache 2.0 License. | ||
// See the LICENSE file in the project root for more information. | ||
|
||
using System.Linq; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
namespace System.Collections.Generic | ||
{ | ||
// Based on https://github.com/dotnet/corefx/blob/ec2685715b01d12f16b08d0dfa326649b12db8ec/src/Common/src/System/Collections/Generic/EnumerableHelpers.cs | ||
internal static class AsyncEnumerableHelpers | ||
{ | ||
internal static async Task<T[]> ToArray<T>(IAsyncEnumerable<T> source, CancellationToken cancellationToken) | ||
{ | ||
var result = await ToArrayWithLength(source, cancellationToken) | ||
.ConfigureAwait(false); | ||
Array.Resize(ref result.array, result.length); | ||
return result.array; | ||
} | ||
|
||
internal static async Task<ArrayWithLength<T>> ToArrayWithLength<T>(IAsyncEnumerable<T> source, CancellationToken cancellationToken) | ||
{ | ||
var result = new ArrayWithLength<T>(); | ||
// Check for short circuit optimizations. This one is very unlikely | ||
// but could be here as a group | ||
var ic = source as ICollection<T>; | ||
if (ic != null) | ||
{ | ||
var count = ic.Count; | ||
if (count != 0) | ||
{ | ||
// Allocate an array of the desired size, then copy the elements into it. Note that this has the same | ||
// issue regarding concurrency as other existing collections like List<T>. If the collection size | ||
// concurrently changes between the array allocation and the CopyTo, we could end up either getting an | ||
// exception from overrunning the array (if the size went up) or we could end up not filling as many | ||
// items as 'count' suggests (if the size went down). This is only an issue for concurrent collections | ||
// that implement ICollection<T>, which as of .NET 4.6 is just ConcurrentDictionary<TKey, TValue>. | ||
result.array = new T[count]; | ||
ic.CopyTo(result.array, 0); | ||
result.length = count; | ||
return result; | ||
} | ||
} | ||
else | ||
{ | ||
using (var en = source.GetEnumerator()) | ||
{ | ||
if (await en.MoveNext(cancellationToken) | ||
.ConfigureAwait(false)) | ||
{ | ||
const int DefaultCapacity = 4; | ||
var arr = new T[DefaultCapacity]; | ||
arr[0] = en.Current; | ||
var count = 1; | ||
|
||
while (await en.MoveNext(cancellationToken) | ||
.ConfigureAwait(false)) | ||
{ | ||
if (count == arr.Length) | ||
{ | ||
// MaxArrayLength is defined in Array.MaxArrayLength and in gchelpers in CoreCLR. | ||
// It represents the maximum number of elements that can be in an array where | ||
// the size of the element is greater than one byte; a separate, slightly larger constant, | ||
// is used when the size of the element is one. | ||
const int MaxArrayLength = 0x7FEFFFFF; | ||
|
||
// This is the same growth logic as in List<T>: | ||
// If the array is currently empty, we make it a default size. Otherwise, we attempt to | ||
// double the size of the array. Doubling will overflow once the size of the array reaches | ||
// 2^30, since doubling to 2^31 is 1 larger than Int32.MaxValue. In that case, we instead | ||
// constrain the length to be MaxArrayLength (this overflow check works because of the | ||
// cast to uint). Because a slightly larger constant is used when T is one byte in size, we | ||
// could then end up in a situation where arr.Length is MaxArrayLength or slightly larger, such | ||
// that we constrain newLength to be MaxArrayLength but the needed number of elements is actually | ||
// larger than that. For that case, we then ensure that the newLength is large enough to hold | ||
// the desired capacity. This does mean that in the very rare case where we've grown to such a | ||
// large size, each new element added after MaxArrayLength will end up doing a resize. | ||
var newLength = count << 1; | ||
if ((uint)newLength > MaxArrayLength) | ||
{ | ||
newLength = MaxArrayLength <= count ? count + 1 : MaxArrayLength; | ||
} | ||
|
||
Array.Resize(ref arr, newLength); | ||
} | ||
|
||
arr[count++] = en.Current; | ||
} | ||
|
||
result.length = count; | ||
result.array = arr; | ||
return result; | ||
} | ||
} | ||
} | ||
|
||
result.length = 0; | ||
#if NO_ARRAY_EMPTY | ||
result.array = EmptyArray<T>.Value; | ||
#else | ||
result.array = Array.Empty<T>(); | ||
#endif | ||
return result; | ||
} | ||
|
||
internal struct ArrayWithLength<T> | ||
{ | ||
public T[] array; | ||
public int length; | ||
} | ||
} | ||
} |
138 changes: 138 additions & 0 deletions
138
Ix.NET/Source/System.Interactive.Async/AsyncIterator.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
// Licensed to the .NET Foundation under one or more agreements. | ||
// The .NET Foundation licenses this file to you under the Apache 2.0 License. | ||
// See the LICENSE file in the project root for more information. | ||
|
||
using System.Collections.Generic; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
namespace System.Linq | ||
{ | ||
public static partial class AsyncEnumerable | ||
{ | ||
internal abstract class AsyncIterator<TSource> : IAsyncEnumerable<TSource>, IAsyncEnumerator<TSource> | ||
{ | ||
private readonly int threadId; | ||
|
||
private CancellationTokenSource cancellationTokenSource; | ||
private bool currentIsInvalid = true; | ||
|
||
internal TSource current; | ||
internal AsyncIteratorState state = AsyncIteratorState.New; | ||
|
||
protected AsyncIterator() | ||
{ | ||
threadId = Environment.CurrentManagedThreadId; | ||
} | ||
|
||
public IAsyncEnumerator<TSource> GetEnumerator() | ||
{ | ||
var enumerator = state == AsyncIteratorState.New && threadId == Environment.CurrentManagedThreadId ? | ||
this : | ||
Clone(); | ||
|
||
enumerator.state = AsyncIteratorState.Allocated; | ||
enumerator.cancellationTokenSource = new CancellationTokenSource(); | ||
|
||
try | ||
{ | ||
enumerator.OnGetEnumerator(); | ||
} | ||
catch | ||
{ | ||
enumerator.Dispose(); | ||
throw; | ||
} | ||
|
||
return enumerator; | ||
} | ||
|
||
|
||
public virtual void Dispose() | ||
{ | ||
if (cancellationTokenSource != null) | ||
{ | ||
if (!cancellationTokenSource.IsCancellationRequested) | ||
{ | ||
cancellationTokenSource.Cancel(); | ||
} | ||
cancellationTokenSource.Dispose(); | ||
} | ||
|
||
current = default(TSource); | ||
state = AsyncIteratorState.Disposed; | ||
} | ||
|
||
public TSource Current | ||
{ | ||
get | ||
{ | ||
if (currentIsInvalid) | ||
throw new InvalidOperationException("Enumerator is in an invalid state"); | ||
return current; | ||
} | ||
} | ||
|
||
public async Task<bool> MoveNext(CancellationToken cancellationToken) | ||
{ | ||
// Note: MoveNext *must* be implemented as an async method to ensure | ||
// that any exceptions thrown from the MoveNextCore call are handled | ||
// by the try/catch, whether they're sync or async | ||
|
||
if (state == AsyncIteratorState.Disposed) | ||
{ | ||
return false; | ||
} | ||
|
||
using (cancellationToken.Register(Dispose)) | ||
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cancellationTokenSource.Token)) | ||
{ | ||
try | ||
{ | ||
// Short circuit and don't even call MoveNexCore | ||
cancellationToken.ThrowIfCancellationRequested(); | ||
|
||
var result = await MoveNextCore(cts.Token) | ||
.ConfigureAwait(false); | ||
|
||
currentIsInvalid = !result; // if move next is false, invalid otherwise valid | ||
|
||
return result; | ||
} | ||
catch | ||
{ | ||
currentIsInvalid = true; | ||
Dispose(); | ||
throw; | ||
} | ||
} | ||
} | ||
|
||
public abstract AsyncIterator<TSource> Clone(); | ||
|
||
public virtual IAsyncEnumerable<TResult> Select<TResult>(Func<TSource, TResult> selector) | ||
{ | ||
return new SelectEnumerableAsyncIterator<TSource, TResult>(this, selector); | ||
} | ||
|
||
public virtual IAsyncEnumerable<TSource> Where(Func<TSource, bool> predicate) | ||
{ | ||
return new WhereEnumerableAsyncIterator<TSource>(this, predicate); | ||
} | ||
|
||
protected abstract Task<bool> MoveNextCore(CancellationToken cancellationToken); | ||
|
||
protected virtual void OnGetEnumerator() | ||
{ | ||
} | ||
} | ||
|
||
internal enum AsyncIteratorState | ||
{ | ||
New = 0, | ||
Allocated = 1, | ||
Iterating = 2, | ||
Disposed = -1 | ||
} | ||
} | ||
} |
Oops, something went wrong.