-
Notifications
You must be signed in to change notification settings - Fork 321
/
ResourceOutgoingPeerResolver.cs
133 lines (113 loc) · 4.06 KB
/
ResourceOutgoingPeerResolver.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Concurrent;
namespace Aspire.Dashboard.Model;
public sealed class ResourceOutgoingPeerResolver : IOutgoingPeerResolver, IAsyncDisposable
{
private readonly IDashboardViewModelService _dashboardViewModelService;
private readonly ConcurrentDictionary<string, ResourceViewModel> _resourceNameMapping = new();
private readonly CancellationTokenSource _watchContainersTokenSource = new();
private readonly Task _watchTask;
private readonly List<Subscription> _subscriptions;
private readonly object _lock = new object();
public ResourceOutgoingPeerResolver(IDashboardViewModelService dashboardViewModelService)
{
_dashboardViewModelService = dashboardViewModelService;
_subscriptions = new List<Subscription>();
var viewModelMonitor = _dashboardViewModelService.GetResources();
var initialList = viewModelMonitor.Snapshot;
var watch = viewModelMonitor.Watch;
foreach (var result in initialList)
{
_resourceNameMapping[result.Name] = result;
}
_watchTask = Task.Run(async () =>
{
await foreach (var resourceChanged in watch.WithCancellation(_watchContainersTokenSource.Token))
{
await OnResourceListChanged(resourceChanged.ObjectChangeType, resourceChanged.Resource).ConfigureAwait(false);
}
});
}
private async Task OnResourceListChanged(ObjectChangeType changeType, ResourceViewModel resourceViewModel)
{
if (changeType == ObjectChangeType.Added)
{
_resourceNameMapping[resourceViewModel.Name] = resourceViewModel;
}
else if (changeType == ObjectChangeType.Modified)
{
_resourceNameMapping[resourceViewModel.Name] = resourceViewModel;
}
else if (changeType == ObjectChangeType.Deleted)
{
_resourceNameMapping.TryRemove(resourceViewModel.Name, out _);
}
await RaisePeerChangesAsync().ConfigureAwait(false);
}
public string ResolvePeerName(string networkAddress)
{
foreach (var (resourceName, resource) in _resourceNameMapping)
{
foreach (var service in resource.Services)
{
if (string.Equals(service.AddressAndPort, networkAddress, StringComparison.OrdinalIgnoreCase))
{
return resource.Name;
}
}
}
return networkAddress;
}
public IDisposable OnPeerChanges(Func<Task> callback)
{
lock (_lock)
{
var subscription = new Subscription(callback, RemoveSubscription);
_subscriptions.Add(subscription);
return subscription;
}
}
private void RemoveSubscription(Subscription subscription)
{
lock (_lock)
{
_subscriptions.Remove(subscription);
}
}
private async Task RaisePeerChangesAsync()
{
if (_subscriptions.Count == 0)
{
return;
}
Subscription[] subscriptions;
lock (_lock)
{
subscriptions = _subscriptions.ToArray();
}
foreach (var subscription in subscriptions)
{
await subscription.ExecuteAsync().ConfigureAwait(false);
}
}
public async ValueTask DisposeAsync()
{
_watchContainersTokenSource.Cancel();
_watchContainersTokenSource.Dispose();
try
{
await _watchTask.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
}
private sealed class Subscription(Func<Task> callback, Action<Subscription> onDispose) : IDisposable
{
private readonly Func<Task> _callback = callback;
private readonly Action<Subscription> _onDispose = onDispose;
public void Dispose() => _onDispose(this);
public Task ExecuteAsync() => _callback();
}
}