Skip to content

Commit

Permalink
Align safe observers (#603)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielcweber committed Jun 18, 2018
1 parent da20177 commit 0c5a53f
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 120 deletions.
19 changes: 4 additions & 15 deletions Rx.NET/Source/src/System.Reactive/AnonymousSafeObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ namespace System.Reactive
/// that accept delegates for the On* handlers. By doing the fusion, we make the call stack depth shorter which
/// helps debugging and some performance.
/// </summary>
internal sealed class AnonymousSafeObserver<T> : ISafeObserver<T>
internal sealed class AnonymousSafeObserver<T> : SafeObserver<T>
{
private readonly Action<T> _onNext;
private readonly Action<Exception> _onError;
private readonly Action _onCompleted;
private IDisposable _disposable;

private int isStopped;

Expand All @@ -35,7 +34,7 @@ public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action
_onCompleted = onCompleted;
}

public void OnNext(T value)
public override void OnNext(T value)
{
if (isStopped == 0)
{
Expand All @@ -53,7 +52,7 @@ public void OnNext(T value)
}
}

public void OnError(Exception error)
public override void OnError(Exception error)
{
if (Interlocked.Exchange(ref isStopped, 1) == 0)
{
Expand All @@ -64,7 +63,7 @@ public void OnError(Exception error)
}
}

public void OnCompleted()
public override void OnCompleted()
{
if (Interlocked.Exchange(ref isStopped, 1) == 0)
{
Expand All @@ -74,15 +73,5 @@ public void OnCompleted()
}
}
}

public void SetResource(IDisposable resource)
{
Disposable.SetSingle(ref _disposable, resource);
}

public void Dispose()
{
Disposable.TryDispose(ref _disposable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
// See the LICENSE file in the project root for more information.

using System.Reactive.Disposables;
using System.Threading;

namespace System.Reactive
{
internal sealed class AutoDetachObserver<T> : ObserverBase<T>
internal sealed class AutoDetachObserver<T> : ObserverBase<T>, ISafeObserver<T>
{
private readonly IObserver<T> _observer;

Expand All @@ -18,9 +17,9 @@ public AutoDetachObserver(IObserver<T> observer)
_observer = observer;
}

public IDisposable Disposable
public void SetResource(IDisposable resource)
{
set => Disposables.Disposable.SetSingle(ref _disposable, value);
Disposable.SetSingle(ref _disposable, resource);
}

protected override void OnNextCore(T value)
Expand Down
60 changes: 0 additions & 60 deletions Rx.NET/Source/src/System.Reactive/Internal/ObserverWithToken.cs

This file was deleted.

4 changes: 2 additions & 2 deletions Rx.NET/Source/src/System.Reactive/Internal/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguar
//
if (enableSafeguard)
{
var safeObserver = SafeObserver<TSource>.Create(observer);
var safeObserver = SafeObserver<TSource>.Wrap(observer);
safeObserver.SetResource(subscription);
observer = safeObserver;
}
Expand Down Expand Up @@ -100,7 +100,7 @@ public IDisposable SubscribeRaw(IObserver<TTarget> observer, bool enableSafeguar
//
if (enableSafeguard)
{
observer = safeObserver = SafeObserver<TTarget>.Create(observer);
observer = safeObserver = SafeObserver<TTarget>.Wrap(observer);
}

var sink = CreateSink(observer);
Expand Down
85 changes: 50 additions & 35 deletions Rx.NET/Source/src/System.Reactive/Internal/SafeObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,70 +11,85 @@ namespace System.Reactive
// its implementation aspects.
//

internal sealed class SafeObserver<TSource> : ISafeObserver<TSource>
internal abstract class SafeObserver<TSource> : ISafeObserver<TSource>
{
private readonly IObserver<TSource> _observer;

private IDisposable _disposable;

public static ISafeObserver<TSource> Create(IObserver<TSource> observer)
private sealed class WrappingSafeObserver : SafeObserver<TSource>
{
if (observer is AnonymousObserver<TSource> a)
private readonly IObserver<TSource> _observer;

public WrappingSafeObserver(IObserver<TSource> observer)
{
return a.MakeSafe();
_observer = observer;
}
else

public override void OnNext(TSource value)
{
return new SafeObserver<TSource>(observer);
var __noError = false;
try
{
_observer.OnNext(value);
__noError = true;
}
finally
{
if (!__noError)
{
Dispose();
}
}
}
}

private SafeObserver(IObserver<TSource> observer)
{
_observer = observer;
}

public void OnNext(TSource value)
{
var __noError = false;
try
public override void OnError(Exception error)
{
_observer.OnNext(value);
__noError = true;
using (this)
{
_observer.OnError(error);
}
}
finally

public override void OnCompleted()
{
if (!__noError)
using (this)
{
Dispose();
_observer.OnCompleted();
}
}
}

public void OnError(Exception error)
public static ISafeObserver<TSource> Wrap(IObserver<TSource> observer)
{
using (this)
if (observer is AnonymousObserver<TSource> a)
{
_observer.OnError(error);
return a.MakeSafe();
}
}

public void OnCompleted()
{
using (this)
else
{
_observer.OnCompleted();
return new WrappingSafeObserver(observer);
}
}

private IDisposable _disposable;

public abstract void OnNext(TSource value);

public abstract void OnError(Exception error);

public abstract void OnCompleted();

public void SetResource(IDisposable resource)
{
Disposable.SetSingle(ref _disposable, resource);
}

public void Dispose()
{
Disposable.TryDispose(ref _disposable);
Dispose(true);
}

protected virtual void Dispose(bool disposing)
{
if (disposing)
Disposable.TryDispose(ref _disposable);
}
}
}
2 changes: 1 addition & 1 deletion Rx.NET/Source/src/System.Reactive/Observable.Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ private static void Subscribe_<T>(this IObservable<T> source, IObserver<T> obser
{
if (!token.IsCancellationRequested)
{
var consumer = new ObserverWithToken<T>(observer);
var consumer = SafeObserver<T>.Wrap(observer);

//
// [OK] Use of unsafe Subscribe: exception during Subscribe doesn't orphan CancellationTokenRegistration.
Expand Down
4 changes: 2 additions & 2 deletions Rx.NET/Source/src/System.Reactive/ObservableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public IDisposable Subscribe(IObserver<T> observer)
{
try
{
autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver);
autoDetachObserver.SetResource(SubscribeCore(autoDetachObserver));
}
catch (Exception exception)
{
Expand All @@ -79,7 +79,7 @@ private IDisposable ScheduledSubscribe(IScheduler _, AutoDetachObserver<T> autoD
{
try
{
autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver);
autoDetachObserver.SetResource(SubscribeCore(autoDetachObserver));
}
catch (Exception exception)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public SerializableObservable(RemotableObservable<T> remotableObservable)

public IDisposable Subscribe(IObserver<T> observer)
{
var consumer = new ObserverWithToken<T>(observer);
var consumer = SafeObserver<T>.Wrap(observer);

//
// [OK] Use of unsafe Subscribe: non-pretentious transparent wrapping through remoting; exception coming from the remote object is not re-routed.
Expand Down

0 comments on commit 0c5a53f

Please sign in to comment.