diff --git a/docker-compose.observability.yml b/docker-compose.observability.yml new file mode 100644 index 000000000..00b4d64e5 --- /dev/null +++ b/docker-compose.observability.yml @@ -0,0 +1,60 @@ +services: + jaeger: + image: jaegertracing/all-in-one:latest + container_name: aevatar-jaeger + environment: + COLLECTOR_OTLP_ENABLED: "true" + ports: + - "16686:16686" + restart: unless-stopped + + otel-collector: + image: otel/opentelemetry-collector-contrib:latest + container_name: aevatar-otel-collector + command: + - --config=/etc/otelcol-contrib/config.yml + ports: + - "4317:4317" + - "4318:4318" + - "9464:9464" + volumes: + - ./tools/observability/otel-collector-config.yml:/etc/otelcol-contrib/config.yml:ro + depends_on: + - jaeger + restart: unless-stopped + + prometheus: + image: prom/prometheus:latest + container_name: aevatar-prometheus + command: + - --config.file=/etc/prometheus/prometheus.yml + - --storage.tsdb.path=/prometheus + - --web.enable-lifecycle + ports: + - "9090:9090" + volumes: + - ./tools/observability/prometheus:/etc/prometheus:ro + - prometheus-data:/prometheus + depends_on: + - otel-collector + restart: unless-stopped + + grafana: + image: grafana/grafana:latest + container_name: aevatar-grafana + ports: + - "3000:3000" + environment: + GF_SECURITY_ADMIN_USER: admin + GF_SECURITY_ADMIN_PASSWORD: admin + GF_USERS_ALLOW_SIGN_UP: "false" + volumes: + - grafana-data:/var/lib/grafana + - ./tools/observability/grafana/provisioning:/etc/grafana/provisioning:ro + depends_on: + - prometheus + restart: unless-stopped + +volumes: + prometheus-data: + grafana-data: diff --git a/docs/architecture/metrics-baseline-plan.md b/docs/architecture/metrics-baseline-plan.md new file mode 100644 index 000000000..68b943afd --- /dev/null +++ b/docs/architecture/metrics-baseline-plan.md @@ -0,0 +1,249 @@ +# Metrics Baseline Plan + +> Status: In progress on branch `feature/metrics-observability-plan`. + +## 1. Purpose + +Provide a low-cardinality, production-safe observability baseline for workflow runtime and API: + +- Separate platform health metrics from user-perceived latency metrics. +- Support SSE/WS first-response analysis (TTFB-like) in addition to full request duration. +- Keep instrumentation generic, minimal, and layered. + +## 2. Design Principles + +- One meter per domain boundary: `Aevatar.Agents` for runtime, `Aevatar.Api` for API layer. +- Prefer low-cardinality labels over high-cardinality identifiers. +- Track both user-perceived latency (API) and backend processing overhead (runtime). +- Use OpenTelemetry OTLP export with Prometheus + Grafana as the default metrics display stack. + +## 3. Anti-Patterns to Avoid + +- Per-actor/per-session/per-command labels in metrics. +- Parallel metric systems with overlapping names and meanings. +- Multiple `Meter` objects sharing the same name across assemblies. +- Dashboard-only metrics that are not used in SLO/SLA decisions. +- `TypeUrl.Contains(...)` in src (blocked by CI guard). + +## 4. Current Implementation Status + +Implemented: + +- OTLP metric export from `Workflow.Host.Api`. +- Collector-centered local stack (`docker-compose.observability.yml`) with Jaeger, Prometheus, and Grafana. +- Runtime metric cleanup and low-cardinality refactor: + - removed high-cardinality labels (`agent_id`, `publisher_id`). + - removed low-value instruments (`RouteTargets`, `StateLoads`, `StateSaves`, `HandlerDuration`). +- Runtime metrics emitted from both Local and Orleans paths. +- API metrics for request count and full duration (meter `Aevatar.Api`) on all interaction endpoints: + - `POST /api/chat` + - `POST /api/workflows/resume` + - `POST /api/workflows/signal` + - command-style HTTP request path + - `GET /api/ws/chat` +- API first-response duration metric for streaming paths and WS parse error responses. +- Unified instrumentation scopes that compose tracing + logging + metrics: + - `EventHandleScope` (runtime): single scope drives Activity span, log scope, and metrics recording. + - `ApiRequestScope` (API): single scope drives stopwatch, result classification, and first-response tracking. + - Eliminates duplicate Stopwatch and independent error-tracking across tracing/metrics. +- `OperationCanceledException` consistently classified as `result=ok` across HTTP and WebSocket paths. +- `ChatWebSocketRunCoordinator` decoupled from metrics (no metrics return value; scope passed from caller). +- Histogram views are configured explicitly for AI-oriented request latency and runtime event latency buckets. +- Grafana dashboard panels for: + - health/error ratio + - runtime/API throughput and latency + - window totals (`increase`) + - first-response vs full-response comparison + +Pending: + +- Add explicit SLO panel with thresholds and status coloring. +- Tune alert thresholds with production baselines after traffic observation. +- Tune histogram bucket boundaries with production latency samples if workload profile changes. + +## 5. Metric Contract (Current) + +### 5.1 Runtime Metrics (Meter: `Aevatar.Agents`) + +| Name | Type | Labels | Meaning | +|---|---|---|---| +| `aevatar_runtime_events_handled_total` | Counter | `direction`, `result` | Number of handled runtime events | +| `aevatar_runtime_event_handle_duration_ms` | Histogram | `result` | Event handling duration (platform overhead) | +| `aevatar_runtime_active_actors` | UpDownCounter | none | Active actor count | + +### 5.2 API Metrics (Meter: `Aevatar.Api`) + +| Name | Type | Labels | Meaning | +|---|---|---|---| +| `aevatar_api_requests_total` | Counter | `transport`, `result` | API request volume | +| `aevatar_api_request_duration_ms` | Histogram | `transport` | End-to-end request duration | +| `aevatar_api_first_response_duration_ms` | Histogram | `transport`, `result` | Time to first response frame/ack | + +`transport` values: `http`, `ws` + +`result` values: `ok`, `error` + +### 5.3 First-Response Semantics (Explicit Contract) + +`aevatar_api_first_response_duration_ms` is defined as "time from request accepted to first response frame/ack/error sent to client". + +- `http` path: + - recorded on the first response signal sent to the client: + - run-context bootstrap frame (`aevatar.run.context`), or + - first streamed output frame (`emitAsync`) if no bootstrap frame was written first. + - not recorded for prompt validation early return (400) because no response stream frame is produced. +- `ws` path: + - recorded on first outbound message among `command ack`, `agui event`, or `command error`. + - if the request fails before any websocket message is sent, no first-response metric is recorded. + +This contract intentionally tracks "first observable response signal" rather than "request finished". + +### 5.4 Request-Duration Tagging Trade-Off + +`aevatar_api_request_duration_ms` intentionally omits the `result` label. This is a deliberate cardinality and dashboard-simplicity trade-off: + +- error ratio is derived from `aevatar_api_requests_total{result=...}`. +- latency panels focus on user wait time split by `transport`. +- adding `result` to the histogram would double API latency series without materially improving the primary SLO view. + +If per-result latency distributions become operationally necessary later, introduce them as a separate histogram with an explicit need, rather than retrofitting the core baseline. + +### 5.5 Cancellation Semantics + +`OperationCanceledException` is treated as `result=ok` in the request metric. Client-initiated cancellation is not a service error; it reflects normal connection lifecycle behavior (e.g., user navigates away, timeout). This keeps the error ratio focused on genuine service-side failures. + +## 6. Diagnostic Model + +When AI is part of the core request path, end-to-end latency alone cannot diagnose health. The layered metric approach separates concerns: + +| Metric | Answers | Normal Range | +|---|---|---| +| **error ratio** | Is the service stable? | < 1% | +| **first_response p95** | Is user-perceived responsiveness OK? | Acceptable TTFB | +| **request_duration p95** | Is total time reasonable? | High variance expected with AI | +| **runtime event duration p95** | Is platform overhead normal? | Should be consistently low | + +Interpretation rules: + +- High full latency + normal first-response + low error ratio → AI generation variance (normal). +- Rising error ratio + dropping throughput → service incident. +- Rising runtime event latency → platform bottleneck (investigate regardless of AI content). +- Full request duration minus first response → AI generation time (expected to vary). + +### Why NOT split AI/core at the runtime event level + +The runtime event layer processes individual envelopes. A single AI chat request fans out to many events (ChatRequest, TextMessageStart, N × TextMessageContent, TextMessageEnd). Only one event (the LLM call trigger) is slow; the rest are fast streaming relays. A `pipeline=ai|core` label at this level gives misleading distributions (p50 looks fast because most AI events are fast) and adds label cardinality without clear diagnostic value. + +The correct separation is at the API level: `first_response_duration` captures the real AI-latency impact on user experience, and `request_duration` captures the total including AI generation. + +## 7. Dashboard and Display + +Grafana dashboard file: + +- `tools/observability/grafana/provisioning/dashboards/aevatar-runtime-overview.json` + +Panels (SLO section, default view): + +1. SLO Read Guide (text) +2. Error Ratio — API and runtime (timeseries) +3. User Latency: First Response p95 (timeseries) +4. User Latency: Full Request p95 (timeseries) +5. API Request Latency p99 (timeseries) +6. Runtime Event Handle Latency p95/p99 (timeseries) + +Panels (Runtime Diagnostics section, drill-down): + +7. Runtime Diagnostics Guide (text) +8. Active Actors (stat) +9. Runtime Events — Self, Window Total (stat) +10. API Requests — Window Total (stat) +11. Runtime Self Events / API Request (stat) +12. Runtime Events Rate — Self, by result (timeseries) +13. API Requests Rate by result (timeseries) + +Note: "Runtime Self Events / API Request" is computed only when window API request count is greater than 0; otherwise the panel is intentionally empty to avoid misleading inflation from background self events. + +Local stack: + +- `docker-compose.observability.yml` +- OpenTelemetry Collector OTLP HTTP ingest endpoint: `http://localhost:4318` (ingest only, no UI) +- Prometheus: `http://localhost:9090` +- Grafana: `http://localhost:3000` +- Jaeger: `http://localhost:16686` +- Prometheus alert examples: `tools/observability/prometheus/alerts.yml` + +## 8. Verification Checklist + +- Prometheus target `aevatar-otel-collector` is `UP`. +- Collector-exposed metrics include: + - `aevatar_runtime_events_handled_total` + - `aevatar_api_requests_total` + - `aevatar_api_first_response_duration_ms` +- No high-cardinality labels appear in metric series. +- No `pipeline` label in runtime metrics. +- Dashboard shows first-response vs full-response latency. +- `alerts.yml` loads successfully in Prometheus rule status page. + +## 9. Metric Quick Reference (Runbook) + +### 9.1 Core SLO Metrics + +| Metric | PromQL (Reference) | Why it matters | Suggested alert threshold | Common misread | +|---|---|---|---|---| +| API error ratio (5m) | `sum(increase(aevatar_api_requests_total{result="error"}[5m])) / clamp_min(sum(increase(aevatar_api_requests_total[5m])), 1)` | Primary service stability signal for client requests | `> 1%` for 5-10 minutes | Counting 4xx as service error (current contract treats only 5xx as `error`) | +| Runtime event error ratio (5m) | `sum(increase(aevatar_runtime_events_handled_total{result="error"}[5m])) / clamp_min(sum(increase(aevatar_runtime_events_handled_total[5m])), 1)` | Runtime pipeline health signal | `> 1%` for 5-10 minutes | Comparing directly with API ratio without considering event fan-out | +| First response latency p95 | `histogram_quantile(0.95, sum by (le) (rate(aevatar_api_first_response_duration_ms_bucket[$__rate_interval])))` | User-perceived responsiveness (TTFB-like) | Set per workload; start from your current p95 baseline + 30% | Treating missing first-response samples as zero (they are "not emitted", not "fast") | +| Full request latency p95 | `histogram_quantile(0.95, sum by (le) (rate(aevatar_api_request_duration_ms_bucket[$__rate_interval])))` | End-to-end user waiting time | Set per workflow family; usually looser than first response | Assuming high full latency alone means platform issue | +| API request latency p99 | `histogram_quantile(0.99, sum by (le) (rate(aevatar_api_request_duration_ms_bucket[$__rate_interval])))` | Tail-latency regression detection | Alert when sustained spike exceeds SLO budget | Using p99 as primary product KPI instead of engineering diagnostic | +| Runtime event handle latency p95/p99 | `histogram_quantile(0.95, sum by (le) (rate(aevatar_runtime_event_handle_duration_ms_bucket[$__rate_interval])))` and p99 equivalent | Detects platform/runtime overhead changes independent of model generation variance | Trigger when both p95 and p99 trend up with stable traffic | Correlating directly to user latency without checking first/full API latency pair | + +### 9.2 Diagnostic Order (Fast Triage) + +1. Check API and runtime event error ratio (incident vs non-incident). +2. Check first-response p95 (user "is it responsive?" signal). +3. Check full-request p95 and API p99 (overall wait and tail behavior). +4. Check runtime event latency p95/p99 (platform overhead confirmation). +5. If only full-request worsens while first-response is stable, prioritize model/downstream generation analysis. + +Error-ratio panel implementation note: dashboard queries use "empty-as-zero" (`or on() vector(0)`) to keep both API and runtime ratio series visible even when a 5-minute window has no error samples. + +## 10. Next Plan Items + +1. Add dashboard status coloring for core SLO panels. +2. Re-baseline alert thresholds after collecting production traffic. +3. Add tests: + - WebSocket path first-response integration tests (requires WebSocket mock infrastructure) +4. Re-evaluate `IMeterFactory` only if host lifecycle isolation or meter injection becomes a proven need. + +## 11. Default Histogram Buckets and Alerts + +### 11.1 Default Histogram Buckets + +Configured in `ObservabilityExtensions`: + +- API latency histograms (`aevatar_api_request_duration_ms`, `aevatar_api_first_response_duration_ms`): + - `25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 20000, 30000, 45000, 60000, 90000, 120000` ms +- Runtime event latency histogram (`aevatar_runtime_event_handle_duration_ms`): + - `1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000` ms + +Configuration overrides: + +- `Observability:Metrics:ApiLatencyBucketsMs` +- `Observability:Metrics:RuntimeLatencyBucketsMs` + +Both settings accept comma-separated millisecond boundaries in ascending order. + +### 11.2 Default Alert Examples + +The repository now includes Prometheus alert examples in `tools/observability/prometheus/alerts.yml`: + +- API error ratio above 1% for 10 minutes +- Runtime event error ratio above 1% for 10 minutes +- API first-response p95 above 5000 ms for 10 minutes + +These are default guardrails for local and pre-production validation. They should be tuned after collecting real workload baselines. + +## 12. Related Documents + +- `docs/architecture/stream-first-tracing-design.md` +- `docs/architecture/workflow-jaeger-observability-guide.md` diff --git a/docs/architecture/workflow-jaeger-observability-guide.md b/docs/architecture/workflow-jaeger-observability-guide.md index 68ceb609b..93d184e0f 100644 --- a/docs/architecture/workflow-jaeger-observability-guide.md +++ b/docs/architecture/workflow-jaeger-observability-guide.md @@ -37,15 +37,10 @@ Expected runtime logging behavior: ## 3. Local Setup -Start Jaeger all-in-one: +Start the local observability stack: ```bash -docker run --rm --name jaeger \ - -e COLLECTOR_OTLP_ENABLED=true \ - -p 16686:16686 \ - -p 4317:4317 \ - -p 4318:4318 \ - jaegertracing/all-in-one:latest +docker compose -f docker-compose.observability.yml up -d jaeger otel-collector prometheus grafana ``` Set OTEL environment variables before host startup: @@ -88,7 +83,8 @@ Validate all checks: 2. Response body includes `correlationId` for async accepted flow. 3. Runtime logs include `trace_id`, `correlation_id`, and `causation_id`. 4. Jaeger UI (`http://localhost:16686`) shows trace under `Aevatar.Workflow.Host.Api`. -5. Log `trace_id` equals Jaeger trace id (trace_id is not exposed in API responses, only in logs). +5. Prometheus target page (`http://localhost:9090/targets`) shows `aevatar-otel-collector` as `UP`. +6. Log `trace_id` equals Jaeger trace id (trace_id is not exposed in API responses, only in logs). ## 5. Automated Test Checklist @@ -105,7 +101,7 @@ Recommended minimum checks: No traces in Jaeger: -- verify Jaeger container is healthy and OTLP ports are reachable +- verify the collector and Jaeger containers are healthy and OTLP ports are reachable - verify OTEL environment variables are visible to host process - verify sampling is not effectively zero diff --git a/docs/audit-scorecard/metrics-observability-dev-diff-scorecard-2026-03-06.md b/docs/audit-scorecard/metrics-observability-dev-diff-scorecard-2026-03-06.md new file mode 100644 index 000000000..0d86a9780 --- /dev/null +++ b/docs/audit-scorecard/metrics-observability-dev-diff-scorecard-2026-03-06.md @@ -0,0 +1,45 @@ +# Metrics Observability Dev-Diff Scorecard + +> Branch: `feature/metrics-observability-plan` vs `dev` +> Date: 2026-03-06 +> Scope: API/runtime metrics instrumentation, Prometheus alerting baseline, Grafana dashboard provisioning + +## Executive Score + +**Overall Score: 9.1 / 10** + +The branch is in a strong state for observability baseline quality. +Instrumentation boundaries are clean, metric cardinality remains controlled, tests are passing, and local observability stack provisioning works end-to-end. + +## Dimension Scores + +| # | Dimension | Score | Weight | Weighted | +|---|---|:---:|:---:|:---:| +| 1 | Architectural alignment (layering and scope ownership) | 9.2 | 20% | 1.84 | +| 2 | Metric correctness and low-cardinality safety | 9.2 | 20% | 1.84 | +| 3 | Test coverage and regression confidence | 9.0 | 20% | 1.80 | +| 4 | Operational readiness (Prometheus/Grafana/alerts) | 9.0 | 20% | 1.80 | +| 5 | Documentation quality and maintainability | 9.0 | 20% | 1.80 | +| | **Total** | | | **9.08** | + +Rounded verdict: **9.1 / 10** + +## Validation Evidence (Local) + +- `dotnet test test/Aevatar.Workflow.Host.Api.Tests/Aevatar.Workflow.Host.Api.Tests.csproj --nologo` + - Result: **261 passed, 0 failed, 0 skipped** +- `docker compose -f docker-compose.observability.yml up -d prometheus grafana` + - Result: both `aevatar-prometheus` and `aevatar-grafana` are up +- Generated API traffic to produce metrics: + - `POST /api/chat` returned HTTP 200 three times +- Prometheus query validation: + - `sum(increase(aevatar_api_requests_total[5m]))` returned value `3.080946740700676` (greater than zero) +- Grafana startup/provisioning logs: + - dashboard provisioning finished + - dashboard live channel initialized for UID `aevatar-runtime-overview` + +## Notes and Follow-Ups + +- Grafana HTTP auth check returned `401` with `admin/admin`, indicating existing persisted credentials in local volume differ from defaults. This does not block provisioning validation, but UI login verification should use local real credentials. +- Alert thresholds and histogram buckets are good defaults; tune with real traffic after baseline observation. +- Keep API and runtime metric contracts stable to preserve dashboard/query continuity across future refactors. diff --git a/src/Aevatar.Foundation.Runtime.Implementations.Local/Actors/LocalActor.cs b/src/Aevatar.Foundation.Runtime.Implementations.Local/Actors/LocalActor.cs index b2bb11adc..6ebba120d 100644 --- a/src/Aevatar.Foundation.Runtime.Implementations.Local/Actors/LocalActor.cs +++ b/src/Aevatar.Foundation.Runtime.Implementations.Local/Actors/LocalActor.cs @@ -1,7 +1,3 @@ -// LocalActor - IActor implementation. -// Focuses on two responsibilities: mailbox serialization and stream subscription management. - -using System.Diagnostics; using Aevatar.Foundation.Runtime.Routing; using Aevatar.Foundation.Runtime.Observability; using Aevatar.Foundation.Runtime.Actors; @@ -115,11 +111,7 @@ internal Task UnsubscribeFromParentAsync() private async Task EnqueueAsync(EventEnvelope envelope, bool propagateFailure = false) { - using var instrumentation = TracingContextHelpers.BeginHandleEnvelopeInstrumentation(_logger, Id, envelope); - var activity = instrumentation.Activity; - - var sw = Stopwatch.StartNew(); - var status = "ok"; + using var scope = EventHandleScope.Begin(_logger, Id, envelope); await _mailbox.WaitAsync(); try { @@ -127,9 +119,7 @@ private async Task EnqueueAsync(EventEnvelope envelope, bool propagateFailure = } catch (Exception ex) { - status = "error"; - activity?.SetTag("aevatar.error", true); - activity?.SetTag("aevatar.error.message", ex.Message); + scope.MarkError(ex); _logger.LogError(ex, "LocalActor {Id} failed to handle event", Id); if (propagateFailure) throw; @@ -137,20 +127,6 @@ private async Task EnqueueAsync(EventEnvelope envelope, bool propagateFailure = finally { _mailbox.Release(); - sw.Stop(); - AgentMetrics.EventsHandled.Add(1, - [ - new("agent.id", Id), - new("event.direction", envelope.Direction.ToString()), - new("event.type", envelope.Payload?.TypeUrl ?? "unknown"), - new("status", status), - ]); - AgentMetrics.EventHandleDuration.Record(sw.Elapsed.TotalMilliseconds, - [ - new("agent.id", Id), - new("event.direction", envelope.Direction.ToString()), - new("status", status), - ]); } } diff --git a/src/Aevatar.Foundation.Runtime.Implementations.Local/Actors/LocalActorPublisher.cs b/src/Aevatar.Foundation.Runtime.Implementations.Local/Actors/LocalActorPublisher.cs index 4771e4e2b..6ffaa63d6 100644 --- a/src/Aevatar.Foundation.Runtime.Implementations.Local/Actors/LocalActorPublisher.cs +++ b/src/Aevatar.Foundation.Runtime.Implementations.Local/Actors/LocalActorPublisher.cs @@ -3,7 +3,6 @@ using Aevatar.Foundation.Abstractions.Propagation; using Aevatar.Foundation.Core.Propagation; -using Aevatar.Foundation.Runtime.Observability; using Aevatar.Foundation.Runtime.Propagation; using Aevatar.Foundation.Runtime.Routing; using Google.Protobuf; @@ -56,12 +55,6 @@ public async Task PublishAsync( _envelopePropagationPolicy, _actorId, routeTargetCount); - AgentMetrics.RouteTargets.Add(routeTargetCount, - [ - new("publisher.id", _actorId), - new("direction", direction.ToString()), - new("event.type", evt.Descriptor.Name), - ]); switch (direction) { @@ -106,12 +99,6 @@ public async Task SendToAsync( _actorId, routeTargetCount: 1); await _streams.GetStream(targetActorId).ProduceAsync(envelope, ct); - AgentMetrics.RouteTargets.Add(1, - [ - new("publisher.id", _actorId), - new("direction", "Direct"), - new("event.type", evt.Descriptor.Name), - ]); } private long GetRouteTargetCount(EventDirection direction) => diff --git a/src/Aevatar.Foundation.Runtime.Implementations.Orleans/Grains/RuntimeActorGrain.cs b/src/Aevatar.Foundation.Runtime.Implementations.Orleans/Grains/RuntimeActorGrain.cs index 3da5ab70e..64a28b8c2 100644 --- a/src/Aevatar.Foundation.Runtime.Implementations.Orleans/Grains/RuntimeActorGrain.cs +++ b/src/Aevatar.Foundation.Runtime.Implementations.Orleans/Grains/RuntimeActorGrain.cs @@ -123,10 +123,6 @@ public async Task HandleEnvelopeAsync(byte[] envelopeBytes) } var envelope = EventEnvelope.Parser.ParseFrom(envelopeBytes); - using var instrumentation = TracingContextHelpers.BeginHandleEnvelopeInstrumentation( - _logger, - this.GetPrimaryKeyString(), - envelope); if (await TryHandleCompatibilityRetryAsync(envelope)) return; @@ -165,6 +161,7 @@ public async Task HandleEnvelopeAsync(byte[] envelopeBytes) return; } + using var scope = EventHandleScope.Begin(_logger, this.GetPrimaryKeyString(), envelope); try { using var stateBinding = _stateBindingAccessor?.Bind(_state); @@ -172,6 +169,7 @@ public async Task HandleEnvelopeAsync(byte[] envelopeBytes) } catch (Exception ex) { + scope.MarkError(ex); if (await TryScheduleRetryAsync(envelope, ex)) return; diff --git a/src/Aevatar.Foundation.Runtime/Observability/AgentMetrics.cs b/src/Aevatar.Foundation.Runtime/Observability/AgentMetrics.cs index 8c84c1e66..ccabedadd 100644 --- a/src/Aevatar.Foundation.Runtime/Observability/AgentMetrics.cs +++ b/src/Aevatar.Foundation.Runtime/Observability/AgentMetrics.cs @@ -11,25 +11,36 @@ namespace Aevatar.Foundation.Runtime.Observability; public static class AgentMetrics { private static readonly Meter Meter = new("Aevatar.Agents", "1.0.0"); + public const string DirectionTag = "direction"; + public const string ResultTag = "result"; + public const string ResultOk = "ok"; + public const string ResultError = "error"; - /// Processed event counter. - public static readonly Counter EventsHandled = Meter.CreateCounter("aevatar.agent.events_handled"); + /// Total events handled by runtime actor pipelines. + public static readonly Counter RuntimeEventsHandled = Meter.CreateCounter( + "aevatar.runtime.events_handled", + description: "Total number of runtime events handled."); - /// Handler duration histogram in milliseconds. - public static readonly Histogram HandlerDuration = Meter.CreateHistogram("aevatar.agent.handler_duration_ms"); - - /// End-to-end event handle duration in milliseconds. - public static readonly Histogram EventHandleDuration = Meter.CreateHistogram("aevatar.agent.event_handle_duration_ms"); - - /// Published route target count by direction. - public static readonly Counter RouteTargets = Meter.CreateCounter("aevatar.runtime.route_targets"); - - /// State store load counter. - public static readonly Counter StateLoads = Meter.CreateCounter("aevatar.state.loads"); - - /// State store save counter. - public static readonly Counter StateSaves = Meter.CreateCounter("aevatar.state.saves"); + /// Runtime event handle duration in milliseconds. + public static readonly Histogram RuntimeEventHandleDurationMs = Meter.CreateHistogram( + "aevatar.runtime.event_handle_duration_ms", + description: "Runtime event handling duration in milliseconds."); /// Active actor count (up/down counter). - public static readonly UpDownCounter ActiveActors = Meter.CreateUpDownCounter("aevatar.runtime.active_actors"); + public static readonly UpDownCounter ActiveActors = Meter.CreateUpDownCounter( + "aevatar.runtime.active_actors", + description: "Current number of active actors."); + + public static void RecordEventHandled(string direction, string result, double durationMs) + { + RuntimeEventsHandled.Add(1, + [ + new(DirectionTag, direction), + new(ResultTag, result), + ]); + RuntimeEventHandleDurationMs.Record(durationMs, + [ + new(ResultTag, result), + ]); + } } diff --git a/src/Aevatar.Foundation.Runtime/Observability/EventHandleScope.cs b/src/Aevatar.Foundation.Runtime/Observability/EventHandleScope.cs new file mode 100644 index 000000000..fb82aa47b --- /dev/null +++ b/src/Aevatar.Foundation.Runtime/Observability/EventHandleScope.cs @@ -0,0 +1,65 @@ +using System.Diagnostics; +using Microsoft.Extensions.Logging; + +namespace Aevatar.Foundation.Runtime.Observability; + +/// +/// Unified instrumentation scope for runtime event handling. +/// Composes tracing (Activity), structured logging (log scope), and metrics +/// into a single disposable scope, eliminating duplicate Stopwatch/error-tracking. +/// +public struct EventHandleScope : IDisposable +{ + private readonly Stopwatch _sw; + private readonly Activity? _activity; + private readonly IDisposable? _logScope; + private readonly string _direction; + private string _result; + private bool _disposed; + + public Activity? Activity => _activity; + + private EventHandleScope( + Stopwatch sw, + Activity? activity, + IDisposable? logScope, + string direction) + { + _sw = sw; + _activity = activity; + _logScope = logScope; + _direction = direction; + _result = AgentMetrics.ResultOk; + _disposed = false; + } + + public static EventHandleScope Begin(ILogger logger, string actorId, EventEnvelope envelope) + { + ArgumentNullException.ThrowIfNull(logger); + ArgumentException.ThrowIfNullOrWhiteSpace(actorId); + ArgumentNullException.ThrowIfNull(envelope); + + var activity = AevatarActivitySource.StartHandleEvent(actorId, envelope); + var logScope = logger.BeginScope(TracingContextHelpers.CreateLogScopeState(envelope)); + return new EventHandleScope(Stopwatch.StartNew(), activity, logScope, envelope.Direction.ToString()); + } + + public void MarkError(Exception ex) + { + _result = AgentMetrics.ResultError; + _activity?.SetTag("aevatar.error", true); + _activity?.SetTag("aevatar.error.message", ex.Message); + } + + public void Dispose() + { + if (_disposed) + return; + _disposed = true; + + _sw.Stop(); + AgentMetrics.RecordEventHandled(_direction, _result, _sw.Elapsed.TotalMilliseconds); + _logScope?.Dispose(); + _activity?.Dispose(); + } +} diff --git a/src/Aevatar.Foundation.Runtime/Observability/TracingContextHelpers.cs b/src/Aevatar.Foundation.Runtime/Observability/TracingContextHelpers.cs index c2da9aefe..6eb43e8f2 100644 --- a/src/Aevatar.Foundation.Runtime/Observability/TracingContextHelpers.cs +++ b/src/Aevatar.Foundation.Runtime/Observability/TracingContextHelpers.cs @@ -6,20 +6,6 @@ namespace Aevatar.Foundation.Runtime.Observability; public static class TracingContextHelpers { - public static HandleEnvelopeInstrumentation BeginHandleEnvelopeInstrumentation( - ILogger logger, - string agentId, - EventEnvelope envelope) - { - ArgumentNullException.ThrowIfNull(logger); - ArgumentException.ThrowIfNullOrWhiteSpace(agentId); - ArgumentNullException.ThrowIfNull(envelope); - - var activity = AevatarActivitySource.StartHandleEvent(agentId, envelope); - var logScope = logger.BeginScope(CreateLogScopeState(envelope)); - return new HandleEnvelopeInstrumentation(activity, logScope); - } - public static void PopulateTraceId(EventEnvelope envelope, bool overwrite = false) { var activity = Activity.Current; @@ -69,22 +55,4 @@ private static string ResolveCausationId(EventEnvelope envelope) => !string.IsNullOrWhiteSpace(causationId) ? causationId : string.Empty; - - public sealed class HandleEnvelopeInstrumentation : IDisposable - { - private readonly IDisposable? _logScope; - public Activity? Activity { get; } - - internal HandleEnvelopeInstrumentation(Activity? activity, IDisposable? logScope) - { - Activity = activity; - _logScope = logScope; - } - - public void Dispose() - { - _logScope?.Dispose(); - Activity?.Dispose(); - } - } } diff --git a/src/Aevatar.Foundation.Runtime/Persistence/InMemoryStateStore.cs b/src/Aevatar.Foundation.Runtime/Persistence/InMemoryStateStore.cs index 0f2e49abe..c0c841181 100644 --- a/src/Aevatar.Foundation.Runtime/Persistence/InMemoryStateStore.cs +++ b/src/Aevatar.Foundation.Runtime/Persistence/InMemoryStateStore.cs @@ -4,7 +4,6 @@ // ───────────────────────────────────────────────────────────── using System.Collections.Concurrent; -using Aevatar.Foundation.Runtime.Observability; using Aevatar.Foundation.Abstractions.Persistence; namespace Aevatar.Foundation.Runtime.Persistence; @@ -18,7 +17,6 @@ public sealed class InMemoryStateStore : IStateStore where TStat /// Loads state for the specified agent. public Task LoadAsync(string agentId, CancellationToken ct = default) { - AgentMetrics.StateLoads.Add(1, [new("state.type", typeof(TState).Name)]); return Task.FromResult(_store.GetValueOrDefault(agentId)); } @@ -26,7 +24,6 @@ public sealed class InMemoryStateStore : IStateStore where TStat public Task SaveAsync(string agentId, TState state, CancellationToken ct = default) { _store[agentId] = state; - AgentMetrics.StateSaves.Add(1, [new("state.type", typeof(TState).Name)]); return Task.CompletedTask; } diff --git a/src/workflow/Aevatar.Workflow.Host.Api/Aevatar.Workflow.Host.Api.csproj b/src/workflow/Aevatar.Workflow.Host.Api/Aevatar.Workflow.Host.Api.csproj index 4fd0cebf6..8948ccbae 100644 --- a/src/workflow/Aevatar.Workflow.Host.Api/Aevatar.Workflow.Host.Api.csproj +++ b/src/workflow/Aevatar.Workflow.Host.Api/Aevatar.Workflow.Host.Api.csproj @@ -14,6 +14,7 @@ + diff --git a/src/workflow/Aevatar.Workflow.Host.Api/ObservabilityExtensions.cs b/src/workflow/Aevatar.Workflow.Host.Api/ObservabilityExtensions.cs index 6abe2b177..0d8c6e7bc 100644 --- a/src/workflow/Aevatar.Workflow.Host.Api/ObservabilityExtensions.cs +++ b/src/workflow/Aevatar.Workflow.Host.Api/ObservabilityExtensions.cs @@ -1,5 +1,6 @@ using Microsoft.AspNetCore.Builder; using OpenTelemetry; +using OpenTelemetry.Metrics; using OpenTelemetry.Resources; using OpenTelemetry.Trace; using System.Globalization; @@ -9,8 +10,8 @@ namespace Aevatar.Workflow.Host.Api; internal static class ObservabilityExtensions { /// - /// Registers OpenTelemetry tracing. Service name is resolved from - /// OTEL_SERVICE_NAME; OTLP exporter is enabled when + /// Registers OpenTelemetry tracing and metrics. Service name is resolved from + /// OTEL_SERVICE_NAME; OTLP exporters are enabled when /// OTEL_EXPORTER_OTLP_ENDPOINT is set. /// internal static WebApplicationBuilder AddAevatarWorkflowObservability( @@ -20,6 +21,13 @@ internal static WebApplicationBuilder AddAevatarWorkflowObservability( var serviceName = builder.Configuration["OTEL_SERVICE_NAME"] ?? defaultServiceName; var defaultSamplingRatio = builder.Environment.IsDevelopment() ? 1.0 : 0.1; var samplingRatio = ResolveSamplingRatio(builder, defaultSamplingRatio); + var apiLatencyBucketsMs = ResolveHistogramBuckets( + builder.Configuration["Observability:Metrics:ApiLatencyBucketsMs"], + [25d, 50d, 100d, 250d, 500d, 1000d, 2500d, 5000d, 10000d, 20000d, 30000d, 45000d, 60000d, 90000d, 120000d]); + var runtimeLatencyBucketsMs = ResolveHistogramBuckets( + builder.Configuration["Observability:Metrics:RuntimeLatencyBucketsMs"], + [1d, 5d, 10d, 25d, 50d, 100d, 250d, 500d, 1000d, 2500d, 5000d, 10000d]); + var otlpEndpoint = ResolveOtlpEndpoint(builder.Configuration["OTEL_EXPORTER_OTLP_ENDPOINT"]); builder.Services .AddOpenTelemetry() @@ -32,11 +40,31 @@ internal static WebApplicationBuilder AddAevatarWorkflowObservability( .AddSource("Aevatar.Agents") .SetSampler(new ParentBasedSampler(new TraceIdRatioBasedSampler(samplingRatio))); - var endpoint = builder.Configuration["OTEL_EXPORTER_OTLP_ENDPOINT"]; - if (!string.IsNullOrWhiteSpace(endpoint) && - Uri.TryCreate(endpoint, UriKind.Absolute, out var uri)) + if (otlpEndpoint is not null) { - tracing.AddOtlpExporter(options => options.Endpoint = uri); + tracing.AddOtlpExporter(options => options.Endpoint = otlpEndpoint); + } + }) + .WithMetrics(metrics => + { + metrics + .AddAspNetCoreInstrumentation() + .AddRuntimeInstrumentation() + .AddMeter("Aevatar.Agents") + .AddMeter("Aevatar.Api") + .AddView( + instrumentName: "aevatar.api.request_duration_ms", + new ExplicitBucketHistogramConfiguration { Boundaries = apiLatencyBucketsMs }) + .AddView( + instrumentName: "aevatar.api.first_response_duration_ms", + new ExplicitBucketHistogramConfiguration { Boundaries = apiLatencyBucketsMs }) + .AddView( + instrumentName: "aevatar.runtime.event_handle_duration_ms", + new ExplicitBucketHistogramConfiguration { Boundaries = runtimeLatencyBucketsMs }); + + if (otlpEndpoint is not null) + { + metrics.AddOtlpExporter(options => options.Endpoint = otlpEndpoint); } }); @@ -70,4 +98,47 @@ private static double ResolveSamplingRatio(WebApplicationBuilder builder, double return ratio; } + + private static Uri? ResolveOtlpEndpoint(string? configuredValue) + { + if (string.IsNullOrWhiteSpace(configuredValue)) + return null; + + if (!Uri.TryCreate(configuredValue, UriKind.Absolute, out var uri)) + { + throw new InvalidOperationException( + $"Invalid OTLP endpoint '{configuredValue}' in 'OTEL_EXPORTER_OTLP_ENDPOINT'. Expected an absolute URI."); + } + + return uri; + } + + private static double[] ResolveHistogramBuckets(string? configuredValue, double[] defaultBuckets) + { + if (string.IsNullOrWhiteSpace(configuredValue)) + return defaultBuckets; + + var values = configuredValue + .Split(',', StringSplitOptions.TrimEntries | StringSplitOptions.RemoveEmptyEntries) + .Select(value => + { + if (!double.TryParse(value, NumberStyles.Float, CultureInfo.InvariantCulture, out var parsed) || + !double.IsFinite(parsed) || + parsed <= 0d) + { + throw new InvalidOperationException( + $"Invalid histogram bucket '{value}' in '{configuredValue}'. Expected positive finite numbers."); + } + + return parsed; + }) + .Distinct() + .OrderBy(value => value) + .ToArray(); + + if (values.Length == 0) + throw new InvalidOperationException("Histogram bucket configuration must contain at least one value."); + + return values; + } } diff --git a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ApiMetrics.cs b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ApiMetrics.cs new file mode 100644 index 000000000..c1cb79cdb --- /dev/null +++ b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ApiMetrics.cs @@ -0,0 +1,53 @@ +using System.Diagnostics.Metrics; + +namespace Aevatar.Workflow.Infrastructure.CapabilityApi; + +internal static class ApiMetrics +{ + private static readonly Meter Meter = new("Aevatar.Api", "1.0.0"); + internal const string TransportTag = "transport"; + internal const string ResultTag = "result"; + internal const string TransportHttp = "http"; + internal const string TransportWebSocket = "ws"; + internal const string ResultOk = "ok"; + internal const string ResultError = "error"; + + public static readonly Counter RequestsTotal = Meter.CreateCounter( + "aevatar.api.requests_total", + description: "Total API requests by transport and result."); + + public static readonly Histogram RequestDurationMs = Meter.CreateHistogram( + "aevatar.api.request_duration_ms", + description: "API request duration in milliseconds."); + + public static readonly Histogram FirstResponseDurationMs = Meter.CreateHistogram( + "aevatar.api.first_response_duration_ms", + description: "API first-response duration in milliseconds."); + + public static string ResolveResult(int statusCode) + { + return statusCode >= 500 ? ResultError : ResultOk; + } + + public static void RecordRequest(string transport, string result, double durationMs) + { + RequestsTotal.Add(1, + [ + new(TransportTag, transport), + new(ResultTag, result), + ]); + RequestDurationMs.Record(durationMs, + [ + new(TransportTag, transport), + ]); + } + + public static void RecordFirstResponse(string transport, string result, double durationMs) + { + FirstResponseDurationMs.Record(durationMs, + [ + new(TransportTag, transport), + new(ResultTag, result), + ]); + } +} diff --git a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ApiRequestScope.cs b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ApiRequestScope.cs new file mode 100644 index 000000000..9b0dbd47a --- /dev/null +++ b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ApiRequestScope.cs @@ -0,0 +1,62 @@ +using System.Diagnostics; +using System.Threading; + +namespace Aevatar.Workflow.Infrastructure.CapabilityApi; + +/// +/// Unified instrumentation scope for API request handling. +/// Encapsulates stopwatch, result tracking, first-response recording, +/// and auto-records the request metric on dispose. +/// Reference type so mutations propagate through callee boundaries. +/// +internal sealed class ApiRequestScope : IDisposable +{ + private readonly Stopwatch _sw; + private readonly string _transport; + private string _result; + private int _firstResponseRecorded; + private bool _disposed; + + private ApiRequestScope(string transport) + { + _sw = Stopwatch.StartNew(); + _transport = transport; + _result = ApiMetrics.ResultOk; + } + + public static ApiRequestScope BeginHttp() => new(ApiMetrics.TransportHttp); + + public static ApiRequestScope BeginWebSocket() => new(ApiMetrics.TransportWebSocket); + + public double ElapsedMs => _sw.Elapsed.TotalMilliseconds; + + public void MarkResult(int statusCode) + { + _result = ApiMetrics.ResolveResult(statusCode); + } + + public void MarkError() + { + _result = ApiMetrics.ResultError; + } + + /// + /// Records the first-response metric exactly once (thread-safe). + /// Uses the current result classification. Subsequent calls are no-ops. + /// + public void RecordFirstResponse() + { + if (Interlocked.CompareExchange(ref _firstResponseRecorded, 1, 0) == 0) + ApiMetrics.RecordFirstResponse(_transport, _result, _sw.Elapsed.TotalMilliseconds); + } + + public void Dispose() + { + if (_disposed) + return; + _disposed = true; + + _sw.Stop(); + ApiMetrics.RecordRequest(_transport, _result, _sw.Elapsed.TotalMilliseconds); + } +} diff --git a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs index 88d7f873d..1ff90156d 100644 --- a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs +++ b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs @@ -56,6 +56,7 @@ internal static async Task HandleChat( ICommandExecutionService chatRunService, CancellationToken ct = default) { + using var scope = ApiRequestScope.BeginHttp(); if (string.IsNullOrWhiteSpace(input.Prompt)) { http.Response.StatusCode = StatusCodes.Status400BadRequest; @@ -73,35 +74,34 @@ internal static async Task HandleChat( if (!normalizedRequest.Succeeded) { var (code, message) = ChatRunStartErrorMapper.ToCommandError(normalizedRequest.Error); - await WriteJsonErrorResponseAsync( - http, - ChatRunStartErrorMapper.ToHttpStatusCode(normalizedRequest.Error), - code, - message, - ct); + var statusCode = ChatRunStartErrorMapper.ToHttpStatusCode(normalizedRequest.Error); + scope.MarkResult(statusCode); + await WriteJsonErrorResponseAsync(http, statusCode, code, message, ct); return; } var result = await chatRunService.ExecuteAsync( normalizedRequest.Request!, - (frame, token) => writer.WriteAsync(frame, token), + async (frame, token) => + { + await writer.WriteAsync(frame, token); + scope.RecordFirstResponse(); + }, onStartedAsync: async (started, token) => { CapabilityTraceContext.ApplyCorrelationHeader(http.Response, started.CommandId); await writer.StartAsync(token); await writer.WriteAsync(BuildRunContextFrame(started), token); + scope.RecordFirstResponse(); }, ct); if (result.Error != WorkflowChatRunStartError.None && !writer.Started) { var (code, message) = ChatRunStartErrorMapper.ToCommandError(result.Error); - await WriteJsonErrorResponseAsync( - http, - ChatRunStartErrorMapper.ToHttpStatusCode(result.Error), - code, - message, - ct); + var statusCode = ChatRunStartErrorMapper.ToHttpStatusCode(result.Error); + scope.MarkResult(statusCode); + await WriteJsonErrorResponseAsync(http, statusCode, code, message, ct); } } catch (OperationCanceledException) @@ -109,6 +109,7 @@ await WriteJsonErrorResponseAsync( } catch (Exception ex) { + scope.MarkError(); logger?.LogError(ex, "Workflow chat execution failed."); if (!writer.Started) { @@ -131,6 +132,11 @@ internal static async Task HandleCommand( ILoggerFactory loggerFactory, CancellationToken ct = default) { + using var scope = ApiRequestScope.BeginHttp(); + var logger = loggerFactory.CreateLogger("Aevatar.Workflow.Host.Api.Command"); + var startSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + Task> executionTask; + if (string.IsNullOrWhiteSpace(input.Prompt)) { return Results.BadRequest(new @@ -140,25 +146,17 @@ internal static async Task HandleCommand( }); } - var logger = loggerFactory.CreateLogger("Aevatar.Workflow.Host.Api.Command"); - var startSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - Task> executionTask; + var normalizedRequest = ChatRunRequestNormalizer.Normalize(input); + if (!normalizedRequest.Succeeded) + { + var (code, message) = ChatRunStartErrorMapper.ToCommandError(normalizedRequest.Error); + var statusCode = ChatRunStartErrorMapper.ToHttpStatusCode(normalizedRequest.Error); + scope.MarkResult(statusCode); + return Results.Json(new { code, message }, statusCode: statusCode); + } try { - var normalizedRequest = ChatRunRequestNormalizer.Normalize(input); - if (!normalizedRequest.Succeeded) - { - var (code, message) = ChatRunStartErrorMapper.ToCommandError(normalizedRequest.Error); - return Results.Json( - new - { - code, - message, - }, - statusCode: ChatRunStartErrorMapper.ToHttpStatusCode(normalizedRequest.Error)); - } - executionTask = chatRunService.ExecuteAsync( normalizedRequest.Request!, static (_, _) => ValueTask.CompletedTask, @@ -169,8 +167,13 @@ internal static async Task HandleCommand( }, ct); } + catch (OperationCanceledException) + { + return Results.StatusCode(499); + } catch (Exception ex) { + scope.MarkError(); logger.LogError(ex, "Workflow command execution failed before start signal"); return Results.Json( new { code = "EXECUTION_FAILED", message = "Command execution failed." }, @@ -204,8 +207,13 @@ internal static async Task HandleCommand( { result = await executionTask; } + catch (OperationCanceledException) + { + return Results.StatusCode(499); + } catch (Exception ex) { + scope.MarkError(); logger.LogError(ex, "Workflow command execution failed before start signal"); return Results.Json( new { code = "EXECUTION_FAILED", message = "Command execution failed." }, @@ -215,13 +223,11 @@ internal static async Task HandleCommand( if (result.Error != WorkflowChatRunStartError.None) { var mappedError = ChatRunStartErrorMapper.ToCommandError(result.Error); + var statusCode = ChatRunStartErrorMapper.ToHttpStatusCode(result.Error); + scope.MarkResult(statusCode); return Results.Json( - new - { - code = mappedError.Code, - message = mappedError.Message, - }, - statusCode: ChatRunStartErrorMapper.ToHttpStatusCode(result.Error)); + new { code = mappedError.Code, message = mappedError.Message }, + statusCode: statusCode); } if (result.Started != null) @@ -231,6 +237,7 @@ internal static async Task HandleCommand( CapabilityTraceContext.CreateAcceptedPayload(result.Started)); } + scope.MarkError(); return Results.StatusCode(StatusCodes.Status500InternalServerError); } @@ -239,62 +246,78 @@ internal static async Task HandleResume( IWorkflowRunActorPort actorPort, CancellationToken ct = default) { + using var scope = ApiRequestScope.BeginHttp(); ArgumentNullException.ThrowIfNull(input); ArgumentNullException.ThrowIfNull(actorPort); - var actorId = (input.ActorId ?? string.Empty).Trim(); - var runId = (input.RunId ?? string.Empty).Trim(); - var stepId = (input.StepId ?? string.Empty).Trim(); - if (string.IsNullOrWhiteSpace(actorId) || - string.IsNullOrWhiteSpace(runId) || - string.IsNullOrWhiteSpace(stepId)) + try { - return Results.BadRequest(new { error = "actorId, runId and stepId are required." }); - } + var actorId = (input.ActorId ?? string.Empty).Trim(); + var runId = (input.RunId ?? string.Empty).Trim(); + var stepId = (input.StepId ?? string.Empty).Trim(); + if (string.IsNullOrWhiteSpace(actorId) || + string.IsNullOrWhiteSpace(runId) || + string.IsNullOrWhiteSpace(stepId)) + { + scope.MarkResult(StatusCodes.Status400BadRequest); + return Results.BadRequest(new { error = "actorId, runId and stepId are required." }); + } - var actor = await actorPort.GetAsync(actorId, ct); - if (actor == null) - return Results.NotFound(new { error = $"Actor '{actorId}' not found." }); + var actor = await actorPort.GetAsync(actorId, ct); + if (actor == null) + { + scope.MarkResult(StatusCodes.Status404NotFound); + return Results.NotFound(new { error = $"Actor '{actorId}' not found." }); + } - if (!await actorPort.IsWorkflowActorAsync(actor, ct)) - return Results.BadRequest(new { error = $"Actor '{actorId}' is not a workflow actor." }); + if (!await actorPort.IsWorkflowActorAsync(actor, ct)) + { + scope.MarkResult(StatusCodes.Status400BadRequest); + return Results.BadRequest(new { error = $"Actor '{actorId}' is not a workflow actor." }); + } - var resumed = new WorkflowResumedEvent - { - RunId = runId, - StepId = stepId, - Approved = input.Approved, - UserInput = input.UserInput ?? string.Empty, - }; - if (input.Metadata is { Count: > 0 }) - { - foreach (var (key, value) in input.Metadata) - resumed.Metadata[key] = value; - } - var commandId = (input.CommandId ?? string.Empty).Trim(); - var correlationId = string.IsNullOrWhiteSpace(commandId) - ? Guid.NewGuid().ToString("N") - : commandId; + var resumed = new WorkflowResumedEvent + { + RunId = runId, + StepId = stepId, + Approved = input.Approved, + UserInput = input.UserInput ?? string.Empty, + }; + if (input.Metadata is { Count: > 0 }) + { + foreach (var (key, value) in input.Metadata) + resumed.Metadata[key] = value; + } + var commandId = (input.CommandId ?? string.Empty).Trim(); + var correlationId = string.IsNullOrWhiteSpace(commandId) + ? Guid.NewGuid().ToString("N") + : commandId; - await actor.HandleEventAsync(new EventEnvelope - { - Id = Guid.NewGuid().ToString("N"), - Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), - Payload = Any.Pack(resumed), - PublisherId = "api.workflow.resume", - Direction = EventDirection.Self, - CorrelationId = correlationId, - TargetActorId = actor.Id, - }, ct); - - return Results.Ok(new + await actor.HandleEventAsync(new EventEnvelope + { + Id = Guid.NewGuid().ToString("N"), + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + Payload = Any.Pack(resumed), + PublisherId = "api.workflow.resume", + Direction = EventDirection.Self, + CorrelationId = correlationId, + TargetActorId = actor.Id, + }, ct); + + return Results.Ok(new + { + accepted = true, + actorId, + runId, + stepId, + commandId = correlationId, + }); + } + catch (Exception ex) when (ex is not OperationCanceledException) { - accepted = true, - actorId, - runId, - stepId, - commandId = correlationId, - }); + scope.MarkError(); + throw; + } } internal static async Task HandleSignal( @@ -302,55 +325,71 @@ internal static async Task HandleSignal( IWorkflowRunActorPort actorPort, CancellationToken ct = default) { + using var scope = ApiRequestScope.BeginHttp(); ArgumentNullException.ThrowIfNull(input); ArgumentNullException.ThrowIfNull(actorPort); - var actorId = (input.ActorId ?? string.Empty).Trim(); - var runId = (input.RunId ?? string.Empty).Trim(); - var signalName = (input.SignalName ?? string.Empty).Trim(); - if (string.IsNullOrWhiteSpace(actorId) || - string.IsNullOrWhiteSpace(runId) || - string.IsNullOrWhiteSpace(signalName)) + try { - return Results.BadRequest(new { error = "actorId, runId and signalName are required." }); - } + var actorId = (input.ActorId ?? string.Empty).Trim(); + var runId = (input.RunId ?? string.Empty).Trim(); + var signalName = (input.SignalName ?? string.Empty).Trim(); + if (string.IsNullOrWhiteSpace(actorId) || + string.IsNullOrWhiteSpace(runId) || + string.IsNullOrWhiteSpace(signalName)) + { + scope.MarkResult(StatusCodes.Status400BadRequest); + return Results.BadRequest(new { error = "actorId, runId and signalName are required." }); + } - var actor = await actorPort.GetAsync(actorId, ct); - if (actor == null) - return Results.NotFound(new { error = $"Actor '{actorId}' not found." }); + var actor = await actorPort.GetAsync(actorId, ct); + if (actor == null) + { + scope.MarkResult(StatusCodes.Status404NotFound); + return Results.NotFound(new { error = $"Actor '{actorId}' not found." }); + } - if (!await actorPort.IsWorkflowActorAsync(actor, ct)) - return Results.BadRequest(new { error = $"Actor '{actorId}' is not a workflow actor." }); + if (!await actorPort.IsWorkflowActorAsync(actor, ct)) + { + scope.MarkResult(StatusCodes.Status400BadRequest); + return Results.BadRequest(new { error = $"Actor '{actorId}' is not a workflow actor." }); + } - var commandId = (input.CommandId ?? string.Empty).Trim(); - var correlationId = string.IsNullOrWhiteSpace(commandId) - ? Guid.NewGuid().ToString("N") - : commandId; + var commandId = (input.CommandId ?? string.Empty).Trim(); + var correlationId = string.IsNullOrWhiteSpace(commandId) + ? Guid.NewGuid().ToString("N") + : commandId; - await actor.HandleEventAsync(new EventEnvelope - { - Id = Guid.NewGuid().ToString("N"), - Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), - Payload = Any.Pack(new SignalReceivedEvent + await actor.HandleEventAsync(new EventEnvelope { - RunId = runId, - SignalName = signalName, - Payload = input.Payload ?? string.Empty, - }), - PublisherId = "api.workflow.signal", - Direction = EventDirection.Self, - CorrelationId = correlationId, - TargetActorId = actor.Id, - }, ct); - - return Results.Ok(new + Id = Guid.NewGuid().ToString("N"), + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + Payload = Any.Pack(new SignalReceivedEvent + { + RunId = runId, + SignalName = signalName, + Payload = input.Payload ?? string.Empty, + }), + PublisherId = "api.workflow.signal", + Direction = EventDirection.Self, + CorrelationId = correlationId, + TargetActorId = actor.Id, + }, ct); + + return Results.Ok(new + { + accepted = true, + actorId, + runId, + signalName, + commandId = correlationId, + }); + } + catch (Exception ex) when (ex is not OperationCanceledException) { - accepted = true, - actorId, - runId, - signalName, - commandId = correlationId, - }); + scope.MarkError(); + throw; + } } private static WorkflowOutputFrame BuildRunContextFrame(WorkflowChatRunStarted started) => @@ -426,6 +465,7 @@ internal static async Task HandleChatWebSocket( ILoggerFactory loggerFactory, CancellationToken ct = default) { + using var scope = ApiRequestScope.BeginWebSocket(); if (!http.WebSockets.IsWebSocketRequest) { http.Response.StatusCode = StatusCodes.Status400BadRequest; @@ -456,17 +496,19 @@ await ChatWebSocketProtocol.SendAsync( parseContext.CorrelationId), ct, parseError.ResponseMessageType); + scope.RecordFirstResponse(); return; } responseMessageType = ChatWebSocketProtocol.NormalizeMessageType(command.ResponseMessageType); - await ChatWebSocketRunCoordinator.ExecuteAsync(socket, command, chatRunService, ct); + await ChatWebSocketRunCoordinator.ExecuteAsync(socket, command, chatRunService, scope, ct); } catch (OperationCanceledException) { } catch (Exception ex) { + scope.MarkError(); logger?.LogWarning(ex, "Failed to execute websocket chat command"); if (socket.State == System.Net.WebSockets.WebSocketState.Open) { diff --git a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatWebSocketRunCoordinator.cs b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatWebSocketRunCoordinator.cs index c9ef5f84b..f2856730b 100644 --- a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatWebSocketRunCoordinator.cs +++ b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatWebSocketRunCoordinator.cs @@ -10,6 +10,7 @@ public static async Task ExecuteAsync( WebSocket socket, ChatWebSocketCommandEnvelope command, ICommandExecutionService chatRunService, + ApiRequestScope scope, CancellationToken ct = default) { var responseMessageType = ChatWebSocketProtocol.NormalizeMessageType(command.ResponseMessageType); @@ -21,61 +22,66 @@ CapabilityMessageTraceContext ResolveContext() => if (!normalizedRequest.Succeeded) { var (code, message) = ChatRunStartErrorMapper.ToCommandError(normalizedRequest.Error); + var statusCode = ChatRunStartErrorMapper.ToHttpStatusCode(normalizedRequest.Error); + scope.MarkResult(statusCode); var context = ResolveContext(); await ChatWebSocketProtocol.SendAsync( socket, ChatWebSocketEnvelopeFactory.CreateCommandError( - command.RequestId, - code, - message, - context.CorrelationId), + command.RequestId, code, message, context.CorrelationId), ct, responseMessageType); + scope.RecordFirstResponse(); return; } var executionResult = await chatRunService.ExecuteAsync( normalizedRequest.Request!, - (frame, token) => - { - var context = ResolveContext(); - return new ValueTask(ChatWebSocketProtocol.SendAsync( - socket, - ChatWebSocketEnvelopeFactory.CreateAguiEvent( - command.RequestId, - frame, - context.CorrelationId), - token, - responseMessageType)); - }, - onStartedAsync: (started, token) => - { - correlationId = started.CommandId; - return new ValueTask(ChatWebSocketProtocol.SendAsync( - socket, - ChatWebSocketEnvelopeFactory.CreateCommandAck(command.RequestId, started), - token, - responseMessageType)); - }, + SendAguiEventAndRecordAsync, + onStartedAsync: SendAckAndRecordAsync, ct); if (executionResult.Error != WorkflowChatRunStartError.None) { var (code, message) = ChatRunStartErrorMapper.ToCommandError(executionResult.Error); + var statusCode = ChatRunStartErrorMapper.ToHttpStatusCode(executionResult.Error); + scope.MarkResult(statusCode); var context = ResolveContext(); await ChatWebSocketProtocol.SendAsync( socket, ChatWebSocketEnvelopeFactory.CreateCommandError( - command.RequestId, - code, - message, - context.CorrelationId), + command.RequestId, code, message, context.CorrelationId), ct, responseMessageType); + scope.RecordFirstResponse(); return; } if (executionResult.Started != null) correlationId = executionResult.Started.CommandId; + return; + + async ValueTask SendAguiEventAndRecordAsync(WorkflowOutputFrame frame, CancellationToken token) + { + var context = ResolveContext(); + await ChatWebSocketProtocol.SendAsync( + socket, + ChatWebSocketEnvelopeFactory.CreateAguiEvent( + command.RequestId, frame, context.CorrelationId), + token, + responseMessageType); + scope.RecordFirstResponse(); + } + + async ValueTask SendAckAndRecordAsync(WorkflowChatRunStarted started, CancellationToken token) + { + correlationId = started.CommandId; + await ChatWebSocketProtocol.SendAsync( + socket, + ChatWebSocketEnvelopeFactory.CreateCommandAck(command.RequestId, started), + token, + responseMessageType); + scope.RecordFirstResponse(); + } } } diff --git a/test/Aevatar.Foundation.Runtime.Hosting.Tests/RuntimeObservabilityAndTypeProbeCoverageTests.cs b/test/Aevatar.Foundation.Runtime.Hosting.Tests/RuntimeObservabilityAndTypeProbeCoverageTests.cs index 7893ff0f7..b52f97c7a 100644 --- a/test/Aevatar.Foundation.Runtime.Hosting.Tests/RuntimeObservabilityAndTypeProbeCoverageTests.cs +++ b/test/Aevatar.Foundation.Runtime.Hosting.Tests/RuntimeObservabilityAndTypeProbeCoverageTests.cs @@ -134,12 +134,7 @@ public void AgentMetrics_Instruments_ShouldAllowRecording() { Action act = () => { - AgentMetrics.EventsHandled.Add(1); - AgentMetrics.HandlerDuration.Record(12.5); - AgentMetrics.EventHandleDuration.Record(18.3); - AgentMetrics.RouteTargets.Add(2); - AgentMetrics.StateLoads.Add(1); - AgentMetrics.StateSaves.Add(1); + AgentMetrics.RecordEventHandled("Self", AgentMetrics.ResultOk, 18.3); AgentMetrics.ActiveActors.Add(1); }; diff --git a/test/Aevatar.Foundation.Runtime.Hosting.Tests/TracingContextHelpersTests.cs b/test/Aevatar.Foundation.Runtime.Hosting.Tests/TracingContextHelpersTests.cs index 6daf40904..fbcc7435c 100644 --- a/test/Aevatar.Foundation.Runtime.Hosting.Tests/TracingContextHelpersTests.cs +++ b/test/Aevatar.Foundation.Runtime.Hosting.Tests/TracingContextHelpersTests.cs @@ -75,7 +75,7 @@ public void PopulateTraceId_ShouldPopulateTraceAndSpanMetadata_FromCurrentActivi } [Fact] - public void BeginHandleEnvelopeInstrumentation_ShouldCreateActivityAndAttachScope() + public void EventHandleScope_ShouldCreateActivityAndAttachScope() { using var listener = new ActivityListener { @@ -96,9 +96,9 @@ public void BeginHandleEnvelopeInstrumentation_ShouldCreateActivityAndAttachScop PublisherId = "publisher-3", }; - using var instrumentation = TracingContextHelpers.BeginHandleEnvelopeInstrumentation(logger, "agent-3", envelope); - instrumentation.Activity.Should().NotBeNull(); - instrumentation.Activity!.GetTagItem("aevatar.agent.id").Should().Be("agent-3"); + using var scope = EventHandleScope.Begin(logger, "agent-3", envelope); + scope.Activity.Should().NotBeNull(); + scope.Activity!.GetTagItem("aevatar.agent.id").Should().Be("agent-3"); logger.LogInformation("runtime log line"); provider.Entries.Should().ContainSingle(); diff --git a/test/Aevatar.Workflow.Host.Api.Tests/ChatEndpointsInternalTests.cs b/test/Aevatar.Workflow.Host.Api.Tests/ChatEndpointsInternalTests.cs index 7ae521ac2..3147ea1da 100644 --- a/test/Aevatar.Workflow.Host.Api.Tests/ChatEndpointsInternalTests.cs +++ b/test/Aevatar.Workflow.Host.Api.Tests/ChatEndpointsInternalTests.cs @@ -6,6 +6,7 @@ using Aevatar.Workflow.Application.Abstractions.Queries; using Aevatar.Workflow.Application.Abstractions.Runs; using Aevatar.Workflow.Abstractions; +using Aevatar.Workflow.Host.Api.Tests.Helpers; using FluentAssertions; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Http; @@ -381,6 +382,7 @@ await WorkflowCapabilityEndpoints.HandleChat( [Fact] public async Task HandleResume_WhenValid_ShouldDispatchWorkflowResumedEvent() { + using var metricCapture = new ApiMetricCapture(); var actor = new RecordingActor("actor-1"); var actorPort = new FakeWorkflowRunActorPort { @@ -420,11 +422,15 @@ public async Task HandleResume_WhenValid_ShouldDispatchWorkflowResumedEvent() resumed.UserInput.Should().Be("approved"); resumed.Metadata["operator"].Should().Be("alice"); actor.LastHandledEnvelope.CorrelationId.Should().Be("cmd-1"); + metricCapture.RequestMeasurements.Should().ContainSingle(); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "transport" && Equals(t.Value, "http")); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "ok")); } [Fact] public async Task HandleResume_WhenActorMissing_ShouldReturnNotFound() { + using var metricCapture = new ApiMetricCapture(); var actorPort = new FakeWorkflowRunActorPort { ActorToReturn = null, @@ -442,11 +448,14 @@ public async Task HandleResume_WhenActorMissing_ShouldReturnNotFound() var (statusCode, _) = await ExecuteResultAsync(result); statusCode.Should().Be(StatusCodes.Status404NotFound); + metricCapture.RequestMeasurements.Should().ContainSingle(); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "ok")); } [Fact] public async Task HandleSignal_WhenValid_ShouldDispatchSignalReceivedEvent() { + using var metricCapture = new ApiMetricCapture(); var actor = new RecordingActor("actor-1"); var actorPort = new FakeWorkflowRunActorPort { @@ -479,6 +488,9 @@ public async Task HandleSignal_WhenValid_ShouldDispatchSignalReceivedEvent() signal.SignalName.Should().Be("ops_window_open"); signal.Payload.Should().Be("window=2026-02-26T10:00:00Z"); actor.LastHandledEnvelope.CorrelationId.Should().Be("cmd-s1"); + metricCapture.RequestMeasurements.Should().ContainSingle(); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "transport" && Equals(t.Value, "http")); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "ok")); } [Fact] @@ -621,6 +633,7 @@ public async Task HandleCommand_WhenAgentIdProvidedWithoutWorkflow_ShouldKeepWor [Fact] public async Task HandleChat_WithEmptyPrompt_ShouldReturn400() { + using var metricCapture = new ApiMetricCapture(); var http = CreateHttpContext(); var service = new FakeChatRunApplicationService(); @@ -631,6 +644,215 @@ await WorkflowCapabilityEndpoints.HandleChat( CancellationToken.None); http.Response.StatusCode.Should().Be(StatusCodes.Status400BadRequest); + metricCapture.FirstResponseMeasurements.Should().BeEmpty(); + } + + [Fact] + public async Task HandleChat_WhenStreaming_ShouldRecordFirstResponseMetric() + { + using var metricCapture = new ApiMetricCapture(); + var http = CreateHttpContext(); + var service = new FakeChatRunApplicationService + { + ExecuteHandler = async (_, emitAsync, onStartedAsync, ct) => + { + var started = new WorkflowChatRunStarted("actor-1", "direct", "cmd-1"); + if (onStartedAsync != null) + await onStartedAsync(started, ct); + + await emitAsync(new WorkflowOutputFrame + { + Type = WorkflowRunEventTypes.RunStarted, + ThreadId = "actor-1", + }, ct); + + return ToCoreResult( + new WorkflowChatRunExecutionResult( + WorkflowChatRunStartError.None, + started, + new WorkflowChatRunFinalizeResult( + WorkflowProjectionCompletionStatus.Completed, + true))); + }, + }; + + await WorkflowCapabilityEndpoints.HandleChat( + http, + new ChatInput { Prompt = "hello", Workflow = "direct" }, + service, + CancellationToken.None); + + http.Response.StatusCode.Should().Be(StatusCodes.Status200OK); + metricCapture.FirstResponseMeasurements.Should().ContainSingle(); + metricCapture.FirstResponseMeasurements[0].Tags.Should().Contain(t => t.Key == "transport" && Equals(t.Value, "http")); + metricCapture.FirstResponseMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "ok")); + + metricCapture.RequestMeasurements.Should().ContainSingle(); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "ok")); + } + + [Fact] + public async Task HandleChat_WhenOnlyRunContextIsWritten_ShouldRecordFirstResponseMetric() + { + using var metricCapture = new ApiMetricCapture(); + var http = CreateHttpContext(); + var service = new FakeChatRunApplicationService + { + ExecuteHandler = async (_, _, onStartedAsync, ct) => + { + var started = new WorkflowChatRunStarted("actor-1", "direct", "cmd-context-only"); + if (onStartedAsync != null) + await onStartedAsync(started, ct); + + return ToCoreResult( + new WorkflowChatRunExecutionResult( + WorkflowChatRunStartError.None, + started, + new WorkflowChatRunFinalizeResult( + WorkflowProjectionCompletionStatus.Completed, + true))); + }, + }; + + await WorkflowCapabilityEndpoints.HandleChat( + http, + new ChatInput { Prompt = "hello", Workflow = "direct" }, + service, + CancellationToken.None); + + http.Response.StatusCode.Should().Be(StatusCodes.Status200OK); + metricCapture.FirstResponseMeasurements.Should().ContainSingle(); + metricCapture.FirstResponseMeasurements[0].Tags.Should().Contain(t => t.Key == "transport" && Equals(t.Value, "http")); + metricCapture.FirstResponseMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "ok")); + metricCapture.RequestMeasurements.Should().ContainSingle(); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "ok")); + } + + [Fact] + public async Task HandleChat_WhenWorkflowNotFound_ShouldClassify404AsResultOk() + { + using var metricCapture = new ApiMetricCapture(); + var http = CreateHttpContext(); + var service = new FakeChatRunApplicationService + { + ExecuteHandler = (_, _, _, _) => Task.FromResult(ToCoreResult( + new WorkflowChatRunExecutionResult( + WorkflowChatRunStartError.WorkflowNotFound, + null, + null))), + }; + + await WorkflowCapabilityEndpoints.HandleChat( + http, + new ChatInput { Prompt = "hello", Workflow = "missing" }, + service, + CancellationToken.None); + + http.Response.StatusCode.Should().Be(StatusCodes.Status404NotFound); + metricCapture.RequestMeasurements.Should().ContainSingle(); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "ok")); + } + + [Fact] + public async Task HandleCommand_WhenStarted_ShouldRecordRequestMetric() + { + using var metricCapture = new ApiMetricCapture(); + using var loggerFactory = LoggerFactory.Create(_ => { }); + var service = new FakeChatRunApplicationService + { + ExecuteHandler = async (_, _, onStartedAsync, ct) => + { + var started = new WorkflowChatRunStarted("actor-1", "direct", "cmd-1"); + if (onStartedAsync != null) + await onStartedAsync(started, ct); + + return ToCoreResult( + new WorkflowChatRunExecutionResult( + WorkflowChatRunStartError.None, + started, + new WorkflowChatRunFinalizeResult( + WorkflowProjectionCompletionStatus.Completed, + true))); + }, + }; + + await WorkflowCapabilityEndpoints.HandleCommand( + new ChatInput { Prompt = "hello", Workflow = "direct" }, + service, + loggerFactory, + CancellationToken.None); + + metricCapture.RequestMeasurements.Should().ContainSingle(); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "transport" && Equals(t.Value, "http")); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "ok")); + } + + [Fact] + public async Task HandleCommand_WhenExecutionThrows_ShouldRecordRequestMetricAsError() + { + using var metricCapture = new ApiMetricCapture(); + using var loggerFactory = LoggerFactory.Create(_ => { }); + var service = new FakeChatRunApplicationService + { + ExecuteHandler = (_, _, _, _) => throw new InvalidOperationException("boom"), + }; + + await WorkflowCapabilityEndpoints.HandleCommand( + new ChatInput { Prompt = "hello", Workflow = "direct" }, + service, + loggerFactory, + CancellationToken.None); + + metricCapture.RequestMeasurements.Should().ContainSingle(); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "error")); + } + + [Fact] + public async Task HandleCommand_WhenExecutionCanceled_ShouldRecordRequestMetricAsOk() + { + using var metricCapture = new ApiMetricCapture(); + using var loggerFactory = LoggerFactory.Create(_ => { }); + var service = new FakeChatRunApplicationService + { + ExecuteHandler = (_, _, _, _) => throw new OperationCanceledException(), + }; + + var result = await WorkflowCapabilityEndpoints.HandleCommand( + new ChatInput { Prompt = "hello", Workflow = "direct" }, + service, + loggerFactory, + CancellationToken.None); + + var (statusCode, _) = await ExecuteResultAsync(result); + statusCode.Should().Be(499); + metricCapture.RequestMeasurements.Should().ContainSingle(); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "ok")); + } + + [Fact] + public async Task HandleCommand_WhenProjectionDisabled_ShouldClassify503AsResultError() + { + using var metricCapture = new ApiMetricCapture(); + using var loggerFactory = LoggerFactory.Create(_ => { }); + var service = new FakeChatRunApplicationService + { + ExecuteHandler = (_, _, _, _) => Task.FromResult(ToCoreResult( + new WorkflowChatRunExecutionResult( + WorkflowChatRunStartError.ProjectionDisabled, + null, + null))), + }; + + var result = await WorkflowCapabilityEndpoints.HandleCommand( + new ChatInput { Prompt = "hello", Workflow = "direct" }, + service, + loggerFactory, + CancellationToken.None); + + var (statusCode, _) = await ExecuteResultAsync(result); + statusCode.Should().Be(StatusCodes.Status503ServiceUnavailable); + metricCapture.RequestMeasurements.Should().ContainSingle(); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "error")); } [Fact] @@ -1098,4 +1320,5 @@ public RecordingAgent(string id) private static CommandExecutionResult ToCoreResult( WorkflowChatRunExecutionResult source) => new(source.Error, source.Started, source.FinalizeResult); + } diff --git a/test/Aevatar.Workflow.Host.Api.Tests/ChatWebSocketCoordinatorAndProtocolTests.cs b/test/Aevatar.Workflow.Host.Api.Tests/ChatWebSocketCoordinatorAndProtocolTests.cs index 20b993035..9c9676710 100644 --- a/test/Aevatar.Workflow.Host.Api.Tests/ChatWebSocketCoordinatorAndProtocolTests.cs +++ b/test/Aevatar.Workflow.Host.Api.Tests/ChatWebSocketCoordinatorAndProtocolTests.cs @@ -35,6 +35,7 @@ public async Task ExecuteAsync_WhenSuccess_ShouldSendAckAndRunEvents() }, }; + var scope = ApiRequestScope.BeginWebSocket(); await ChatWebSocketRunCoordinator.ExecuteAsync( socket, new ChatWebSocketCommandEnvelope("req-1", new ChatInput @@ -45,6 +46,7 @@ await ChatWebSocketRunCoordinator.ExecuteAsync( AgentId = "actor-1", }, WebSocketMessageType.Text), service, + scope, CancellationToken.None); var types = socket.SentTexts @@ -84,10 +86,12 @@ public async Task ExecuteAsync_WhenStartFails_ShouldSendCommandErrorOnly() null)), }; + var scope = ApiRequestScope.BeginWebSocket(); await ChatWebSocketRunCoordinator.ExecuteAsync( socket, new ChatWebSocketCommandEnvelope("req-2", new ChatInput { Prompt = "hello" }, WebSocketMessageType.Text), service, + scope, CancellationToken.None); socket.SentTexts.Should().ContainSingle(); @@ -114,6 +118,7 @@ public async Task ExecuteAsync_WhenAgentIdProvidedWithoutWorkflow_ShouldKeepWork null)), }; + var scope = ApiRequestScope.BeginWebSocket(); await ChatWebSocketRunCoordinator.ExecuteAsync( socket, new ChatWebSocketCommandEnvelope( @@ -125,6 +130,7 @@ await ChatWebSocketRunCoordinator.ExecuteAsync( }, WebSocketMessageType.Text), service, + scope, CancellationToken.None); service.LastCommand.Should().NotBeNull(); @@ -156,10 +162,12 @@ public async Task ExecuteAsync_WhenRunEventArrivesBeforeAck_ShouldUseRequestIdAs }, }; + var scope = ApiRequestScope.BeginWebSocket(); await ChatWebSocketRunCoordinator.ExecuteAsync( socket, new ChatWebSocketCommandEnvelope("req-fallback", new ChatInput { Prompt = "hello" }, WebSocketMessageType.Text), service, + scope, CancellationToken.None); socket.SentTexts.Should().HaveCount(2); diff --git a/test/Aevatar.Workflow.Host.Api.Tests/Helpers/ApiMetricCapture.cs b/test/Aevatar.Workflow.Host.Api.Tests/Helpers/ApiMetricCapture.cs new file mode 100644 index 000000000..13e89a1b6 --- /dev/null +++ b/test/Aevatar.Workflow.Host.Api.Tests/Helpers/ApiMetricCapture.cs @@ -0,0 +1,44 @@ +using System.Diagnostics.Metrics; + +namespace Aevatar.Workflow.Host.Api.Tests.Helpers; + +internal sealed class ApiMetricCapture : IDisposable +{ + private const string ApiMeterName = "Aevatar.Api"; + private const string FirstResponseMetricName = "aevatar.api.first_response_duration_ms"; + private const string RequestsTotalMetricName = "aevatar.api.requests_total"; + private readonly MeterListener _listener = new(); + + public ApiMetricCapture() + { + _listener.InstrumentPublished = (instrument, listener) => + { + if (instrument.Meter.Name == ApiMeterName) + listener.EnableMeasurementEvents(instrument); + }; + + _listener.SetMeasurementEventCallback((instrument, measurement, tags, _) => + { + if (instrument.Name == FirstResponseMetricName) + FirstResponseMeasurements.Add(new MetricMeasurement(measurement, tags.ToArray())); + }); + + _listener.SetMeasurementEventCallback((instrument, measurement, tags, _) => + { + if (instrument.Name == RequestsTotalMetricName) + RequestMeasurements.Add(new MetricMeasurement(measurement, tags.ToArray())); + }); + + _listener.Start(); + } + + public List FirstResponseMeasurements { get; } = []; + public List RequestMeasurements { get; } = []; + + public void Dispose() + { + _listener.Dispose(); + } +} + +internal sealed record MetricMeasurement(double Value, KeyValuePair[] Tags); diff --git a/test/Aevatar.Workflow.Host.Api.Tests/WorkflowCapabilityEndpointsCoverageTests.cs b/test/Aevatar.Workflow.Host.Api.Tests/WorkflowCapabilityEndpointsCoverageTests.cs index 1679088f2..896a0e239 100644 --- a/test/Aevatar.Workflow.Host.Api.Tests/WorkflowCapabilityEndpointsCoverageTests.cs +++ b/test/Aevatar.Workflow.Host.Api.Tests/WorkflowCapabilityEndpointsCoverageTests.cs @@ -4,6 +4,7 @@ using Aevatar.CQRS.Core.Abstractions.Commands; using Aevatar.Workflow.Application.Abstractions.Runs; using Aevatar.Workflow.Infrastructure.CapabilityApi; +using Aevatar.Workflow.Host.Api.Tests.Helpers; using FluentAssertions; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http.Features; @@ -169,6 +170,7 @@ await WorkflowCapabilityEndpoints.HandleChatWebSocket( [Fact] public async Task HandleChatWebSocket_WhenParseFails_ShouldSendCommandError() { + using var metricCapture = new ApiMetricCapture(); using var activity = new System.Diagnostics.Activity("ws-parse-trace").Start(); var socket = new FakeWebSocket(WebSocketState.Open); socket.EnqueueReceive(WebSocketMessageType.Text, Encoding.UTF8.GetBytes("""{"type":"chat.command","requestId":"req-parse","payload":{"prompt":""}}"""), true); @@ -193,11 +195,15 @@ await WorkflowCapabilityEndpoints.HandleChatWebSocket( doc.RootElement.GetProperty("correlationId").GetString().Should().Be("req-parse"); doc.RootElement.TryGetProperty("traceId", out _).Should().BeFalse(); doc.RootElement.TryGetProperty("payload", out _).Should().BeFalse(); + metricCapture.FirstResponseMeasurements.Should().ContainSingle(); + metricCapture.FirstResponseMeasurements[0].Tags.Should().Contain(t => t.Key == "transport" && Equals(t.Value, "ws")); + metricCapture.FirstResponseMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "ok")); } [Fact] public async Task HandleChatWebSocket_WhenExecutionThrows_ShouldSendRunExecutionFailed() { + using var metricCapture = new ApiMetricCapture(); using var activity = new System.Diagnostics.Activity("ws-exception-trace").Start(); var socket = new FakeWebSocket(WebSocketState.Open); socket.EnqueueReceive( @@ -226,6 +232,10 @@ await WorkflowCapabilityEndpoints.HandleChatWebSocket( doc.RootElement.GetProperty("code").GetString().Should().Be("RUN_EXECUTION_FAILED"); doc.RootElement.TryGetProperty("traceId", out _).Should().BeFalse(); doc.RootElement.TryGetProperty("payload", out _).Should().BeFalse(); + metricCapture.FirstResponseMeasurements.Should().BeEmpty(); + metricCapture.RequestMeasurements.Should().ContainSingle(); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "transport" && Equals(t.Value, "ws")); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "error")); } [Fact] @@ -261,6 +271,83 @@ await WorkflowCapabilityEndpoints.HandleChatWebSocket( doc.RootElement.TryGetProperty("traceId", out _).Should().BeFalse(); } + [Fact] + public async Task HandleChatWebSocket_WhenStarted_ShouldRecordFirstResponseMetric() + { + using var metricCapture = new ApiMetricCapture(); + var socket = new FakeWebSocket(WebSocketState.Open); + socket.EnqueueReceive( + WebSocketMessageType.Text, + Encoding.UTF8.GetBytes("""{"type":"chat.command","requestId":"req-ok","payload":{"prompt":"hello"}}"""), + true); + + var http = new DefaultHttpContext(); + http.Features.Set(new FakeWebSocketFeature(socket)); + + var service = new FakeCommandExecutionService + { + Handler = async (_, _, onStartedAsync, ct) => + { + if (onStartedAsync != null) + await onStartedAsync(new WorkflowChatRunStarted("actor-1", "direct", "cmd-ok"), ct); + + return new CommandExecutionResult( + WorkflowChatRunStartError.None, + new WorkflowChatRunStarted("actor-1", "direct", "cmd-ok"), + new WorkflowChatRunFinalizeResult(WorkflowProjectionCompletionStatus.Completed, true)); + }, + }; + using var loggerFactory = LoggerFactory.Create(_ => { }); + + await WorkflowCapabilityEndpoints.HandleChatWebSocket( + http, + service, + loggerFactory, + CancellationToken.None); + + metricCapture.FirstResponseMeasurements.Should().ContainSingle(); + metricCapture.FirstResponseMeasurements[0].Tags.Should().Contain(t => t.Key == "transport" && Equals(t.Value, "ws")); + metricCapture.FirstResponseMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "ok")); + } + + [Fact] + public async Task HandleChatWebSocket_WhenProjectionDisabled_ShouldRecordRequestAndFirstResponseAsError() + { + using var metricCapture = new ApiMetricCapture(); + var socket = new FakeWebSocket(WebSocketState.Open); + socket.EnqueueReceive( + WebSocketMessageType.Text, + Encoding.UTF8.GetBytes("""{"type":"chat.command","requestId":"req-proj-off","payload":{"prompt":"hello","workflow":"direct"}}"""), + true); + + var http = new DefaultHttpContext(); + http.Features.Set(new FakeWebSocketFeature(socket)); + + var service = new FakeCommandExecutionService + { + Handler = (_, _, _, _) => Task.FromResult( + new CommandExecutionResult( + WorkflowChatRunStartError.ProjectionDisabled, + null, + null)), + }; + using var loggerFactory = LoggerFactory.Create(_ => { }); + + await WorkflowCapabilityEndpoints.HandleChatWebSocket( + http, + service, + loggerFactory, + CancellationToken.None); + + metricCapture.FirstResponseMeasurements.Should().ContainSingle(); + metricCapture.FirstResponseMeasurements[0].Tags.Should().Contain(t => t.Key == "transport" && Equals(t.Value, "ws")); + metricCapture.FirstResponseMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "error")); + + metricCapture.RequestMeasurements.Should().ContainSingle(); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "transport" && Equals(t.Value, "ws")); + metricCapture.RequestMeasurements[0].Tags.Should().Contain(t => t.Key == "result" && Equals(t.Value, "error")); + } + private static async Task<(int StatusCode, string Body)> ExecuteResultAsync(IResult result) { var http = new DefaultHttpContext @@ -384,4 +471,5 @@ public override Task SendAsync(ArraySegment buffer, WebSocketMessageType m return Task.CompletedTask; } } + } diff --git a/tools/observability/README.md b/tools/observability/README.md new file mode 100644 index 000000000..b9d95cbe2 --- /dev/null +++ b/tools/observability/README.md @@ -0,0 +1,108 @@ +# Local Observability Stack + +This repository uses an OpenTelemetry Collector in front of Prometheus, Grafana, and Jaeger for local observability. + +## 1. Start Workflow Host + +Run the API host with OTLP pointed at the local collector: + +```bash +export OTEL_SERVICE_NAME=Aevatar.Workflow.Host.Api +export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +export OTEL_EXPORTER_OTLP_PROTOCOL=grpc + +ASPNETCORE_URLS=http://localhost:5000 \ +dotnet run --project src/workflow/Aevatar.Workflow.Host.Api +``` + +## 2. Start Observability Stack + +```bash +docker compose -f docker-compose.observability.yml up -d +``` + +Services: + +- OpenTelemetry Collector OTLP HTTP ingest endpoint: `http://localhost:4318` (ingest only, no UI) +- Prometheus: `http://localhost:9090` +- Grafana: `http://localhost:3000` (`admin` / `admin`) +- Jaeger: `http://localhost:16686` + +## 3. Validate Scraping + +Open Prometheus targets page: + +`http://localhost:9090/targets` + +Expect `aevatar-otel-collector` to be `UP`. + +## 4. Explore Metrics in Grafana + +Open Grafana Explore and query: + +- `aevatar_runtime_events_handled_total` +- `aevatar_runtime_event_handle_duration_ms` +- `aevatar_api_requests_total` +- `aevatar_api_request_duration_ms` +- `aevatar_api_first_response_duration_ms` + +A provisioned dashboard is available after startup: + +- Folder: `Aevatar` +- Dashboard: `Aevatar Runtime Overview` +- Includes two sections: + - SLO section (error ratio, first-response p95, full-request p95, runtime/api latency) + - Runtime diagnostics section (self-direction runtime events, event amplification signals) + +## 5. Stop Stack + +```bash +docker compose -f docker-compose.observability.yml down +``` + +## 6. Alert Rules + +Prometheus alert examples are provisioned from: + +- `tools/observability/prometheus/alerts.yml` + +Default examples: + +- API error ratio > 1% for 10 minutes +- Runtime event error ratio > 1% for 10 minutes +- API first-response p95 > 5000 ms for 10 minutes + +Inspect loaded rules in Prometheus: + +- `http://localhost:9090/rules` + +## 7. Histogram Buckets + +The workflow host configures explicit histogram buckets for AI-oriented latency profiles: + +- API request / first-response latency: + - `25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 20000, 30000, 45000, 60000, 90000, 120000` ms +- Runtime event latency: + - `1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000` ms + +Override them with configuration values: + +- `Observability:Metrics:ApiLatencyBucketsMs` +- `Observability:Metrics:RuntimeLatencyBucketsMs` + +Use comma-separated millisecond values in ascending order. + +## 8. Data Flow + +The local metric and trace path is: + +- Workflow host exports traces and metrics to OTLP (`localhost:4317`) +- OpenTelemetry Collector forwards traces to Jaeger +- OpenTelemetry Collector exposes Prometheus metrics on `:9464` +- Prometheus scrapes the collector and Grafana queries Prometheus + +## 9. Docker Host Notes + +The application runs on the host by default and exports OTLP to `localhost:4317`, which is published by the collector container. + +If you change the collector port mapping, update `OTEL_EXPORTER_OTLP_ENDPOINT` before starting the workflow host. diff --git a/tools/observability/grafana/provisioning/dashboards/aevatar-runtime-overview.json b/tools/observability/grafana/provisioning/dashboards/aevatar-runtime-overview.json new file mode 100644 index 000000000..679b11ea9 --- /dev/null +++ b/tools/observability/grafana/provisioning/dashboards/aevatar-runtime-overview.json @@ -0,0 +1,384 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": false, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "red", "value": 50 } + ] + } + }, + "overrides": [] + }, + "gridPos": { "h": 6, "w": 6, "x": 0, "y": 24 }, + "id": 1, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "textMode": "auto" + }, + "targets": [ + { + "editorMode": "code", + "expr": "aevatar_runtime_active_actors", + "legendFormat": "", + "range": true, + "refId": "A" + } + ], + "title": "Runtime Active Actors (Instances)", + "type": "stat" + }, + { + "datasource": { "type": "prometheus", "uid": "Prometheus" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "thresholds" }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { "h": 6, "w": 6, "x": 6, "y": 24 }, + "id": 7, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "textMode": "value" + }, + "targets": [ + { + "editorMode": "code", + "expr": "round(sum(increase(aevatar_runtime_events_handled_total{direction=\"Self\"}[$__range])))", + "legendFormat": "", + "range": true, + "refId": "A" + } + ], + "title": "Runtime Events (Self, Window Total)", + "type": "stat" + }, + { + "datasource": { "type": "prometheus", "uid": "Prometheus" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "thresholds" }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { "h": 6, "w": 6, "x": 12, "y": 24 }, + "id": 8, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "textMode": "value" + }, + "targets": [ + { + "editorMode": "code", + "expr": "round(sum(increase(aevatar_api_requests_total[$__range])))", + "legendFormat": "", + "range": true, + "refId": "A" + } + ], + "title": "API Requests (Window Total)", + "type": "stat" + }, + { + "datasource": { "type": "prometheus", "uid": "Prometheus" }, + "gridPos": { "h": 4, "w": 24, "x": 0, "y": 0 }, + "id": 9, + "options": { + "content": "### SLO (Default View)\n- If **error ratios** are low and **first response p95** is stable, service is healthy.\n- **First vs full** gap mainly reflects AI generation time variance.\n- Use lower section only when SLO signals degrade.", + "mode": "markdown" + }, + "title": "SLO Read Guide", + "type": "text" + }, + { + "datasource": { "type": "prometheus", "uid": "Prometheus" }, + "fieldConfig": { + "defaults": { "color": { "mode": "palette-classic" }, "mappings": [], "unit": "reqps" }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 30 }, + "id": 2, + "options": { + "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", "showLegend": true }, + "tooltip": { "mode": "single", "sort": "none" } + }, + "targets": [ + { + "editorMode": "code", + "expr": "sum by (result) (rate(aevatar_runtime_events_handled_total{direction=\"Self\"}[$__rate_interval]))", + "legendFormat": "{{result}}", + "range": true, + "refId": "A" + } + ], + "title": "Runtime Events Rate (Self, Diagnostics)", + "type": "timeseries" + }, + { + "datasource": { "type": "prometheus", "uid": "Prometheus" }, + "fieldConfig": { + "defaults": { "color": { "mode": "palette-classic" }, "mappings": [], "unit": "reqps" }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 30 }, + "id": 4, + "options": { + "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", "showLegend": true }, + "tooltip": { "mode": "single", "sort": "none" } + }, + "targets": [ + { + "editorMode": "code", + "expr": "sum by (result) (rate(aevatar_api_requests_total[$__rate_interval]))", + "legendFormat": "{{result}}", + "range": true, + "refId": "A" + } + ], + "title": "API Requests Rate (Diagnostics)", + "type": "timeseries" + }, + { + "datasource": { "type": "prometheus", "uid": "Prometheus" }, + "fieldConfig": { + "defaults": { "color": { "mode": "palette-classic" }, "mappings": [], "unit": "ms" }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 12 }, + "id": 3, + "options": { + "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", "showLegend": true }, + "tooltip": { "mode": "single", "sort": "none" } + }, + "targets": [ + { + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum by (le) (rate(aevatar_runtime_event_handle_duration_ms_bucket[$__rate_interval])))", + "legendFormat": "runtime p95", + "range": true, + "refId": "A" + }, + { + "editorMode": "code", + "expr": "histogram_quantile(0.99, sum by (le) (rate(aevatar_runtime_event_handle_duration_ms_bucket[$__rate_interval])))", + "legendFormat": "runtime p99", + "range": true, + "refId": "B" + } + ], + "title": "Runtime Event Handle Latency (p95/p99)", + "type": "timeseries" + }, + { + "datasource": { "type": "prometheus", "uid": "Prometheus" }, + "fieldConfig": { + "defaults": { "color": { "mode": "palette-classic" }, "mappings": [], "unit": "ms" }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 12 }, + "id": 5, + "options": { + "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", "showLegend": true }, + "tooltip": { "mode": "single", "sort": "none" } + }, + "targets": [ + { + "editorMode": "code", + "expr": "histogram_quantile(0.99, sum by (le) (rate(aevatar_api_request_duration_ms_bucket[$__rate_interval])))", + "legendFormat": "api p99", + "range": true, + "refId": "A" + } + ], + "title": "API Request Latency (p99)", + "type": "timeseries" + }, + { + "datasource": { "type": "prometheus", "uid": "Prometheus" }, + "fieldConfig": { + "defaults": { "color": { "mode": "palette-classic" }, "mappings": [], "unit": "percentunit" }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 8, "x": 0, "y": 4 }, + "id": 6, + "options": { + "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", "showLegend": true }, + "tooltip": { "mode": "single", "sort": "none" } + }, + "targets": [ + { + "editorMode": "code", + "expr": "((sum(increase(aevatar_api_requests_total{result=\"error\"}[5m])) or on() vector(0)) / clamp_min((sum(increase(aevatar_api_requests_total[5m])) or on() vector(0)), 1))", + "legendFormat": "api error ratio", + "range": true, + "refId": "A" + }, + { + "editorMode": "code", + "expr": "((sum(increase(aevatar_runtime_events_handled_total{result=\"error\"}[5m])) or on() vector(0)) / clamp_min((sum(increase(aevatar_runtime_events_handled_total[5m])) or on() vector(0)), 1))", + "legendFormat": "runtime event error ratio", + "range": true, + "refId": "B" + } + ], + "title": "Error Ratio", + "type": "timeseries" + }, + { + "datasource": { "type": "prometheus", "uid": "Prometheus" }, + "fieldConfig": { + "defaults": { "color": { "mode": "palette-classic" }, "mappings": [], "unit": "ms" }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 8, "x": 8, "y": 4 }, + "id": 12, + "options": { + "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", "showLegend": true }, + "tooltip": { "mode": "single", "sort": "none" } + }, + "targets": [ + { + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum by (le) (rate(aevatar_api_first_response_duration_ms_bucket[$__rate_interval])))", + "legendFormat": "first response p95", + "range": true, + "refId": "A" + } + ], + "title": "User Latency: First Response (p95)", + "type": "timeseries" + }, + { + "datasource": { "type": "prometheus", "uid": "Prometheus" }, + "fieldConfig": { + "defaults": { "color": { "mode": "palette-classic" }, "mappings": [], "unit": "ms" }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 8, "x": 16, "y": 4 }, + "id": 15, + "options": { + "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", "showLegend": true }, + "tooltip": { "mode": "single", "sort": "none" } + }, + "targets": [ + { + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum by (le) (rate(aevatar_api_request_duration_ms_bucket[$__rate_interval])))", + "legendFormat": "full request p95", + "range": true, + "refId": "A" + } + ], + "title": "User Latency: Full Request (p95)", + "type": "timeseries" + }, + { + "datasource": { "type": "prometheus", "uid": "Prometheus" }, + "gridPos": { "h": 4, "w": 24, "x": 0, "y": 20 }, + "id": 13, + "options": { + "content": "### Runtime Diagnostics (Only when SLO degrades)\n- **Self** events are closest to request-correlated runtime work.\n- **Events per request** highlights fan-out/internal amplification.\n- Use these panels for root-cause direction, not primary health judgement.", + "mode": "markdown" + }, + "title": "Runtime Diagnostics Guide", + "type": "text" + }, + { + "datasource": { "type": "prometheus", "uid": "Prometheus" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "thresholds" }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { "h": 6, "w": 6, "x": 18, "y": 24 }, + "id": 14, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "textMode": "value" + }, + "targets": [ + { + "editorMode": "code", + "expr": "round(sum(increase(aevatar_runtime_events_handled_total{direction=\"Self\"}[$__range])) / sum(increase(aevatar_api_requests_total[$__range])), 2) and on() (sum(increase(aevatar_api_requests_total[$__range])) > 0)", + "legendFormat": "", + "range": true, + "refId": "A" + } + ], + "title": "Runtime Self Events / API Request (API>0)", + "type": "stat" + } + ], + "refresh": "10s", + "schemaVersion": 39, + "style": "dark", + "tags": ["aevatar", "runtime", "metrics"], + "templating": { "list": [] }, + "time": { "from": "now-30m", "to": "now" }, + "timepicker": {}, + "timezone": "", + "title": "Aevatar Runtime Overview", + "uid": "aevatar-runtime-overview", + "version": 7, + "weekStart": "" +} diff --git a/tools/observability/grafana/provisioning/dashboards/dashboards.yml b/tools/observability/grafana/provisioning/dashboards/dashboards.yml new file mode 100644 index 000000000..be100d269 --- /dev/null +++ b/tools/observability/grafana/provisioning/dashboards/dashboards.yml @@ -0,0 +1,12 @@ +apiVersion: 1 + +providers: + - name: Aevatar + orgId: 1 + folder: Aevatar + type: file + disableDeletion: true + allowUiUpdates: false + updateIntervalSeconds: 10 + options: + path: /etc/grafana/provisioning/dashboards diff --git a/tools/observability/grafana/provisioning/datasources/prometheus.yml b/tools/observability/grafana/provisioning/datasources/prometheus.yml new file mode 100644 index 000000000..9e3c7b4e4 --- /dev/null +++ b/tools/observability/grafana/provisioning/datasources/prometheus.yml @@ -0,0 +1,10 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + uid: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true + editable: false diff --git a/tools/observability/otel-collector-config.yml b/tools/observability/otel-collector-config.yml new file mode 100644 index 000000000..2eca2633c --- /dev/null +++ b/tools/observability/otel-collector-config.yml @@ -0,0 +1,31 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: "0.0.0.0:4317" + http: + endpoint: "0.0.0.0:4318" + +processors: + batch: + +exporters: + prometheus: + endpoint: "0.0.0.0:9464" + + otlp/jaeger: + endpoint: "jaeger:4317" + tls: + insecure: true + +service: + pipelines: + metrics: + receivers: [otlp] + processors: [batch] + exporters: [prometheus] + + traces: + receivers: [otlp] + processors: [batch] + exporters: [otlp/jaeger] diff --git a/tools/observability/prometheus/alerts.yml b/tools/observability/prometheus/alerts.yml new file mode 100644 index 000000000..df931eb77 --- /dev/null +++ b/tools/observability/prometheus/alerts.yml @@ -0,0 +1,32 @@ +groups: + - name: aevatar-observability + rules: + - alert: AevatarApiErrorRatioHigh + expr: sum(increase(aevatar_api_requests_total{result="error"}[5m])) / clamp_min(sum(increase(aevatar_api_requests_total[5m])), 1) > 0.01 + for: 10m + labels: + severity: warning + service: aevatar-workflow-host + annotations: + summary: API error ratio is above 1%. + description: API requests have exceeded a 1% error ratio for 10 minutes. + + - alert: AevatarRuntimeEventErrorRatioHigh + expr: sum(increase(aevatar_runtime_events_handled_total{result="error"}[5m])) / clamp_min(sum(increase(aevatar_runtime_events_handled_total[5m])), 1) > 0.01 + for: 10m + labels: + severity: warning + service: aevatar-runtime + annotations: + summary: Runtime event error ratio is above 1%. + description: Runtime event handling has exceeded a 1% error ratio for 10 minutes. + + - alert: AevatarApiFirstResponseP95High + expr: histogram_quantile(0.95, sum by (le) (rate(aevatar_api_first_response_duration_ms_bucket[5m]))) > 5000 + for: 10m + labels: + severity: warning + service: aevatar-workflow-host + annotations: + summary: API first-response p95 is above 5000 ms. + description: User-perceived first response latency has remained above 5000 ms for 10 minutes. diff --git a/tools/observability/prometheus/prometheus.yml b/tools/observability/prometheus/prometheus.yml new file mode 100644 index 000000000..87132de9c --- /dev/null +++ b/tools/observability/prometheus/prometheus.yml @@ -0,0 +1,12 @@ +global: + scrape_interval: 5s + evaluation_interval: 5s + +rule_files: + - /etc/prometheus/alerts.yml + +scrape_configs: + - job_name: "aevatar-otel-collector" + metrics_path: /metrics + static_configs: + - targets: ["otel-collector:9464"]