Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Used Orleans for data syncing instead of observable collection #2

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 26 additions & 0 deletions ControlCenter/AgentProxy.cs
@@ -0,0 +1,26 @@


// Wrapper to workaround bug in signalr
using Contracts;
using Microsoft.AspNetCore.SignalR;

public class AgentProxy : IAgent
{
private readonly ISingleClientProxy _clientProxy;
private AgentProxy(ISingleClientProxy clientProxy)
{
_clientProxy = clientProxy;
}

public Task<double> GetTemperature()
{
return _clientProxy.InvokeAsync<double>(nameof(GetTemperature));
}

public Task Shutdown()
{
return _clientProxy.SendAsync(nameof(Shutdown));
}

public static IAgent Create(ISingleClientProxy clientProxy) => new AgentProxy(clientProxy);
}
5 changes: 5 additions & 0 deletions ControlCenter/ControlCenter.csproj
Expand Up @@ -8,6 +8,11 @@

<ItemGroup>
<ProjectReference Include="..\Contracts\Contracts.csproj" />
<PackageReference Include="Microsoft.Orleans.Server" Version="3.6.2" />
<PackageReference Include="Microsoft.Orleans.CodeGenerator.MSBuild" Version="3.6.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
Expand Down
12 changes: 8 additions & 4 deletions ControlCenter/Data/AgentManager.cs
@@ -1,7 +1,11 @@
using Contracts;
using System.Collections.ObjectModel;
using Orleans.Collections;

public class AgentManager
{
public ObservableCollection<(string, IAgent)> Agents { get; } = new();
}
public AgentManager(IDistributedCollectionFactory factory)
{
Collection = factory.CreateObservableCollection<string>(0);
}

public DistributedObservableCollection<string> Collection { get; }
}
44 changes: 44 additions & 0 deletions ControlCenter/Data/CollectionGrain.cs
@@ -0,0 +1,44 @@
using Orleans.Concurrency;

namespace Orleans.Collections;

[Reentrant]
public class CollectionGrain<T> : Grain, ICollectionGrain<T>
{
private readonly List<T> _items = new();
private readonly List<ICollectionObserver> _subs = new();

public async Task AddItem(T item)
{
_items.Add(item);

foreach (var s in _subs)
{
await s.OnCollectionChanged();
}
}

public async Task RemoveItem(T item)
{
_items.Remove(item);

foreach (var s in _subs)
{
await s.OnCollectionChanged();
}
}

public Task<T[]> GetItems() => Task.FromResult(_items.ToArray());

public Task Subscribe(ICollectionObserver observer)
{
_subs.Add(observer);
return Task.CompletedTask;
}

public Task Unsubscribe(ICollectionObserver observer)
{
_subs.Remove(observer);
return Task.CompletedTask;
}
}
16 changes: 16 additions & 0 deletions ControlCenter/Data/DistributedCollectionFactory.cs
@@ -0,0 +1,16 @@

namespace Orleans.Collections;

public class DistributedCollectionFactory : IDistributedCollectionFactory
{
private readonly IGrainFactory _grainFactory;
public DistributedCollectionFactory(IGrainFactory grainFactory)
{
_grainFactory = grainFactory;
}

public DistributedObservableCollection<T> CreateObservableCollection<T>(long key)
{
return new DistributedObservableCollection<T>(_grainFactory, key);
}
}
76 changes: 76 additions & 0 deletions ControlCenter/Data/DistributedObservableCollection.cs
@@ -0,0 +1,76 @@
namespace Orleans.Collections;

public class DistributedObservableCollection<T>
{
private readonly ICollectionGrain<T> _collectionGrain;
private readonly IGrainFactory _grainFactory;
private ICollectionObserver? _collectionObserverReference;
private Func<Task>? _changed;
private readonly Observer _observer;

public DistributedObservableCollection(IGrainFactory grainFactory, long key)
{
_grainFactory = grainFactory;
_collectionGrain = grainFactory.GetGrain<ICollectionGrain<T>>(key);
_observer = new Observer(this);
}

public Task AddItem(T item) => _collectionGrain.AddItem(item);
public Task RemoveItem(T item) => _collectionGrain.RemoveItem(item);
public Task<T[]> GetItems() => _collectionGrain.GetItems();

private Task OnCollectionChanged()
{
return _changed?.Invoke() ?? Task.CompletedTask;
}

public async Task<IDisposable> SubscribeAsync(Func<Task> onChanged)
{
_changed += onChanged;

if (_collectionObserverReference is null)
{
_collectionObserverReference = await _grainFactory.CreateObjectReference<ICollectionObserver>(_observer);
await _collectionGrain.Subscribe(_collectionObserverReference);
}

return new Disposable(() =>
{
_changed -= onChanged;
});
}

public async ValueTask DisposeAsync()
{
if (_collectionObserverReference is not null)
{
await _collectionGrain.Unsubscribe(_collectionObserverReference);
}
}

private class Observer : ICollectionObserver
{
private readonly DistributedObservableCollection<T> _parent;
public Observer(DistributedObservableCollection<T> parent)
{
_parent = parent;
}

public Task OnCollectionChanged() => _parent.OnCollectionChanged();
}

private class Disposable : IDisposable
{
private readonly Action _callback;
public Disposable(Action callback)
{
_callback = callback;
}

public void Dispose()
{
_callback();
}
}
}

12 changes: 12 additions & 0 deletions ControlCenter/Data/ICollectionGrain.cs
@@ -0,0 +1,12 @@
using Orleans;

namespace Orleans.Collections;

public interface ICollectionGrain<T> : IGrainWithIntegerKey
{
Task AddItem(T item);
Task RemoveItem(T item);
Task Subscribe(ICollectionObserver observer);
Task Unsubscribe(ICollectionObserver observer);
Task<T[]> GetItems();
}
6 changes: 6 additions & 0 deletions ControlCenter/Data/ICollectionObserver.cs
@@ -0,0 +1,6 @@
namespace Orleans.Collections;

public interface ICollectionObserver : IGrainObserver
{
Task OnCollectionChanged();
}
7 changes: 7 additions & 0 deletions ControlCenter/Data/IDistributedCollectionFactory.cs
@@ -0,0 +1,7 @@

namespace Orleans.Collections;

public interface IDistributedCollectionFactory
{
DistributedObservableCollection<T> CreateObservableCollection<T>(long key);
}
11 changes: 2 additions & 9 deletions ControlCenter/Hubs/AgentHub.cs
Expand Up @@ -15,21 +15,14 @@ public AgentHub(AgentManager agentManager)

public override Task OnConnectedAsync()
{
lock (_agentManager.Agents)
{
_agentManager.Agents.Add((Context.ConnectionId, Clients.Single(Context.ConnectionId)));
}
_agentManager.Collection.AddItem(Context.ConnectionId);

return base.OnConnectedAsync();
}

public override Task OnDisconnectedAsync(Exception? exception)
{
lock (_agentManager.Agents)
{
var item = _agentManager.Agents.FirstOrDefault(a => a.Item1 == Context.ConnectionId);
_agentManager.Agents.Remove(item);
}
_agentManager.Collection.RemoveItem(Context.ConnectionId);

return base.OnDisconnectedAsync(exception);
}
Expand Down
21 changes: 18 additions & 3 deletions ControlCenter/Pages/Index.razor
@@ -1,6 +1,10 @@
@page "/"
@using Contracts
@using ControlCenter.Hubs
@using Microsoft.AspNetCore.SignalR

@inject AgentManager Manager
@inject IHubContext<AgentHub> HubContext

<PageTitle>Agents</PageTitle>

Expand All @@ -15,8 +19,9 @@
</tr>
</thead>
<tbody>
@foreach (var (id, agent) in Manager.Agents)
@foreach (var id in agents)
{
var agent = AgentProxy.Create(HubContext.Clients.Single(id));
<tr>
<td>@id</td>
<td>
Expand Down Expand Up @@ -44,10 +49,20 @@

@code {
Dictionary<string, double> _lastTemperature = new();
string[] agents = Array.Empty<string>();

protected override void OnInitialized()
protected override async Task OnInitializedAsync()
{
Manager.Agents.CollectionChanged += (_, _) => InvokeAsync(() => StateHasChanged());
_ = await Manager.Collection.SubscribeAsync(async () =>
{
agents = await Manager.Collection.GetItems();

await InvokeAsync(() => StateHasChanged());
});

agents = await Manager.Collection.GetItems();

await base.OnInitializedAsync();
}

async Task CheckTemperature(string id, IAgent agent)
Expand Down
7 changes: 6 additions & 1 deletion ControlCenter/Program.cs
@@ -1,11 +1,16 @@
using ControlCenter.Hubs;
using Orleans;
using Orleans.Collections;
using Orleans.Hosting;

var builder = WebApplication.CreateBuilder(args);

builder.Host.UseOrleans(builder => builder.UseLocalhostClustering());

// Add services to the container.
builder.Services.AddRazorPages();
builder.Services.AddServerSideBlazor();
builder.Services.AddSignalR(o => o.MaximumParallelInvocationsPerClient = 2);
builder.Services.AddSingleton<IDistributedCollectionFactory, DistributedCollectionFactory>();
builder.Services.AddSingleton<AgentManager>();

var app = builder.Build();
Expand Down