Skip to content
1 change: 1 addition & 0 deletions src/libraries/System.Text.Json/src/System.Text.Json.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ The System.Text.Json library is built-in as part of the shared framework in .NET
<Compile Include="System\Text\Json\Schema\JsonSchemaExporterOptions.cs" />
<Compile Include="System\Text\Json\Schema\JsonSchemaExporterContext.cs" />
<Compile Include="System\Text\Json\Schema\JsonSchemaType.cs" />
<Compile Include="System\Text\Json\Serialization\AsyncEnumeratorState.cs" />
<Compile Include="System\Text\Json\Serialization\Arguments.cs" />
<Compile Include="System\Text\Json\Serialization\ArgumentState.cs" />
<Compile Include="System\Text\Json\Serialization\Attributes\JsonObjectCreationHandlingAttribute.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace System.Text.Json
{
/// <summary>
/// Tracks the state of an async enumerator within a <see cref="WriteStackFrame"/>.
/// </summary>
internal enum AsyncEnumeratorState : byte
{
/// <summary>
/// No async enumerator is active; the enumerator has not been created yet.
/// </summary>
None,

/// <summary>
/// The async enumerator has been created and is actively being iterated.
/// </summary>
Enumerating,

/// <summary>
/// The converter has been suspended due to a pending MoveNextAsync() task.
/// </summary>
PendingMoveNext,

/// <summary>
/// The converter has been suspended due to a pending DisposeAsync() task.
/// </summary>
PendingDisposal,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,47 +49,59 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TAsyncEnumerable va
IAsyncEnumerator<TElement> enumerator;
ValueTask<bool> moveNextTask;

if (state.Current.AsyncDisposable is null)
switch (state.Current.AsyncEnumeratorState)
{
enumerator = value.GetAsyncEnumerator(state.CancellationToken);
// async enumerators can only be disposed asynchronously;
// store in the WriteStack for future disposal
// by the root async serialization context.
state.Current.AsyncDisposable = enumerator;
// enumerator.MoveNextAsync() calls can throw,
// ensure the enumerator already is stored
// in the WriteStack for proper disposal.
moveNextTask = enumerator.MoveNextAsync();
case AsyncEnumeratorState.None:
enumerator = value.GetAsyncEnumerator(state.CancellationToken);
// async enumerators can only be disposed asynchronously;
// store in the WriteStack for disposal on exception.
state.Current.AsyncEnumerator = enumerator;
state.Current.AsyncEnumeratorState = AsyncEnumeratorState.Enumerating;
// enumerator.MoveNextAsync() calls can throw,
// ensure the enumerator already is stored
// in the WriteStack for proper disposal.
moveNextTask = enumerator.MoveNextAsync();

if (!moveNextTask.IsCompleted)
{
// It is common for first-time MoveNextAsync() calls to return pending tasks,
// since typically that is when underlying network connections are being established.
// For this case only, suppress flushing the current buffer contents (e.g. the leading '[' token of the written array)
// to give the stream owner the ability to recover in case of a connection error.
state.SuppressFlush = true;
goto SuspendDueToPendingTask;
}
break;

case AsyncEnumeratorState.PendingMoveNext:
Debug.Assert(state.Current.AsyncEnumerator is IAsyncEnumerator<TElement>);
enumerator = (IAsyncEnumerator<TElement>)state.Current.AsyncEnumerator;

if (!moveNextTask.IsCompleted)
{
// It is common for first-time MoveNextAsync() calls to return pending tasks,
// since typically that is when underlying network connections are being established.
// For this case only, suppress flushing the current buffer contents (e.g. the leading '[' token of the written array)
// to give the stream owner the ability to recover in case of a connection error.
state.SuppressFlush = true;
goto SuspendDueToPendingTask;
}
}
else
{
Debug.Assert(state.Current.AsyncDisposable is IAsyncEnumerator<TElement>);
enumerator = (IAsyncEnumerator<TElement>)state.Current.AsyncDisposable;

if (state.Current.AsyncEnumeratorIsPendingCompletion)
{
// converter was previously suspended due to a pending MoveNextAsync() task
Debug.Assert(state.PendingTask is Task<bool> && state.PendingTask.IsCompleted);
moveNextTask = new ValueTask<bool>((Task<bool>)state.PendingTask);
state.Current.AsyncEnumeratorIsPendingCompletion = false;
state.Current.AsyncEnumeratorState = AsyncEnumeratorState.Enumerating;
state.PendingTask = null;
}
else
{
break;

case AsyncEnumeratorState.PendingDisposal:
// Converter was previously suspended due to a pending DisposeAsync() task.
Debug.Assert(state.Current.AsyncEnumerator is null);
Debug.Assert(state.PendingTask is not null && state.PendingTask.IsCompleted);
state.PendingTask.GetAwaiter().GetResult();
state.Current.AsyncEnumeratorState = AsyncEnumeratorState.None;
state.PendingTask = null;
return true;

default:
Debug.Assert(state.Current.AsyncEnumeratorState == AsyncEnumeratorState.Enumerating);
Debug.Assert(state.Current.AsyncEnumerator is IAsyncEnumerator<TElement>);
enumerator = (IAsyncEnumerator<TElement>)state.Current.AsyncEnumerator;

// converter was suspended for a different reason;
// the last MoveNextAsync() call can only have completed with 'true'.
moveNextTask = new ValueTask<bool>(true);
}
break;
}

Debug.Assert(moveNextTask.IsCompleted);
Expand All @@ -100,10 +112,21 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TAsyncEnumerable va
{
if (!moveNextTask.Result)
{
// we have completed serialization for the enumerator,
// clear from the stack and schedule for async disposal.
state.Current.AsyncDisposable = null;
state.AddCompletedAsyncDisposable(enumerator);
// Enumeration complete, dispose the enumerator inline.
// Clear from the stack first to prevent double disposal on exception.
state.Current.AsyncEnumerator = null;
state.Current.AsyncEnumeratorState = AsyncEnumeratorState.None;
ValueTask disposeTask = enumerator.DisposeAsync();
if (!disposeTask.IsCompleted)
{
// DisposeAsync is pending; store as a pending task
// and yield control to the root-level async serialization loop.
state.PendingTask = disposeTask.AsTask();
state.Current.AsyncEnumeratorState = AsyncEnumeratorState.PendingDisposal;
return false;
}

disposeTask.GetAwaiter().GetResult();
return true;
}

Expand All @@ -128,7 +151,7 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TAsyncEnumerable va
// mark the current stackframe as pending completion.
Debug.Assert(state.PendingTask is null);
state.PendingTask = moveNextTask.AsTask();
state.Current.AsyncEnumeratorIsPendingCompletion = true;
state.Current.AsyncEnumeratorState = AsyncEnumeratorState.PendingMoveNext;
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ rootValue is not null &&
}
finally
{
// Await any pending resumable converter tasks (currently these can only be IAsyncEnumerator.MoveNextAsync() tasks).
// Await any pending resumable converter tasks (currently these can only be IAsyncEnumerator.MoveNextAsync() or DisposeAsync() tasks).
// Note that pending tasks are always awaited, even if an exception has been thrown or the cancellation token has fired.
if (state.PendingTask is not null)
{
Expand All @@ -210,12 +210,6 @@ rootValue is not null &&
catch { }
#endif
}

// Dispose any pending async disposables (currently these can only be completed IAsyncEnumerators).
if (state.CompletedAsyncDisposables?.Count > 0)
{
await state.DisposeCompletedAsyncDisposables().ConfigureAwait(false);
}
}

} while (!isFinalBlock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Runtime.ExceptionServices;
Expand Down Expand Up @@ -78,11 +77,6 @@ public readonly ref WriteStackFrame Parent
/// </summary>
public Task? PendingTask;

/// <summary>
/// List of completed IAsyncDisposables that have been scheduled for disposal by converters.
/// </summary>
public List<IAsyncDisposable>? CompletedAsyncDisposables;

/// <summary>
/// The amount of bytes to write before the underlying Stream should be flushed and the
/// current buffer adjusted to remove the processed bytes.
Expand Down Expand Up @@ -277,35 +271,6 @@ public void Pop(bool success)
}
}

public void AddCompletedAsyncDisposable(IAsyncDisposable asyncDisposable)
=> (CompletedAsyncDisposables ??= new List<IAsyncDisposable>()).Add(asyncDisposable);

// Asynchronously dispose of any AsyncDisposables that have been scheduled for disposal
public readonly async ValueTask DisposeCompletedAsyncDisposables()
{
Debug.Assert(CompletedAsyncDisposables?.Count > 0);
Exception? exception = null;

foreach (IAsyncDisposable asyncDisposable in CompletedAsyncDisposables)
{
try
{
await asyncDisposable.DisposeAsync().ConfigureAwait(false);
}
catch (Exception e)
{
exception = e;
}
}

if (exception is not null)
{
ExceptionDispatchInfo.Capture(exception).Throw();
}

CompletedAsyncDisposables.Clear();
}

/// <summary>
/// Walks the stack cleaning up any leftover IDisposables
/// in the event of an exception on serialization
Expand All @@ -314,7 +279,7 @@ public readonly void DisposePendingDisposablesOnException()
{
Exception? exception = null;

Debug.Assert(Current.AsyncDisposable is null);
Debug.Assert(Current.AsyncEnumerator is null);
DisposeFrame(Current.CollectionEnumerator, ref exception);

if (_stack is not null)
Expand All @@ -323,7 +288,7 @@ public readonly void DisposePendingDisposablesOnException()
int stackSize = Math.Max(currentIndex, _continuationCount);
for (int i = 0; i < stackSize; i++)
{
Debug.Assert(_stack[i].AsyncDisposable is null);
Debug.Assert(_stack[i].AsyncEnumerator is null);

if (i == currentIndex)
{
Expand Down Expand Up @@ -365,7 +330,7 @@ public readonly async ValueTask DisposePendingDisposablesOnExceptionAsync()
{
Exception? exception = null;

exception = await DisposeFrame(Current.CollectionEnumerator, Current.AsyncDisposable, exception).ConfigureAwait(false);
exception = await DisposeFrame(Current.CollectionEnumerator, Current.AsyncEnumerator, exception).ConfigureAwait(false);

if (_stack is not null)
{
Expand All @@ -378,11 +343,11 @@ public readonly async ValueTask DisposePendingDisposablesOnExceptionAsync()
{
// Matches the entry in Current, skip to avoid double disposal.
Debug.Assert(_stack[i].CollectionEnumerator is null || ReferenceEquals(Current.CollectionEnumerator, _stack[i].CollectionEnumerator));
Debug.Assert(_stack[i].AsyncDisposable is null || ReferenceEquals(Current.AsyncDisposable, _stack[i].AsyncDisposable));
Debug.Assert(_stack[i].AsyncEnumerator is null || ReferenceEquals(Current.AsyncEnumerator, _stack[i].AsyncEnumerator));
continue;
}

exception = await DisposeFrame(_stack[i].CollectionEnumerator, _stack[i].AsyncDisposable, exception).ConfigureAwait(false);
exception = await DisposeFrame(_stack[i].CollectionEnumerator, _stack[i].AsyncEnumerator, exception).ConfigureAwait(false);
}
}

Expand All @@ -391,17 +356,17 @@ public readonly async ValueTask DisposePendingDisposablesOnExceptionAsync()
ExceptionDispatchInfo.Capture(exception).Throw();
}

static async ValueTask<Exception?> DisposeFrame(IEnumerator? collectionEnumerator, IAsyncDisposable? asyncDisposable, Exception? exception)
static async ValueTask<Exception?> DisposeFrame(IEnumerator? collectionEnumerator, object? asyncEnumerator, Exception? exception)
{
Debug.Assert(!(collectionEnumerator is not null && asyncDisposable is not null));
Debug.Assert(!(collectionEnumerator is not null && asyncEnumerator is not null));

try
{
if (collectionEnumerator is IDisposable disposable)
{
disposable.Dispose();
}
else if (asyncDisposable is not null)
else if (asyncEnumerator is IAsyncDisposable asyncDisposable)
{
await asyncDisposable.DisposeAsync().ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ internal struct WriteStackFrame
public IEnumerator? CollectionEnumerator;

/// <summary>
/// The enumerator for resumable async disposables.
/// The async enumerator for resumable async enumerable collections.
/// </summary>
public IAsyncDisposable? AsyncDisposable;
public object? AsyncEnumerator;

/// <summary>
/// The current stackframe has suspended serialization due to a pending task,
/// stored in the <see cref="WriteStack.PendingTask"/> property.
/// The state of the async enumerator for the current stack frame.
/// </summary>
public bool AsyncEnumeratorIsPendingCompletion;
public AsyncEnumeratorState AsyncEnumeratorState;

/// <summary>
/// The original JsonPropertyInfo that is not changed. It contains all properties.
Expand Down
Loading
Loading