-
Notifications
You must be signed in to change notification settings - Fork 351
/
ResourceNotificationService.cs
156 lines (128 loc) · 5.75 KB
/
ResourceNotificationService.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
namespace Aspire.Hosting.ApplicationModel;
/// <summary>
/// A service that allows publishing and subscribing to changes in the state of a resource.
/// </summary>
public class ResourceNotificationService(ILogger<ResourceNotificationService> logger)
{
// Resource state is keyed by the resource and the unique name of the resource. This could be the name of the resource, or a replica ID.
private readonly ConcurrentDictionary<(IResource, string), ResourceNotificationState> _resourceNotificationStates = new();
private readonly ILogger<ResourceNotificationService> _logger = logger ?? throw new ArgumentNullException(nameof(logger));
private Action<ResourceEvent>? OnResourceUpdated { get; set; }
/// <summary>
/// Watch for changes to the state for all resources.
/// </summary>
/// <returns></returns>
public async IAsyncEnumerable<ResourceEvent> WatchAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Return the last snapshot for each resource.
foreach (var state in _resourceNotificationStates)
{
var (resource, resourceId) = state.Key;
if (state.Value.LastSnapshot is not null)
{
yield return new ResourceEvent(resource, resourceId, state.Value.LastSnapshot);
}
}
var channel = Channel.CreateUnbounded<ResourceEvent>();
void WriteToChannel(ResourceEvent resourceEvent) =>
channel.Writer.TryWrite(resourceEvent);
OnResourceUpdated += WriteToChannel;
try
{
await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
{
yield return item;
}
}
finally
{
OnResourceUpdated -= WriteToChannel;
channel.Writer.TryComplete();
}
}
/// <summary>
/// Updates the snapshot of the <see cref="CustomResourceSnapshot"/> for a resource.
/// </summary>
/// <param name="resource">The resource to update</param>
/// <param name="resourceId"> The id of the resource.</param>
/// <param name="stateFactory">A factory that creates the new state based on the previous state.</param>
public Task PublishUpdateAsync(IResource resource, string resourceId, Func<CustomResourceSnapshot, CustomResourceSnapshot> stateFactory)
{
var notificationState = GetResourceNotificationState(resource, resourceId);
lock (notificationState)
{
var previousState = GetCurrentSnapshot(resource, notificationState);
var newState = stateFactory(previousState);
notificationState.LastSnapshot = newState;
OnResourceUpdated?.Invoke(new ResourceEvent(resource, resourceId, newState));
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Resource {Resource}/{ResourceId} -> {State}", resource.Name, resourceId, newState.State);
}
return Task.CompletedTask;
}
}
/// <summary>
/// Updates the snapshot of the <see cref="CustomResourceSnapshot"/> for a resource.
/// </summary>
/// <param name="resource">The resource to update</param>
/// <param name="stateFactory">A factory that creates the new state based on the previous state.</param>
public Task PublishUpdateAsync(IResource resource, Func<CustomResourceSnapshot, CustomResourceSnapshot> stateFactory)
{
return PublishUpdateAsync(resource, resource.Name, stateFactory);
}
private static CustomResourceSnapshot GetCurrentSnapshot(IResource resource, ResourceNotificationState notificationState)
{
var previousState = notificationState.LastSnapshot;
if (previousState is null)
{
if (resource.Annotations.OfType<ResourceSnapshotAnnotation>().LastOrDefault() is { } annotation)
{
previousState = annotation.InitialSnapshot;
}
// If there is no initial snapshot, create an empty one.
previousState ??= new CustomResourceSnapshot()
{
ResourceType = resource.GetType().Name,
Properties = []
};
}
return previousState;
}
private ResourceNotificationState GetResourceNotificationState(IResource resource, string resourceId) =>
_resourceNotificationStates.GetOrAdd((resource, resourceId), _ => new ResourceNotificationState());
/// <summary>
/// The annotation that allows publishing and subscribing to changes in the state of a resource.
/// </summary>
private sealed class ResourceNotificationState
{
public CustomResourceSnapshot? LastSnapshot { get; set; }
}
}
/// <summary>
/// Represents a change in the state of a resource.
/// </summary>
/// <param name="resource">The resource associated with the event.</param>
/// <param name="resourceId">The unique id of the resource.</param>
/// <param name="snapshot">The snapshot of the resource state.</param>
public class ResourceEvent(IResource resource, string resourceId, CustomResourceSnapshot snapshot)
{
/// <summary>
/// The resource associated with the event.
/// </summary>
public IResource Resource { get; } = resource;
/// <summary>
/// The unique id of the resource.
/// </summary>
public string ResourceId { get; } = resourceId;
/// <summary>
/// The snapshot of the resource state.
/// </summary>
public CustomResourceSnapshot Snapshot { get; } = snapshot;
}