Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,37 @@ class ObserveOn<TSource> : Producer<TSource>
{
private readonly IObservable<TSource> _source;
private readonly IScheduler _scheduler;
private readonly SynchronizationContext _context;

public ObserveOn(IObservable<TSource> source, IScheduler scheduler)
{
_source = source;
_scheduler = scheduler;
}

#if !NO_SYNCCTX
private readonly SynchronizationContext _context;

public ObserveOn(IObservable<TSource> source, SynchronizationContext context)
{
_source = source;
_context = context;
}
#endif

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "2", Justification = "Visibility restricted to friend assemblies. Those should be correct by inspection.")]
protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
{
#if !NO_SYNCCTX
if (_context != null)
{
var sink = new ObserveOnSink(this, observer, cancel);
setSink(sink);
return sink.Run();
}
else
#endif
{
var sink = new ObserveOnObserver<TSource>(_scheduler, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
}

#if !NO_SYNCCTX
class ObserveOnSink : Sink<TSource>, IObserver<TSource>
{
private readonly ObserveOn<TSource> _parent;
Expand Down Expand Up @@ -111,7 +105,6 @@ private void OnCompletedPosted(object ignored)
base.Dispose();
}
}
#endif
}
}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> sou
});
}

#if !NO_SYNCCTX
/// <summary>
/// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context.
/// </summary>
Expand Down Expand Up @@ -81,7 +80,6 @@ public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> sou
return subscription;
});
}
#endif

#endregion

Expand Down Expand Up @@ -109,7 +107,6 @@ public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> sourc
#endif
}

#if !NO_SYNCCTX
/// <summary>
/// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context.
/// </summary>
Expand Down Expand Up @@ -152,7 +149,6 @@ public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> sourc
});
#endif
}
#endif

#endregion

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

#if !NO_SYNCCTX
using System.Reactive.Disposables;
using System.Threading;

Expand Down Expand Up @@ -98,4 +97,3 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun
}
}
}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

#if !NO_SYNCCTX
using System.Reactive.Concurrency;
using System.Threading;

Expand Down Expand Up @@ -63,4 +62,3 @@ public void Dispose()
}
}
}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

#if !NO_SYNCCTX
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace System.Reactive.Concurrency
Expand Down Expand Up @@ -50,4 +53,3 @@ public static void PostWithStartComplete(this SynchronizationContext context, Ac
}
}
}
#endif
6 changes: 2 additions & 4 deletions Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -380,13 +380,11 @@ internal interface IQueryLanguage
#region * Concurrency *

IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, IScheduler scheduler);
#if !NO_SYNCCTX
IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, SynchronizationContext context);
#endif

IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, IScheduler scheduler);
#if !NO_SYNCCTX
IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, SynchronizationContext context);
#endif

IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source);
IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, object gate);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource>
return s_impl.ObserveOn<TSource>(source, scheduler);
}

#if !NO_SYNCCTX
/// <summary>
/// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context.
/// </summary>
Expand All @@ -55,7 +54,6 @@ public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource>

return s_impl.ObserveOn<TSource>(source, context);
}
#endif

#endregion

Expand Down Expand Up @@ -84,7 +82,6 @@ public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource
return s_impl.SubscribeOn<TSource>(source, scheduler);
}

#if !NO_SYNCCTX
/// <summary>
/// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context. This operation is not commonly used;
/// see the remarks section for more information on the distinction between SubscribeOn and ObserveOn.
Expand All @@ -107,7 +104,6 @@ public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource

return s_impl.SubscribeOn<TSource>(source, context);
}
#endif

#endregion

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,36 @@ class ObserveOn<TSource> : Producer<TSource>
{
private readonly IObservable<TSource> _source;
private readonly IScheduler _scheduler;
private readonly SynchronizationContext _context;

public ObserveOn(IObservable<TSource> source, IScheduler scheduler)
{
_source = source;
_scheduler = scheduler;
}

#if !NO_SYNCCTX
private readonly SynchronizationContext _context;

public ObserveOn(IObservable<TSource> source, SynchronizationContext context)
{
_source = source;
_context = context;
}
#endif

protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
{
#if !NO_SYNCCTX
if (_context != null)
{
var sink = new ObserveOnImpl(this, observer, cancel);
setSink(sink);
return _source.Subscribe(sink);
}
else
#endif
{
var sink = new ObserveOnObserver<TSource>(_scheduler, observer, cancel);
setSink(sink);
return _source.Subscribe(sink);
}
}

#if !NO_SYNCCTX
class ObserveOnImpl : Sink<TSource>, IObserver<TSource>
{
private readonly ObserveOn<TSource> _parent;
Expand Down Expand Up @@ -85,7 +79,6 @@ public void OnCompleted()
});
}
}
#endif
}
}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ public virtual IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> sour
return Synchronization.ObserveOn<TSource>(source, scheduler);
}

#if !NO_SYNCCTX
public virtual IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
{
return Synchronization.ObserveOn<TSource>(source, context);
}
#endif

#endregion

Expand All @@ -37,12 +35,10 @@ public virtual IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> so
return Synchronization.SubscribeOn<TSource>(source, scheduler);
}

#if !NO_SYNCCTX
public virtual IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
{
return Synchronization.SubscribeOn<TSource>(source, context);
}
#endif

#endregion

Expand Down
4 changes: 1 addition & 3 deletions Rx.NET/Source/src/System.Reactive/Observer.Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ public static IObserver<T> NotifyOn<T>(this IObserver<T> observer, IScheduler sc
return new ObserveOnObserver<T>(scheduler, observer, null);
}

#if !NO_SYNCCTX
/// <summary>
/// Schedules the invocation of observer methods on the given synchonization context.
/// </summary>
Expand All @@ -276,8 +275,7 @@ public static IObserver<T> NotifyOn<T>(this IObserver<T> observer, Synchronizati

return new ObserveOnObserver<T>(new SynchronizationContextScheduler(context), observer, null);
}
#endif


/// <summary>
/// Converts an observer to a progress object.
/// </summary>
Expand Down