Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public InspectorDemoScenarioService(

public async Task<InspectorDemoRunResponse> RunHierarchyAsync(CancellationToken ct = default)
{
await _registry.UnregisterActorAsync(nameof(InspectorTransformerAgent), "inspector-parent", ct);
await _registry.UnregisterActorAsync(nameof(InspectorCollectorAgent), "inspector-child", ct);

var parent = await _runtime.CreateAsync<InspectorTransformerAgent>("inspector-parent", ct);
var child = await _runtime.CreateAsync<InspectorCollectorAgent>("inspector-child", ct);
await _registry.RegisterActorAsync(nameof(InspectorTransformerAgent), parent.Id, ct);
Expand Down
1 change: 0 additions & 1 deletion demos/Aevatar.Demos.Inspector/InspectorApplication.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public static void ConfigureServices(IServiceCollection services, IConfiguration
ProjectionKind = scopeKey.ProjectionKind,
},
context => new StudioMaterializationRuntimeLease(context));
services.TryAddSingleton<StudioCurrentStateProjectionPort>();
services.TryAddSingleton<IStudioActorBootstrap, InspectorStudioActorBootstrap>();
services.TryAddSingleton<
IProjectionDocumentMetadataProvider<GAgentRegistryCurrentStateDocument>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,6 @@ public ScriptEvolutionProjectionPort(
_attachExistingLeaseLookup = attachExistingLeaseLookup ?? throw new ArgumentNullException(nameof(attachExistingLeaseLookup));
}

internal Task<IScriptEvolutionProjectionLease?> EnsureActorProjectionAsync(
string sessionActorId,
string proposalId,
CancellationToken ct = default) =>
EnsureProjectionAsync(
new ProjectionScopeStartRequest
{
RootActorId = sessionActorId,
ProjectionKind = ScriptProjectionKinds.EvolutionSession,
Mode = ProjectionRuntimeMode.SessionObservation,
SessionId = proposalId,
},
ct);

// Refactor (iter41/cluster-041-command-observation-projection-activation):
// Old pattern: command observation binders ensure/activate projection/readmodel sessions before dispatch.
// New principle: observation binders attach only to existing projection-owned sessions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,4 @@ public ScriptExecutionProjectionPort(
{
}

internal Task<IScriptExecutionProjectionLease?> EnsureActorProjectionAsync(
string actorId,
CancellationToken ct = default) =>
EnsureRunProjectionAsync(actorId, actorId, ct);

internal Task<IScriptExecutionProjectionLease?> EnsureRunProjectionAsync(
string actorId,
string runId,
CancellationToken ct = default) =>
EnsureProjectionAsync(
new ProjectionScopeStartRequest
{
RootActorId = actorId,
ProjectionKind = ScriptProjectionKinds.ExecutionSession,
Mode = ProjectionRuntimeMode.SessionObservation,
SessionId = runId,
},
ct);
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public static IServiceCollection AddStudioProjectionComponents(
ProjectionKind = scopeKey.ProjectionKind,
},
context => new StudioMaterializationRuntimeLease(context));
services.TryAddSingleton<StudioCurrentStateProjectionPort>();

// ── Projectors ──

Expand Down

This file was deleted.

100 changes: 100 additions & 0 deletions test/Aevatar.Demos.Inspector.Tests/InspectorEndpointsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
using System.Diagnostics;
using System.Net.Http.Json;
using System.Text.Json;
using System.Threading.Channels;
using Aevatar.CQRS.Projection.Stores.Abstractions;
using Aevatar.Demos.Inspector;
using Aevatar.Demos.Inspector.Demo;
using Aevatar.Demos.Inspector.ReadModels;
using Aevatar.Demos.Inspector.Telemetry;
using Aevatar.Foundation.Runtime.Observability;
Expand Down Expand Up @@ -56,6 +58,39 @@ public async Task ReadModelEndpoint_ShouldExposeUnpackedProtobufJson()
serialized.Should().Contain("actor-a");
}

[Fact]
public async Task ActorsEndpoint_ShouldReflectInspectorUnregisterCleanup()
{
await using var host = await InspectorTestHost.StartAsync();
var registry = host.Services.GetRequiredService<InspectorGAgentRegistryService>();
var readModelUpserts = new ReadModelUpsertSignal();
using var listener = CreateGAgentRegistryReadModelUpsertListener(readModelUpserts);

await registry.RegisterActorAsync(nameof(InspectorTransformerAgent), "inspector-parent", CancellationToken.None);
await registry.RegisterActorAsync(nameof(InspectorTransformerAgent), "stale-parent", CancellationToken.None);

var registered = await WaitForInspectorTransformerGroupAsync(
host.Client,
readModelUpserts,
group =>
group.ActorIds.Contains("inspector-parent") &&
group.ActorIds.Contains("stale-parent"));
registered.Groups.Should().ContainSingle();

await registry.UnregisterActorAsync(nameof(InspectorTransformerAgent), "stale-parent", CancellationToken.None);

var response = await WaitForInspectorTransformerGroupAsync(
host.Client,
readModelUpserts,
group =>
group.ActorIds.SequenceEqual(["inspector-parent"]) &&
!group.ActorIds.Contains("stale-parent"));

var group = response.Groups.Should().ContainSingle().Subject;
group.ActorIds.Should().Equal("inspector-parent");
group.ActorIds.Should().NotContain("stale-parent");
}

[Fact]
public async Task WorkflowRunsEndpoint_ShouldReadWorkflowCurrentStateReadModel()
{
Expand Down Expand Up @@ -214,6 +249,71 @@ private static bool IsActorHandleActivity(Activity activity, string displayName,
actorId,
StringComparison.Ordinal);

private static async Task<InspectorActorsResponse> WaitForInspectorTransformerGroupAsync(
HttpClient client,
ReadModelUpsertSignal readModelUpserts,
Func<InspectorActorGroupDto, bool> isExpected)
{
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(3));
while (true)
{
var response = await client.GetFromJsonAsync<InspectorActorsResponse>(
"/api/inspector/actors",
timeout.Token);
response.Should().NotBeNull();

var group = response!.Groups.SingleOrDefault(group =>
group.Type == nameof(InspectorTransformerAgent));
if (group != null && isExpected(group))
return response;

await readModelUpserts.WaitForNextAsync(timeout.Token);
}
}

private static ActivityListener CreateGAgentRegistryReadModelUpsertListener(ReadModelUpsertSignal signal)
{
var listener = new ActivityListener
{
ShouldListenTo = source => source.Name == AevatarActivitySource.ActivitySourceName,
Sample = static (ref ActivityCreationOptions<ActivityContext> _) =>
ActivitySamplingResult.AllDataAndRecorded,
SampleUsingParentId = static (ref ActivityCreationOptions<string> _) =>
ActivitySamplingResult.AllDataAndRecorded,
ActivityStopped = activity =>
{
if (activity.DisplayName != AevatarActivitySource.ReadModelUpsertActivityName ||
!string.Equals(
activity.GetTagItem(AevatarActivitySource.ReadModelNameTag) as string,
nameof(GAgentRegistryCurrentStateDocument),
StringComparison.Ordinal))
{
return;
}

signal.Notify();
},
};
ActivitySource.AddActivityListener(listener);
return listener;
}

private sealed class ReadModelUpsertSignal
{
private readonly Channel<bool> _signals = Channel.CreateUnbounded<bool>(new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false,
});

public void Notify() => _signals.Writer.TryWrite(true);

public async ValueTask WaitForNextAsync(CancellationToken ct)
{
await _signals.Reader.ReadAsync(ct);
}
}

private sealed class InspectorTestHost : IAsyncDisposable
{
private readonly WebApplication _app;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private static string BuildDefinitionSnapshotKey(
var queryService = provider.GetRequiredService<IScriptReadModelQueryApplicationService>();
var projectionPort = provider.GetRequiredService<IScriptExecutionProjectionPort>();

var lease = await projectionPort.EnsureActorProjectionAsync(runtimeActorId, ct);
var lease = await provider.EnsureScriptExecutionProjectionAsync(runtimeActorId, ct);
lease.Should().NotBeNull();
await using var sink = new EventChannel<EventEnvelope>(capacity: 32);
var liveSinkLease = await projectionPort.AttachLiveSinkAsync(lease!, sink, ct);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ await provisioningPortNode1.EnsureRuntimeAsync(
orchestratorDefinition.Snapshot,
CancellationToken.None);

var lease = await executionProjectionNode1.EnsureActorProjectionAsync(
var lease = await node1.Services.EnsureScriptExecutionProjectionAsync(
orchestratorRuntimeActorId,
CancellationToken.None);
lease.Should().NotBeNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public async Task ProvisionRunAndReadSnapshot_ShouldProduceProjectedReadModel()
CancellationToken.None);
resolvedRuntimeActorId.Should().Be(runtimeActorId);

var lease = await projectionPort.EnsureActorProjectionAsync(runtimeActorId, CancellationToken.None);
var lease = await provider.EnsureScriptExecutionProjectionAsync(runtimeActorId, CancellationToken.None);
lease.Should().NotBeNull();
await using var sink = new EventChannel<EventEnvelope>(capacity: 32);
var liveSinkLease = await projectionPort.AttachLiveSinkAsync(lease!, sink, CancellationToken.None);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Aevatar.CQRS.Core.Abstractions.Streaming;
using Aevatar.CQRS.Projection.Core.Abstractions;
using Aevatar.CQRS.Projection.Stores.Abstractions;
using Aevatar.Foundation.Abstractions;
using Aevatar.Foundation.Runtime.Implementations.Local.DependencyInjection;
Expand All @@ -13,6 +14,7 @@
using Aevatar.Scripting.Core.Ports;
using Aevatar.Scripting.Hosting.DependencyInjection;
using Aevatar.Scripting.Infrastructure.Ports;
using Aevatar.Scripting.Projection.Orchestration;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -45,12 +47,12 @@ public static IServiceCollection AddAttachOnlyScriptEvolutionApplicationService(
services.Replace(ServiceDescriptor.Singleton<IScriptEvolutionApplicationService>(sp =>
new AttachOnlyScriptEvolutionApplicationService(
new ScriptEvolutionApplicationService(sp.GetRequiredService<IScriptEvolutionProposalPort>()),
sp.GetRequiredService<IScriptEvolutionProjectionPort>(),
sp.GetRequiredService<IProjectionScopeActivationService<ScriptEvolutionRuntimeLease>>(),
sp.GetRequiredService<IScriptingActorAddressResolver>())));
services.Replace(ServiceDescriptor.Singleton<IScriptEvolutionProposalPort>(sp =>
new AttachOnlyScriptEvolutionProposalPort(
sp.GetRequiredService<RuntimeScriptEvolutionInteractionService>(),
sp.GetRequiredService<IScriptEvolutionProjectionPort>(),
sp.GetRequiredService<IProjectionScopeActivationService<ScriptEvolutionRuntimeLease>>(),
sp.GetRequiredService<IScriptingActorAddressResolver>())));
return services;
}
Expand Down Expand Up @@ -167,7 +169,7 @@ private static string BuildDefinitionSnapshotKey(
var queryService = provider.GetRequiredService<IScriptReadModelQueryApplicationService>();
var projectionPort = provider.GetRequiredService<IScriptExecutionProjectionPort>();

var lease = await projectionPort.EnsureActorProjectionAsync(runtimeActorId, ct)
var lease = await provider.EnsureScriptExecutionProjectionAsync(runtimeActorId, ct)
?? throw new InvalidOperationException($"Failed to ensure script execution projection. actor_id={runtimeActorId}");
await using var sink = new EventChannel<EventEnvelope>(capacity: 64);
var liveSinkLease = await projectionPort.AttachLiveSinkAsync(lease, sink, ct);
Expand Down Expand Up @@ -202,7 +204,7 @@ public static async Task<TextNormalizationReadModel> QueryNormalizationAsync(
{
var queryService = provider.GetRequiredService<IScriptReadModelQueryApplicationService>();
var projectionPort = provider.GetRequiredService<IScriptExecutionProjectionPort>();
var lease = await projectionPort.EnsureActorProjectionAsync(runtimeActorId, ct)
var lease = await provider.EnsureScriptExecutionProjectionAsync(runtimeActorId, ct)
?? throw new InvalidOperationException($"Failed to ensure script execution projection. actor_id={runtimeActorId}");

try
Expand Down Expand Up @@ -436,7 +438,7 @@ public static async Task ActivateAuthorityReadModelsAsync(

private sealed class AttachOnlyScriptEvolutionApplicationService(
IScriptEvolutionApplicationService inner,
IScriptEvolutionProjectionPort evolutionProjectionPort,
IProjectionScopeActivationService<ScriptEvolutionRuntimeLease> evolutionProjectionActivation,
IScriptingActorAddressResolver addressResolver)
: IScriptEvolutionApplicationService
{
Expand Down Expand Up @@ -464,7 +466,8 @@ public async Task<ScriptPromotionDecision> ProposeAsync(
var sessionActorId = addressResolver.GetEvolutionSessionActorId(
normalizedProposalId,
normalizedScopeId);
var lease = await evolutionProjectionPort.EnsureActorProjectionAsync(
var lease = await EnsureEvolutionProjectionAsync(
evolutionProjectionActivation,
sessionActorId,
normalizedProposalId,
ct);
Expand All @@ -480,7 +483,7 @@ public async Task<ScriptPromotionDecision> ProposeAsync(

private sealed class AttachOnlyScriptEvolutionProposalPort(
IScriptEvolutionProposalPort inner,
IScriptEvolutionProjectionPort evolutionProjectionPort,
IProjectionScopeActivationService<ScriptEvolutionRuntimeLease> evolutionProjectionActivation,
IScriptingActorAddressResolver addressResolver)
: IScriptEvolutionProposalPort
{
Expand All @@ -502,7 +505,8 @@ public async Task<ScriptPromotionDecision> ProposeAsync(
var sessionActorId = addressResolver.GetEvolutionSessionActorId(
normalizedProposalId,
normalizedScopeId);
var lease = await evolutionProjectionPort.EnsureActorProjectionAsync(
var lease = await EnsureEvolutionProjectionAsync(
evolutionProjectionActivation,
sessionActorId,
normalizedProposalId,
ct);
Expand All @@ -515,4 +519,23 @@ public async Task<ScriptPromotionDecision> ProposeAsync(
return await inner.ProposeAsync(normalizedProposal, ct);
}
}

private static async Task<IScriptEvolutionProjectionLease?> EnsureEvolutionProjectionAsync(
IProjectionScopeActivationService<ScriptEvolutionRuntimeLease> activationService,
string sessionActorId,
string proposalId,
CancellationToken ct)
{
ArgumentNullException.ThrowIfNull(activationService);

return await activationService.EnsureAsync(
new ProjectionScopeStartRequest
{
RootActorId = sessionActorId,
ProjectionKind = ScriptProjectionKinds.EvolutionSession,
Mode = ProjectionRuntimeMode.SessionObservation,
SessionId = proposalId,
},
ct);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ await provisioningPort.EnsureRuntimeAsync(
definition.Snapshot,
CancellationToken.None);

var lease = await projectionPort.EnsureActorProjectionAsync(runtimeActorId, CancellationToken.None);
var lease = await provider.EnsureScriptExecutionProjectionAsync(runtimeActorId, CancellationToken.None);
lease.Should().NotBeNull();
await using var sink = new EventChannel<EventEnvelope>(capacity: 32);
var liveSinkLease = await projectionPort.AttachLiveSinkAsync(lease!, sink, CancellationToken.None);
Expand Down
Loading
Loading