Skip to content

Commit

Permalink
Merge pull request #34 from dr1rrb/dev/dr/AutomationCtxFlow
Browse files Browse the repository at this point in the history
Miscs update and fixes
  • Loading branch information
dr1rrb committed Nov 28, 2019
2 parents 399994e + 5ac0232 commit 6b38d13
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 31 deletions.
18 changes: 9 additions & 9 deletions src/SmartHomeDotNet.Package/Mqtt/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public sealed class MqttClient : IDisposable

private readonly object _connectionGate = new object();
private int _connectionClients;
private Connection _subscription;
private Connection _connection;

public MqttClient(
MqttBrokerConfig broker,
Expand Down Expand Up @@ -135,17 +135,17 @@ private Subscription Enable()
{
Interlocked.Increment(ref _connectionClients);

var connection = _subscription;
var connection = _connection;
if (connection == null)
{
lock (_connectionGate)
{
if (_subscription == null)
if (_connection == null)
{
_subscription = new Connection(_broker, _rootTopics, _scheduler);
_connection = new Connection(_broker, _rootTopics, _scheduler);
}

connection = _subscription;
connection = _connection;
}
}

Expand All @@ -154,7 +154,7 @@ private Subscription Enable()

private void Release(Connection connection)
{
if (_subscription != connection)
if (_connection != connection)
{
throw new InvalidOperationException("Invalid state");
}
Expand All @@ -177,7 +177,7 @@ private void Release(Connection connection)

private IDisposable DelayedRelease(IScheduler scheduler, Connection connection)
{
if (_subscription != connection)
if (_connection != connection)
{
throw new InvalidOperationException("Invalid state");
}
Expand All @@ -188,7 +188,7 @@ private IDisposable DelayedRelease(IScheduler scheduler, Connection connection)
{
if (_connectionClients == 0)
{
_subscription = null;
_connection = null;
connection.Dispose();
}
}
Expand All @@ -212,7 +212,7 @@ private static string ToSubscribeTopic(string topic)
/// <inheritdoc />
public void Dispose()
{
_subscription.Dispose();
_connection.Dispose();
}

private class Subscription : IDisposable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,13 @@ protected Automation(string id, string name, IAutomationHost host)
_automationSubscription.Disposable = null;
if (isEnabled)
{
using (new AsyncContext(Scheduler)) // AsyncContext is used here only to propagate the IScheduler
_automationSubscription.Disposable = new CompositeDisposable
{
_automationSubscription.Disposable = Enable();
}
// AsyncContext is used here mainly to propagate the IScheduler, but might flow in the subscriptions
// made in the "Enable". So we make sure to dispose it only when the automation is disable.
new AsyncContext(Scheduler),
Enable()
};
}
this.Log().Info($"Automation '{Name}' is now enabled: {isEnabled}");
Expand Down
67 changes: 50 additions & 17 deletions src/SmartHomeDotNet.Package/SmartHome/Devices/HomeDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using SmartHomeDotNet.Utils;

Expand All @@ -20,7 +21,7 @@ public sealed class HomeDevice<TDevice> : IObservable<TDevice>, IDisposable, IDe
// The throttle to wait for the initial device to load before being published
// It's frequent (eg. Home Assistant) to be dispatched on multiple MQTT topics, so with this delay we make sure
// we have received all the updates of the device before publishing its value.
// Note: We don't want to package this in the MqqtDeviceHost (or any other device host) as we want to be able to
// Note: We don't want to package this in the MqttDeviceHost (or any other device host) as we want to be able to
// apply this delay only when this HomeDevice is used using the Awaiter.
private static readonly TimeSpan _initialThrottling = TimeSpan.FromMilliseconds(50);

Expand Down Expand Up @@ -105,41 +106,73 @@ public void Dispose()
public struct Awaiter : INotifyCompletion
{
private readonly HomeDevice<TDevice> _owner;
private TaskAwaiter<TDevice>? _awaiter;

private TaskAwaiter<TDevice> _awaiter;
private int _awaiterState;

private static class States
{
public const int None = 0;
public const int Initializing = 1;
public const int Ready = 2;
}

public Awaiter(HomeDevice<TDevice> device)
{
_owner = device;
_awaiter = null;
_awaiter = default;
_awaiterState = States.None;
}

public bool IsCompleted => _owner._hasPersisted;
public bool IsCompleted => _owner._hasPersisted || (_awaiterState == States.Ready && _awaiter.IsCompleted);

public TDevice GetResult()
=> _owner._hasPersisted
=> _owner._hasPersisted
? _owner._lastPersisted
: throw new InvalidOperationException("This awaiter cannot run synchronously.");
: GetOrCreateAwaiter().GetResult();

public async void OnCompleted(Action continuation)
public void OnCompleted(Action continuation)
{
if (_owner._hasPersisted)
if (IsCompleted)
{
continuation();
}
else
{
if (_awaiter == null)
GetOrCreateAwaiter().OnCompleted(continuation);
}
}

private TaskAwaiter<TDevice> GetOrCreateAwaiter()
{
while (_awaiterState != States.Ready)
{
if (Interlocked.CompareExchange(ref _awaiterState, States.None, States.Initializing) == States.None)
{
_awaiter = _owner
._device
.Throttle(_initialThrottling)
.FirstAsync()
.ToTask(AsyncContext.CurrentToken)
.GetAwaiter();
try
{
_awaiter = _owner
._device
.Throttle(_initialThrottling)
.FirstAsync()
.ToTask(AsyncContext.CurrentToken)
.GetAwaiter();
_awaiterState = States.Ready;
return _awaiter;
}
catch
{
_awaiterState = States.None;
throw;
}
}
else
{
Thread.SpinWait(3);
}

_awaiter.Value.OnCompleted(() => continuation());
}

return _awaiter;
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/SmartHomeDotNet.Package/Utils/AsyncContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,6 @@ private void CheckDisposed()
/// <inheritdoc />
public void Dispose()
{
CheckCurrent();

if (Interlocked.Exchange(ref _state, State.Disposed) != State.Disposed)
{
_current.Value = _previous;
Expand Down
12 changes: 12 additions & 0 deletions src/SmartHomeDotNet.Package/Utils/ObservableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ public static IObservable<T> WhereUntilChanged<T>(this IObservable<T> source, Pr
/// <returns>An observable sequence of <see cref="Unit"/> which produce a value each time an execution of the action completes.</returns>
public static IObservable<Unit> Execute<T>(this IObservable<T> source, Func<CancellationToken, T, Task> action, ConcurrentExecutionMode mode, IScheduler scheduler)
{
var originalAction = action;
action = async (ct, t) =>
{
try
{
await originalAction(ct, t);
}
catch (OperationCanceledException)
{
}
};

switch (mode)
{
case ConcurrentExecutionMode.AbortPrevious:
Expand Down

0 comments on commit 6b38d13

Please sign in to comment.