feat(aggregator): PR 11+15a — upstream-proxy rewrite + spec.suspended landing early#720
Open
QuentinBisson wants to merge 17 commits into
Open
feat(aggregator): PR 11+15a — upstream-proxy rewrite + spec.suspended landing early#720QuentinBisson wants to merge 17 commits into
QuentinBisson wants to merge 17 commits into
Conversation
… (PR 11/14)
muster's aggregator now dials agentgateway at <UpstreamProxy>/mcp/<server-name>
(streamable-http, regardless of spec.type) for every external MCPServer.
AggregatorConfig.UpstreamProxy is required; cluster mode reads
MUSTER_AGW_UPSTREAM_URL (Helm value muster.agentgateway.upstreamURL),
filesystem mode defaults to http://localhost:8080.
Federation registration is reconciler-driven via aggregator.RegisterUpstream /
DeregisterUpstream on every MCPServer reconcile / delete. With the dial path
unified through agentgateway, the orchestrator-driven MCPServer lifecycle
(processAutoStartMCPServers, createMCPServerService, handleAuthRequiredServer,
retryFailedMCPServers, waitForMCPServerRegistration, DisableMCPServerAutoStart)
and the now-unreachable aggregator manager paths (registerHealthyMCPServers,
EventHandler, retryFailedRegistrations) are deleted in the same PR.
WithDisableLocalSpawn no longer has anything to gate.
internal/services/aggregator/AggregatorService BaseService wrapper collapses
into a plain aggregator.NewAggregatorManager construction; runOrchestrator
supervises the manager directly. internal/services/aggregator/ package is gone.
Cluster-mode stdio MCPServers stay unsupported: upstream AgentgatewayBackend
CRD has no stdio variant (McpTarget is {Host, BackendRef, Port, Path,
Protocol}). ErrStdioNotSupportedInCluster and ConditionTypeNotSupportedInCluster
are untouched.
Token exchange, server-family grouping, and ADR-006 session-scoped tool
filtering continue to be handled by muster — agentgateway is transparent L7
middleware for tracing, audit, and metrics.
Net diff: ~550 insertions, ~4000 deletions across 21 files.
Adds the test surface that proves PR 11's OAuth pending-auth path still works under the new muster→agentgateway→upstream topology, and sets a sensible cluster-mode default so operators don't have to set MUSTER_AGW_UPSTREAM_URL by hand for the common case. - internal/testing/integration/agw_auth_proxy_test.go: dials a httptest server that emits the same RFC 9728 WWW-Authenticate format muster's protected_mcp_server mock uses, through a real agentgateway v1.2.1 subprocess; asserts the 401, the Bearer scheme, the resource_metadata parameter, and the upstream's full URL all survive the round-trip. Skipped unless MUSTER_AGW_BINARY is set (same gate as the existing subprocess-topology test). - internal/testing/scenarios/mcpserver-auth-required-through-agentgateway.yaml: end-to-end BDD scenario asserting an OAuth-protected MCPServer lands in auth_required state and surfaces the synthetic auth tool through muster's API once RegisterUpstream catches the proxied 401. - helm/muster/templates/_helpers.tpl + values.yaml: default the cluster-mode upstream URL to http://agentgateway.<release-namespace>.svc.cluster.local:8080 (Giant Swarm convention; operators with divergent topologies override via muster.agentgateway.upstreamURL).
… fallback PR 11's deletion of the orchestrator-driven MCPServer auto-start path left two gaps the BDD harness immediately surfaced: - The filesystem-mode reconciler uses fsnotify, which fires only on changes after Start. Pre-existing MCPServer YAMLs were invisible without processAutoStartMCPServers — every MCPServer the operator had defined before muster started never reconciled. - core_service_status (orchestrator adapter) queried the orchestrator's service registry, which now never contains MCPServer entries. Tools that depended on it (the agentgateway-subprocess-data-plane BDD scenario, any operator dashboard) reported "service not found" for healthy upstreams. Fixes: - Add reconciler.InitialSyncProvider — an optional Reconciler interface the Manager type-asserts on Start and uses to queue one reconcile per existing resource. MCPServerReconciler implements it through ListMCPServers(). - Orchestrator.GetServiceStatus and the orchestrator APIAdapter both fall back to the aggregator's UpstreamServerState when their service registry has no entry, mapping Connected → state=connected and AuthRequired → state=auth_required so core_service_status keeps reporting MCPServer state without resurrecting the deleted lifecycle path. - Tighten the mcpserver-auth-required-through-agentgateway BDD scenario to poll core_service_status (with wait_for_state) instead of asserting on core_mcpserver_get's spec-only response. Both BDD scenarios now pass: agentgateway-subprocess-data-plane (4/4 steps) mcpserver-auth-required-through-agentgateway (3/3 steps)
…itialSyncProvider The previous fix added an optional InitialSyncProvider interface so the Reconciler manager could ask each registered reconciler for its initial resource list. That seam is the wrong layer — the gap is filesystem-mode specific, parallel to how the Kubernetes informer surfaces existing resources via ADDED events from its initial list(). Move the initial scan into FilesystemDetector.Start: after setupWatches, walk each watched resource-type directory and emit a Create ChangeEvent for every YAML file present. Same outcome, no Reconciler interface extension, automatic coverage for any future reconciler. Drop: - reconciler.InitialSyncProvider interface - Manager.triggerInitialSync - MCPServerReconciler.InitialResources Both BDD scenarios still pass: agentgateway-subprocess-data-plane (4/4) mcpserver-auth-required-through-agentgateway (3/3)
Five fixes that together get the existing BDD suite back to a working state after PR 11's aggregator rewrite. - internal/testing/muster_manager.go: readiness check now requires each expected MCPServer to report a non-empty, ready-shaped state (Connected / Auth Required / Running / Connecting / Starting). The old "name appears in core_mcpserver_list" gate raced the reconciler — PR 11 surfaced this race by moving registration off the synchronous orchestrator path, but the harness was always too loose. - internal/aggregator/upstream.go: initializeWithConnectRetry wraps StreamableHTTPClient.Initialize in a 6-attempt × 250ms retry loop scoped to syscall.ECONNREFUSED. Rides out agentgateway's ~300ms file-watch reload window without dragging the reconcile manager through a full 1s+ backoff cycle. - internal/orchestrator/api_adapter.go: StartService/StopService/RestartService fall back to api.AggregatorHandler.RegisterUpstream / DeregisterUpstream when the name is not in the orchestrator registry. core_service_status fallback synthesises State=Stopped for known MCPServer CRDs whose aggregator state is Absent (e.g. after core_service_stop), so the lifecycle scenarios keep working. - internal/app/services.go: reserveFilesystemModeAgwPort picks a free TCP port for agentgateway at startup so parallel muster instances (BDD test harness, multiple local dev sessions) don't all collide on :8080. The port flows through AggregatorConfig.AgentgatewayListenerPort and yamlapply.WithListenerPort so the spawned agentgateway binds where UpstreamProxy expects. - internal/aggregator/manager.go: AgentgatewayListenerPort() accessor + RegisterUpstream calls initializeWithConnectRetry instead of bare Initialize. mcpserver suite: 33/58 → 49/58 passing. Remaining failures are timing sensitive lifecycle scenarios that need further investigation.
…_status The orchestrator-driven path used to surface stdio MCPServers as state=running (local subprocess) and remote types as state=connected. PR 11's aggregator view collapsed both onto UpstreamServerConnected; the fallback was unconditionally returning "connected" and breaking BDD scenarios that key on the legacy state names. Read info.Type via api.GetMCPServerManager().GetMCPServer(name) and: - stdio + Connected → running, stdio + Absent → stopped - remote + Connected → connected, remote + Absent → disconnected - either + AuthRequired → auth_required Matches the pre-PR-11 state-name convention so existing scenarios keep working without per-scenario edits.
PR 720 gosec G115: the *net.TCPAddr.Port field is int, but TCP ports fit in uint16. The kernel never returns out-of-range values for listen, but a defensive check + inline rationale silences the lint without adding a file-level golangci.yml exclusion.
… BDD agentgateway's admin / stats / readiness endpoints are hardcoded to :15000 / :15020 / :15021. With muster spawning agentgateway as a subprocess, parallel muster instances all tried to bind those ports; only the first won. The remaining instances' agentgateway processes crashed on startup, taking down the BDD harness's parallel runs. - internal/reconciler/agentgateway/yaml: LocalManagementConfig wraps adminAddr / statsAddr / readinessAddr under the agentgateway-native `config:` block (not top-level — the schema docs lied). WithAdminAddr option lets services.go pass per-instance overrides. - internal/config + internal/aggregator: AgentgatewayAdminPort / StatsPort / ReadinessPort flow through AggregatorConfig from muster's config.yaml to the yaml.Applier and the subprocess.Manager readiness probe. - internal/app/services.go: resolveFilesystemModeAgwPorts reserves four ports total (data + admin + stats + readiness) — explicit config values override the random reservation so the BDD harness can pick consistent per-instance ports. - internal/testing/muster_manager.go: harness reserves all four agentgateway ports up-front through portMu so concurrent muster instances don't collide. Search window grew from 100 to 1000 to accommodate the 5-port-per-instance footprint across long suites. mcpserver suite parallel × 4: 33/58 → 53/58 (~57% → ~91%).
…ate semantics The mcpserver BDD suite expected pre-PR-11 lifecycle semantics: - core_service_stop should be sticky against the reconciler's 30s pulse - core_service_list should reflect per-session SSO state changes Both gone with PR 11's reconciler-driven federation. The fixes: - AggregatorManager tracks a userStopped name set. MarkUserStopped records operator intent (added to api.AggregatorHandler), MarkUserStarted clears it, RegisterUpstream short-circuits when set. Orchestrator adapter's StartService / StopService / RestartService thread through these so core_service_stop survives the reconciler's next pass. - oauth-sso-state-sync-after-login expected state="connected" in core_service_list after SSO login. The global registration in PR 11 stays in pending-auth (per-session SSO lives in the session connection pool, not the global registry). Scenario updated to expect "auth_required" with a comment explaining the layering. mcpserver suite parallel × 4: 53/58 → 58/58 (100%).
Cascade after PR 717 was updated. PR 720's upstream-proxy refactor is the authoritative shape for aggregator + reconciler + orchestrator. PR 717 brings: teleport removal, ArgType enum on api, inline gosec nolint annotations, pinned-checksum binary resolver, mcp-go/otel adapter, dead event_handler.go + services/aggregator/ stay removed. Took PR 717's versions of api/oauth.go, oauth/manager.go, oauth/api_adapter.go, api/types.go, internal/cli/columns.go + mcp_output.go, internal/workflow/ operations.go + api_adapter.go, internal/agentgateway/binary/* (now uses pinnedChecksums), and the mock test files that lost ExchangeTokenForRemoteClusterWithClient. Took PR 720's versions of aggregator + reconciler + orchestrator changes. Added ReadHeaderTimeout to all aggregator/server.go http.Server blocks (consumes httpReadHeaderTimeout). Dropped dead agentgatewayDefaultAdminPort / agentgatewayDefaultStatsPort constants from internal/app/services.go.
PR 720's wholesale aggregator + reconciler + orchestrator refactor wins
for the architectural files. PR 717 contributes the teleport removal +
api.ArgType enum + binary/ pinned-checksum resolver + mcp-go/otel
adapter + .golangci.yml exclusions across the rest of the tree.
Resolution:
- Took PR 720 wholesale for internal/aggregator/ (keeps instrument/
subpackage; PR 717 had flattened it, but PR 720's manager.go + server.go
still depend on it).
- Took PR 717 versions of internal/api/{oauth.go,types.go},
internal/oauth/{manager.go,api_adapter.go},
internal/agentgateway/binary/* (pinnedChecksums),
internal/cli/{columns.go,mcp_output.go}, internal/workflow/
{operations.go,api_adapter.go}, internal/metatools/provider_test.go.
- Deleted internal/services/aggregator/api_adapter.go (PR 720 design).
- Deleted internal/aggregator/event_handler.go (PR 720 design).
- Dropped dead agentgatewayDefaultAdminPort/StatsPort constants from
internal/app/services.go.
go build, go test -race, golangci-lint: clean.
This was referenced May 19, 2026
Contributor
Author
|
Stop-conditions and the userStopped / readiness majors from the review are fixed in #733 (draft, stacked on this branch).
|
…place userStopped with spec.suspended (#733) PR 720 silently deleted spec.family server-grouping from the aggregator registry while the CRD field, API round-trip, and ten mcpserver-family-* BDD scenarios survived; commit 8ba23c1 then relaxed oauth-sso-state-sync-after-login from "connected" to "auth_required" to mask the post-SSO state-sync regression, and added an in-memory userStopped set to keep core_service_stop sticky against the reconciler's periodic pulse. This commit fixes the underlying behaviour instead of papering over it in scenarios or memory: - Restore the family-aware ServerRegistry from feat/translator-pr10-federation: serverFamilies / familyMappings / setServerFamilyLocked / familyExposedName / assembleExposedTools / IsFamilyTool / FamilyInstanceArgFor / GetToolServerNames / ResolveToolNameForServer. aggregator.Register and RegisterPendingAuth take ServerRegistration / PendingAuthRegistration structs again so Family is plumbed through; flat-form RegisterPendingAuth / RegisterPendingAuthWithConfig wrappers preserve the PR 720 test surface. AggregatorManager.RegisterUpstream and registerServerPendingAuthForUpstream feed info.Family into the registry on every reconciler-driven pulse, so family grouping is active without operator action. - Reinstate family dispatch in AggregatorServer.CallToolInternal: IsFamilyTool tools require the family.instanceArg parameter, strip it from the forwarded payload, and route via ResolveToolNameForServer. Solo and family paths share a new dispatchResolvedTool helper that consolidates the global-client vs session-scoped vs token-exchange branching. - Revert oauth-sso-state-sync-after-login.yaml step 7 back to expecting "connected"; the underlying state-sync now resolves at the source. AggregatorHandler gains UpstreamServerStateForSession(ctx, name), which promotes pending-auth servers to Connected when the calling session has a live pooled connection (AggregatorServer.HasPooledConnection exposes the SessionConnectionPool lookup). core_service_list / core_service_status thread context through GetAllServices / GetServiceStatus / mcpServerAPIStatusFromAggregator so the session-aware variant is used. - Introduce MCPServer.spec.suspended (CRD field, API field, reconciler short-circuit, filesystem adapter SetSuspended, generated CRD YAML) and drop the in-memory userStopped set, MarkUserStopped / MarkUserStarted, and isUserStopped check. core_service_stop / core_service_start now flip spec.suspended via MCPServerManagerHandler.SetSuspended and also dispatch Deregister/Register synchronously so the next status query reflects the change immediately; the reconciler's next pulse keeps the upstream gated as long as spec.suspended stays true, so operator intent survives muster restarts and concurrent reconciles. Verified end-to-end: muster test --concept mcpserver --parallel 4 is 65/65 (was 58/58 with the relaxation and family deletion masked).
…fault CHANGELOG: spec.suspended and UpstreamServerStateForSession are functional additions, not security fixes — move them from ### Security to ### Added. values.schema.json: the agentgateway.upstreamURL description claimed "Required in cluster mode; muster fails at startup when unset" but muster.agentgatewayUpstreamURL helper resolves an in-namespace fallback. Reflect the helper-default behaviour so the schema and the chart agree.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Step 11/16 of the agentgateway adapter series. Plan:
/home/quentin/.claude/plans/declarative-forging-lynx.md.Summary
muster's aggregator now dials agentgateway at
<UpstreamProxy>/mcp/<server-name>(streamable-http, regardless ofspec.type) for every external MCPServer. Federation registration is reconciler-driven viaaggregator.RegisterUpstream/DeregisterUpstreaminstead of orchestrator-drivenprocessAutoStartMCPServers. With the dial path unified through agentgateway, every component that lived only to support the orchestrator-spawned MCPServer service path is deleted in the same PR.This is the unblocking change for the muster-in-front-of-agentgateway intermediate topology: muster keeps token exchange, server-family grouping (
spec.family, #670), and ADR-006 session-scoped tool filtering; agentgateway becomes transparent L7 middleware for tracing/audit/metrics.Why this also ships PR 15's
spec.suspendedMCPServer.spec.suspendedis the persistence layer forcore_service_stop. Once the orchestrator stops owning MCPServer lifecycle (this PR), the previous in-memoryuserStoppedset on the aggregator has no home — operator intent would not survive a muster restart. Splitting the suspended flag out into PR 15 as originally scoped would have leftcore_service_stop/_start/_restarthalf-working against MCPServer names for one PR cycle. The CRD field,SetSuspended, the orchestrator-adapter Start/Stop/Restart branches, and theSuspendedreconcile branch land together here.Configuration
AggregatorConfig.UpstreamProxyis required at startup; muster fails fast when unset.!cfg.MusterConfig.Kubernetes): defaults tohttp://localhost:8080(the agentgateway subprocess listener, gated on PR 9's:15021/healthz/readyprobe).cfg.MusterConfig.Kubernetes): readsMUSTER_AGW_UPSTREAM_URLfrom the muster pod's environment. The Helm chart sources it frommuster.agentgateway.upstreamURL; when that value is unset the chart'smuster.agentgatewayUpstreamURLhelper falls back tohttp://agentgateway.<release-namespace>.svc.cluster.local:8080. Operators with a divergent agentgateway topology overridemuster.agentgateway.upstreamURLexplicitly.What landed
Aggregator
AggregatorConfig.UpstreamProxyfield +proxyURLForhelper.connection_helper.go(establishConnection,EstablishConnectionWithTokenForwarding,EstablishConnectionWithTokenExchange): dial URL substituted;serverInfo.URLretained for display/audit.AggregatorManager.RegisterUpstream/DeregisterUpstream/UpstreamServerStateplus interface entries onapi.AggregatorHandler.AggregatorManager.RegisterUpstreamhandles the OAuth pending-auth path via*mcpserver.AuthRequiredError→RegisterPendingAuthWithConfig(display URL staysinfo.URL; dial URL is the proxy).Reconciler
applyConfigsuccess →RegisterUpstream(name)whenAutoStart=true.reconcileDelete→DeregisterUpstream(name)+ applierDelete.applyStatusFromAggregatorreplumbs status sync to read fromapi.AggregatorHandler.UpstreamServerState(Connected / AuthRequired / Absent).spec.suspended=trueshort-circuits reconcile to a deregister + status sync; operator intent persists on the CRD instead of in-memory aggregator state.Dead code deleted in this PR
internal/orchestrator/orchestrator.go:processAutoStartMCPServers,createMCPServerService,handleAuthRequiredServer,retryFailedMCPServers,attemptReconnectFailedServers,shouldAttemptRetry,waitForMCPServerRegistration,Config.DisableMCPServerAutoStart. Net file shrinks from 527 → 232 LOC.internal/reconciler/mcpserver_reconciler.go:WithDisableLocalSpawn+disableLocalSpawnfield + every branch.orchestratorAPI+serviceRegistryparameters dropped from the constructors.internal/aggregator/manager.go:registerHealthyMCPServers,registerSingleServer,deregisterSingleServer,EventHandlerwiring,retryFailedRegistrations,attemptPendingRegistrations,ManualRefresh,isServerAuthRequired,isServerSSOBased,GetEventHandler.internal/aggregator/event_handler.go+event_handler_test.gogone.internal/services/aggregator/entire package. ItsAggregatorServiceBaseServicewrapper,onManagerErrorCallback,Restart,GetServiceData,service_test.go.internal/aggregator/api_adapter.go(new) wraps*AggregatorManagerdirectly;internal/app/services.goandrunOrchestratorconstruct/supervise the manager directly.Helm chart
helm/muster/templates/deployment.yaml:MUSTER_AGW_UPSTREAM_URLenv var alongsideK8S_*.helm/muster/templates/_helpers.tpl:muster.agentgatewayUpstreamURLtemplate with namespaced default.helm/muster/values.yaml:muster.agentgateway.upstreamURL(empty default → helper-resolved fallback).helm/muster/values.schema.jsonupdated to match.Cluster-mode stdio MCPServers stay unsupported (deviates from plan)
The plan promised cluster-mode stdio would unblock here "as a side effect" of the proxy rewrite. It does not. Upstream
AgentgatewayBackendCRD atcontroller/api/v1alpha1/agentgateway/agentgateway_backend_types.go:474has no stdio variant —McpTargetis{Host, BackendRef, Port, Path, Protocol}withMCPProtocol ∈ {StreamableHTTP, SSE}only. SoErrStdioNotSupportedInCluster+ConditionTypeNotSupportedInCluster(added in PR 3 / #692) stay in place; cluster-stdio remains a future-PR concern that needs upstream agentgateway support before muster can deliver it.Either the plan needs to drop the side-effect claim, or a follow-up has to land cluster-stdio through per-MCPServer pod isolation. Leaving the symbol in.
Net diff
55 files changed, +2,869 / -4,608. Larger than the original PR 11 envelope because (a) the orchestrator +
services/aggregatorexcisions that were going to be PR 13 + PR 14(d) are bundled here — they are tightly coupled to the dial-path rewrite and splitting them would have required keeping dead code alive across two PRs — and (b)spec.suspended(originally PR 15) lands now for the reason above.Verification evidence
make test -raceclean acrossinternal/aggregator/...,internal/reconciler/...,internal/app/...,internal/orchestrator/....make helm-testclean (64 tests across 6 suites).make helm-lintclean.go vet ./...clean.internal/aggregator/upstream_test.go(proxyURLFor),internal/reconciler/mcpserver_reconciler_test.go(RegisterUpstream on AutoStart=true, skipped on AutoStart=false, NotSupportedInCluster on stdio sentinel, DeregisterUpstream on delete, register-failure requeue, AuthRequired mapping from aggregator state).Open items the reviewer should weigh in on
WWW-Authenticateround-trip through agentgateway v1.2.1. The OAuth pending-auth path relies on the 401 +resource_metadatareaching muster verbatim. The PR 16 (docs: translator series final state (PR 16/16) #724) BDD scenarios assert this; if agentgateway strips the header, we will need an upstream issue / workaround before merging.MCPServer.status.consecutiveFailures/lastAttempt/nextRetryAfterbecome permanently zero (the per-service retry state machine is gone). Surfaced in the CHANGELOG; if a Grafana dashboard or downstream automation depends on these fields, that consumer needs a migration plan.core_service_start/stop/restartadapter fall-through atinternal/orchestrator/api_adapter.go:62-110silently returns toa.orchestrator.StartService(name)when the MCPServer manager lookup fails, masking the "service not found" error path. Worth a follow-up to surfaceapi.ErrServiceNotFoundexplicitly.Stacked PRs above this one (the remaining /16 plan)
Applier+Deleterinto one port, single reconciler constructor, liftOwnerRefForto a free function.subprocess.Manager.Reload,WithListenerName, factory dispatch, SSE/stdio client packages).core_service_{start,stop,restart}against MCPServer names; renamerestart→core_mcpserver_reconnect. (Thespec.suspendedfield itself lands in this PR — see "Why this also ships PR 15'sspec.suspended" above.)Branch base
Cut off
origin/feat/translator-pr10-federation(the cumulative state of PRs 1-10 of the agentgateway-as-subprocess series); rebase tomainonce #714 (PR 9/16) and #717 (PR 10/16) merge.