Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Rx.NET/Source/Directory.build.props
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
</PropertyGroup>

<ItemGroup Condition="'$(IsTestProject)' != 'true' and '$(SourceLinkEnabled)' != 'false'">
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.1" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta-62925-02" PrivateAssets="All"/>
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public class HistoricalScheduler : HistoricalSchedulerBase
/// Creates a new historical scheduler with the minimum value of <see cref="DateTimeOffset"/> as the initial clock value.
/// </summary>
public HistoricalScheduler()
: base()
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,16 +333,15 @@ private static void UpdateLongTermProcessingTimer()
var dueCapped = TimeSpan.FromTicks(Math.Min(dueEarly.Ticks, MAXSUPPORTEDTIMER.Ticks));

_nextLongTermWorkItem = next;
_nextLongTermTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(_ => EvaluateLongTermQueue(_), null, dueCapped);
_nextLongTermTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(_ => EvaluateLongTermQueue(), null, dueCapped);
}
}

/// <summary>
/// Evaluates the long term queue, transitioning short term work to the short term list,
/// and adjusting the new long term processing timer accordingly.
/// </summary>
/// <param name="state">Ignored.</param>
private static void EvaluateLongTermQueue(object state)
private static void EvaluateLongTermQueue()
{
lock (_staticGate)
{
Expand Down Expand Up @@ -408,7 +407,7 @@ internal void SystemClockChanged(object sender, SystemClockChangedEventArgs args
// method to create a new timer for the new first long term item.
//
_nextLongTermWorkItem = null;
EvaluateLongTermQueue(null);
EvaluateLongTermQueue();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private static IDisposable InvokeRec3<TState>(IScheduler scheduler, (TState stat
return recursiveInvoker;
}

private abstract class InvokeRecBaseState<TState> : IDisposable
private abstract class InvokeRecBaseState : IDisposable
{
protected readonly IScheduler Scheduler;

Expand All @@ -190,7 +190,7 @@ public void Dispose()

}

private sealed class InvokeRec1State<TState> : InvokeRecBaseState<TState>
private sealed class InvokeRec1State<TState> : InvokeRecBaseState
{
private readonly Action<TState, Action<TState>> _action;
private readonly Action<TState> _recurseCallback;
Expand Down Expand Up @@ -219,7 +219,7 @@ internal void InvokeFirst(TState state)
}
}

private sealed class InvokeRec2State<TState> : InvokeRecBaseState<TState>
private sealed class InvokeRec2State<TState> : InvokeRecBaseState
{
private readonly Action<TState, Action<TState, TimeSpan>> _action;
private readonly Action<TState, TimeSpan> _recurseCallback;
Expand Down Expand Up @@ -247,7 +247,7 @@ internal void InvokeFirst(TState state)
}
}

private sealed class InvokeRec3State<TState> : InvokeRecBaseState<TState>
private sealed class InvokeRec3State<TState> : InvokeRecBaseState
{
private readonly Action<TState, Action<TState, DateTimeOffset>> _action;
private readonly Action<TState, DateTimeOffset> _recurseCallback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ public abstract class VirtualTimeScheduler<TAbsolute, TRelative> : VirtualTimeSc
/// Creates a new virtual time scheduler with the default value of TAbsolute as the initial clock value.
/// </summary>
protected VirtualTimeScheduler()
: base()
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,8 @@ private void Dispatch(ICancelable cancel)
}
catch
{
var nop = default(T);
while (_queue.TryDequeue(out nop))
while (_queue.TryDequeue(out _))
{
;
}

throw;
Expand Down Expand Up @@ -287,10 +285,8 @@ private void Run(object state, Action<object> recurse)
{
Interlocked.Exchange(ref _state, FAULTED);

var nop = default(T);
while (_queue.TryDequeue(out nop))
while (_queue.TryDequeue(out _))
{
;
}

throw;
Expand Down
2 changes: 1 addition & 1 deletion Rx.NET/Source/src/System.Reactive/Internal/SystemClock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private static void CollectHandlers()

foreach (var handler in _systemClockChanged)
{
if (!handler.TryGetTarget(out var scheduler))
if (!handler.TryGetTarget(out _))
{
if (remove == null)
{
Expand Down
4 changes: 2 additions & 2 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable.Creation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ public static IObservable<TResult> Never<TResult>(TResult witness)
/// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> is less than zero. -or- <paramref name="start"/> + <paramref name="count"/> - 1 is larger than <see cref="int.MaxValue"/>.</exception>
public static IObservable<int> Range(int start, int count)
{
var max = ((long)start) + count - 1;
var max = (long)start + count - 1;
if (count < 0 || max > int.MaxValue)
{
throw new ArgumentOutOfRangeException(nameof(count));
Expand All @@ -427,7 +427,7 @@ public static IObservable<int> Range(int start, int count)
/// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
public static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
var max = ((long)start) + count - 1;
var max = (long)start + count - 1;
if (count < 0 || max > int.MaxValue)
{
throw new ArgumentOutOfRangeException(nameof(count));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ public virtual void OnError(Exception error)

internal sealed class FirstBlocking<T> : BaseBlocking<T>
{
internal FirstBlocking() : base() { }

public override void OnCompleted()
{
Unblock();
Expand Down Expand Up @@ -86,8 +84,6 @@ public override void OnNext(T value)

internal sealed class LastBlocking<T> : BaseBlocking<T>
{
internal LastBlocking() : base() { }

public override void OnCompleted()
{
Unblock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added)

public bool Remove(TKey key)
{
return _map.TryRemove(key, out var value);
return _map.TryRemove(key, out _);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public IEnumerator<TResult> GetEnumerator()
protected abstract PushToPullSink<TSource, TResult> Run();
}

internal abstract class PushToPullSink<TSource, TResult> : IObserver<TSource>, IEnumerator<TResult>, IDisposable
internal abstract class PushToPullSink<TSource, TResult> : IObserver<TSource>, IEnumerator<TResult>
{
private IDisposable _upstream;

Expand Down

This file was deleted.

3 changes: 1 addition & 2 deletions Rx.NET/Source/src/System.Reactive/Subjects/AsyncSubject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace System.Reactive.Subjects
/// The last value before the OnCompleted notification, or the error received through OnError, is sent to all subscribed observers.
/// </summary>
/// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
public sealed class AsyncSubject<T> : SubjectBase<T>, IDisposable, INotifyCompletion
public sealed class AsyncSubject<T> : SubjectBase<T>, INotifyCompletion
{
#region Fields

Expand Down Expand Up @@ -146,7 +146,6 @@ public override void OnError(Exception error)
_exception = error;
if (Interlocked.CompareExchange(ref _observers, TERMINATED, observers) == observers)
{
var ex = _exception;
foreach (var o in observers)
{
if (!o.IsDisposed())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace System.Reactive.Subjects
/// Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.
/// </summary>
/// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
public sealed class BehaviorSubject<T> : SubjectBase<T>, IDisposable
public sealed class BehaviorSubject<T> : SubjectBase<T>
{
#region Fields

Expand Down
3 changes: 1 addition & 2 deletions Rx.NET/Source/src/System.Reactive/Subjects/ReplaySubject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace System.Reactive.Subjects
/// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
/// </summary>
/// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
public sealed class ReplaySubject<T> : SubjectBase<T>, IDisposable
public sealed class ReplaySubject<T> : SubjectBase<T>
{
#region Fields

Expand Down Expand Up @@ -656,7 +656,6 @@ private abstract class ReplayManyBase : ReplayBufferBase
protected readonly Queue<T> _queue;

protected ReplayManyBase(int queueSize)
: base()
{
_queue = new Queue<T>(Math.Min(queueSize, 64));
}
Expand Down
2 changes: 1 addition & 1 deletion Rx.NET/Source/src/System.Reactive/Subjects/Subject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace System.Reactive.Subjects
/// Each notification is broadcasted to all subscribed observers.
/// </summary>
/// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
public sealed class Subject<T> : SubjectBase<T>, IDisposable
public sealed class Subject<T> : SubjectBase<T>
{
#region Fields

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ namespace System.Reactive
TResult OnError(System.Exception exception);
TResult OnNext(TValue value);
}
[System.Runtime.CompilerServices.AsyncMethodBuilderAttribute(typeof(System.Runtime.CompilerServices.TaskObservableMethodBuilder<>))]
public interface ITaskObservable<out T> : System.IObservable<T>
{
System.Reactive.ITaskObservableAwaiter<T> GetAwaiter();
Expand Down Expand Up @@ -2406,7 +2407,7 @@ namespace System.Reactive.PlatformServices
}
namespace System.Reactive.Subjects
{
public sealed class AsyncSubject<T> : System.Reactive.Subjects.SubjectBase<T>, System.IDisposable, System.Runtime.CompilerServices.INotifyCompletion
public sealed class AsyncSubject<T> : System.Reactive.Subjects.SubjectBase<T>, System.Runtime.CompilerServices.INotifyCompletion
{
public AsyncSubject() { }
public override bool HasObservers { get; }
Expand All @@ -2421,7 +2422,7 @@ namespace System.Reactive.Subjects
public override void OnNext(T value) { }
public override System.IDisposable Subscribe(System.IObserver<T> observer) { }
}
public sealed class BehaviorSubject<T> : System.Reactive.Subjects.SubjectBase<T>, System.IDisposable
public sealed class BehaviorSubject<T> : System.Reactive.Subjects.SubjectBase<T>
{
public BehaviorSubject(T value) { }
public override bool HasObservers { get; }
Expand All @@ -2440,7 +2441,7 @@ namespace System.Reactive.Subjects
}
public interface ISubject<T> : System.IObservable<T>, System.IObserver<T>, System.Reactive.Subjects.ISubject<T, T> { }
public interface ISubject<in TSource, out TResult> : System.IObservable<TResult>, System.IObserver<TSource> { }
public sealed class ReplaySubject<T> : System.Reactive.Subjects.SubjectBase<T>, System.IDisposable
public sealed class ReplaySubject<T> : System.Reactive.Subjects.SubjectBase<T>
{
public ReplaySubject() { }
public ReplaySubject(System.Reactive.Concurrency.IScheduler scheduler) { }
Expand All @@ -2467,7 +2468,7 @@ namespace System.Reactive.Subjects
public static System.Reactive.Subjects.ISubject<TSource, TResult> Synchronize<TSource, TResult>(System.Reactive.Subjects.ISubject<TSource, TResult> subject, System.Reactive.Concurrency.IScheduler scheduler) { }
public static System.Reactive.Subjects.ISubject<TSource> Synchronize<TSource>(System.Reactive.Subjects.ISubject<TSource> subject, System.Reactive.Concurrency.IScheduler scheduler) { }
}
public sealed class Subject<T> : System.Reactive.Subjects.SubjectBase<T>, System.IDisposable
public sealed class Subject<T> : System.Reactive.Subjects.SubjectBase<T>
{
public Subject() { }
public override bool HasObservers { get; }
Expand Down