From 88979ff011bb353e30aa066c3f7ee35170eb1a0d Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 25 May 2018 14:24:08 +0200 Subject: [PATCH 1/2] 4.x: Fix PeriodicTimerSystemClockMonitor concurrency & failure behavior --- .../Internal/SystemClock.Default.cs | 66 +++++++++++++++---- 1 file changed, 52 insertions(+), 14 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Internal/SystemClock.Default.cs b/Rx.NET/Source/src/System.Reactive/Internal/SystemClock.Default.cs index 4c8c1a3937..2e8d035c4d 100644 --- a/Rx.NET/Source/src/System.Reactive/Internal/SystemClock.Default.cs +++ b/Rx.NET/Source/src/System.Reactive/Internal/SystemClock.Default.cs @@ -5,6 +5,8 @@ using System.ComponentModel; using System.Reactive.Concurrency; using System.Reactive.Disposables; +using System.Threading; +using System.Threading.Tasks; namespace System.Reactive.PlatformServices { @@ -39,7 +41,20 @@ public class PeriodicTimerSystemClockMonitor : INotifySystemClockChanged private readonly TimeSpan _period; private readonly SerialDisposable _timer; - private DateTimeOffset _lastTime; + /// + /// Counts the number of listeners to + /// , starts the + /// timer for the first listener and stops it after + /// the last one. + /// + private int _timerActive; + + /// + /// Use the Unix milliseconds for the current time + /// so it can be atomically read/written without locking. + /// + private long _lastTimeUnixMillis; + private EventHandler _systemClockChanged; private const int SYNC_MAXRETRIES = 100; @@ -63,7 +78,10 @@ public event EventHandler SystemClockChanged { add { - NewTimer(); + if (Interlocked.Increment(ref _timerActive) == 1) + { + NewTimer(); + } _systemClockChanged += value; } @@ -72,7 +90,10 @@ public event EventHandler SystemClockChanged { _systemClockChanged -= value; - _timer.Disposable = Disposable.Empty; + if (Interlocked.Decrement(ref _timerActive) == 0) + { + _timer.Disposable = Disposable.Empty; + } } } @@ -80,30 +101,47 @@ private void NewTimer() { _timer.Disposable = Disposable.Empty; - var n = 0; - do + var n = 0L; + for (; ; ) { - _lastTime = SystemClock.UtcNow; + var now = SystemClock.UtcNow.ToUnixTimeMilliseconds(); + Interlocked.Exchange(ref _lastTimeUnixMillis, now); + _timer.Disposable = ConcurrencyAbstractionLayer.Current.StartPeriodicTimer(TimeChanged, _period); - } while (Math.Abs((SystemClock.UtcNow - _lastTime).TotalMilliseconds) > SYNC_MAXDELTA && ++n < SYNC_MAXRETRIES); - if (n >= SYNC_MAXRETRIES) - throw new InvalidOperationException(Strings_Core.FAILED_CLOCK_MONITORING); + if (Math.Abs(SystemClock.UtcNow.ToUnixTimeMilliseconds() - now) <= SYNC_MAXDELTA) + { + break; + } + if (Volatile.Read(ref _timerActive) == 0) + { + break; + } + if (++n >= SYNC_MAXRETRIES) + { + Task.Delay((int)SYNC_MAXDELTA).Wait(); + } + }; } private void TimeChanged() { - var now = SystemClock.UtcNow; - var diff = now - (_lastTime + _period); - if (Math.Abs(diff.TotalMilliseconds) >= MAXERROR) + var newTime = SystemClock.UtcNow; + var now = newTime.ToUnixTimeMilliseconds(); + var last = Volatile.Read(ref _lastTimeUnixMillis); + + var oldTime = (long)(last + _period.TotalMilliseconds); + var diff = now - oldTime; + if (Math.Abs(diff) >= MAXERROR) { - _systemClockChanged?.Invoke(this, new SystemClockChangedEventArgs(_lastTime + _period, now)); + _systemClockChanged?.Invoke(this, new SystemClockChangedEventArgs( + DateTimeOffset.FromUnixTimeMilliseconds(oldTime), newTime)); NewTimer(); } else { - _lastTime = SystemClock.UtcNow; + Interlocked.Exchange(ref _lastTimeUnixMillis, SystemClock.UtcNow.ToUnixTimeMilliseconds()); } } } From 2b0f8ce6885ae0be690040216c1de070ed29ad04 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 5 Jun 2018 13:29:17 +0200 Subject: [PATCH 2/2] Revert to non-threadsafe registration --- .../Internal/SystemClock.Default.cs | 20 +++---------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Internal/SystemClock.Default.cs b/Rx.NET/Source/src/System.Reactive/Internal/SystemClock.Default.cs index 2e8d035c4d..a48bc8e1b7 100644 --- a/Rx.NET/Source/src/System.Reactive/Internal/SystemClock.Default.cs +++ b/Rx.NET/Source/src/System.Reactive/Internal/SystemClock.Default.cs @@ -41,14 +41,6 @@ public class PeriodicTimerSystemClockMonitor : INotifySystemClockChanged private readonly TimeSpan _period; private readonly SerialDisposable _timer; - /// - /// Counts the number of listeners to - /// , starts the - /// timer for the first listener and stops it after - /// the last one. - /// - private int _timerActive; - /// /// Use the Unix milliseconds for the current time /// so it can be atomically read/written without locking. @@ -78,10 +70,7 @@ public event EventHandler SystemClockChanged { add { - if (Interlocked.Increment(ref _timerActive) == 1) - { - NewTimer(); - } + NewTimer(); _systemClockChanged += value; } @@ -90,10 +79,7 @@ public event EventHandler SystemClockChanged { _systemClockChanged -= value; - if (Interlocked.Decrement(ref _timerActive) == 0) - { - _timer.Disposable = Disposable.Empty; - } + _timer.Disposable = Disposable.Empty; } } @@ -113,7 +99,7 @@ private void NewTimer() { break; } - if (Volatile.Read(ref _timerActive) == 0) + if (_timer.Disposable == Disposable.Empty) { break; }