diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Aevatar.GAgentService.Abstractions.csproj b/src/platform/Aevatar.GAgentService.Abstractions/Aevatar.GAgentService.Abstractions.csproj
index 6bce22805..32973954e 100644
--- a/src/platform/Aevatar.GAgentService.Abstractions/Aevatar.GAgentService.Abstractions.csproj
+++ b/src/platform/Aevatar.GAgentService.Abstractions/Aevatar.GAgentService.Abstractions.csproj
@@ -26,5 +26,6 @@
+
diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunCurrentStateProjectionPort.cs b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunCurrentStateProjectionPort.cs
new file mode 100644
index 000000000..d0f0fe77c
--- /dev/null
+++ b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunCurrentStateProjectionPort.cs
@@ -0,0 +1,10 @@
+namespace Aevatar.GAgentService.Abstractions.Ports;
+
+///
+/// Activation port for the durable service-run current-state projection.
+/// Mirrors shape but scoped to service-run actors.
+///
+public interface IServiceRunCurrentStateProjectionPort
+{
+ Task EnsureProjectionAsync(string actorId, CancellationToken ct = default);
+}
diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunQueryPort.cs b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunQueryPort.cs
new file mode 100644
index 000000000..fa0122a14
--- /dev/null
+++ b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunQueryPort.cs
@@ -0,0 +1,26 @@
+using Aevatar.GAgentService.Abstractions.Queries;
+
+namespace Aevatar.GAgentService.Abstractions.Ports;
+
+///
+/// Read contract for the implementation-agnostic service-run registry.
+/// Backed by the durable ServiceRunGAgent projection.
+///
+public interface IServiceRunQueryPort
+{
+ Task> ListAsync(
+ ServiceRunQuery query,
+ CancellationToken ct = default);
+
+ Task GetByRunIdAsync(
+ string scopeId,
+ string serviceId,
+ string runId,
+ CancellationToken ct = default);
+
+ Task GetByCommandIdAsync(
+ string scopeId,
+ string serviceId,
+ string commandId,
+ CancellationToken ct = default);
+}
diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunRegistrationPort.cs b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunRegistrationPort.cs
new file mode 100644
index 000000000..aded0ff6b
--- /dev/null
+++ b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunRegistrationPort.cs
@@ -0,0 +1,23 @@
+namespace Aevatar.GAgentService.Abstractions.Ports;
+
+///
+/// Write contract for the implementation-agnostic service-run registry.
+/// Used by the invocation dispatcher to register a run before returning the accepted receipt,
+/// so Studio Observe can query the run from the durable readmodel even on immediate refresh.
+///
+public interface IServiceRunRegistrationPort
+{
+ Task RegisterAsync(
+ ServiceRunRecord record,
+ CancellationToken ct = default);
+
+ Task UpdateStatusAsync(
+ string runActorId,
+ string runId,
+ ServiceRunStatus status,
+ CancellationToken ct = default);
+}
+
+public sealed record ServiceRunRegistrationResult(
+ string RunActorId,
+ string RunId);
diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Protos/service_runs.proto b/src/platform/Aevatar.GAgentService.Abstractions/Protos/service_runs.proto
new file mode 100644
index 000000000..db770367d
--- /dev/null
+++ b/src/platform/Aevatar.GAgentService.Abstractions/Protos/service_runs.proto
@@ -0,0 +1,60 @@
+syntax = "proto3";
+
+package aevatar.gagentservice;
+
+option csharp_namespace = "Aevatar.GAgentService.Abstractions";
+
+import "google/protobuf/timestamp.proto";
+import "service_endpoint.proto";
+import "service_revision.proto";
+
+enum ServiceRunStatus {
+ SERVICE_RUN_STATUS_UNSPECIFIED = 0;
+ SERVICE_RUN_STATUS_ACCEPTED = 1;
+ SERVICE_RUN_STATUS_COMPLETED = 2;
+ SERVICE_RUN_STATUS_FAILED = 3;
+ SERVICE_RUN_STATUS_STOPPED = 4;
+}
+
+message ServiceRunRecord {
+ string scope_id = 1;
+ string service_id = 2;
+ string service_key = 3;
+ string run_id = 4;
+ string command_id = 5;
+ string correlation_id = 6;
+ string endpoint_id = 7;
+ ServiceImplementationKind implementation_kind = 8;
+ string target_actor_id = 9;
+ string revision_id = 10;
+ string deployment_id = 11;
+ ServiceRunStatus status = 12;
+ google.protobuf.Timestamp created_at = 13;
+ google.protobuf.Timestamp updated_at = 14;
+ ServiceIdentity identity = 15;
+}
+
+message ServiceRunState {
+ ServiceRunRecord record = 1;
+ int64 last_applied_event_version = 2;
+ string last_event_id = 3;
+}
+
+message RegisterServiceRunRequested {
+ ServiceRunRecord record = 1;
+}
+
+message UpdateServiceRunStatusRequested {
+ string run_id = 1;
+ ServiceRunStatus status = 2;
+}
+
+message ServiceRunRegisteredEvent {
+ ServiceRunRecord record = 1;
+}
+
+message ServiceRunStatusUpdatedEvent {
+ string run_id = 1;
+ ServiceRunStatus status = 2;
+ google.protobuf.Timestamp updated_at = 3;
+}
diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Queries/ServiceRunSnapshot.cs b/src/platform/Aevatar.GAgentService.Abstractions/Queries/ServiceRunSnapshot.cs
new file mode 100644
index 000000000..7a66f315b
--- /dev/null
+++ b/src/platform/Aevatar.GAgentService.Abstractions/Queries/ServiceRunSnapshot.cs
@@ -0,0 +1,28 @@
+namespace Aevatar.GAgentService.Abstractions.Queries;
+
+public sealed record ServiceRunSnapshot(
+ string ScopeId,
+ string ServiceId,
+ string ServiceKey,
+ string RunId,
+ string CommandId,
+ string CorrelationId,
+ string EndpointId,
+ ServiceImplementationKind ImplementationKind,
+ string TargetActorId,
+ string RevisionId,
+ string DeploymentId,
+ ServiceRunStatus Status,
+ string ActorId,
+ string TenantId,
+ string AppId,
+ string Namespace,
+ long StateVersion,
+ string LastEventId,
+ DateTimeOffset CreatedAt,
+ DateTimeOffset UpdatedAt);
+
+public sealed record ServiceRunQuery(
+ string ScopeId,
+ string ServiceId,
+ int Take = 50);
diff --git a/src/platform/Aevatar.GAgentService.Abstractions/ServiceRunIds.cs b/src/platform/Aevatar.GAgentService.Abstractions/ServiceRunIds.cs
new file mode 100644
index 000000000..6e4a1070d
--- /dev/null
+++ b/src/platform/Aevatar.GAgentService.Abstractions/ServiceRunIds.cs
@@ -0,0 +1,23 @@
+namespace Aevatar.GAgentService.Abstractions;
+
+public static class ServiceRunIds
+{
+ public const string ActorPrefix = "service-run:";
+
+ public static string BuildKey(string scopeId, string serviceId, string runId)
+ {
+ if (string.IsNullOrWhiteSpace(scopeId))
+ throw new ArgumentException("scopeId is required.", nameof(scopeId));
+ if (string.IsNullOrWhiteSpace(serviceId))
+ throw new ArgumentException("serviceId is required.", nameof(serviceId));
+ if (string.IsNullOrWhiteSpace(runId))
+ throw new ArgumentException("runId is required.", nameof(runId));
+
+ return $"{Normalize(scopeId)}:{Normalize(serviceId)}:{Normalize(runId)}";
+ }
+
+ public static string BuildActorId(string scopeId, string serviceId, string runId) =>
+ ActorPrefix + BuildKey(scopeId, serviceId, runId);
+
+ private static string Normalize(string value) => value.Trim();
+}
diff --git a/src/platform/Aevatar.GAgentService.Core/GAgents/ServiceRunGAgent.cs b/src/platform/Aevatar.GAgentService.Core/GAgents/ServiceRunGAgent.cs
new file mode 100644
index 000000000..75dc14595
--- /dev/null
+++ b/src/platform/Aevatar.GAgentService.Core/GAgents/ServiceRunGAgent.cs
@@ -0,0 +1,142 @@
+using Aevatar.Foundation.Abstractions.Attributes;
+using Aevatar.Foundation.Core;
+using Aevatar.Foundation.Core.EventSourcing;
+using Aevatar.GAgentService.Abstractions;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+
+namespace Aevatar.GAgentService.Core.GAgents;
+
+public sealed class ServiceRunGAgent : GAgentBase
+{
+ public ServiceRunGAgent()
+ {
+ InitializeId();
+ }
+
+ [EventHandler]
+ public async Task HandleRegisterAsync(RegisterServiceRunRequested command)
+ {
+ ArgumentNullException.ThrowIfNull(command);
+ ArgumentNullException.ThrowIfNull(command.Record);
+ ValidateRecord(command.Record);
+
+ var existing = State.Record;
+ if (existing != null && !string.IsNullOrWhiteSpace(existing.RunId))
+ {
+ EnsureExistingMatches(existing, command.Record);
+ return;
+ }
+
+ var record = command.Record.Clone();
+ if (record.CreatedAt == null)
+ record.CreatedAt = Timestamp.FromDateTime(DateTime.UtcNow);
+ record.UpdatedAt = record.CreatedAt;
+ if (record.Status == ServiceRunStatus.Unspecified)
+ record.Status = ServiceRunStatus.Accepted;
+
+ await PersistDomainEventAsync(new ServiceRunRegisteredEvent
+ {
+ Record = record,
+ });
+ }
+
+ [EventHandler]
+ public async Task HandleUpdateStatusAsync(UpdateServiceRunStatusRequested command)
+ {
+ ArgumentNullException.ThrowIfNull(command);
+ var existing = State.Record;
+ if (existing == null || string.IsNullOrWhiteSpace(existing.RunId))
+ {
+ throw new InvalidOperationException(
+ $"Service run actor '{Id}' has no registered run; status update rejected.");
+ }
+
+ if (!string.IsNullOrWhiteSpace(command.RunId) &&
+ !string.Equals(existing.RunId, command.RunId, StringComparison.Ordinal))
+ {
+ throw new InvalidOperationException(
+ $"Service run actor '{Id}' is bound to run '{existing.RunId}' and cannot update run '{command.RunId}'.");
+ }
+
+ if (command.Status == ServiceRunStatus.Unspecified)
+ return;
+
+ if (existing.Status == command.Status)
+ return;
+
+ await PersistDomainEventAsync(new ServiceRunStatusUpdatedEvent
+ {
+ RunId = existing.RunId,
+ Status = command.Status,
+ UpdatedAt = Timestamp.FromDateTime(DateTime.UtcNow),
+ });
+ }
+
+ protected override ServiceRunState TransitionState(ServiceRunState current, IMessage evt) =>
+ StateTransitionMatcher
+ .Match(current, evt)
+ .On(ApplyRegistered)
+ .On(ApplyStatusUpdated)
+ .OrCurrent();
+
+ private static ServiceRunState ApplyRegistered(ServiceRunState state, ServiceRunRegisteredEvent evt)
+ {
+ var next = state.Clone();
+ next.Record = evt.Record?.Clone() ?? new ServiceRunRecord();
+ next.LastAppliedEventVersion = state.LastAppliedEventVersion + 1;
+ next.LastEventId = $"{next.Record.RunId}:registered";
+ return next;
+ }
+
+ private static ServiceRunState ApplyStatusUpdated(ServiceRunState state, ServiceRunStatusUpdatedEvent evt)
+ {
+ var next = state.Clone();
+ if (next.Record == null)
+ next.Record = new ServiceRunRecord();
+ next.Record.Status = evt.Status;
+ next.Record.UpdatedAt = evt.UpdatedAt ?? Timestamp.FromDateTime(DateTime.UtcNow);
+ next.LastAppliedEventVersion = state.LastAppliedEventVersion + 1;
+ next.LastEventId = $"{next.Record.RunId}:status:{(int)evt.Status}";
+ return next;
+ }
+
+ private void EnsureExistingMatches(ServiceRunRecord existing, ServiceRunRecord incoming)
+ {
+ if (!string.Equals(existing.RunId, incoming.RunId, StringComparison.Ordinal))
+ {
+ throw new InvalidOperationException(
+ $"Service run actor '{Id}' is bound to run '{existing.RunId}' and cannot register run '{incoming.RunId}'.");
+ }
+ if (!string.Equals(existing.ScopeId, incoming.ScopeId, StringComparison.Ordinal))
+ {
+ throw new InvalidOperationException(
+ $"Service run actor '{Id}' is bound to scope '{existing.ScopeId}' and cannot re-register under scope '{incoming.ScopeId}'.");
+ }
+ if (!string.Equals(existing.ServiceId, incoming.ServiceId, StringComparison.Ordinal))
+ {
+ throw new InvalidOperationException(
+ $"Service run actor '{Id}' is bound to service '{existing.ServiceId}' and cannot re-register under service '{incoming.ServiceId}'.");
+ }
+ if (!string.IsNullOrWhiteSpace(incoming.TargetActorId) &&
+ !string.IsNullOrWhiteSpace(existing.TargetActorId) &&
+ !string.Equals(existing.TargetActorId, incoming.TargetActorId, StringComparison.Ordinal))
+ {
+ throw new InvalidOperationException(
+ $"Service run actor '{Id}' is bound to target '{existing.TargetActorId}' and cannot re-register against target '{incoming.TargetActorId}'.");
+ }
+ }
+
+ private static void ValidateRecord(ServiceRunRecord record)
+ {
+ ArgumentNullException.ThrowIfNull(record);
+ if (string.IsNullOrWhiteSpace(record.RunId))
+ throw new InvalidOperationException("run_id is required.");
+ if (string.IsNullOrWhiteSpace(record.ScopeId))
+ throw new InvalidOperationException("scope_id is required.");
+ if (string.IsNullOrWhiteSpace(record.ServiceId))
+ throw new InvalidOperationException("service_id is required.");
+ if (string.IsNullOrWhiteSpace(record.CommandId))
+ throw new InvalidOperationException("command_id is required.");
+ }
+}
diff --git a/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs b/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs
index 73d1de118..c8d4d1515 100644
--- a/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs
+++ b/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs
@@ -58,6 +58,7 @@ public static IServiceCollection AddGAgentServiceCapability(
services.TryAddSingleton();
services.TryAddSingleton();
services.TryAddSingleton();
+ services.TryAddSingleton();
services.TryAddSingleton();
services.TryAddEnumerable(ServiceDescriptor.Singleton());
services.TryAddEnumerable(ServiceDescriptor.Singleton());
@@ -118,6 +119,7 @@ public static IServiceCollection AddGAgentServiceProjectionReadModelProviders(
TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id);
TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id);
TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id);
+ TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id);
TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id);
}
else
@@ -129,6 +131,7 @@ public static IServiceCollection AddGAgentServiceProjectionReadModelProviders(
TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id);
TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id);
TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id);
+ TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id);
TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id);
}
@@ -146,6 +149,7 @@ private static bool HasAllGAgentServiceProjectionReaders(
&& HasProjectionDocumentReaderForProvider(services, providerKind)
&& HasProjectionDocumentReaderForProvider(services, providerKind)
&& HasProjectionDocumentReaderForProvider(services, providerKind)
+ && HasProjectionDocumentReaderForProvider(services, providerKind)
&& HasProjectionDocumentReaderForProvider(services, providerKind);
}
diff --git a/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs b/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs
index f9c260126..a83087fb4 100644
--- a/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs
+++ b/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs
@@ -22,6 +22,7 @@
using Aevatar.Scripting.Abstractions.Queries;
using Aevatar.Scripting.Core.Ports;
using Aevatar.Workflow.Application.Abstractions.Queries;
+using Aevatar.GAgentService.Abstractions.Queries;
using Aevatar.GAgentService.Hosting.Serialization;
using Aevatar.Presentation.AGUI;
using Aevatar.Workflow.Application.Abstractions.Runs;
@@ -647,6 +648,7 @@ private static async Task HandleInvokeDefaultChatStreamAsync(
StreamScopeServiceHttpRequest request,
[FromServices] ServiceInvocationResolutionService resolutionService,
[FromServices] IInvokeAdmissionAuthorizer admissionAuthorizer,
+ [FromServices] IServiceRunRegistrationPort serviceRunRegistrationPort,
[FromServices] ICommandInteractionService chatRunService,
[FromServices] ICommandInteractionService gagentDraftRunService,
[FromServices] IScriptRuntimeCommandPort scriptRuntimeCommandPort,
@@ -671,6 +673,7 @@ await HandleInvokeStreamAsync(
appId: null,
resolutionService,
admissionAuthorizer,
+ serviceRunRegistrationPort,
chatRunService,
gagentDraftRunService,
scriptRuntimeCommandPort,
@@ -769,6 +772,7 @@ private static async Task HandleInvokeMemberStreamAsync(
[FromServices] IMemberPublishedServiceResolver memberPublishedServiceResolver,
[FromServices] ServiceInvocationResolutionService resolutionService,
[FromServices] IInvokeAdmissionAuthorizer admissionAuthorizer,
+ [FromServices] IServiceRunRegistrationPort serviceRunRegistrationPort,
[FromServices] ICommandInteractionService chatRunService,
[FromServices] ICommandInteractionService gagentDraftRunService,
[FromServices] IScriptRuntimeCommandPort scriptRuntimeCommandPort,
@@ -796,6 +800,7 @@ await HandleInvokeStreamAsync(
null,
resolutionService,
admissionAuthorizer,
+ serviceRunRegistrationPort,
chatRunService,
gagentDraftRunService,
scriptRuntimeCommandPort,
@@ -863,7 +868,7 @@ private static Task HandleListDefaultRunsAsync(
string scopeId,
int take,
[FromServices] IServiceLifecycleQueryPort lifecycleQueryPort,
- [FromServices] IWorkflowRunBindingReader workflowRunBindingReader,
+ [FromServices] IServiceRunQueryPort serviceRunQueryPort,
[FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService,
[FromServices] IOptions options,
CancellationToken ct) =>
@@ -873,7 +878,7 @@ private static Task HandleListDefaultRunsAsync(
ResolveDefaultScopeServiceId(options.Value),
take,
lifecycleQueryPort,
- workflowRunBindingReader,
+ serviceRunQueryPort,
workflowExecutionQueryService,
options,
ct);
@@ -884,7 +889,7 @@ private static Task HandleGetDefaultRunAsync(
string runId,
string? actorId,
[FromServices] IServiceLifecycleQueryPort lifecycleQueryPort,
- [FromServices] IWorkflowRunBindingReader workflowRunBindingReader,
+ [FromServices] IServiceRunQueryPort serviceRunQueryPort,
[FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService,
[FromServices] IOptions options,
CancellationToken ct) =>
@@ -895,7 +900,7 @@ private static Task HandleGetDefaultRunAsync(
runId,
actorId,
lifecycleQueryPort,
- workflowRunBindingReader,
+ serviceRunQueryPort,
workflowExecutionQueryService,
options,
ct);
@@ -906,7 +911,7 @@ private static Task HandleGetDefaultRunAuditAsync(
string runId,
string? actorId,
[FromServices] IServiceLifecycleQueryPort lifecycleQueryPort,
- [FromServices] IWorkflowRunBindingReader workflowRunBindingReader,
+ [FromServices] IServiceRunQueryPort serviceRunQueryPort,
[FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService,
[FromServices] IOptions options,
CancellationToken ct) =>
@@ -917,7 +922,7 @@ private static Task HandleGetDefaultRunAuditAsync(
runId,
actorId,
lifecycleQueryPort,
- workflowRunBindingReader,
+ serviceRunQueryPort,
workflowExecutionQueryService,
options,
ct);
@@ -1334,7 +1339,7 @@ private static async Task HandleListRunsAsync(
string serviceId,
int take,
[FromServices] IServiceLifecycleQueryPort lifecycleQueryPort,
- [FromServices] IWorkflowRunBindingReader workflowRunBindingReader,
+ [FromServices] IServiceRunQueryPort serviceRunQueryPort,
[FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService,
[FromServices] IOptions options,
CancellationToken ct)
@@ -1343,23 +1348,17 @@ private static async Task HandleListRunsAsync(
if (resolution.Failure != null)
return resolution.Failure;
- var bindings = await ListScopeServiceRunsAsync(
- scopeId,
- resolution.Service!,
- resolution.Deployments,
- workflowRunBindingReader,
- take,
+ var snapshots = await serviceRunQueryPort.ListAsync(
+ new ServiceRunQuery(scopeId, serviceId, Math.Clamp(take <= 0 ? 50 : take, 1, 200)),
ct);
- var summaries = new List(bindings.Count);
- foreach (var binding in bindings)
+ var summaries = new List(snapshots.Count);
+ foreach (var snapshot in snapshots)
{
- summaries.Add(await BuildScopeRunSummaryAsync(
+ summaries.Add(await BuildScopeRunSummaryFromRegistryAsync(
scopeId,
serviceId,
- binding,
- resolution.Service!,
- resolution.Deployments,
+ snapshot,
workflowExecutionQueryService,
ct));
}
@@ -1379,30 +1378,29 @@ private static async Task HandleGetRunAsync(
string runId,
string? actorId,
[FromServices] IServiceLifecycleQueryPort lifecycleQueryPort,
- [FromServices] IWorkflowRunBindingReader workflowRunBindingReader,
+ [FromServices] IServiceRunQueryPort serviceRunQueryPort,
[FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService,
[FromServices] IOptions options,
CancellationToken ct)
{
- var resolution = await ResolveScopeServiceRunAsync(
- http,
- options.Value,
- scopeId,
- serviceId,
- runId,
- actorId,
- lifecycleQueryPort,
- workflowRunBindingReader,
- ct);
- if (resolution.Failure != null)
- return resolution.Failure;
+ var serviceResolution = await ResolveScopeServiceAsync(http, scopeId, serviceId, lifecycleQueryPort, options.Value, ct);
+ if (serviceResolution.Failure != null)
+ return serviceResolution.Failure;
- return Results.Ok(await BuildScopeRunSummaryAsync(
+ var snapshot = await ResolveServiceRunSnapshotAsync(scopeId, serviceId, runId, serviceRunQueryPort, ct);
+ if (snapshot == null)
+ {
+ return Results.NotFound(new
+ {
+ code = "SERVICE_RUN_NOT_FOUND",
+ message = BuildScopeServiceRunNotFoundMessage(scopeId, serviceId, runId?.Trim() ?? string.Empty),
+ });
+ }
+
+ return Results.Ok(await BuildScopeRunSummaryFromRegistryAsync(
scopeId,
serviceId,
- resolution.Binding!,
- resolution.Service!,
- resolution.Deployments,
+ snapshot,
workflowExecutionQueryService,
ct));
}
@@ -1414,45 +1412,110 @@ private static async Task HandleGetRunAuditAsync(
string runId,
string? actorId,
[FromServices] IServiceLifecycleQueryPort lifecycleQueryPort,
- [FromServices] IWorkflowRunBindingReader workflowRunBindingReader,
+ [FromServices] IServiceRunQueryPort serviceRunQueryPort,
[FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService,
[FromServices] IOptions options,
CancellationToken ct)
{
- var resolution = await ResolveScopeServiceRunAsync(
- http,
- options.Value,
- scopeId,
- serviceId,
- runId,
- actorId,
- lifecycleQueryPort,
- workflowRunBindingReader,
- ct);
- if (resolution.Failure != null)
- return resolution.Failure;
+ var serviceResolution = await ResolveScopeServiceAsync(http, scopeId, serviceId, lifecycleQueryPort, options.Value, ct);
+ if (serviceResolution.Failure != null)
+ return serviceResolution.Failure;
+
+ var snapshot = await ResolveServiceRunSnapshotAsync(scopeId, serviceId, runId, serviceRunQueryPort, ct);
+ if (snapshot == null)
+ {
+ return Results.NotFound(new
+ {
+ code = "SERVICE_RUN_NOT_FOUND",
+ message = BuildScopeServiceRunNotFoundMessage(scopeId, serviceId, runId?.Trim() ?? string.Empty),
+ });
+ }
- var summary = await BuildScopeRunSummaryAsync(
+ var summary = await BuildScopeRunSummaryFromRegistryAsync(
scopeId,
serviceId,
- resolution.Binding!,
- resolution.Service!,
- resolution.Deployments,
+ snapshot,
workflowExecutionQueryService,
ct);
- var report = await workflowExecutionQueryService.GetActorReportAsync(resolution.Binding!.ActorId, ct);
+
+ if (snapshot.ImplementationKind != ServiceImplementationKind.Workflow ||
+ string.IsNullOrWhiteSpace(snapshot.TargetActorId))
+ {
+ return Results.NotFound(new
+ {
+ code = "SERVICE_RUN_AUDIT_NOT_AVAILABLE",
+ message = $"Audit detail for run '{snapshot.RunId}' is not available for {snapshot.ImplementationKind} services.",
+ });
+ }
+
+ var report = await workflowExecutionQueryService.GetActorReportAsync(snapshot.TargetActorId, ct);
if (report == null)
{
return Results.NotFound(new
{
code = "SERVICE_RUN_AUDIT_NOT_FOUND",
- message = $"Audit report for run '{resolution.Binding.RunId}' was not found on service '{serviceId}' in scope '{scopeId}'.",
+ message = $"Audit report for run '{snapshot.RunId}' was not found on service '{serviceId}' in scope '{scopeId}'.",
});
}
return Results.Ok(new ScopeServiceRunAuditHttpResponse(summary, report));
}
+ // Registers a stream-invocation run with the durable service-run registry using the
+ // actual run id that the implementation pipeline produced (workflow run actor id /
+ // draft-run command id / scripting-generated run id). Called once the downstream
+ // run id is known so /runs/{runId} resolves the same id the client receives via SSE.
+ private static ValueTask RegisterStreamServiceRunAsync(
+ IServiceRunRegistrationPort serviceRunRegistrationPort,
+ ServiceInvocationResolvedTarget target,
+ ServiceInvocationRequest invocationRequest,
+ string scopeId,
+ string serviceId,
+ string runId,
+ string commandId,
+ string correlationId,
+ string targetActorId,
+ CancellationToken ct)
+ {
+ var record = new ServiceRunRecord
+ {
+ ScopeId = scopeId,
+ ServiceId = serviceId,
+ ServiceKey = target.Service.ServiceKey ?? string.Empty,
+ RunId = runId,
+ CommandId = string.IsNullOrWhiteSpace(commandId) ? runId : commandId,
+ CorrelationId = string.IsNullOrWhiteSpace(correlationId) ? runId : correlationId,
+ EndpointId = target.Endpoint.EndpointId ?? string.Empty,
+ ImplementationKind = target.Artifact.ImplementationKind,
+ TargetActorId = string.IsNullOrWhiteSpace(targetActorId)
+ ? target.Service.PrimaryActorId ?? string.Empty
+ : targetActorId,
+ RevisionId = target.Service.RevisionId ?? string.Empty,
+ DeploymentId = target.Service.DeploymentId ?? string.Empty,
+ Status = ServiceRunStatus.Accepted,
+ Identity = invocationRequest.Identity?.Clone(),
+ };
+ return new ValueTask(serviceRunRegistrationPort.RegisterAsync(record, ct));
+ }
+
+ private static async Task ResolveServiceRunSnapshotAsync(
+ string scopeId,
+ string serviceId,
+ string runId,
+ IServiceRunQueryPort serviceRunQueryPort,
+ CancellationToken ct)
+ {
+ var normalized = runId?.Trim() ?? string.Empty;
+ if (string.IsNullOrWhiteSpace(normalized))
+ return null;
+
+ var byRun = await serviceRunQueryPort.GetByRunIdAsync(scopeId, serviceId, normalized, ct);
+ if (byRun != null)
+ return byRun;
+
+ return await serviceRunQueryPort.GetByCommandIdAsync(scopeId, serviceId, normalized, ct);
+ }
+
private static async Task HandleInvokeStreamAsync(
HttpContext http,
string scopeId,
@@ -1462,6 +1525,7 @@ private static async Task HandleInvokeStreamAsync(
string? appId,
[FromServices] ServiceInvocationResolutionService resolutionService,
[FromServices] IInvokeAdmissionAuthorizer admissionAuthorizer,
+ [FromServices] IServiceRunRegistrationPort serviceRunRegistrationPort,
[FromServices] ICommandInteractionService chatRunService,
[FromServices] ICommandInteractionService gagentDraftRunService,
[FromServices] IScriptRuntimeCommandPort scriptRuntimeCommandPort,
@@ -1493,7 +1557,6 @@ await admissionAuthorizer.AuthorizeAsync(
target.Endpoint,
invocationRequest,
ct);
-
switch (target.Artifact.ImplementationKind)
{
case ServiceImplementationKind.Workflow:
@@ -1510,7 +1573,20 @@ await WorkflowCapabilityEndpoints.HandleChat(
Metadata = scopedHeaders,
},
chatRunService,
- ct);
+ ct,
+ onAcceptedHook: (receipt, token) => RegisterStreamServiceRunAsync(
+ serviceRunRegistrationPort,
+ target,
+ invocationRequest,
+ scopeId,
+ serviceId,
+ // For workflow, the SSE RunStarted carries the workflow run actor id as the run identifier;
+ // use the same id so /runs/{runId} resolves to this run after refresh.
+ runId: receipt.ActorId,
+ commandId: receipt.CommandId,
+ correlationId: receipt.CorrelationId,
+ targetActorId: receipt.ActorId,
+ token));
break;
case ServiceImplementationKind.Static:
@@ -1521,9 +1597,12 @@ await HandleStaticGAgentChatStreamAsync(
request.ActorId,
request.SessionId,
scopeId,
+ serviceId,
scopedHeaders,
request.InputParts,
gagentDraftRunService,
+ invocationRequest,
+ serviceRunRegistrationPort,
ct);
break;
@@ -1534,9 +1613,12 @@ await HandleScriptingServiceChatStreamAsync(
normalizedPrompt,
request.SessionId,
scopeId,
+ serviceId,
scopedHeaders,
scriptRuntimeCommandPort,
scriptExecutionProjectionPort,
+ invocationRequest,
+ serviceRunRegistrationPort,
ct);
break;
@@ -1572,9 +1654,12 @@ private static async Task HandleStaticGAgentChatStreamAsync(
string? actorId,
string? sessionId,
string scopeId,
+ string serviceId,
IReadOnlyDictionary? headers,
IReadOnlyList? inputParts,
ICommandInteractionService interactionService,
+ ServiceInvocationRequest invocationRequest,
+ IServiceRunRegistrationPort serviceRunRegistrationPort,
CancellationToken ct)
{
var plan = target.Artifact.DeploymentPlan.StaticPlan;
@@ -1607,6 +1692,19 @@ async ValueTask EmitAsync(AGUIEvent aguiEvent, CancellationToken token)
async ValueTask OnAcceptedAsync(GAgentDraftRunAcceptedReceipt receipt, CancellationToken token)
{
http.Response.Headers["X-Correlation-Id"] = receipt.CorrelationId;
+ // Register the service run with the same id we are about to send to the client
+ // so /runs/{runId} resolves immediately on refresh.
+ await RegisterStreamServiceRunAsync(
+ serviceRunRegistrationPort,
+ target,
+ invocationRequest,
+ scopeId,
+ serviceId,
+ runId: receipt.CommandId,
+ commandId: receipt.CommandId,
+ correlationId: receipt.CorrelationId,
+ targetActorId: receipt.ActorId,
+ token);
await EnsureSseStartedAsync(token);
await writer.WriteAsync(
new AGUIEvent
@@ -1694,9 +1792,12 @@ private static async Task HandleScriptingServiceChatStreamAsync(
string prompt,
string? sessionId,
string scopeId,
+ string serviceId,
IReadOnlyDictionary? headers,
IScriptRuntimeCommandPort scriptRuntimeCommandPort,
IScriptExecutionProjectionPort scriptExecutionProjectionPort,
+ ServiceInvocationRequest invocationRequest,
+ IServiceRunRegistrationPort serviceRunRegistrationPort,
CancellationToken ct)
{
var actorId = target.Service.PrimaryActorId;
@@ -1705,6 +1806,18 @@ private static async Task HandleScriptingServiceChatStreamAsync(
"Script runtime actor is not available. The service may not be activated.");
var runId = Guid.NewGuid().ToString("N");
+ // Register the service run with the same id the SSE RunStarted frame will carry.
+ await RegisterStreamServiceRunAsync(
+ serviceRunRegistrationPort,
+ target,
+ invocationRequest,
+ scopeId,
+ serviceId,
+ runId: runId,
+ commandId: runId,
+ correlationId: runId,
+ targetActorId: actorId,
+ ct);
var chatRequest = new ChatRequestEvent
{
Prompt = prompt,
@@ -2726,7 +2839,58 @@ private static async Task BuildScopeRunSumma
snapshot?.CompletedSteps ?? 0,
snapshot?.RoleReplyCount ?? 0,
snapshot?.LastOutput ?? string.Empty,
- snapshot?.LastError ?? string.Empty);
+ snapshot?.LastError ?? string.Empty,
+ ServiceImplementationKind.Workflow.ToString(),
+ ServiceRunStatus.Accepted.ToString(),
+ string.Empty,
+ string.Empty,
+ string.Empty,
+ binding.ActorId,
+ binding.CreatedAt);
+ }
+
+ private static async Task BuildScopeRunSummaryFromRegistryAsync(
+ string scopeId,
+ string serviceId,
+ ServiceRunSnapshot snapshot,
+ IWorkflowExecutionQueryApplicationService workflowExecutionQueryService,
+ CancellationToken ct)
+ {
+ var workflowSnapshot = snapshot.ImplementationKind == ServiceImplementationKind.Workflow &&
+ !string.IsNullOrWhiteSpace(snapshot.TargetActorId)
+ ? await workflowExecutionQueryService.GetActorSnapshotAsync(snapshot.TargetActorId, ct)
+ : null;
+
+ return new ScopeServiceRunSummaryHttpResponse(
+ scopeId,
+ serviceId,
+ snapshot.RunId,
+ // ActorId stays the controllable target so existing resume/signal/stop
+ // round-trips keep working; the registry actor is internal infra.
+ snapshot.TargetActorId,
+ string.Empty,
+ snapshot.RevisionId,
+ snapshot.DeploymentId,
+ workflowSnapshot?.WorkflowName ?? string.Empty,
+ workflowSnapshot?.CompletionStatus ?? WorkflowRunCompletionStatus.Unknown,
+ workflowSnapshot?.StateVersion ?? snapshot.StateVersion,
+ workflowSnapshot?.LastEventId ?? snapshot.LastEventId,
+ workflowSnapshot?.LastUpdatedAt ?? snapshot.UpdatedAt,
+ snapshot.CreatedAt,
+ snapshot.UpdatedAt,
+ workflowSnapshot?.LastSuccess,
+ workflowSnapshot?.TotalSteps ?? 0,
+ workflowSnapshot?.CompletedSteps ?? 0,
+ workflowSnapshot?.RoleReplyCount ?? 0,
+ workflowSnapshot?.LastOutput ?? string.Empty,
+ workflowSnapshot?.LastError ?? string.Empty,
+ snapshot.ImplementationKind.ToString(),
+ snapshot.Status.ToString(),
+ snapshot.CommandId,
+ snapshot.CorrelationId,
+ snapshot.EndpointId,
+ snapshot.TargetActorId,
+ snapshot.CreatedAt);
}
private static MemberScopeServiceRunSummaryHttpResponse BuildMemberRunSummaryResponse(
@@ -3489,7 +3653,14 @@ public sealed record ScopeServiceRunSummaryHttpResponse(
int CompletedSteps,
int RoleReplyCount,
string LastOutput,
- string LastError);
+ string LastError,
+ string ImplementationKind,
+ string Status,
+ string CommandId,
+ string CorrelationId,
+ string EndpointId,
+ string TargetActorId,
+ DateTimeOffset? CreatedAt = null);
public sealed record MemberScopeServiceRunSummaryHttpResponse(
string ScopeId,
diff --git a/src/platform/Aevatar.GAgentService.Infrastructure/Adapters/ServiceRunRegistrationAdapter.cs b/src/platform/Aevatar.GAgentService.Infrastructure/Adapters/ServiceRunRegistrationAdapter.cs
new file mode 100644
index 000000000..a93cffc76
--- /dev/null
+++ b/src/platform/Aevatar.GAgentService.Infrastructure/Adapters/ServiceRunRegistrationAdapter.cs
@@ -0,0 +1,104 @@
+using Aevatar.Foundation.Abstractions;
+using Aevatar.GAgentService.Abstractions;
+using Aevatar.GAgentService.Abstractions.Ports;
+using Aevatar.GAgentService.Core.GAgents;
+using Google.Protobuf.WellKnownTypes;
+
+namespace Aevatar.GAgentService.Infrastructure.Adapters;
+
+///
+/// Infrastructure adapter that registers and updates service runs by dispatching
+/// commands to actors. The actor commits the events
+/// and the current-state projection materializes them into the durable readmodel.
+///
+public sealed class ServiceRunRegistrationAdapter : IServiceRunRegistrationPort
+{
+ private const string PublisherId = "gagent-service.runs";
+
+ private readonly IActorRuntime _runtime;
+ private readonly IActorDispatchPort _dispatchPort;
+ private readonly IServiceRunCurrentStateProjectionPort _projectionPort;
+
+ public ServiceRunRegistrationAdapter(
+ IActorRuntime runtime,
+ IActorDispatchPort dispatchPort,
+ IServiceRunCurrentStateProjectionPort projectionPort)
+ {
+ _runtime = runtime ?? throw new ArgumentNullException(nameof(runtime));
+ _dispatchPort = dispatchPort ?? throw new ArgumentNullException(nameof(dispatchPort));
+ _projectionPort = projectionPort ?? throw new ArgumentNullException(nameof(projectionPort));
+ }
+
+ public async Task RegisterAsync(
+ ServiceRunRecord record,
+ CancellationToken ct = default)
+ {
+ ArgumentNullException.ThrowIfNull(record);
+ if (string.IsNullOrWhiteSpace(record.RunId))
+ throw new InvalidOperationException("run_id is required.");
+ if (string.IsNullOrWhiteSpace(record.ScopeId))
+ throw new InvalidOperationException("scope_id is required.");
+ if (string.IsNullOrWhiteSpace(record.ServiceId))
+ throw new InvalidOperationException("service_id is required.");
+
+ var actorId = ServiceRunIds.BuildActorId(record.ScopeId, record.ServiceId, record.RunId);
+ var actor = await _runtime.CreateAsync(actorId, ct: ct);
+ await _projectionPort.EnsureProjectionAsync(actor.Id, ct);
+
+ var prepared = record.Clone();
+ if (prepared.CreatedAt == null)
+ prepared.CreatedAt = Timestamp.FromDateTime(DateTime.UtcNow);
+ prepared.UpdatedAt = prepared.CreatedAt;
+ if (prepared.Status == ServiceRunStatus.Unspecified)
+ prepared.Status = ServiceRunStatus.Accepted;
+
+ var envelope = CreateEnvelope(actor.Id, Any.Pack(new RegisterServiceRunRequested
+ {
+ Record = prepared,
+ }), prepared.CommandId, prepared.CorrelationId);
+
+ await _dispatchPort.DispatchAsync(actor.Id, envelope, ct);
+ return new ServiceRunRegistrationResult(actor.Id, prepared.RunId);
+ }
+
+ public async Task UpdateStatusAsync(
+ string runActorId,
+ string runId,
+ ServiceRunStatus status,
+ CancellationToken ct = default)
+ {
+ if (string.IsNullOrWhiteSpace(runActorId))
+ throw new ArgumentException("runActorId is required.", nameof(runActorId));
+ if (status == ServiceRunStatus.Unspecified)
+ return;
+
+ var commandId = Guid.NewGuid().ToString("N");
+ var envelope = CreateEnvelope(
+ runActorId,
+ Any.Pack(new UpdateServiceRunStatusRequested
+ {
+ RunId = runId ?? string.Empty,
+ Status = status,
+ }),
+ commandId,
+ commandId);
+ await _dispatchPort.DispatchAsync(runActorId, envelope, ct);
+ }
+
+ private static EventEnvelope CreateEnvelope(
+ string actorId,
+ Any payload,
+ string commandId,
+ string correlationId) =>
+ new()
+ {
+ Id = string.IsNullOrWhiteSpace(commandId) ? Guid.NewGuid().ToString("N") : commandId,
+ Timestamp = Timestamp.FromDateTime(DateTime.UtcNow),
+ Payload = payload,
+ Route = EnvelopeRouteSemantics.CreateDirect(PublisherId, actorId),
+ Propagation = new EnvelopePropagation
+ {
+ CorrelationId = string.IsNullOrWhiteSpace(correlationId) ? commandId : correlationId,
+ },
+ };
+}
diff --git a/src/platform/Aevatar.GAgentService.Infrastructure/Dispatch/DefaultServiceInvocationDispatcher.cs b/src/platform/Aevatar.GAgentService.Infrastructure/Dispatch/DefaultServiceInvocationDispatcher.cs
index 06f20d4c6..ee5a8bff9 100644
--- a/src/platform/Aevatar.GAgentService.Infrastructure/Dispatch/DefaultServiceInvocationDispatcher.cs
+++ b/src/platform/Aevatar.GAgentService.Infrastructure/Dispatch/DefaultServiceInvocationDispatcher.cs
@@ -14,15 +14,18 @@ public sealed class DefaultServiceInvocationDispatcher : IServiceInvocationDispa
private readonly IActorDispatchPort _dispatchPort;
private readonly IScriptRuntimeCommandPort _scriptRuntimeCommandPort;
private readonly IWorkflowRunActorPort _workflowRunActorPort;
+ private readonly IServiceRunRegistrationPort _serviceRunRegistrationPort;
public DefaultServiceInvocationDispatcher(
IActorDispatchPort dispatchPort,
IScriptRuntimeCommandPort scriptRuntimeCommandPort,
- IWorkflowRunActorPort workflowRunActorPort)
+ IWorkflowRunActorPort workflowRunActorPort,
+ IServiceRunRegistrationPort serviceRunRegistrationPort)
{
_dispatchPort = dispatchPort ?? throw new ArgumentNullException(nameof(dispatchPort));
_scriptRuntimeCommandPort = scriptRuntimeCommandPort ?? throw new ArgumentNullException(nameof(scriptRuntimeCommandPort));
_workflowRunActorPort = workflowRunActorPort ?? throw new ArgumentNullException(nameof(workflowRunActorPort));
+ _serviceRunRegistrationPort = serviceRunRegistrationPort ?? throw new ArgumentNullException(nameof(serviceRunRegistrationPort));
}
public async Task DispatchAsync(
@@ -49,9 +52,12 @@ private async Task DispatchStaticAsync(
CancellationToken ct)
{
var commandId = ResolveCommandId(request);
- var envelope = CreateEnvelope(target.Service.PrimaryActorId, request.Payload, commandId, ResolveCorrelationId(request, commandId));
+ var correlationId = ResolveCorrelationId(request, commandId);
+ var runId = ResolveRunId(request, commandId);
+ await RegisterRunAsync(target, request, runId, commandId, correlationId, target.Service.PrimaryActorId, ServiceImplementationKind.Static, ct);
+ var envelope = CreateEnvelope(target.Service.PrimaryActorId, request.Payload, commandId, correlationId);
await _dispatchPort.DispatchAsync(target.Service.PrimaryActorId, envelope, ct);
- return CreateReceipt(target, target.Service.PrimaryActorId, commandId, ResolveCorrelationId(request, commandId));
+ return CreateReceipt(target, target.Service.PrimaryActorId, commandId, correlationId);
}
private async Task DispatchScriptingAsync(
@@ -61,6 +67,9 @@ private async Task DispatchScriptingAsync(
{
var plan = target.Artifact.DeploymentPlan.ScriptingPlan;
var commandId = ResolveCommandId(request);
+ var correlationId = ResolveCorrelationId(request, commandId);
+ var runId = ResolveRunId(request, commandId);
+ await RegisterRunAsync(target, request, runId, commandId, correlationId, target.Service.PrimaryActorId, ServiceImplementationKind.Scripting, ct);
await _scriptRuntimeCommandPort.RunRuntimeAsync(
target.Service.PrimaryActorId,
runId: commandId,
@@ -70,7 +79,7 @@ await _scriptRuntimeCommandPort.RunRuntimeAsync(
request.Payload?.TypeUrl ?? string.Empty,
request.Identity?.TenantId,
ct);
- return CreateReceipt(target, target.Service.PrimaryActorId, commandId, ResolveCorrelationId(request, commandId));
+ return CreateReceipt(target, target.Service.PrimaryActorId, commandId, correlationId);
}
private async Task DispatchWorkflowAsync(
@@ -91,11 +100,42 @@ private async Task DispatchWorkflowAsync(
ct);
var commandId = ResolveCommandId(request);
var correlationId = ResolveCorrelationId(request, commandId);
+ var runId = ResolveRunId(request, commandId);
+ await RegisterRunAsync(target, request, runId, commandId, correlationId, run.Actor.Id, ServiceImplementationKind.Workflow, ct);
var envelope = CreateEnvelope(run.Actor.Id, Any.Pack(chatRequest), commandId, correlationId);
await _dispatchPort.DispatchAsync(run.Actor.Id, envelope, ct);
return CreateReceipt(target, run.Actor.Id, commandId, correlationId);
}
+ private async Task RegisterRunAsync(
+ ServiceInvocationResolvedTarget target,
+ ServiceInvocationRequest request,
+ string runId,
+ string commandId,
+ string correlationId,
+ string targetActorId,
+ ServiceImplementationKind implementationKind,
+ CancellationToken ct)
+ {
+ var record = new ServiceRunRecord
+ {
+ ScopeId = request.Identity?.TenantId ?? string.Empty,
+ ServiceId = request.Identity?.ServiceId ?? string.Empty,
+ ServiceKey = target.Service.ServiceKey ?? string.Empty,
+ RunId = runId,
+ CommandId = commandId,
+ CorrelationId = correlationId,
+ EndpointId = target.Endpoint.EndpointId ?? string.Empty,
+ ImplementationKind = implementationKind,
+ TargetActorId = targetActorId ?? string.Empty,
+ RevisionId = target.Service.RevisionId ?? string.Empty,
+ DeploymentId = target.Service.DeploymentId ?? string.Empty,
+ Status = ServiceRunStatus.Accepted,
+ Identity = request.Identity?.Clone(),
+ };
+ await _serviceRunRegistrationPort.RegisterAsync(record, ct);
+ }
+
private static void EnsureEndpointPayloadMatch(ServiceEndpointDescriptor endpoint, ServiceInvocationRequest request)
{
if (request.Payload == null)
@@ -155,9 +195,13 @@ private static string ResolveCorrelationId(ServiceInvocationRequest request, str
? commandId
: request.CorrelationId;
+ private static string ResolveRunId(ServiceInvocationRequest request, string commandId) =>
+ string.IsNullOrWhiteSpace(request.CommandId)
+ ? commandId
+ : request.CommandId;
+
private static string ResolveAuthoritativeScopeId(ServiceInvocationRequest request, ChatRequestEvent chatRequest)
{
- // Path-level scope (Identity.TenantId) is authoritative; payload cannot override it.
if (!string.IsNullOrWhiteSpace(request.Identity?.TenantId))
return request.Identity.TenantId.Trim();
return ResolveScopeId(chatRequest);
diff --git a/src/platform/Aevatar.GAgentService.Projection/Contexts/ServiceRunCurrentStateProjectionContext.cs b/src/platform/Aevatar.GAgentService.Projection/Contexts/ServiceRunCurrentStateProjectionContext.cs
new file mode 100644
index 000000000..c895ede6a
--- /dev/null
+++ b/src/platform/Aevatar.GAgentService.Projection/Contexts/ServiceRunCurrentStateProjectionContext.cs
@@ -0,0 +1,9 @@
+namespace Aevatar.GAgentService.Projection.Contexts;
+
+public sealed class ServiceRunCurrentStateProjectionContext
+ : IProjectionMaterializationContext
+{
+ public required string RootActorId { get; init; }
+
+ public required string ProjectionKind { get; init; }
+}
diff --git a/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs b/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs
index 869454b91..5b557a59c 100644
--- a/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs
+++ b/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs
@@ -73,6 +73,13 @@ public static IServiceCollection AddGAgentServiceProjection(
ProjectionKind = scopeKey.ProjectionKind,
},
static context => new ServiceProjectionRuntimeLease(context.RootActorId, context));
+ services.AddServiceProjectionRuntime>(
+ static scopeKey => new ServiceRunCurrentStateProjectionContext
+ {
+ RootActorId = scopeKey.RootActorId,
+ ProjectionKind = scopeKey.ProjectionKind,
+ },
+ static context => new ServiceProjectionRuntimeLease(context.RootActorId, context));
services.AddEventSinkProjectionRuntimeCore<
GAgentDraftRunProjectionContext,
GAgentDraftRunRuntimeLease,
@@ -92,6 +99,7 @@ public static IServiceCollection AddGAgentServiceProjection(
services.TryAddSingleton();
services.TryAddSingleton();
services.TryAddSingleton();
+ services.TryAddSingleton();
services.TryAddSingleton, GAgentDraftRunSessionEventCodec>();
services.TryAddSingleton, ProjectionSessionEventHub>();
services.TryAddSingleton();
@@ -102,6 +110,7 @@ public static IServiceCollection AddGAgentServiceProjection(
services.TryAddSingleton, ServiceRolloutCommandObservationReadModelMetadataProvider>();
services.TryAddSingleton, ServiceTrafficViewReadModelMetadataProvider>();
services.TryAddSingleton, ServiceRevisionCatalogReadModelMetadataProvider>();
+ services.TryAddSingleton, ServiceRunCurrentStateReadModelMetadataProvider>();
services.TryAddSingleton();
services.TryAddSingleton();
services.TryAddSingleton();
@@ -109,6 +118,7 @@ public static IServiceCollection AddGAgentServiceProjection(
services.TryAddSingleton();
services.TryAddSingleton();
services.TryAddSingleton();
+ services.TryAddSingleton();
services.AddProjectionArtifactMaterializer<
ServiceCatalogProjectionContext,
ServiceCatalogProjector>();
@@ -130,6 +140,9 @@ public static IServiceCollection AddGAgentServiceProjection(
services.AddProjectionArtifactMaterializer<
ServiceRevisionCatalogProjectionContext,
ServiceRevisionCatalogProjector>();
+ services.AddCurrentStateProjectionMaterializer<
+ ServiceRunCurrentStateProjectionContext,
+ ServiceRunCurrentStateProjector>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<
IProjectionProjector,
GAgentDraftRunSessionEventProjector>());
diff --git a/src/platform/Aevatar.GAgentService.Projection/Metadata/ServiceRunCurrentStateReadModelMetadataProvider.cs b/src/platform/Aevatar.GAgentService.Projection/Metadata/ServiceRunCurrentStateReadModelMetadataProvider.cs
new file mode 100644
index 000000000..2b0bf8b27
--- /dev/null
+++ b/src/platform/Aevatar.GAgentService.Projection/Metadata/ServiceRunCurrentStateReadModelMetadataProvider.cs
@@ -0,0 +1,13 @@
+using Aevatar.CQRS.Projection.Stores.Abstractions;
+using Aevatar.GAgentService.Projection.ReadModels;
+
+namespace Aevatar.GAgentService.Projection.Metadata;
+
+public sealed class ServiceRunCurrentStateReadModelMetadataProvider : IProjectionDocumentMetadataProvider
+{
+ public DocumentIndexMetadata Metadata { get; } = new(
+ "gagent-service-runs",
+ Mappings: new Dictionary(),
+ Settings: new Dictionary(),
+ Aliases: new Dictionary());
+}
diff --git a/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs b/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs
index 9ac1f658a..752c2d8ad 100644
--- a/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs
+++ b/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs
@@ -9,4 +9,5 @@ internal static class ServiceProjectionKinds
public const string Rollouts = "service-rollouts";
public const string Traffic = "service-traffic";
public const string DraftRunSession = "service-draft-run-session";
+ public const string Runs = "service-runs";
}
diff --git a/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceRunCurrentStateProjectionPort.cs b/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceRunCurrentStateProjectionPort.cs
new file mode 100644
index 000000000..164b5c929
--- /dev/null
+++ b/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceRunCurrentStateProjectionPort.cs
@@ -0,0 +1,21 @@
+using Aevatar.GAgentService.Abstractions.Ports;
+using Aevatar.GAgentService.Projection.Configuration;
+using Aevatar.GAgentService.Projection.Contexts;
+
+namespace Aevatar.GAgentService.Projection.Orchestration;
+
+public sealed class ServiceRunCurrentStateProjectionPort
+ : ServiceProjectionPortBase,
+ IServiceRunCurrentStateProjectionPort
+{
+ public ServiceRunCurrentStateProjectionPort(
+ ServiceProjectionOptions options,
+ IProjectionScopeActivationService> activationService,
+ IProjectionScopeReleaseService> releaseService)
+ : base(options, activationService, releaseService, ServiceProjectionKinds.Runs)
+ {
+ }
+
+ public Task EnsureProjectionAsync(string actorId, CancellationToken ct = default) =>
+ EnsureProjectionCoreAsync(actorId, ct);
+}
diff --git a/src/platform/Aevatar.GAgentService.Projection/Projectors/ServiceRunCurrentStateProjector.cs b/src/platform/Aevatar.GAgentService.Projection/Projectors/ServiceRunCurrentStateProjector.cs
new file mode 100644
index 000000000..1d3740011
--- /dev/null
+++ b/src/platform/Aevatar.GAgentService.Projection/Projectors/ServiceRunCurrentStateProjector.cs
@@ -0,0 +1,77 @@
+using Aevatar.CQRS.Projection.Core.Orchestration;
+using Aevatar.CQRS.Projection.Runtime.Abstractions;
+using Aevatar.CQRS.Projection.Stores.Abstractions;
+using Aevatar.Foundation.Abstractions;
+using Aevatar.GAgentService.Abstractions;
+using Aevatar.GAgentService.Projection.Contexts;
+using Aevatar.GAgentService.Projection.ReadModels;
+
+namespace Aevatar.GAgentService.Projection.Projectors;
+
+public sealed class ServiceRunCurrentStateProjector
+ : ICurrentStateProjectionMaterializer
+{
+ private readonly IProjectionWriteDispatcher _writeDispatcher;
+ private readonly IProjectionClock _clock;
+
+ public ServiceRunCurrentStateProjector(
+ IProjectionWriteDispatcher writeDispatcher,
+ IProjectionClock clock)
+ {
+ _writeDispatcher = writeDispatcher ?? throw new ArgumentNullException(nameof(writeDispatcher));
+ _clock = clock ?? throw new ArgumentNullException(nameof(clock));
+ }
+
+ public async ValueTask ProjectAsync(
+ ServiceRunCurrentStateProjectionContext context,
+ EventEnvelope envelope,
+ CancellationToken ct = default)
+ {
+ if (!CommittedStateEventEnvelope.TryUnpackState(
+ envelope,
+ out _,
+ out var stateEvent,
+ out var state) ||
+ stateEvent == null ||
+ state?.Record == null)
+ {
+ return;
+ }
+
+ var record = state.Record;
+ if (string.IsNullOrWhiteSpace(record.RunId) ||
+ string.IsNullOrWhiteSpace(record.ScopeId) ||
+ string.IsNullOrWhiteSpace(record.ServiceId))
+ {
+ return;
+ }
+
+ var observedAt = CommittedStateEventEnvelope.ResolveTimestamp(envelope, _clock.UtcNow);
+ var document = new ServiceRunCurrentStateReadModel
+ {
+ Id = ServiceRunIds.BuildKey(record.ScopeId, record.ServiceId, record.RunId),
+ ActorId = context.RootActorId,
+ ScopeId = record.ScopeId ?? string.Empty,
+ ServiceId = record.ServiceId ?? string.Empty,
+ ServiceKey = record.ServiceKey ?? string.Empty,
+ RunId = record.RunId,
+ CommandId = record.CommandId ?? string.Empty,
+ CorrelationId = record.CorrelationId ?? string.Empty,
+ EndpointId = record.EndpointId ?? string.Empty,
+ ImplementationKind = (int)record.ImplementationKind,
+ TargetActorId = record.TargetActorId ?? string.Empty,
+ RevisionId = record.RevisionId ?? string.Empty,
+ DeploymentId = record.DeploymentId ?? string.Empty,
+ Status = (int)record.Status,
+ TenantId = record.Identity?.TenantId ?? string.Empty,
+ AppId = record.Identity?.AppId ?? string.Empty,
+ Namespace = record.Identity?.Namespace ?? string.Empty,
+ CreatedAt = record.CreatedAt?.ToDateTimeOffset() ?? observedAt,
+ UpdatedAt = record.UpdatedAt?.ToDateTimeOffset() ?? observedAt,
+ StateVersion = stateEvent.Version,
+ LastEventId = stateEvent.EventId ?? string.Empty,
+ };
+
+ await _writeDispatcher.UpsertAsync(document, ct);
+ }
+}
diff --git a/src/platform/Aevatar.GAgentService.Projection/Queries/ServiceRunQueryReader.cs b/src/platform/Aevatar.GAgentService.Projection/Queries/ServiceRunQueryReader.cs
new file mode 100644
index 000000000..e84eb69fc
--- /dev/null
+++ b/src/platform/Aevatar.GAgentService.Projection/Queries/ServiceRunQueryReader.cs
@@ -0,0 +1,183 @@
+using Aevatar.CQRS.Projection.Stores.Abstractions;
+using Aevatar.GAgentService.Abstractions;
+using Aevatar.GAgentService.Abstractions.Ports;
+using Aevatar.GAgentService.Abstractions.Queries;
+using Aevatar.GAgentService.Projection.Configuration;
+using Aevatar.GAgentService.Projection.ReadModels;
+
+namespace Aevatar.GAgentService.Projection.Queries;
+
+public sealed class ServiceRunQueryReader : IServiceRunQueryPort
+{
+ private readonly IProjectionDocumentReader _documentStore;
+ private readonly bool _enabled;
+
+ public ServiceRunQueryReader(
+ IProjectionDocumentReader documentStore,
+ ServiceProjectionOptions? options = null)
+ {
+ _documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore));
+ _enabled = options?.Enabled ?? true;
+ }
+
+ public async Task> ListAsync(
+ ServiceRunQuery query,
+ CancellationToken ct = default)
+ {
+ ArgumentNullException.ThrowIfNull(query);
+ if (!_enabled)
+ return [];
+
+ var boundedTake = Math.Clamp(query.Take, 1, 200);
+ var filters = new List(2);
+ if (!string.IsNullOrWhiteSpace(query.ScopeId))
+ {
+ filters.Add(new ProjectionDocumentFilter
+ {
+ FieldPath = nameof(ServiceRunCurrentStateReadModel.ScopeId),
+ Operator = ProjectionDocumentFilterOperator.Eq,
+ Value = ProjectionDocumentValue.FromString(query.ScopeId),
+ });
+ }
+ if (!string.IsNullOrWhiteSpace(query.ServiceId))
+ {
+ filters.Add(new ProjectionDocumentFilter
+ {
+ FieldPath = nameof(ServiceRunCurrentStateReadModel.ServiceId),
+ Operator = ProjectionDocumentFilterOperator.Eq,
+ Value = ProjectionDocumentValue.FromString(query.ServiceId),
+ });
+ }
+
+ var result = await _documentStore.QueryAsync(
+ new ProjectionDocumentQuery
+ {
+ Take = boundedTake,
+ Filters = filters,
+ Sorts = new[]
+ {
+ new ProjectionDocumentSort
+ {
+ FieldPath = nameof(ServiceRunCurrentStateReadModel.UpdatedAt),
+ Direction = ProjectionDocumentSortDirection.Desc,
+ },
+ new ProjectionDocumentSort
+ {
+ FieldPath = nameof(ServiceRunCurrentStateReadModel.RunId),
+ Direction = ProjectionDocumentSortDirection.Asc,
+ },
+ },
+ },
+ ct);
+ return result.Items.Take(boundedTake).Select(Map).ToList();
+ }
+
+ public async Task GetByRunIdAsync(
+ string scopeId,
+ string serviceId,
+ string runId,
+ CancellationToken ct = default)
+ {
+ if (!_enabled)
+ return null;
+ if (string.IsNullOrWhiteSpace(runId) ||
+ string.IsNullOrWhiteSpace(scopeId) ||
+ string.IsNullOrWhiteSpace(serviceId))
+ {
+ return null;
+ }
+
+ var direct = await _documentStore.GetAsync(
+ ServiceRunIds.BuildKey(scopeId, serviceId, runId),
+ ct);
+ return direct == null ? null : Map(direct);
+ }
+
+ public async Task GetByCommandIdAsync(
+ string scopeId,
+ string serviceId,
+ string commandId,
+ CancellationToken ct = default)
+ {
+ if (!_enabled)
+ return null;
+ if (string.IsNullOrWhiteSpace(commandId))
+ return null;
+
+ var matches = await QueryByEqualityAsync(
+ scopeId,
+ serviceId,
+ nameof(ServiceRunCurrentStateReadModel.CommandId),
+ commandId.Trim(),
+ ct);
+ return matches.FirstOrDefault();
+ }
+
+ private async Task> QueryByEqualityAsync(
+ string scopeId,
+ string serviceId,
+ string fieldPath,
+ string value,
+ CancellationToken ct)
+ {
+ var filters = new List(3)
+ {
+ new ProjectionDocumentFilter
+ {
+ FieldPath = fieldPath,
+ Operator = ProjectionDocumentFilterOperator.Eq,
+ Value = ProjectionDocumentValue.FromString(value),
+ },
+ };
+ if (!string.IsNullOrWhiteSpace(scopeId))
+ {
+ filters.Add(new ProjectionDocumentFilter
+ {
+ FieldPath = nameof(ServiceRunCurrentStateReadModel.ScopeId),
+ Operator = ProjectionDocumentFilterOperator.Eq,
+ Value = ProjectionDocumentValue.FromString(scopeId),
+ });
+ }
+ if (!string.IsNullOrWhiteSpace(serviceId))
+ {
+ filters.Add(new ProjectionDocumentFilter
+ {
+ FieldPath = nameof(ServiceRunCurrentStateReadModel.ServiceId),
+ Operator = ProjectionDocumentFilterOperator.Eq,
+ Value = ProjectionDocumentValue.FromString(serviceId),
+ });
+ }
+
+ var result = await _documentStore.QueryAsync(
+ new ProjectionDocumentQuery
+ {
+ Take = 5,
+ Filters = filters,
+ },
+ ct);
+ return result.Items.Select(Map).ToList();
+ }
+
+ private static ServiceRunSnapshot Map(ServiceRunCurrentStateReadModel readModel) =>
+ new(
+ readModel.ScopeId,
+ readModel.ServiceId,
+ readModel.ServiceKey,
+ readModel.RunId,
+ readModel.CommandId,
+ readModel.CorrelationId,
+ readModel.EndpointId,
+ (ServiceImplementationKind)readModel.ImplementationKind,
+ readModel.TargetActorId,
+ readModel.RevisionId,
+ readModel.DeploymentId,
+ (ServiceRunStatus)readModel.Status,
+ readModel.ActorId,
+ readModel.TenantId,
+ readModel.AppId,
+ readModel.Namespace,
+ readModel.StateVersion,
+ readModel.LastEventId,
+ readModel.CreatedAt,
+ readModel.UpdatedAt);
+}
diff --git a/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs b/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs
index a10c9a0c5..3347f068a 100644
--- a/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs
+++ b/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs
@@ -202,6 +202,21 @@ public IList Targets
}
}
+public sealed partial class ServiceRunCurrentStateReadModel : IProjectionReadModel
+{
+ public DateTimeOffset CreatedAt
+ {
+ get => ServiceProjectionReadModelSupport.ToDateTimeOffset(CreatedAtUtcValue);
+ set => CreatedAtUtcValue = ServiceProjectionReadModelSupport.ToTimestamp(value);
+ }
+
+ public DateTimeOffset UpdatedAt
+ {
+ get => ServiceProjectionReadModelSupport.ToDateTimeOffset(UpdatedAtUtcValue);
+ set => UpdatedAtUtcValue = ServiceProjectionReadModelSupport.ToTimestamp(value);
+ }
+}
+
internal static class ServiceProjectionReadModelSupport
{
public static Timestamp ToTimestamp(DateTimeOffset value) =>
diff --git a/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto b/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto
index 1ebe6c536..684eb1c82 100644
--- a/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto
+++ b/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto
@@ -175,3 +175,31 @@ message ServiceTrafficTargetReadModel {
int32 allocation_weight = 4;
string serving_state = 5;
}
+
+// --- ServiceRunCurrentStateReadModel ---
+
+message ServiceRunCurrentStateReadModel {
+ string id = 1;
+ string actor_id = 2;
+ int64 state_version = 3;
+ string last_event_id = 4;
+
+ string scope_id = 5;
+ string service_id = 6;
+ string service_key = 7;
+ string run_id = 8;
+ string command_id = 9;
+ string correlation_id = 10;
+ string endpoint_id = 11;
+ int32 implementation_kind = 12;
+ string target_actor_id = 13;
+ string revision_id = 14;
+ string deployment_id = 15;
+ int32 status = 16;
+ string tenant_id = 17;
+ string app_id = 18;
+ string namespace = 19;
+
+ google.protobuf.Timestamp created_at_utc_value = 20;
+ google.protobuf.Timestamp updated_at_utc_value = 21;
+}
diff --git a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs
index fc85a8ca0..f36803851 100644
--- a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs
+++ b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs
@@ -38,7 +38,8 @@ public static async Task HandleChat(
HttpContext http,
ChatInput input,
ICommandInteractionService chatRunService,
- CancellationToken ct = default)
+ CancellationToken ct = default,
+ Func? onAcceptedHook = null)
{
using var scope = ApiRequestScope.BeginHttp();
var writer = new ChatSseResponseWriter(http.Response);
@@ -73,6 +74,8 @@ public static async Task HandleChat(
onAcceptedAsync: async (receipt, token) =>
{
CapabilityTraceContext.ApplyCorrelationHeader(http.Response, receipt.CorrelationId);
+ if (onAcceptedHook != null)
+ await onAcceptedHook(receipt, token);
await writer.StartAsync(token);
await writer.WriteAsync(BuildRunContextFrame(receipt), token);
scope.RecordFirstResponse();
diff --git a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs
index ca5e595fb..9c64eda14 100644
--- a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs
+++ b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs
@@ -8,6 +8,7 @@
using Aevatar.Foundation.Abstractions;
using Aevatar.Foundation.Abstractions.Streaming;
using Aevatar.GAgentService.Abstractions;
+using Aevatar.GAgentService.Abstractions.Ports;
using Aevatar.GAgentService.Abstractions.ScopeGAgents;
using Aevatar.GAgentService.Application.ScopeGAgents;
using Aevatar.GAgentService.Hosting.Endpoints;
@@ -68,8 +69,7 @@ public async Task HandleGAgentServiceChatStreamAsync_ShouldCreateActor_AndEmitSy
};
var interactionService = CreateStaticStreamInteractionService(runtime, projectionPort);
- await InvokePrivateTaskAsync(
- HandleGAgentStreamMethod,
+ await InvokeStaticStreamAsync(
http,
CreateStaticTarget(typeof(StreamTestAgent).AssemblyQualifiedName!, primaryActorId: "actor-1"),
"hello",
@@ -120,8 +120,7 @@ public async Task HandleGAgentServiceChatStreamAsync_ShouldReuseExistingActor_An
};
var interactionService = CreateStaticStreamInteractionService(runtime, projectionPort);
- await InvokePrivateTaskAsync(
- HandleGAgentStreamMethod,
+ await InvokeStaticStreamAsync(
http,
CreateStaticTarget(typeof(StreamTestAgent).AssemblyQualifiedName!, primaryActorId: "actor-1"),
"hello",
@@ -155,8 +154,7 @@ public async Task HandleGAgentServiceChatStreamAsync_ShouldMapAllInputPartKinds_
};
var interactionService = CreateStaticStreamInteractionService(runtime, projectionPort);
- await InvokePrivateTaskAsync(
- HandleGAgentStreamMethod,
+ await InvokeStaticStreamAsync(
http,
CreateStaticTarget(typeof(StreamTestAgent).AssemblyQualifiedName!, primaryActorId: "actor-1"),
"hello",
@@ -215,8 +213,7 @@ public async Task HandleGAgentServiceChatStreamAsync_ShouldPreserveRunErrorWitho
};
var interactionService = CreateStaticStreamInteractionService(runtime, projectionPort);
- await InvokePrivateTaskAsync(
- HandleGAgentStreamMethod,
+ await InvokeStaticStreamAsync(
http,
CreateStaticTarget(typeof(StreamTestAgent).AssemblyQualifiedName!, primaryActorId: "actor-1"),
"hello",
@@ -350,8 +347,7 @@ public async Task ScriptExecutionSessionEventProjector_ShouldRouteOnlyMatchingRu
[Fact]
public async Task HandleGAgentServiceChatStreamAsync_ShouldThrow_WhenAgentTypeCannotBeResolved()
{
- var act = () => InvokePrivateTaskAsync(
- HandleGAgentStreamMethod,
+ var act = () => InvokeStaticStreamAsync(
CreateHttpContext(),
CreateStaticTarget("Missing.Agent, Missing.Assembly", primaryActorId: "actor-1"),
"hello",
@@ -370,8 +366,7 @@ await act.Should().ThrowAsync()
[Fact]
public async Task HandleScriptingServiceChatStreamAsync_ShouldThrow_WhenPrimaryActorMissing()
{
- var act = () => InvokePrivateTaskAsync(
- HandleScriptingStreamMethod,
+ var act = () => InvokeScriptingStreamAsync(
CreateHttpContext(),
CreateScriptingTarget(primaryActorId: string.Empty),
"hello",
@@ -389,8 +384,7 @@ await act.Should().ThrowAsync()
[Fact]
public async Task HandleScriptingServiceChatStreamAsync_ShouldThrow_WhenActorCannotBeResolved()
{
- var act = () => InvokePrivateTaskAsync(
- HandleScriptingStreamMethod,
+ var act = () => InvokeScriptingStreamAsync(
CreateHttpContext(),
CreateScriptingTarget(primaryActorId: "actor-1"),
"hello",
@@ -421,8 +415,7 @@ public async Task HandleScriptingServiceChatStreamAsync_ShouldEmitSyntheticFinis
},
};
- await InvokePrivateTaskAsync(
- HandleScriptingStreamMethod,
+ await InvokeScriptingStreamAsync(
http,
CreateScriptingTarget(primaryActorId: "actor-1"),
"hello",
@@ -469,8 +462,7 @@ public async Task HandleScriptingServiceChatStreamAsync_ShouldPreserveRunErrorWi
},
};
- await InvokePrivateTaskAsync(
- HandleScriptingStreamMethod,
+ await InvokeScriptingStreamAsync(
http,
CreateScriptingTarget(primaryActorId: "actor-1"),
"hello",
@@ -509,8 +501,7 @@ public async Task HandleScriptingServiceChatStreamAsync_ShouldAvoidSyntheticDupl
},
};
- await InvokePrivateTaskAsync(
- HandleScriptingStreamMethod,
+ await InvokeScriptingStreamAsync(
http,
CreateScriptingTarget(primaryActorId: "actor-1"),
"hello",
@@ -620,6 +611,67 @@ private static ServiceInvocationResolvedTarget CreateScriptingTarget(string prim
artifact.Endpoints[0]);
}
+ private static Task InvokeStaticStreamAsync(
+ HttpContext http,
+ ServiceInvocationResolvedTarget target,
+ string prompt,
+ string? actorId,
+ string? sessionId,
+ string scopeId,
+ IReadOnlyDictionary? headers,
+ IReadOnlyList? inputParts,
+ ICommandInteractionService interactionService,
+ CancellationToken ct) =>
+ InvokePrivateTaskAsync(
+ HandleGAgentStreamMethod,
+ http,
+ target,
+ prompt,
+ actorId,
+ sessionId,
+ scopeId,
+ "svc-default",
+ headers,
+ inputParts,
+ interactionService,
+ new ServiceInvocationRequest(),
+ new NoOpServiceRunRegistrationPort(),
+ ct);
+
+ private static Task InvokeScriptingStreamAsync(
+ HttpContext http,
+ ServiceInvocationResolvedTarget target,
+ string prompt,
+ string? sessionId,
+ string scopeId,
+ IReadOnlyDictionary? headers,
+ IScriptRuntimeCommandPort scriptRuntimeCommandPort,
+ IScriptExecutionProjectionPort scriptExecutionProjectionPort,
+ CancellationToken ct) =>
+ InvokePrivateTaskAsync(
+ HandleScriptingStreamMethod,
+ http,
+ target,
+ prompt,
+ sessionId,
+ scopeId,
+ "svc-default",
+ headers,
+ scriptRuntimeCommandPort,
+ scriptExecutionProjectionPort,
+ new ServiceInvocationRequest(),
+ new NoOpServiceRunRegistrationPort(),
+ ct);
+
+ private sealed class NoOpServiceRunRegistrationPort : IServiceRunRegistrationPort
+ {
+ public Task RegisterAsync(ServiceRunRecord record, CancellationToken ct = default) =>
+ Task.FromResult(new ServiceRunRegistrationResult($"service-run:{record.RunId}", record.RunId));
+
+ public Task UpdateStatusAsync(string runActorId, string runId, ServiceRunStatus status, CancellationToken ct = default) =>
+ Task.CompletedTask;
+ }
+
private static async Task InvokePrivateTaskAsync(MethodInfo method, params object?[] args)
{
var result = method.Invoke(null, args);
diff --git a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs
index 090aa5c15..26c816b9b 100644
--- a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs
+++ b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs
@@ -1490,6 +1490,13 @@ await host.ArtifactStore.SaveAsync(
host.InteractionService.LastRequest!.ActorId.Should().Be("definition-actor-1");
host.InteractionService.LastRequest.ScopeId.Should().Be("scope-a");
host.InteractionService.LastRequest.Metadata.Should().ContainKey("source").WhoseValue.Should().Be("tests");
+ // Service-run registry receives the actual workflow run actor id as the run id, so
+ // /runs/{runId} can resolve the same id the SSE RunStarted frame carries.
+ host.ServiceRunRegistrationPort.RegisterCalls.Should().ContainSingle();
+ host.ServiceRunRegistrationPort.RegisterCalls[0].RunId.Should().Be("run-actor-1");
+ host.ServiceRunRegistrationPort.RegisterCalls[0].CommandId.Should().Be("cmd-1");
+ host.ServiceRunRegistrationPort.RegisterCalls[0].TargetActorId.Should().Be("run-actor-1");
+ host.ServiceRunRegistrationPort.RegisterCalls[0].ImplementationKind.Should().Be(ServiceImplementationKind.Workflow);
}
[Fact]
@@ -1815,9 +1822,12 @@ public async Task ScopeServiceEndpointHelpers_ShouldRejectScriptingStream_WhenRu
"hello",
"session-1",
"scope-a",
+ "default",
new Dictionary(),
new NoOpScriptRuntimeCommandPort(),
new NoOpScriptExecutionProjectionPort(),
+ new ServiceInvocationRequest(),
+ new NoOpServiceRunRegistrationPort(),
CancellationToken.None))
.Should()
.ThrowAsync();
@@ -1876,9 +1886,12 @@ public async Task ScopeServiceEndpointHelpers_ShouldRejectScriptingStream_WhenRu
"hello",
"session-1",
"scope-a",
+ "default",
new Dictionary(),
new ThrowingScriptRuntimeCommandPort(new InvalidOperationException("Script runtime actor 'script-runtime-1' could not be resolved. The service may not be activated.")),
new NoOpScriptExecutionProjectionPort(),
+ new ServiceInvocationRequest(),
+ new NoOpServiceRunRegistrationPort(),
CancellationToken.None))
.Should()
.ThrowAsync();
@@ -2815,8 +2828,6 @@ public async Task ListDefaultRunsEndpoint_ShouldReturnDefaultServiceRunHistory()
response.Runs[0].RevisionId.Should().Be("rev-1");
response.Runs[0].DeploymentId.Should().Be("dep-old");
response.Runs[0].WorkflowName.Should().Be("default-flow");
- host.RunBindingReader.Queries.Should().ContainSingle();
- host.RunBindingReader.Queries[0].ScopeId.Should().Be("scope-a");
}
[Fact]
@@ -2927,8 +2938,6 @@ public async Task ListMemberRunsEndpoint_ShouldReturnMemberScopedRunHistory()
response.Runs[0].RevisionId.Should().Be("rev-1");
response.Runs[0].DeploymentId.Should().Be("dep-member-old");
response.Runs[0].StateVersion.Should().Be(13);
- host.RunBindingReader.Queries.Should().ContainSingle();
- host.RunBindingReader.Queries[0].DefinitionActorIds.Should().BeEquivalentTo(["def-member-active", "def-member-old"]);
}
[Fact]
@@ -3199,10 +3208,6 @@ public async Task ListRunsEndpoint_ShouldReturnScopeScopedRunHistory()
response.Runs[0].CompletionStatus.Should().Be(WorkflowRunCompletionStatus.Completed);
response.Runs[0].StateVersion.Should().Be(7);
response.Runs[0].LastEventId.Should().Be("evt-7");
- host.RunBindingReader.Queries.Should().ContainSingle();
- host.RunBindingReader.Queries[0].ScopeId.Should().Be("scope-a");
- host.RunBindingReader.Queries[0].Take.Should().Be(5);
- host.RunBindingReader.Queries[0].DefinitionActorIds.Should().BeEquivalentTo(["def-actor-active", "def-actor-old"]);
}
[Fact]
@@ -4152,7 +4157,9 @@ private ScopeServiceEndpointTestHost(
FakeWorkflowRunBindingReader runBindingReader,
RecordingResumeDispatchService resumeDispatchService,
RecordingSignalDispatchService signalDispatchService,
- RecordingStopDispatchService stopDispatchService)
+ RecordingStopDispatchService stopDispatchService,
+ RecordingServiceRunRegistrationPort serviceRunRegistrationPort,
+ FakeServiceRunQueryPort serviceRunQueryPort)
{
_app = app;
Client = client;
@@ -4172,6 +4179,8 @@ private ScopeServiceEndpointTestHost(
ResumeDispatchService = resumeDispatchService;
SignalDispatchService = signalDispatchService;
StopDispatchService = stopDispatchService;
+ ServiceRunRegistrationPort = serviceRunRegistrationPort;
+ ServiceRunQueryPort = serviceRunQueryPort;
}
public HttpClient Client { get; }
@@ -4208,6 +4217,10 @@ private ScopeServiceEndpointTestHost(
public RecordingStopDispatchService StopDispatchService { get; }
+ public RecordingServiceRunRegistrationPort ServiceRunRegistrationPort { get; }
+
+ public FakeServiceRunQueryPort ServiceRunQueryPort { get; }
+
public static async Task StartAsync(bool authenticationEnabled = true)
{
var builder = WebApplication.CreateBuilder(new WebApplicationOptions
@@ -4238,6 +4251,20 @@ public static async Task StartAsync(bool authentic
var stopDispatchService = new RecordingStopDispatchService();
var actorRuntime = new NoOpActorRuntime();
var eventSubscriptionProvider = new NoOpActorEventSubscriptionProvider();
+ var serviceRunQueryPort = new FakeServiceRunQueryPort
+ {
+ WorkflowBindingFallback = runBindingReader,
+ DeploymentResolver = binding =>
+ {
+ var deployment = lifecycleQueryPort.Deployments?.Deployments.FirstOrDefault(d =>
+ string.Equals(d.PrimaryActorId, binding.EffectiveDefinitionActorId, StringComparison.Ordinal));
+ return (deployment?.DeploymentId ?? string.Empty, deployment?.RevisionId ?? string.Empty);
+ },
+ };
+ var serviceRunRegistrationPort = new RecordingServiceRunRegistrationPort
+ {
+ LinkedQueryPort = serviceRunQueryPort,
+ };
builder.Services.AddSingleton(commandPort);
builder.Services.AddSingleton(queryPort);
builder.Services.AddSingleton(scopeBindingPort);
@@ -4262,6 +4289,8 @@ public static async Task StartAsync(bool authentic
builder.Services.AddSingleton>(stopDispatchService);
builder.Services.AddSingleton(actorRuntime);
builder.Services.AddSingleton(eventSubscriptionProvider);
+ builder.Services.AddSingleton(serviceRunRegistrationPort);
+ builder.Services.AddSingleton(serviceRunQueryPort);
builder.Services.AddSingleton>(
Options.Create(new ScopeWorkflowCapabilityOptions
{
@@ -4369,7 +4398,9 @@ public static async Task StartAsync(bool authentic
runBindingReader,
resumeDispatchService,
signalDispatchService,
- stopDispatchService);
+ stopDispatchService,
+ serviceRunRegistrationPort,
+ serviceRunQueryPort);
}
private static bool TryGetRequestedScopeId(string? path, out string scopeId)
@@ -4574,6 +4605,147 @@ private sealed class RecordingServiceGovernanceQueryPort : IServiceGovernanceQue
throw new NotSupportedException();
}
+ private sealed class RecordingServiceRunRegistrationPort : IServiceRunRegistrationPort
+ {
+ public List RegisterCalls { get; } = [];
+ public List<(string runActorId, string runId, ServiceRunStatus status)> StatusCalls { get; } = [];
+
+ public FakeServiceRunQueryPort? LinkedQueryPort { get; set; }
+
+ public Task RegisterAsync(ServiceRunRecord record, CancellationToken ct = default)
+ {
+ RegisterCalls.Add(record.Clone());
+ LinkedQueryPort?.Upsert(BuildSnapshot(record));
+ return Task.FromResult(new ServiceRunRegistrationResult($"service-run:{record.ScopeId}:{record.ServiceId}:{record.RunId}", record.RunId));
+ }
+
+ public Task UpdateStatusAsync(string runActorId, string runId, ServiceRunStatus status, CancellationToken ct = default)
+ {
+ StatusCalls.Add((runActorId, runId, status));
+ return Task.CompletedTask;
+ }
+
+ private static ServiceRunSnapshot BuildSnapshot(ServiceRunRecord record) =>
+ new(
+ record.ScopeId,
+ record.ServiceId,
+ record.ServiceKey,
+ record.RunId,
+ record.CommandId,
+ record.CorrelationId,
+ record.EndpointId,
+ record.ImplementationKind,
+ record.TargetActorId,
+ record.RevisionId,
+ record.DeploymentId,
+ record.Status,
+ $"service-run:{record.ScopeId}:{record.ServiceId}:{record.RunId}",
+ record.Identity?.TenantId ?? string.Empty,
+ record.Identity?.AppId ?? string.Empty,
+ record.Identity?.Namespace ?? string.Empty,
+ StateVersion: 1,
+ LastEventId: $"{record.RunId}:registered",
+ CreatedAt: record.CreatedAt?.ToDateTimeOffset() ?? DateTimeOffset.UtcNow,
+ UpdatedAt: record.UpdatedAt?.ToDateTimeOffset() ?? DateTimeOffset.UtcNow);
+ }
+
+ private sealed class FakeServiceRunQueryPort : IServiceRunQueryPort
+ {
+ private readonly List _snapshots = [];
+
+ // Bridge to existing FakeWorkflowRunBindingReader fixtures so tests that pre-populate
+ // workflow run bindings also see the runs through the new IServiceRunQueryPort surface.
+ public FakeWorkflowRunBindingReader? WorkflowBindingFallback { get; set; }
+
+ // Optional resolver that maps a workflow run binding to (deploymentId, revisionId) so the
+ // bridged snapshot mirrors what production projector would write from the dispatcher.
+ public Func? DeploymentResolver { get; set; }
+
+ public IReadOnlyList Snapshots => _snapshots;
+
+ public void Upsert(ServiceRunSnapshot snapshot)
+ {
+ _snapshots.RemoveAll(x =>
+ string.Equals(x.ScopeId, snapshot.ScopeId, StringComparison.Ordinal) &&
+ string.Equals(x.ServiceId, snapshot.ServiceId, StringComparison.Ordinal) &&
+ string.Equals(x.RunId, snapshot.RunId, StringComparison.Ordinal));
+ _snapshots.Add(snapshot);
+ }
+
+ public Task> ListAsync(ServiceRunQuery query, CancellationToken ct = default)
+ {
+ var bridged = MaterializeForQuery(query.ScopeId, query.ServiceId).ToList();
+ IEnumerable results = bridged;
+ if (!string.IsNullOrWhiteSpace(query.ScopeId))
+ results = results.Where(s => string.Equals(s.ScopeId, query.ScopeId, StringComparison.Ordinal));
+ if (!string.IsNullOrWhiteSpace(query.ServiceId))
+ results = results.Where(s => string.Equals(s.ServiceId, query.ServiceId, StringComparison.Ordinal));
+ return Task.FromResult>(
+ results.OrderByDescending(s => s.UpdatedAt).Take(query.Take).ToList());
+ }
+
+ public Task GetByRunIdAsync(string scopeId, string serviceId, string runId, CancellationToken ct = default) =>
+ Task.FromResult(MaterializeForQuery(scopeId, serviceId).FirstOrDefault(s =>
+ string.Equals(s.ScopeId, scopeId, StringComparison.Ordinal) &&
+ string.Equals(s.ServiceId, serviceId, StringComparison.Ordinal) &&
+ string.Equals(s.RunId, runId, StringComparison.Ordinal)));
+
+ public Task GetByCommandIdAsync(string scopeId, string serviceId, string commandId, CancellationToken ct = default) =>
+ Task.FromResult(MaterializeForQuery(scopeId, serviceId).FirstOrDefault(s =>
+ string.Equals(s.ScopeId, scopeId, StringComparison.Ordinal) &&
+ string.Equals(s.ServiceId, serviceId, StringComparison.Ordinal) &&
+ string.Equals(s.CommandId, commandId, StringComparison.Ordinal)));
+
+ // Materializes snapshots, treating any workflow binding fixtures as belonging to the queried service
+ // (workflow bindings predate the service-run registry and don't carry serviceId in the test fixtures).
+ private IEnumerable MaterializeForQuery(string scopeId, string serviceId)
+ {
+ foreach (var snapshot in _snapshots)
+ yield return snapshot;
+ if (WorkflowBindingFallback != null)
+ {
+ foreach (var binding in WorkflowBindingFallback.AllBindings())
+ {
+ if (_snapshots.Any(s => string.Equals(s.RunId, binding.RunId, StringComparison.Ordinal) &&
+ string.Equals(s.ServiceId, serviceId, StringComparison.Ordinal)))
+ {
+ continue;
+ }
+ var (deploymentId, revisionId) = DeploymentResolver?.Invoke(binding) ?? (string.Empty, string.Empty);
+ yield return BuildSnapshotFromBinding(binding, scopeId, serviceId, deploymentId, revisionId);
+ }
+ }
+ }
+
+ private static ServiceRunSnapshot BuildSnapshotFromBinding(
+ WorkflowActorBinding binding,
+ string scopeId,
+ string serviceId,
+ string deploymentId,
+ string revisionId) =>
+ new(
+ ScopeId: string.IsNullOrWhiteSpace(scopeId) ? binding.ScopeId ?? string.Empty : scopeId,
+ ServiceId: serviceId ?? string.Empty,
+ ServiceKey: string.Empty,
+ RunId: binding.RunId,
+ CommandId: binding.RunId,
+ CorrelationId: binding.RunId,
+ EndpointId: string.Empty,
+ ImplementationKind: ServiceImplementationKind.Workflow,
+ TargetActorId: binding.ActorId,
+ RevisionId: revisionId,
+ DeploymentId: deploymentId,
+ Status: ServiceRunStatus.Accepted,
+ ActorId: binding.ActorId,
+ TenantId: binding.ScopeId ?? string.Empty,
+ AppId: string.Empty,
+ Namespace: string.Empty,
+ StateVersion: binding.SourceVersion,
+ LastEventId: binding.SourceEventId ?? string.Empty,
+ CreatedAt: binding.CreatedAt ?? DateTimeOffset.UtcNow,
+ UpdatedAt: binding.UpdatedAt ?? DateTimeOffset.UtcNow);
+ }
+
private sealed class RecordingServiceInvocationPort : IServiceInvocationPort
{
public ServiceInvocationRequest? LastRequest { get; private set; }
@@ -4703,6 +4875,9 @@ private sealed class FakeWorkflowRunBindingReader : IWorkflowRunBindingReader
public List Queries { get; } = [];
+ public IEnumerable AllBindings() =>
+ BindingsByRunId.Values.SelectMany(x => x);
+
public Task> ListByRunIdAsync(
string runId,
int take = 20,
@@ -4818,6 +4993,15 @@ public Task RegisterAsync(ServiceRunRecord record, CancellationToken ct = default) =>
+ Task.FromResult(new ServiceRunRegistrationResult($"service-run:{record.RunId}", record.RunId));
+
+ public Task UpdateStatusAsync(string runActorId, string runId, ServiceRunStatus status, CancellationToken ct = default) =>
+ Task.CompletedTask;
+ }
+
private sealed class NoOpScriptRuntimeCommandPort : IScriptRuntimeCommandPort
{
public Task RunRuntimeAsync(
diff --git a/test/Aevatar.GAgentService.Tests/Core/ServiceRunGAgentTests.cs b/test/Aevatar.GAgentService.Tests/Core/ServiceRunGAgentTests.cs
new file mode 100644
index 000000000..399ccfd4c
--- /dev/null
+++ b/test/Aevatar.GAgentService.Tests/Core/ServiceRunGAgentTests.cs
@@ -0,0 +1,225 @@
+using Aevatar.Foundation.Runtime.Persistence;
+using Aevatar.GAgentService.Abstractions;
+using Aevatar.GAgentService.Core.GAgents;
+using Aevatar.GAgentService.Tests.TestSupport;
+using FluentAssertions;
+using Google.Protobuf.WellKnownTypes;
+
+namespace Aevatar.GAgentService.Tests.Core;
+
+public sealed class ServiceRunGAgentTests
+{
+ [Fact]
+ public async Task HandleRegisterAsync_ShouldPersistRecord_AndDefaultStatusToAccepted()
+ {
+ var actor = GAgentServiceTestKit.CreateStatefulAgent(
+ new InMemoryEventStore(),
+ "service-run:run-1",
+ static () => new ServiceRunGAgent());
+ await actor.ActivateAsync();
+
+ await actor.HandleRegisterAsync(new RegisterServiceRunRequested
+ {
+ Record = BuildRecord("run-1"),
+ });
+
+ actor.State.Record.Should().NotBeNull();
+ actor.State.Record!.RunId.Should().Be("run-1");
+ actor.State.Record.Status.Should().Be(ServiceRunStatus.Accepted);
+ actor.State.LastAppliedEventVersion.Should().Be(1);
+ }
+
+ [Fact]
+ public async Task HandleRegisterAsync_ShouldBeIdempotent_WhenRunIdAlreadyBound()
+ {
+ var actor = GAgentServiceTestKit.CreateStatefulAgent(
+ new InMemoryEventStore(),
+ "service-run:run-1",
+ static () => new ServiceRunGAgent());
+
+ await actor.HandleRegisterAsync(new RegisterServiceRunRequested
+ {
+ Record = BuildRecord("run-1"),
+ });
+ await actor.HandleRegisterAsync(new RegisterServiceRunRequested
+ {
+ Record = BuildRecord("run-1"),
+ });
+
+ actor.State.LastAppliedEventVersion.Should().Be(1);
+ }
+
+ [Fact]
+ public async Task HandleRegisterAsync_ShouldRejectMismatchedRunId()
+ {
+ var actor = GAgentServiceTestKit.CreateStatefulAgent(
+ new InMemoryEventStore(),
+ "service-run:run-1",
+ static () => new ServiceRunGAgent());
+ await actor.HandleRegisterAsync(new RegisterServiceRunRequested
+ {
+ Record = BuildRecord("run-1"),
+ });
+
+ var act = () => actor.HandleRegisterAsync(new RegisterServiceRunRequested
+ {
+ Record = BuildRecord("run-2"),
+ });
+
+ await act.Should().ThrowAsync()
+ .WithMessage("*run-1*cannot register run 'run-2'*");
+ }
+
+ [Fact]
+ public async Task HandleRegisterAsync_ShouldRejectScopeMismatchOnReRegister()
+ {
+ var actor = GAgentServiceTestKit.CreateStatefulAgent(
+ new InMemoryEventStore(),
+ "service-run:tenant-1:svc-1:run-1",
+ static () => new ServiceRunGAgent());
+ await actor.HandleRegisterAsync(new RegisterServiceRunRequested
+ {
+ Record = BuildRecord("run-1"),
+ });
+
+ var foreign = BuildRecord("run-1");
+ foreign.ScopeId = "tenant-2";
+ var act = () => actor.HandleRegisterAsync(new RegisterServiceRunRequested { Record = foreign });
+
+ await act.Should().ThrowAsync()
+ .WithMessage("*tenant-1*cannot re-register under scope 'tenant-2'*");
+ }
+
+ [Fact]
+ public async Task HandleRegisterAsync_ShouldRejectServiceMismatchOnReRegister()
+ {
+ var actor = GAgentServiceTestKit.CreateStatefulAgent(
+ new InMemoryEventStore(),
+ "service-run:tenant-1:svc-1:run-1",
+ static () => new ServiceRunGAgent());
+ await actor.HandleRegisterAsync(new RegisterServiceRunRequested
+ {
+ Record = BuildRecord("run-1"),
+ });
+
+ var foreign = BuildRecord("run-1");
+ foreign.ServiceId = "svc-2";
+ var act = () => actor.HandleRegisterAsync(new RegisterServiceRunRequested { Record = foreign });
+
+ await act.Should().ThrowAsync()
+ .WithMessage("*svc-1*cannot re-register under service 'svc-2'*");
+ }
+
+ [Fact]
+ public async Task HandleRegisterAsync_ShouldRejectTargetMismatchOnReRegister()
+ {
+ var actor = GAgentServiceTestKit.CreateStatefulAgent(
+ new InMemoryEventStore(),
+ "service-run:tenant-1:svc-1:run-1",
+ static () => new ServiceRunGAgent());
+ await actor.HandleRegisterAsync(new RegisterServiceRunRequested
+ {
+ Record = BuildRecord("run-1"),
+ });
+
+ var foreign = BuildRecord("run-1");
+ foreign.TargetActorId = "different-target";
+ var act = () => actor.HandleRegisterAsync(new RegisterServiceRunRequested { Record = foreign });
+
+ await act.Should().ThrowAsync()
+ .WithMessage("*target-run-1*cannot re-register against target 'different-target'*");
+ }
+
+ [Fact]
+ public async Task HandleRegisterAsync_ShouldRejectMissingRequiredFields()
+ {
+ var actor = GAgentServiceTestKit.CreateStatefulAgent(
+ new InMemoryEventStore(),
+ "service-run:bad",
+ static () => new ServiceRunGAgent());
+
+ var noRunId = () => actor.HandleRegisterAsync(new RegisterServiceRunRequested
+ {
+ Record = new ServiceRunRecord { ScopeId = "t", ServiceId = "s", CommandId = "c" },
+ });
+ await noRunId.Should().ThrowAsync().WithMessage("run_id*");
+ }
+
+ [Fact]
+ public async Task HandleUpdateStatusAsync_ShouldAdvanceStatusAndStamp()
+ {
+ var actor = GAgentServiceTestKit.CreateStatefulAgent(
+ new InMemoryEventStore(),
+ "service-run:run-1",
+ static () => new ServiceRunGAgent());
+ await actor.HandleRegisterAsync(new RegisterServiceRunRequested
+ {
+ Record = BuildRecord("run-1"),
+ });
+
+ await actor.HandleUpdateStatusAsync(new UpdateServiceRunStatusRequested
+ {
+ RunId = "run-1",
+ Status = ServiceRunStatus.Completed,
+ });
+
+ actor.State.Record!.Status.Should().Be(ServiceRunStatus.Completed);
+ actor.State.LastAppliedEventVersion.Should().Be(2);
+ }
+
+ [Fact]
+ public async Task HandleUpdateStatusAsync_ShouldNoOp_WhenStatusUnchanged()
+ {
+ var actor = GAgentServiceTestKit.CreateStatefulAgent(
+ new InMemoryEventStore(),
+ "service-run:run-1",
+ static () => new ServiceRunGAgent());
+ await actor.HandleRegisterAsync(new RegisterServiceRunRequested
+ {
+ Record = BuildRecord("run-1"),
+ });
+
+ await actor.HandleUpdateStatusAsync(new UpdateServiceRunStatusRequested
+ {
+ RunId = "run-1",
+ Status = ServiceRunStatus.Accepted,
+ });
+
+ actor.State.LastAppliedEventVersion.Should().Be(1);
+ }
+
+ [Fact]
+ public async Task HandleUpdateStatusAsync_ShouldRejectWhenNotRegistered()
+ {
+ var actor = GAgentServiceTestKit.CreateStatefulAgent(
+ new InMemoryEventStore(),
+ "service-run:run-1",
+ static () => new ServiceRunGAgent());
+
+ var act = () => actor.HandleUpdateStatusAsync(new UpdateServiceRunStatusRequested
+ {
+ RunId = "run-1",
+ Status = ServiceRunStatus.Completed,
+ });
+ await act.Should().ThrowAsync()
+ .WithMessage("*has no registered run*");
+ }
+
+ private static ServiceRunRecord BuildRecord(string runId) =>
+ new()
+ {
+ ScopeId = "tenant-1",
+ ServiceId = "svc-1",
+ ServiceKey = "tenant-1:svc-1",
+ RunId = runId,
+ CommandId = $"cmd-{runId}",
+ CorrelationId = $"corr-{runId}",
+ EndpointId = "run",
+ ImplementationKind = ServiceImplementationKind.Static,
+ TargetActorId = $"target-{runId}",
+ RevisionId = "r1",
+ DeploymentId = "dep-1",
+ Status = ServiceRunStatus.Unspecified,
+ CreatedAt = Timestamp.FromDateTime(DateTime.UtcNow),
+ };
+}
diff --git a/test/Aevatar.GAgentService.Tests/Infrastructure/DefaultServiceInvocationDispatcherTests.cs b/test/Aevatar.GAgentService.Tests/Infrastructure/DefaultServiceInvocationDispatcherTests.cs
index 2af93dd78..60c849349 100644
--- a/test/Aevatar.GAgentService.Tests/Infrastructure/DefaultServiceInvocationDispatcherTests.cs
+++ b/test/Aevatar.GAgentService.Tests/Infrastructure/DefaultServiceInvocationDispatcherTests.cs
@@ -1,6 +1,7 @@
using Aevatar.AI.Abstractions;
using Aevatar.Foundation.Abstractions;
using Aevatar.GAgentService.Abstractions;
+using Aevatar.GAgentService.Abstractions.Ports;
using Aevatar.GAgentService.Infrastructure.Dispatch;
using Aevatar.GAgentService.Tests.TestSupport;
using Aevatar.Scripting.Core.Ports;
@@ -19,7 +20,8 @@ public async Task DispatchAsync_ShouldDispatchStaticEnvelope()
var dispatcher = new DefaultServiceInvocationDispatcher(
dispatchPort,
new RecordingScriptRuntimeCommandPort(),
- new RecordingWorkflowRunActorPort());
+ new RecordingWorkflowRunActorPort(),
+ new RecordingServiceRunRegistrationPort());
var target = CreateTarget(ServiceImplementationKind.Static, endpointId: "run");
var request = new ServiceInvocationRequest
{
@@ -46,7 +48,8 @@ public async Task DispatchAsync_ShouldDelegateScriptingRun()
var dispatcher = new DefaultServiceInvocationDispatcher(
new RecordingDispatchPort(),
scriptPort,
- new RecordingWorkflowRunActorPort());
+ new RecordingWorkflowRunActorPort(),
+ new RecordingServiceRunRegistrationPort());
var target = CreateTarget(
ServiceImplementationKind.Scripting,
endpointId: "run",
@@ -83,7 +86,8 @@ public async Task DispatchAsync_ShouldCreateWorkflowRun_AndSendEnvelope()
var dispatcher = new DefaultServiceInvocationDispatcher(
dispatchPort,
new RecordingScriptRuntimeCommandPort(),
- workflowPort);
+ workflowPort,
+ new RecordingServiceRunRegistrationPort());
var target = CreateTarget(
ServiceImplementationKind.Workflow,
endpointId: "chat",
@@ -127,7 +131,8 @@ public async Task DispatchAsync_ShouldPreferIdentityTenantIdOverPayloadScope()
var dispatcher = new DefaultServiceInvocationDispatcher(
new RecordingDispatchPort(),
new RecordingScriptRuntimeCommandPort(),
- workflowPort);
+ workflowPort,
+ new RecordingServiceRunRegistrationPort());
var target = CreateTarget(
ServiceImplementationKind.Workflow,
endpointId: "chat",
@@ -166,7 +171,8 @@ public async Task DispatchAsync_ShouldResolveScopeIdFromRequestScopeBeforeMetada
var dispatcher = new DefaultServiceInvocationDispatcher(
new RecordingDispatchPort(),
new RecordingScriptRuntimeCommandPort(),
- workflowPort);
+ workflowPort,
+ new RecordingServiceRunRegistrationPort());
var target = CreateTarget(
ServiceImplementationKind.Workflow,
endpointId: "chat",
@@ -205,7 +211,8 @@ public async Task DispatchAsync_ShouldResolveScopeIdFromWorkflowMetadataKey_When
var dispatcher = new DefaultServiceInvocationDispatcher(
new RecordingDispatchPort(),
new RecordingScriptRuntimeCommandPort(),
- workflowPort);
+ workflowPort,
+ new RecordingServiceRunRegistrationPort());
var target = CreateTarget(
ServiceImplementationKind.Workflow,
endpointId: "chat",
@@ -242,7 +249,8 @@ public async Task DispatchAsync_ShouldResolveScopeIdFromLegacyMetadataKey_WhenOt
var dispatcher = new DefaultServiceInvocationDispatcher(
new RecordingDispatchPort(),
new RecordingScriptRuntimeCommandPort(),
- workflowPort);
+ workflowPort,
+ new RecordingServiceRunRegistrationPort());
var target = CreateTarget(
ServiceImplementationKind.Workflow,
endpointId: "chat",
@@ -277,7 +285,8 @@ public async Task DispatchAsync_ShouldRejectPayloadTypeMismatch()
var dispatcher = new DefaultServiceInvocationDispatcher(
new RecordingDispatchPort(),
new RecordingScriptRuntimeCommandPort(),
- new RecordingWorkflowRunActorPort());
+ new RecordingWorkflowRunActorPort(),
+ new RecordingServiceRunRegistrationPort());
var target = CreateTarget(
ServiceImplementationKind.Static,
endpointId: "run",
@@ -302,7 +311,8 @@ public async Task DispatchAsync_ShouldGenerateCommandAndCorrelationIds_WhenMissi
var dispatcher = new DefaultServiceInvocationDispatcher(
dispatchPort,
new RecordingScriptRuntimeCommandPort(),
- new RecordingWorkflowRunActorPort());
+ new RecordingWorkflowRunActorPort(),
+ new RecordingServiceRunRegistrationPort());
var target = CreateTarget(ServiceImplementationKind.Static, endpointId: "run");
var receipt = await dispatcher.DispatchAsync(target, new ServiceInvocationRequest
@@ -325,7 +335,8 @@ public async Task DispatchAsync_ShouldRejectMissingPayload()
var dispatcher = new DefaultServiceInvocationDispatcher(
new RecordingDispatchPort(),
new RecordingScriptRuntimeCommandPort(),
- new RecordingWorkflowRunActorPort());
+ new RecordingWorkflowRunActorPort(),
+ new RecordingServiceRunRegistrationPort());
var target = CreateTarget(ServiceImplementationKind.Static, endpointId: "run");
var act = () => dispatcher.DispatchAsync(target, new ServiceInvocationRequest
@@ -344,7 +355,8 @@ public async Task DispatchAsync_ShouldRejectWorkflowPayloadThatIsNotChatRequest(
var dispatcher = new DefaultServiceInvocationDispatcher(
new RecordingDispatchPort(),
new RecordingScriptRuntimeCommandPort(),
- new RecordingWorkflowRunActorPort());
+ new RecordingWorkflowRunActorPort(),
+ new RecordingServiceRunRegistrationPort());
var target = CreateTarget(
ServiceImplementationKind.Workflow,
endpointId: "chat",
@@ -372,7 +384,8 @@ public async Task DispatchAsync_ShouldPassRequestedEventTypeAndGeneratedRunIdToS
var dispatcher = new DefaultServiceInvocationDispatcher(
new RecordingDispatchPort(),
scriptPort,
- new RecordingWorkflowRunActorPort());
+ new RecordingWorkflowRunActorPort(),
+ new RecordingServiceRunRegistrationPort());
var target = CreateTarget(
ServiceImplementationKind.Scripting,
endpointId: "run",
@@ -398,13 +411,111 @@ public async Task DispatchAsync_ShouldPassRequestedEventTypeAndGeneratedRunIdToS
scriptPort.Calls[0].payload!.TypeUrl.Should().Be(Any.Pack(new StringValue()).TypeUrl);
}
+ [Fact]
+ public async Task DispatchAsync_ShouldRegisterServiceRun_ForStaticPath()
+ {
+ var registry = new RecordingServiceRunRegistrationPort();
+ var dispatcher = new DefaultServiceInvocationDispatcher(
+ new RecordingDispatchPort(),
+ new RecordingScriptRuntimeCommandPort(),
+ new RecordingWorkflowRunActorPort(),
+ registry);
+ var target = CreateTarget(ServiceImplementationKind.Static, endpointId: "run");
+ var request = new ServiceInvocationRequest
+ {
+ Identity = GAgentServiceTestKit.CreateIdentity(),
+ EndpointId = "run",
+ CommandId = "cmd-static",
+ Payload = Any.Pack(new StringValue { Value = "payload" }),
+ };
+
+ var receipt = await dispatcher.DispatchAsync(target, request);
+
+ registry.Calls.Should().ContainSingle();
+ registry.Calls[0].RunId.Should().Be(receipt.CommandId);
+ registry.Calls[0].CommandId.Should().Be("cmd-static");
+ registry.Calls[0].ImplementationKind.Should().Be(ServiceImplementationKind.Static);
+ registry.Calls[0].TargetActorId.Should().Be("primary-actor");
+ registry.Calls[0].ScopeId.Should().Be("tenant");
+ registry.Calls[0].ServiceId.Should().Be("svc");
+ }
+
+ [Fact]
+ public async Task DispatchAsync_ShouldRegisterServiceRun_ForScriptingPath()
+ {
+ var registry = new RecordingServiceRunRegistrationPort();
+ var dispatcher = new DefaultServiceInvocationDispatcher(
+ new RecordingDispatchPort(),
+ new RecordingScriptRuntimeCommandPort(),
+ new RecordingWorkflowRunActorPort(),
+ registry);
+ var target = CreateTarget(
+ ServiceImplementationKind.Scripting,
+ endpointId: "run",
+ requestTypeUrl: Any.Pack(new StringValue()).TypeUrl);
+ target.Artifact.DeploymentPlan.ScriptingPlan = new ScriptingServiceDeploymentPlan
+ {
+ Revision = "rev-1",
+ DefinitionActorId = "definition-1",
+ };
+ var request = new ServiceInvocationRequest
+ {
+ Identity = GAgentServiceTestKit.CreateIdentity(),
+ EndpointId = "run",
+ CommandId = "cmd-script",
+ Payload = Any.Pack(new StringValue { Value = "payload" }),
+ };
+
+ await dispatcher.DispatchAsync(target, request);
+
+ registry.Calls.Should().ContainSingle();
+ registry.Calls[0].ImplementationKind.Should().Be(ServiceImplementationKind.Scripting);
+ registry.Calls[0].CommandId.Should().Be("cmd-script");
+ }
+
+ [Fact]
+ public async Task DispatchAsync_ShouldRegisterServiceRun_ForWorkflowPath()
+ {
+ var registry = new RecordingServiceRunRegistrationPort();
+ var workflowPort = new RecordingWorkflowRunActorPort();
+ var dispatcher = new DefaultServiceInvocationDispatcher(
+ new RecordingDispatchPort(),
+ new RecordingScriptRuntimeCommandPort(),
+ workflowPort,
+ registry);
+ var target = CreateTarget(
+ ServiceImplementationKind.Workflow,
+ endpointId: "chat",
+ requestTypeUrl: Any.Pack(new ChatRequestEvent()).TypeUrl);
+ target.Artifact.DeploymentPlan.WorkflowPlan = new WorkflowServiceDeploymentPlan
+ {
+ WorkflowName = "wf",
+ WorkflowYaml = "name: wf",
+ };
+ var request = new ServiceInvocationRequest
+ {
+ Identity = GAgentServiceTestKit.CreateIdentity(),
+ EndpointId = "chat",
+ CommandId = "cmd-wf",
+ Payload = Any.Pack(new ChatRequestEvent { Prompt = "hi" }),
+ };
+
+ await dispatcher.DispatchAsync(target, request);
+
+ registry.Calls.Should().ContainSingle();
+ registry.Calls[0].ImplementationKind.Should().Be(ServiceImplementationKind.Workflow);
+ registry.Calls[0].TargetActorId.Should().Be(workflowPort.RunActor.Id);
+ registry.Calls[0].CommandId.Should().Be("cmd-wf");
+ }
+
[Fact]
public async Task DispatchAsync_ShouldRejectUnsupportedImplementationKind()
{
var dispatcher = new DefaultServiceInvocationDispatcher(
new RecordingDispatchPort(),
new RecordingScriptRuntimeCommandPort(),
- new RecordingWorkflowRunActorPort());
+ new RecordingWorkflowRunActorPort(),
+ new RecordingServiceRunRegistrationPort());
var target = CreateTarget(ServiceImplementationKind.Static, endpointId: "run");
target.Artifact.ImplementationKind = ServiceImplementationKind.Unspecified;
@@ -453,6 +564,20 @@ private static ServiceInvocationResolvedTarget CreateTarget(
});
}
+ private sealed class RecordingServiceRunRegistrationPort : IServiceRunRegistrationPort
+ {
+ public List Calls { get; } = [];
+
+ public Task RegisterAsync(ServiceRunRecord record, CancellationToken ct = default)
+ {
+ Calls.Add(record.Clone());
+ return Task.FromResult(new ServiceRunRegistrationResult($"service-run:{record.RunId}", record.RunId));
+ }
+
+ public Task UpdateStatusAsync(string runActorId, string runId, ServiceRunStatus status, CancellationToken ct = default) =>
+ Task.CompletedTask;
+ }
+
private sealed class RecordingDispatchPort : IActorDispatchPort
{
public List<(string actorId, EventEnvelope envelope)> Calls { get; } = [];
diff --git a/test/Aevatar.GAgentService.Tests/Infrastructure/ServiceRunRegistrationAdapterTests.cs b/test/Aevatar.GAgentService.Tests/Infrastructure/ServiceRunRegistrationAdapterTests.cs
new file mode 100644
index 000000000..fa94d5049
--- /dev/null
+++ b/test/Aevatar.GAgentService.Tests/Infrastructure/ServiceRunRegistrationAdapterTests.cs
@@ -0,0 +1,193 @@
+using Aevatar.Foundation.Abstractions;
+using Aevatar.GAgentService.Abstractions;
+using Aevatar.GAgentService.Abstractions.Ports;
+using Aevatar.GAgentService.Core.GAgents;
+using Aevatar.GAgentService.Infrastructure.Adapters;
+using Aevatar.GAgentService.Tests.TestSupport;
+using FluentAssertions;
+using Google.Protobuf.WellKnownTypes;
+
+namespace Aevatar.GAgentService.Tests.Infrastructure;
+
+public sealed class ServiceRunRegistrationAdapterTests
+{
+ [Fact]
+ public async Task RegisterAsync_ShouldCreateActorWithCompositeId_AndDispatchRegisterEnvelope()
+ {
+ var runtime = new RecordingRunRegistryRuntime();
+ var dispatchPort = new RecordingDispatchPort();
+ var projectionPort = new RecordingServiceRunProjectionPort();
+ var adapter = new ServiceRunRegistrationAdapter(runtime, dispatchPort, projectionPort);
+
+ var record = BuildRecord(scopeId: "tenant-1", serviceId: "svc-1", runId: "run-1");
+ var result = await adapter.RegisterAsync(record);
+
+ var expectedActorId = ServiceRunIds.BuildActorId("tenant-1", "svc-1", "run-1");
+ result.RunActorId.Should().Be(expectedActorId);
+ result.RunId.Should().Be("run-1");
+ runtime.CreateCalls.Should().ContainSingle();
+ runtime.CreateCalls[0].agentType.Should().Be(typeof(ServiceRunGAgent));
+ runtime.CreateCalls[0].actorId.Should().Be(expectedActorId);
+ projectionPort.EnsureCalls.Should().Equal(expectedActorId);
+ dispatchPort.Calls.Should().ContainSingle();
+ dispatchPort.Calls[0].actorId.Should().Be(expectedActorId);
+ dispatchPort.Calls[0].envelope.Payload.TypeUrl.Should().Contain("RegisterServiceRunRequested");
+ }
+
+ [Fact]
+ public async Task RegisterAsync_ShouldNotCollide_OnSameRunIdAcrossScopes()
+ {
+ var runtime = new RecordingRunRegistryRuntime();
+ var adapter = new ServiceRunRegistrationAdapter(
+ runtime,
+ new RecordingDispatchPort(),
+ new RecordingServiceRunProjectionPort());
+
+ await adapter.RegisterAsync(BuildRecord("tenant-a", "svc", "run-shared"));
+ await adapter.RegisterAsync(BuildRecord("tenant-b", "svc", "run-shared"));
+
+ runtime.CreateCalls.Should().HaveCount(2);
+ runtime.CreateCalls[0].actorId.Should().Be(ServiceRunIds.BuildActorId("tenant-a", "svc", "run-shared"));
+ runtime.CreateCalls[1].actorId.Should().Be(ServiceRunIds.BuildActorId("tenant-b", "svc", "run-shared"));
+ runtime.CreateCalls[0].actorId.Should().NotBe(runtime.CreateCalls[1].actorId);
+ }
+
+ [Fact]
+ public async Task RegisterAsync_ShouldRejectMissingRequiredFields()
+ {
+ var adapter = new ServiceRunRegistrationAdapter(
+ new RecordingRunRegistryRuntime(),
+ new RecordingDispatchPort(),
+ new RecordingServiceRunProjectionPort());
+
+ var noRun = BuildRecord("tenant", "svc", string.Empty);
+ var act = () => adapter.RegisterAsync(noRun);
+ await act.Should().ThrowAsync().WithMessage("run_id*");
+
+ var noScope = BuildRecord(string.Empty, "svc", "run-1");
+ var act2 = () => adapter.RegisterAsync(noScope);
+ await act2.Should().ThrowAsync().WithMessage("scope_id*");
+
+ var noService = BuildRecord("tenant", string.Empty, "run-1");
+ var act3 = () => adapter.RegisterAsync(noService);
+ await act3.Should().ThrowAsync().WithMessage("service_id*");
+ }
+
+ [Fact]
+ public async Task UpdateStatusAsync_ShouldDispatchUpdateEnvelope()
+ {
+ var dispatchPort = new RecordingDispatchPort();
+ var adapter = new ServiceRunRegistrationAdapter(
+ new RecordingRunRegistryRuntime(),
+ dispatchPort,
+ new RecordingServiceRunProjectionPort());
+
+ await adapter.UpdateStatusAsync("service-run:tenant:svc:run-1", "run-1", ServiceRunStatus.Completed);
+
+ dispatchPort.Calls.Should().ContainSingle();
+ dispatchPort.Calls[0].actorId.Should().Be("service-run:tenant:svc:run-1");
+ dispatchPort.Calls[0].envelope.Payload.TypeUrl.Should().Contain("UpdateServiceRunStatusRequested");
+ }
+
+ [Fact]
+ public async Task UpdateStatusAsync_ShouldNoOp_WhenStatusUnspecified()
+ {
+ var dispatchPort = new RecordingDispatchPort();
+ var adapter = new ServiceRunRegistrationAdapter(
+ new RecordingRunRegistryRuntime(),
+ dispatchPort,
+ new RecordingServiceRunProjectionPort());
+
+ await adapter.UpdateStatusAsync("service-run:tenant:svc:run-1", "run-1", ServiceRunStatus.Unspecified);
+
+ dispatchPort.Calls.Should().BeEmpty();
+ }
+
+ private static ServiceRunRecord BuildRecord(string scopeId, string serviceId, string runId) =>
+ new()
+ {
+ ScopeId = scopeId,
+ ServiceId = serviceId,
+ ServiceKey = $"{scopeId}:{serviceId}",
+ RunId = runId,
+ CommandId = $"cmd-{runId}",
+ CorrelationId = $"corr-{runId}",
+ EndpointId = "run",
+ ImplementationKind = ServiceImplementationKind.Static,
+ TargetActorId = "primary-actor",
+ RevisionId = "r1",
+ DeploymentId = "dep-1",
+ Status = ServiceRunStatus.Unspecified,
+ CreatedAt = Timestamp.FromDateTime(DateTime.UtcNow),
+ };
+
+ private sealed class RecordingRunRegistryRuntime : IActorRuntime
+ {
+ public List<(System.Type agentType, string actorId)> CreateCalls { get; } = [];
+
+ public Task CreateAsync(string? id = null, CancellationToken ct = default)
+ where TAgent : IAgent =>
+ CreateAsync(typeof(TAgent), id, ct);
+
+ public Task CreateAsync(System.Type agentType, string? id = null, CancellationToken ct = default)
+ {
+ var actorId = id ?? $"created:{agentType.Name}";
+ CreateCalls.Add((agentType, actorId));
+ return Task.FromResult(new RecordingActor(actorId));
+ }
+
+ public Task DestroyAsync(string id, CancellationToken ct = default) => Task.CompletedTask;
+
+ public Task GetAsync(string id) => Task.FromResult(null);
+
+ public Task ExistsAsync(string id) => Task.FromResult(false);
+
+ public Task LinkAsync(string parentId, string childId, CancellationToken ct = default) => Task.CompletedTask;
+
+ public Task UnlinkAsync(string childId, CancellationToken ct = default) => Task.CompletedTask;
+ }
+
+ private sealed class RecordingDispatchPort : IActorDispatchPort
+ {
+ public List<(string actorId, EventEnvelope envelope)> Calls { get; } = [];
+
+ public Task DispatchAsync(string actorId, EventEnvelope envelope, CancellationToken ct = default)
+ {
+ Calls.Add((actorId, envelope));
+ return Task.CompletedTask;
+ }
+ }
+
+ private sealed class RecordingServiceRunProjectionPort : IServiceRunCurrentStateProjectionPort
+ {
+ public List EnsureCalls { get; } = [];
+
+ public Task EnsureProjectionAsync(string actorId, CancellationToken ct = default)
+ {
+ EnsureCalls.Add(actorId);
+ return Task.CompletedTask;
+ }
+ }
+
+ private sealed class RecordingActor : IActor
+ {
+ public RecordingActor(string id)
+ {
+ Id = id;
+ }
+
+ public string Id { get; }
+
+ public IAgent Agent { get; } = new TestStaticServiceAgent();
+
+ public Task ActivateAsync(CancellationToken ct = default) => Task.CompletedTask;
+
+ public Task DeactivateAsync(CancellationToken ct = default) => Task.CompletedTask;
+
+ public Task HandleEventAsync(EventEnvelope envelope, CancellationToken ct = default) => Task.CompletedTask;
+
+ public Task GetParentIdAsync() => Task.FromResult(null);
+
+ public Task> GetChildrenIdsAsync() => Task.FromResult>([]);
+ }
+}
diff --git a/test/Aevatar.GAgentService.Tests/Projection/ServiceRunCurrentStateProjectorTests.cs b/test/Aevatar.GAgentService.Tests/Projection/ServiceRunCurrentStateProjectorTests.cs
new file mode 100644
index 000000000..88c661218
--- /dev/null
+++ b/test/Aevatar.GAgentService.Tests/Projection/ServiceRunCurrentStateProjectorTests.cs
@@ -0,0 +1,253 @@
+using Aevatar.CQRS.Projection.Core.Abstractions;
+using Aevatar.Foundation.Abstractions;
+using Aevatar.GAgentService.Abstractions;
+using Aevatar.GAgentService.Projection.Contexts;
+using Aevatar.GAgentService.Projection.Projectors;
+using Aevatar.GAgentService.Projection.Queries;
+using Aevatar.GAgentService.Projection.ReadModels;
+using Aevatar.GAgentService.Abstractions.Queries;
+using FluentAssertions;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+
+namespace Aevatar.GAgentService.Tests.Projection;
+
+public sealed class ServiceRunCurrentStateProjectorTests
+{
+ [Fact]
+ public async Task ProjectAsync_ShouldMaterializeCurrentState_FromCommittedStateRoot()
+ {
+ var store = new RecordingDocumentStore(x => x.Id);
+ var projector = new ServiceRunCurrentStateProjector(
+ store,
+ new FixedProjectionClock(DateTimeOffset.Parse("2026-04-27T00:00:00+00:00")));
+ var observedAt = DateTimeOffset.Parse("2026-04-27T01:00:00+00:00");
+ var record = BuildRecord(
+ scopeId: "tenant-1",
+ serviceId: "svc-1",
+ runId: "run-1",
+ commandId: "cmd-1",
+ implementation: ServiceImplementationKind.Workflow,
+ targetActorId: "workflow-run:abc",
+ createdAt: observedAt);
+ var envelope = WrapCommittedRunState(
+ record,
+ stateVersion: 3,
+ eventId: "evt-registered",
+ observedAt: observedAt);
+ var context = new ServiceRunCurrentStateProjectionContext
+ {
+ RootActorId = "service-run:run-1",
+ ProjectionKind = "service-runs",
+ };
+
+ await projector.ProjectAsync(context, envelope);
+
+ var doc = await store.GetAsync(ServiceRunIds.BuildKey("tenant-1", "svc-1", "run-1"));
+ doc.Should().NotBeNull();
+ doc!.RunId.Should().Be("run-1");
+ doc.CommandId.Should().Be("cmd-1");
+ doc.ScopeId.Should().Be("tenant-1");
+ doc.ServiceId.Should().Be("svc-1");
+ doc.ActorId.Should().Be("service-run:run-1");
+ doc.ImplementationKind.Should().Be((int)ServiceImplementationKind.Workflow);
+ doc.TargetActorId.Should().Be("workflow-run:abc");
+ doc.Status.Should().Be((int)ServiceRunStatus.Accepted);
+ doc.StateVersion.Should().Be(3);
+ doc.LastEventId.Should().Be("evt-registered");
+ }
+
+ [Fact]
+ public async Task ProjectAsync_ShouldIgnoreEnvelope_WithoutCommittedStateRoot()
+ {
+ var store = new RecordingDocumentStore(x => x.Id);
+ var projector = new ServiceRunCurrentStateProjector(
+ store,
+ new FixedProjectionClock(DateTimeOffset.UtcNow));
+ var context = new ServiceRunCurrentStateProjectionContext
+ {
+ RootActorId = "service-run:run-x",
+ ProjectionKind = "service-runs",
+ };
+
+ await projector.ProjectAsync(context, new EventEnvelope
+ {
+ Id = "raw",
+ Payload = Any.Pack(new StringValue { Value = "noop" }),
+ });
+
+ (await store.ReadItemsAsync()).Should().BeEmpty();
+ }
+
+ [Fact]
+ public async Task QueryReader_ShouldFilterByScopeAndService_AndResolveByRunIdAndCommandId()
+ {
+ var store = new RecordingDocumentStore(x => x.Id);
+ var projector = new ServiceRunCurrentStateProjector(
+ store,
+ new FixedProjectionClock(DateTimeOffset.Parse("2026-04-27T00:00:00+00:00")));
+ var reader = new ServiceRunQueryReader(store);
+ await projector.ProjectAsync(
+ CreateContext("service-run:run-a"),
+ WrapCommittedRunState(
+ BuildRecord("tenant-1", "svc-1", "run-a", "cmd-a", ServiceImplementationKind.Static, "actor-a"),
+ stateVersion: 1,
+ eventId: "evt-a",
+ observedAt: DateTimeOffset.Parse("2026-04-27T01:00:00+00:00")));
+ await projector.ProjectAsync(
+ CreateContext("service-run:run-b"),
+ WrapCommittedRunState(
+ BuildRecord("tenant-1", "svc-1", "run-b", "cmd-b", ServiceImplementationKind.Workflow, "actor-b"),
+ stateVersion: 1,
+ eventId: "evt-b",
+ observedAt: DateTimeOffset.Parse("2026-04-27T02:00:00+00:00")));
+ await projector.ProjectAsync(
+ CreateContext("service-run:run-c"),
+ WrapCommittedRunState(
+ BuildRecord("tenant-2", "svc-1", "run-c", "cmd-c", ServiceImplementationKind.Scripting, "actor-c"),
+ stateVersion: 1,
+ eventId: "evt-c",
+ observedAt: DateTimeOffset.Parse("2026-04-27T03:00:00+00:00")));
+
+ var listForTenant1 = await reader.ListAsync(new ServiceRunQuery("tenant-1", "svc-1"));
+ listForTenant1.Should().HaveCount(2);
+ listForTenant1.Select(x => x.RunId).Should().BeEquivalentTo(new[] { "run-a", "run-b" });
+
+ var listForTenant2 = await reader.ListAsync(new ServiceRunQuery("tenant-2", "svc-1"));
+ listForTenant2.Select(x => x.RunId).Should().Equal("run-c");
+
+ var byRun = await reader.GetByRunIdAsync("tenant-1", "svc-1", "run-a");
+ byRun.Should().NotBeNull();
+ byRun!.CommandId.Should().Be("cmd-a");
+
+ var byCommand = await reader.GetByCommandIdAsync("tenant-1", "svc-1", "cmd-b");
+ byCommand.Should().NotBeNull();
+ byCommand!.RunId.Should().Be("run-b");
+
+ var byRunWrongScope = await reader.GetByRunIdAsync("tenant-1", "svc-1", "run-c");
+ byRunWrongScope.Should().BeNull();
+ }
+
+ [Fact]
+ public async Task ProjectAsync_ShouldNotCollide_WhenSameRunIdAcrossDifferentScopes()
+ {
+ var store = new RecordingDocumentStore(x => x.Id);
+ var projector = new ServiceRunCurrentStateProjector(
+ store,
+ new FixedProjectionClock(DateTimeOffset.Parse("2026-04-27T00:00:00+00:00")));
+ var observedAt = DateTimeOffset.Parse("2026-04-27T01:00:00+00:00");
+
+ await projector.ProjectAsync(
+ CreateContext("service-run:tenant-a:svc:run-shared"),
+ WrapCommittedRunState(
+ BuildRecord("tenant-a", "svc", "run-shared", "cmd-x", ServiceImplementationKind.Static, "actor-a", observedAt),
+ stateVersion: 1,
+ eventId: "evt-a",
+ observedAt: observedAt));
+ await projector.ProjectAsync(
+ CreateContext("service-run:tenant-b:svc:run-shared"),
+ WrapCommittedRunState(
+ BuildRecord("tenant-b", "svc", "run-shared", "cmd-x", ServiceImplementationKind.Static, "actor-b", observedAt),
+ stateVersion: 1,
+ eventId: "evt-b",
+ observedAt: observedAt));
+
+ var docA = await store.GetAsync(ServiceRunIds.BuildKey("tenant-a", "svc", "run-shared"));
+ var docB = await store.GetAsync(ServiceRunIds.BuildKey("tenant-b", "svc", "run-shared"));
+ docA.Should().NotBeNull();
+ docB.Should().NotBeNull();
+ docA!.TargetActorId.Should().Be("actor-a");
+ docB!.TargetActorId.Should().Be("actor-b");
+ }
+
+ [Fact]
+ public async Task ProjectAsync_ShouldIgnoreState_WithMissingScopeOrService()
+ {
+ var store = new RecordingDocumentStore(x => x.Id);
+ var projector = new ServiceRunCurrentStateProjector(
+ store,
+ new FixedProjectionClock(DateTimeOffset.UtcNow));
+
+ var record = BuildRecord("tenant-1", "svc-1", "run-1", "cmd-1", ServiceImplementationKind.Static, "actor-1");
+ record.ScopeId = string.Empty;
+ await projector.ProjectAsync(
+ CreateContext("service-run:bad"),
+ WrapCommittedRunState(record, stateVersion: 1, eventId: "evt-bad", observedAt: DateTimeOffset.UtcNow));
+
+ (await store.ReadItemsAsync()).Should().BeEmpty();
+ }
+
+ private static ServiceRunCurrentStateProjectionContext CreateContext(string rootActorId) =>
+ new()
+ {
+ RootActorId = rootActorId,
+ ProjectionKind = "service-runs",
+ };
+
+ private static ServiceRunRecord BuildRecord(
+ string scopeId,
+ string serviceId,
+ string runId,
+ string commandId,
+ ServiceImplementationKind implementation,
+ string targetActorId,
+ DateTimeOffset? createdAt = null) =>
+ new()
+ {
+ ScopeId = scopeId,
+ ServiceId = serviceId,
+ ServiceKey = $"{scopeId}:{serviceId}",
+ RunId = runId,
+ CommandId = commandId,
+ CorrelationId = commandId,
+ EndpointId = "run",
+ ImplementationKind = implementation,
+ TargetActorId = targetActorId,
+ RevisionId = "r1",
+ DeploymentId = "dep-1",
+ Status = ServiceRunStatus.Accepted,
+ CreatedAt = createdAt.HasValue ? Timestamp.FromDateTimeOffset(createdAt.Value) : null,
+ UpdatedAt = createdAt.HasValue ? Timestamp.FromDateTimeOffset(createdAt.Value) : null,
+ Identity = new ServiceIdentity
+ {
+ TenantId = scopeId,
+ AppId = "app",
+ Namespace = "default",
+ ServiceId = serviceId,
+ },
+ };
+
+ private static EventEnvelope WrapCommittedRunState(
+ ServiceRunRecord record,
+ long stateVersion,
+ string eventId,
+ DateTimeOffset observedAt)
+ {
+ var state = new ServiceRunState
+ {
+ Record = record.Clone(),
+ LastAppliedEventVersion = stateVersion,
+ LastEventId = eventId,
+ };
+ return new EventEnvelope
+ {
+ Id = $"outer-{eventId}",
+ Timestamp = Timestamp.FromDateTimeOffset(observedAt),
+ Route = EnvelopeRouteSemantics.CreateObserverPublication("root-actor"),
+ Payload = Any.Pack(new CommittedStateEventPublished
+ {
+ StateEvent = new StateEvent
+ {
+ EventId = eventId,
+ Version = stateVersion,
+ Timestamp = Timestamp.FromDateTimeOffset(observedAt),
+ EventData = Any.Pack(new ServiceRunRegisteredEvent
+ {
+ Record = record.Clone(),
+ }),
+ },
+ StateRoot = Any.Pack(state),
+ }),
+ };
+ }
+}