Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Internal/ExceptionHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Linq;

namespace System.Reactive
{
/// <summary>
/// Utility methods to handle lock-free combining of Exceptions
/// as well as hosting a terminal-exception indicator for
/// lock-free termination support.
/// </summary>
internal static class ExceptionHelper
{
/// <summary>
/// The singleton instance of the exception indicating a terminal state,
/// DO NOT LEAK or signal this via OnError!
/// </summary>
public static Exception Terminated { get; } = new TerminatedException();

/// <summary>
/// Tries to atomically set the Exception on the given field if it is
/// still null.
/// </summary>
/// <param name="field">The target field to try to set atomically.</param>
/// <param name="ex">The exception to set, not null (not verified).</param>
/// <returns>True if the operation succeeded, false if the target was not null.</returns>
public static bool TrySetException(ref Exception field, Exception ex)
{
return Interlocked.CompareExchange(ref field, ex, null) == null;
}

/// <summary>
/// Atomically swaps in the Terminated exception into the field and
/// returns the previous exception in that field (which could be the
/// Terminated instance too).
/// </summary>
/// <param name="field">The target field to terminate.</param>
/// <returns>The previous exception in that field before the termination.</returns>
public static Exception Terminate(ref Exception field)
{
var current = Volatile.Read(ref field);
if (current != Terminated)
{
current = Interlocked.Exchange(ref field, Terminated);
}
return current;
}

/// <summary>
/// Atomically sets the field to the given new exception or combines
/// it with any pre-existing exception as a new AggregateException
/// unless the field contains the Terminated instance.
/// </summary>
/// <param name="field">The field to set or combine with.</param>
/// <param name="ex">The exception to combine with.</param>
/// <returns>True if successful, false if the field contains the Terminated instance.</returns>
/// <remarks>This type of atomic aggregation helps with operators that
/// want to delay all errors until all of their sources terminate in some way.</remarks>
public static bool TryAddException(ref Exception field, Exception ex)
{
for (; ; )
{
var current = Volatile.Read(ref field);
if (current == Terminated)
{
return false;
}

var b = default(Exception);

if (current == null)
{
b = ex;
}
else
if (current is AggregateException a)
{
var list = new List<Exception>(a.InnerExceptions);
list.Add(ex);
b = new AggregateException(list);
}
else
{
b = new AggregateException(current, ex);
}
if (Interlocked.CompareExchange(ref field, b, current) == current)
{
return true;
}
}
}

/// <summary>
/// The class indicating a terminal state as an Exception type.
/// </summary>
sealed class TerminatedException : Exception
{
internal TerminatedException() : base("No further exceptions")
{

}
}
}
}
102 changes: 102 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Internal/HalfSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace System.Reactive
{
/// <summary>
/// Utility methods for dealing with serializing OnXXX signals
/// for an IObserver where concurrent OnNext is still not allowed
/// but concurrent OnError/OnCompleted may happen.
/// This serialization case is generally lower overhead than
/// a full SerializedObserver wrapper and doesn't need
/// allocation.
/// </summary>
internal static class HalfSerializer
{
/// <summary>
/// Signals the given item to the observer in a serialized fashion
/// allowing a concurrent OnError or OnCompleted emission to be delayed until
/// the observer.OnNext returns.
/// Do not call OnNext from multiple threads as it may lead to ignored items.
/// Use a full SerializedObserver wrapper for merging multiple sequences.
/// </summary>
/// <typeparam name="T">The element type of the observer.</typeparam>
/// <param name="observer">The observer to signal events in a serialized fashion.</param>
/// <param name="item">The item to signal.</param>
/// <param name="wip">Indicates there is an emission going on currently.</param>
/// <param name="error">The field containing an error or terminal indicator.</param>
public static void OnNext<T>(IObserver<T> observer, T item, ref int wip, ref Exception error)
{
if (Interlocked.CompareExchange(ref wip, 1, 0) == 0)
{
observer.OnNext(item);
if (Interlocked.Decrement(ref wip) != 0)
{
var ex = error;
if (ex != ExceptionHelper.Terminated)
{
error = ExceptionHelper.Terminated;
observer.OnError(ex);
}
else
{
observer.OnCompleted();
}
}
}
}

/// <summary>
/// Signals the given exception to the observer. If there is a concurrent
/// OnNext emission is happening, saves the exception into the given field
/// otherwise to be picked up by <see cref="OnNext{T}(IObserver{T}, T, ref int, ref Exception)"/>.
/// This method can be called concurrently with itself and the other methods of this
/// helper class but only one terminal signal may actually win.
/// </summary>
/// <typeparam name="T">The element type of the observer.</typeparam>
/// <param name="observer">The observer to signal events in a serialized fashion.</param>
/// <param name="ex">The exception to signal sooner or later.</param>
/// <param name="wip">Indicates there is an emission going on currently.</param>
/// <param name="error">The field containing an error or terminal indicator.</param>
public static void OnError<T>(IObserver<T> observer, Exception ex, ref int wip, ref Exception error)
{
if (ExceptionHelper.TrySetException(ref error, ex))
{
if (Interlocked.Increment(ref wip) == 1)
{
error = ExceptionHelper.Terminated;
observer.OnError(ex);
}
}
}

/// <summary>
/// Signals OnCompleted on the observer. If there is a concurrent
/// OnNext emission happening, the error field will host a special
/// terminal exception signal to be picked up by <see cref="OnNext{T}(IObserver{T}, T, ref int, ref Exception)"/> once it finishes with OnNext and signal the
/// OnCompleted as well.
/// This method can be called concurrently with itself and the other methods of this
/// helper class but only one terminal signal may actually win.
/// </summary>
/// <typeparam name="T">The element type of the observer.</typeparam>
/// <param name="observer">The observer to signal events in a serialized fashion.</param>
/// <param name="wip">Indicates there is an emission going on currently.</param>
/// <param name="error">The field containing an error or terminal indicator.</param>
public static void OnCompleted<T>(IObserver<T> observer, ref int wip, ref Exception error)
{
if (ExceptionHelper.TrySetException(ref error, ExceptionHelper.Terminated))
{
if (Interlocked.Increment(ref wip) == 1)
{
observer.OnCompleted();
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Reactive;
using Xunit;
using System;

namespace ReactiveTests.Tests
{

public class ExceptionHelperTest
{
Exception errors;

[Fact]
public void ExceptionHelper_TrySetException_Empty()
{
var ex = new InvalidOperationException();

Assert.True(ExceptionHelper.TrySetException(ref errors, ex));

Assert.Equal(ex, errors);
}

[Fact]
public void ExceptionHelper_TrySetException_Not_Empty()
{
var ex1 = new InvalidOperationException();
errors = ex1;

var ex2 = new NotSupportedException();

Assert.False(ExceptionHelper.TrySetException(ref errors, ex2));

Assert.Equal(ex1, errors);
}

[Fact]
public void ExceptionHelper_TrySetException_Terminate_Empty()
{
var ex = ExceptionHelper.Terminate(ref errors);

Assert.Null(ex);
Assert.Equal(errors, ExceptionHelper.Terminated);
}

[Fact]
public void ExceptionHelper_TrySetException_Terminate_Not_Empty()
{
var ex1 = new InvalidOperationException();
errors = ex1;

var ex = ExceptionHelper.Terminate(ref errors);

Assert.Equal(ex, ex1);
Assert.Equal(errors, ExceptionHelper.Terminated);
}


[Fact]
public void ExceptionHelper_TrySetException_Terminate_Twice()
{
var ex1 = new InvalidOperationException();
errors = ex1;

var ex = ExceptionHelper.Terminate(ref errors);

Assert.Equal(ex, ex1);
Assert.Equal(errors, ExceptionHelper.Terminated);

ex = ExceptionHelper.Terminate(ref errors);

Assert.Equal(ex, ExceptionHelper.Terminated);
Assert.Equal(errors, ExceptionHelper.Terminated);
}

[Fact]
public void ExceptionHelper_TryAddException_Empty()
{
var ex1 = new InvalidOperationException();

Assert.True(ExceptionHelper.TryAddException(ref errors, ex1));

Assert.Equal(ex1, errors);
}

[Fact]
public void ExceptionHelper_TryAddException_Not_Empty()
{
var ex1 = new InvalidOperationException();
errors = ex1;

var ex2 = new NotImplementedException();

Assert.True(ExceptionHelper.TryAddException(ref errors, ex2));

Assert.True(errors is AggregateException);
var x = errors as AggregateException;

Assert.Equal(2, x.InnerExceptions.Count);
Assert.True(x.InnerExceptions[0] is InvalidOperationException);
Assert.True(x.InnerExceptions[1] is NotImplementedException);
}

[Fact]
public void ExceptionHelper_TryAddException_Aggregated()
{
var ex1 = new InvalidOperationException();
var ex2 = new NotImplementedException();

errors = new AggregateException(ex1, ex2);

var ex3 = new InvalidCastException();

Assert.True(ExceptionHelper.TryAddException(ref errors, ex3));

Assert.True(errors is AggregateException);
var x = errors as AggregateException;

Assert.Equal(3, x.InnerExceptions.Count);
Assert.True(x.InnerExceptions[0] is InvalidOperationException);
Assert.True(x.InnerExceptions[1] is NotImplementedException);
Assert.True(x.InnerExceptions[2] is InvalidCastException);
}

[Fact]
public void ExceptionHelper_TryAddException_Terminated()
{
errors = ExceptionHelper.Terminated;

var ex = new InvalidCastException();

Assert.False(ExceptionHelper.TryAddException(ref errors, ex));

Assert.Equal(errors, ExceptionHelper.Terminated);
}
}
}
Loading