diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Aggregate.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Aggregate.cs index 22b0498166..8837b9f559 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Aggregate.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Aggregate.cs @@ -29,8 +29,6 @@ public _(Func accumulator, IObserver observe : base(observer) { _accumulator = accumulator; - _accumulation = default; - _hasAccumulation = false; } public override void OnNext(TSource value) @@ -48,11 +46,18 @@ public override void OnNext(TSource value) } catch (Exception exception) { + _accumulation = default; ForwardOnError(exception); } } } + public override void OnError(Exception error) + { + _accumulation = default; + ForwardOnError(error); + } + public override void OnCompleted() { if (!_hasAccumulation) @@ -61,7 +66,9 @@ public override void OnCompleted() } else { - ForwardOnNext(_accumulation); + var accumulation = _accumulation; + _accumulation = default; + ForwardOnNext(accumulation); ForwardOnCompleted(); } } @@ -105,18 +112,22 @@ public override void OnNext(TSource value) } catch (Exception exception) { + _accumulation = default; ForwardOnError(exception); } } public override void OnError(Exception error) { + _accumulation = default; ForwardOnError(error); } public override void OnCompleted() { - ForwardOnNext(_accumulation); + var accumulation = _accumulation; + _accumulation = default; + ForwardOnNext(accumulation); ForwardOnCompleted(); } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/LastAsync.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/LastAsync.cs index 28a99287d0..fbb66ae98f 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/LastAsync.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/LastAsync.cs @@ -27,8 +27,7 @@ internal sealed class _ : IdentitySink public _(IObserver observer) : base(observer) { - _value = default; - _seenValue = false; + } public override void OnNext(TSource value) @@ -37,6 +36,12 @@ public override void OnNext(TSource value) _seenValue = true; } + public override void OnError(Exception error) + { + _value = default; + ForwardOnError(error); + } + public override void OnCompleted() { if (!_seenValue) @@ -45,7 +50,9 @@ public override void OnCompleted() } else { - ForwardOnNext(_value); + var value = _value; + _value = default; + ForwardOnNext(value); ForwardOnCompleted(); } } @@ -77,9 +84,6 @@ public _(Func predicate, IObserver observer) : base(observer) { _predicate = predicate; - - _value = default; - _seenValue = false; } public override void OnNext(TSource value) @@ -92,6 +96,7 @@ public override void OnNext(TSource value) } catch (Exception ex) { + _value = default; ForwardOnError(ex); return; } @@ -103,6 +108,12 @@ public override void OnNext(TSource value) } } + public override void OnError(Exception error) + { + _value = default; + ForwardOnError(error); + } + public override void OnCompleted() { if (!_seenValue) @@ -111,7 +122,9 @@ public override void OnCompleted() } else { - ForwardOnNext(_value); + var value = _value; + _value = default; + ForwardOnNext(value); ForwardOnCompleted(); } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/LastOrDefaultAsync.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/LastOrDefaultAsync.cs index 545c06df91..bdf0c89093 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/LastOrDefaultAsync.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/LastOrDefaultAsync.cs @@ -26,7 +26,7 @@ internal sealed class _ : IdentitySink public _(IObserver observer) : base(observer) { - _value = default; + } public override void OnNext(TSource value) @@ -34,9 +34,17 @@ public override void OnNext(TSource value) _value = value; } + public override void OnError(Exception error) + { + _value = default; + ForwardOnError(error); + } + public override void OnCompleted() { - ForwardOnNext(_value); + var value = _value; + _value = default; + ForwardOnNext(value); ForwardOnCompleted(); } } @@ -66,8 +74,6 @@ public _(Func predicate, IObserver observer) : base(observer) { _predicate = predicate; - - _value = default; } public override void OnNext(TSource value) @@ -80,6 +86,7 @@ public override void OnNext(TSource value) } catch (Exception ex) { + _value = default; ForwardOnError(ex); return; } @@ -90,9 +97,17 @@ public override void OnNext(TSource value) } } + public override void OnError(Exception error) + { + _value = default; + ForwardOnError(error); + } + public override void OnCompleted() { - ForwardOnNext(_value); + var value = _value; + _value = default; + ForwardOnNext(value); ForwardOnCompleted(); } } diff --git a/Rx.NET/Source/tests/Tests.System.Reactive/Tests.System.Reactive.csproj b/Rx.NET/Source/tests/Tests.System.Reactive/Tests.System.Reactive.csproj index 0803528374..c323df57bd 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive/Tests.System.Reactive.csproj +++ b/Rx.NET/Source/tests/Tests.System.Reactive/Tests.System.Reactive.csproj @@ -2,6 +2,7 @@ net46;netcoreapp2.0 $(NoWarn);CS0618 + 7.1 diff --git a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/UsingTest.cs b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/UsingTest.cs index 271effdedb..db2c5d0ec3 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/UsingTest.cs +++ b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/UsingTest.cs @@ -46,7 +46,7 @@ public void Using_Null() _d = d; createInvoked++; xs = scheduler.CreateColdObservable( - OnNext(100, scheduler.Clock), + OnNext(100, scheduler.Clock), OnCompleted(200)); return xs; }