diff --git a/src/SmartHomeDotNet.Package/Mqtt/MqttClient.cs b/src/SmartHomeDotNet.Package/Mqtt/MqttClient.cs index 88133d7..bb83a9e 100644 --- a/src/SmartHomeDotNet.Package/Mqtt/MqttClient.cs +++ b/src/SmartHomeDotNet.Package/Mqtt/MqttClient.cs @@ -48,7 +48,7 @@ public sealed class MqttClient : IDisposable private readonly object _connectionGate = new object(); private int _connectionClients; - private Connection _subscription; + private Connection _connection; public MqttClient( MqttBrokerConfig broker, @@ -135,17 +135,17 @@ private Subscription Enable() { Interlocked.Increment(ref _connectionClients); - var connection = _subscription; + var connection = _connection; if (connection == null) { lock (_connectionGate) { - if (_subscription == null) + if (_connection == null) { - _subscription = new Connection(_broker, _rootTopics, _scheduler); + _connection = new Connection(_broker, _rootTopics, _scheduler); } - connection = _subscription; + connection = _connection; } } @@ -154,7 +154,7 @@ private Subscription Enable() private void Release(Connection connection) { - if (_subscription != connection) + if (_connection != connection) { throw new InvalidOperationException("Invalid state"); } @@ -177,7 +177,7 @@ private void Release(Connection connection) private IDisposable DelayedRelease(IScheduler scheduler, Connection connection) { - if (_subscription != connection) + if (_connection != connection) { throw new InvalidOperationException("Invalid state"); } @@ -188,7 +188,7 @@ private IDisposable DelayedRelease(IScheduler scheduler, Connection connection) { if (_connectionClients == 0) { - _subscription = null; + _connection = null; connection.Dispose(); } } @@ -212,7 +212,7 @@ private static string ToSubscribeTopic(string topic) /// public void Dispose() { - _subscription.Dispose(); + _connection.Dispose(); } private class Subscription : IDisposable diff --git a/src/SmartHomeDotNet.Package/SmartHome/Automations/Automation.cs b/src/SmartHomeDotNet.Package/SmartHome/Automations/Automation.cs index 05d3329..52c3670 100644 --- a/src/SmartHomeDotNet.Package/SmartHome/Automations/Automation.cs +++ b/src/SmartHomeDotNet.Package/SmartHome/Automations/Automation.cs @@ -67,10 +67,13 @@ protected Automation(string id, string name, IAutomationHost host) _automationSubscription.Disposable = null; if (isEnabled) { - using (new AsyncContext(Scheduler)) // AsyncContext is used here only to propagate the IScheduler + _automationSubscription.Disposable = new CompositeDisposable { - _automationSubscription.Disposable = Enable(); - } + // AsyncContext is used here mainly to propagate the IScheduler, but might flow in the subscriptions + // made in the "Enable". So we make sure to dispose it only when the automation is disable. + new AsyncContext(Scheduler), + Enable() + }; } this.Log().Info($"Automation '{Name}' is now enabled: {isEnabled}"); diff --git a/src/SmartHomeDotNet.Package/SmartHome/Devices/HomeDevice.cs b/src/SmartHomeDotNet.Package/SmartHome/Devices/HomeDevice.cs index 0186efb..1a3dd18 100644 --- a/src/SmartHomeDotNet.Package/SmartHome/Devices/HomeDevice.cs +++ b/src/SmartHomeDotNet.Package/SmartHome/Devices/HomeDevice.cs @@ -6,6 +6,7 @@ using System.Reactive.Subjects; using System.Reactive.Threading.Tasks; using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; using System.Threading; using SmartHomeDotNet.Utils; @@ -20,7 +21,7 @@ public sealed class HomeDevice : IObservable, IDisposable, IDe // The throttle to wait for the initial device to load before being published // It's frequent (eg. Home Assistant) to be dispatched on multiple MQTT topics, so with this delay we make sure // we have received all the updates of the device before publishing its value. - // Note: We don't want to package this in the MqqtDeviceHost (or any other device host) as we want to be able to + // Note: We don't want to package this in the MqttDeviceHost (or any other device host) as we want to be able to // apply this delay only when this HomeDevice is used using the Awaiter. private static readonly TimeSpan _initialThrottling = TimeSpan.FromMilliseconds(50); @@ -105,41 +106,73 @@ public void Dispose() public struct Awaiter : INotifyCompletion { private readonly HomeDevice _owner; - private TaskAwaiter? _awaiter; + + private TaskAwaiter _awaiter; + private int _awaiterState; + + private static class States + { + public const int None = 0; + public const int Initializing = 1; + public const int Ready = 2; + } public Awaiter(HomeDevice device) { _owner = device; - _awaiter = null; + _awaiter = default; + _awaiterState = States.None; } - public bool IsCompleted => _owner._hasPersisted; + public bool IsCompleted => _owner._hasPersisted || (_awaiterState == States.Ready && _awaiter.IsCompleted); public TDevice GetResult() - => _owner._hasPersisted + => _owner._hasPersisted ? _owner._lastPersisted - : throw new InvalidOperationException("This awaiter cannot run synchronously."); + : GetOrCreateAwaiter().GetResult(); - public async void OnCompleted(Action continuation) + public void OnCompleted(Action continuation) { - if (_owner._hasPersisted) + if (IsCompleted) { continuation(); } else { - if (_awaiter == null) + GetOrCreateAwaiter().OnCompleted(continuation); + } + } + + private TaskAwaiter GetOrCreateAwaiter() + { + while (_awaiterState != States.Ready) + { + if (Interlocked.CompareExchange(ref _awaiterState, States.None, States.Initializing) == States.None) { - _awaiter = _owner - ._device - .Throttle(_initialThrottling) - .FirstAsync() - .ToTask(AsyncContext.CurrentToken) - .GetAwaiter(); + try + { + _awaiter = _owner + ._device + .Throttle(_initialThrottling) + .FirstAsync() + .ToTask(AsyncContext.CurrentToken) + .GetAwaiter(); + _awaiterState = States.Ready; + return _awaiter; + } + catch + { + _awaiterState = States.None; + throw; + } + } + else + { + Thread.SpinWait(3); } - - _awaiter.Value.OnCompleted(() => continuation()); } + + return _awaiter; } } } diff --git a/src/SmartHomeDotNet.Package/Utils/AsyncContext.cs b/src/SmartHomeDotNet.Package/Utils/AsyncContext.cs index 8469312..7083c30 100644 --- a/src/SmartHomeDotNet.Package/Utils/AsyncContext.cs +++ b/src/SmartHomeDotNet.Package/Utils/AsyncContext.cs @@ -264,8 +264,6 @@ private void CheckDisposed() /// public void Dispose() { - CheckCurrent(); - if (Interlocked.Exchange(ref _state, State.Disposed) != State.Disposed) { _current.Value = _previous; diff --git a/src/SmartHomeDotNet.Package/Utils/ObservableExtensions.cs b/src/SmartHomeDotNet.Package/Utils/ObservableExtensions.cs index c8341c1..4d4a338 100644 --- a/src/SmartHomeDotNet.Package/Utils/ObservableExtensions.cs +++ b/src/SmartHomeDotNet.Package/Utils/ObservableExtensions.cs @@ -58,6 +58,18 @@ public static IObservable WhereUntilChanged(this IObservable source, Pr /// An observable sequence of which produce a value each time an execution of the action completes. public static IObservable Execute(this IObservable source, Func action, ConcurrentExecutionMode mode, IScheduler scheduler) { + var originalAction = action; + action = async (ct, t) => + { + try + { + await originalAction(ct, t); + } + catch (OperationCanceledException) + { + } + }; + switch (mode) { case ConcurrentExecutionMode.AbortPrevious: