Skip to content

Commit 3592719

Browse files
committed
[diag] add agent-side debug RPC over existing QUIC control channel
Adds a long-lived bidirectional nd-json RPC stream from tunnelproxy to each connected agent on the existing QUIC connection (no new listener, no new PKI). The agent dispatches incoming Request frames against a process-scoped Registry of read-only probes; auth is inherited from the QUIC mTLS to tunnelproxy. Wire format (pkg/diag/protocol): Down (server -> agent): {"id":N, "command":"<name>", "args":{...}} Up (agent -> server): {"id":N, "result":{...}} | {"id":N, "error":{code,message}} | {"id":N, "chunk":{...}} ... {"id":N, "done":true} Tier-1 commands: agent - version, build, runtime, uptime, flag inventory, manifest of every command clock - wall + monotonic + zone (clock-skew sanity check) Server-side per-agent Sessions registry exposed via TunnelServer.DiagSessions(); apoxy-cloud's tunnelproxy reaches it through TunnelNodeReconciler.DiagSessions() (separate PR). Wired into both reconciler call sites: - pkg/cmd/run/tunnel.go (embedded-agent path in apiserver/backplane) - pkg/cmd/tunnel/run.go (CLI 'apoxy tunnel run') Tests: pkg/diag covers protocol round-trip via io.Pipe (TestEndToEnd exercises Session<->Dispatcher with the real agent + clock commands) plus dispatcher unit tests for ceiling/busy/streaming/unknown-command.
1 parent 8fe2939 commit 3592719

15 files changed

Lines changed: 1304 additions & 10 deletions

File tree

pkg/cmd/run/tunnel.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ import (
3535
configv1alpha1 "github.com/apoxy-dev/apoxy/api/config/v1alpha1"
3636
corev1alpha "github.com/apoxy-dev/apoxy/api/core/v1alpha"
3737
"github.com/apoxy-dev/apoxy/client/versioned"
38+
"github.com/apoxy-dev/apoxy/pkg/diag"
39+
"github.com/apoxy-dev/apoxy/pkg/diag/commands"
3840
"github.com/apoxy-dev/apoxy/pkg/log"
3941
"github.com/apoxy-dev/apoxy/pkg/net/dns"
4042
"github.com/apoxy-dev/apoxy/pkg/tunnel"
@@ -182,6 +184,9 @@ type runtimeTunnelReconciler struct {
182184
lastAddresses []string
183185
lastSelected string
184186

187+
// Process-scoped diag command registry, shared across reconnects.
188+
diagRegistry *diag.Registry
189+
185190
// Dial parameters protected by dialMu.
186191
dialMu sync.RWMutex
187192
tunnelUID uuid.UUID
@@ -239,6 +244,9 @@ func (r *runtimeTunnelReconciler) reconcile(ctx context.Context, req ctrl.Reques
239244
if len(r.tunCfg.Labels) > 0 {
240245
cOpts = append(cOpts, tunnel.WithLabels(r.tunCfg.Labels))
241246
}
247+
if r.diagRegistry != nil {
248+
cOpts = append(cOpts, tunnel.WithDiagRegistry(r.diagRegistry))
249+
}
242250

243251
var srvAddr string
244252
if !r.cfg.IsLocalMode {
@@ -497,6 +505,7 @@ func runTunnel(ctx context.Context, cfg *configv1alpha1.Config, tc *configv1alph
497505
endpointSelector: selector,
498506
tunConns: make([]*runtimeTunConn, 0),
499507
tunDialer: &tunnel.TunnelDialer{Router: r},
508+
diagRegistry: commands.NewDefaultRegistry(),
500509
}
501510

502511
if err := rec.setupWithManager(mgr, tn.Name); err != nil {

pkg/cmd/tunnel/run.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ import (
4040
"github.com/apoxy-dev/apoxy/config"
4141
"github.com/apoxy-dev/apoxy/pkg/cmd/tunnel/tui"
4242
"github.com/apoxy-dev/apoxy/pkg/cmd/utils"
43+
"github.com/apoxy-dev/apoxy/pkg/diag"
44+
"github.com/apoxy-dev/apoxy/pkg/diag/commands"
4345
"github.com/apoxy-dev/apoxy/pkg/log"
4446
"github.com/apoxy-dev/apoxy/pkg/net/dns"
4547
"github.com/apoxy-dev/apoxy/pkg/tunnel"
@@ -116,6 +118,9 @@ type tunnelNodeReconciler struct {
116118
// Endpoint selector for choosing tunnel server addresses.
117119
endpointSelector endpointselect.Selector
118120

121+
// Process-scoped diag command registry, shared across reconnects.
122+
diagRegistry *diag.Registry
123+
119124
// Cached endpoint selection — only re-probe when addresses change.
120125
lastAddresses []string
121126
lastSelected string
@@ -209,6 +214,7 @@ var tunnelRunCmd = &cobra.Command{
209214
cfg: cfg,
210215
a3y: a3y,
211216
endpointSelector: selector,
217+
diagRegistry: commands.NewDefaultRegistry(),
212218

213219
tunDialerWorkers: make([]*tunConn, 0),
214220
}
@@ -442,6 +448,9 @@ func (t *tunnelNodeReconciler) reconcile(ctx context.Context, req ctrl.Request)
442448
} else {
443449
cOpts = append(cOpts, tunnel.WithAuthToken(tunnelNode.Status.Credentials.Token))
444450
}
451+
if t.diagRegistry != nil {
452+
cOpts = append(cOpts, tunnel.WithDiagRegistry(t.diagRegistry))
453+
}
445454

446455
var srvAddr string
447456
var srvAddrs []string

pkg/diag/command.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Package diag implements the agent-side debug surface that runs over
2+
// the existing QUIC control channel to tunnelproxy.
3+
package diag
4+
5+
import (
6+
"context"
7+
"encoding/json"
8+
"fmt"
9+
"sort"
10+
)
11+
12+
func sortSpecs(s []Spec) {
13+
sort.Slice(s, func(i, j int) bool { return s[i].Name < s[j].Name })
14+
}
15+
16+
// ArgType is the wire-format token for an argument's type.
17+
type ArgType string
18+
19+
const (
20+
ArgTypeString ArgType = "string"
21+
ArgTypeInt ArgType = "int"
22+
ArgTypeBool ArgType = "bool"
23+
ArgTypeDuration ArgType = "duration"
24+
)
25+
26+
// ArgSpec describes one argument of a Command. It is serialized into
27+
// the manifest returned by the built-in `agent` command so an operator
28+
// can discover the surface with a single GET.
29+
type ArgSpec struct {
30+
Type ArgType `json:"type"`
31+
Required bool `json:"required,omitempty"`
32+
Default any `json:"default,omitempty"`
33+
Max any `json:"max,omitempty"`
34+
Min any `json:"min,omitempty"`
35+
Desc string `json:"description,omitempty"`
36+
}
37+
38+
// Spec is the manifest entry for a single Command.
39+
type Spec struct {
40+
Name string `json:"name"`
41+
Desc string `json:"description,omitempty"`
42+
Args map[string]ArgSpec `json:"args,omitempty"`
43+
Streams bool `json:"streams,omitempty"`
44+
CeilingMs int `json:"ceiling_ms"`
45+
}
46+
47+
// Emitter is what a streaming Command writes its chunks to. It is
48+
// supplied by the dispatcher; Commands MUST NOT retain a reference
49+
// past their Run call.
50+
type Emitter interface {
51+
// Chunk emits one streaming frame upstream.
52+
Chunk(v any) error
53+
}
54+
55+
// Command is the contract every probe implements. Non-streaming
56+
// commands return a JSON-serializable Result and ignore the Emitter.
57+
// Streaming commands return Result == nil and emit chunks via e until
58+
// Run returns.
59+
type Command interface {
60+
// Spec returns the manifest entry for this command. It is called
61+
// once at registration; the result must be safe to share.
62+
Spec() Spec
63+
64+
// Run executes the command. args is the raw `args` field from the
65+
// Request; implementations decode it as needed. ctx carries the
66+
// dispatcher-imposed wall-clock ceiling and is cancelled when the
67+
// stream goes away.
68+
Run(ctx context.Context, args json.RawMessage, e Emitter) (result any, err error)
69+
}
70+
71+
// Registry holds the set of registered commands. It is built once at
72+
// startup and treated as immutable thereafter.
73+
type Registry struct {
74+
cmds map[string]Command
75+
}
76+
77+
// NewRegistry returns an empty Registry.
78+
func NewRegistry() *Registry { return &Registry{cmds: map[string]Command{}} }
79+
80+
// Register adds c to r. Panics on duplicate name — registration is a
81+
// startup-time operation.
82+
func (r *Registry) Register(c Command) {
83+
name := c.Spec().Name
84+
if _, ok := r.cmds[name]; ok {
85+
panic(fmt.Sprintf("diag: duplicate command %q", name))
86+
}
87+
r.cmds[name] = c
88+
}
89+
90+
// Lookup returns the command with the given name and whether it
91+
// exists.
92+
func (r *Registry) Lookup(name string) (Command, bool) {
93+
c, ok := r.cmds[name]
94+
return c, ok
95+
}
96+
97+
// Specs returns the manifest entries for every registered command,
98+
// sorted by name. Used by the built-in `agent` command.
99+
func (r *Registry) Specs() []Spec {
100+
out := make([]Spec, 0, len(r.cmds))
101+
for _, c := range r.cmds {
102+
out = append(out, c.Spec())
103+
}
104+
// Stable order so manifest diffs are reviewable.
105+
sortSpecs(out)
106+
return out
107+
}

pkg/diag/commands/agent.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Package commands holds the built-in diag commands. Each file
2+
// registers one command; the dispatcher composes them into a Registry.
3+
package commands
4+
5+
import (
6+
"context"
7+
"encoding/json"
8+
"flag"
9+
"runtime"
10+
"time"
11+
12+
"github.com/apoxy-dev/apoxy/build"
13+
"github.com/apoxy-dev/apoxy/pkg/diag"
14+
)
15+
16+
// Agent returns a Command that reports static + slow-changing agent
17+
// state and the manifest of every other command in r. It is the first
18+
// command an operator runs against an unfamiliar agent.
19+
func Agent(r *diag.Registry) diag.Command { return &agentCmd{r: r, started: time.Now()} }
20+
21+
type agentCmd struct {
22+
r *diag.Registry
23+
started time.Time
24+
}
25+
26+
func (a *agentCmd) Spec() diag.Spec {
27+
return diag.Spec{
28+
Name: "agent",
29+
Desc: "Agent identity, build, runtime, and the manifest of every available diag command.",
30+
CeilingMs: 5_000,
31+
}
32+
}
33+
34+
type agentResult struct {
35+
Version string `json:"version"`
36+
BuildDate string `json:"build_date"`
37+
CommitHash string `json:"commit_hash"`
38+
GoVersion string `json:"go_version"`
39+
GOOS string `json:"goos"`
40+
GOARCH string `json:"goarch"`
41+
NumCPU int `json:"num_cpu"`
42+
UptimeSec float64 `json:"uptime_sec"`
43+
Flags []flagState `json:"flags"`
44+
Commands []diag.Spec `json:"commands"`
45+
}
46+
47+
type flagState struct {
48+
Name string `json:"name"`
49+
Value string `json:"value"`
50+
Default string `json:"default"`
51+
}
52+
53+
func (a *agentCmd) Run(_ context.Context, _ json.RawMessage, _ diag.Emitter) (any, error) {
54+
return agentResult{
55+
Version: build.BuildVersion,
56+
BuildDate: build.BuildDate,
57+
CommitHash: build.CommitHash,
58+
GoVersion: runtime.Version(),
59+
GOOS: runtime.GOOS,
60+
GOARCH: runtime.GOARCH,
61+
NumCPU: runtime.NumCPU(),
62+
UptimeSec: time.Since(a.started).Seconds(),
63+
Flags: collectFlags(),
64+
Commands: a.r.Specs(),
65+
}, nil
66+
}
67+
68+
func collectFlags() []flagState {
69+
var out []flagState
70+
flag.VisitAll(func(f *flag.Flag) {
71+
out = append(out, flagState{Name: f.Name, Value: f.Value.String(), Default: f.DefValue})
72+
})
73+
return out
74+
}

pkg/diag/commands/clock.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package commands
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"runtime"
7+
"time"
8+
9+
"github.com/apoxy-dev/apoxy/pkg/diag"
10+
)
11+
12+
// Clock returns a Command that reports the agent's current notion of
13+
// time. Useful for diagnosing TLS / JWT failures rooted in clock skew.
14+
func Clock() diag.Command { return clockCmd{} }
15+
16+
type clockCmd struct{}
17+
18+
func (clockCmd) Spec() diag.Spec {
19+
return diag.Spec{
20+
Name: "clock",
21+
Desc: "Wall, monotonic, and runtime-reported time as observed by the agent.",
22+
CeilingMs: 1_000,
23+
}
24+
}
25+
26+
type clockResult struct {
27+
WallUnixNs int64 `json:"wall_unix_ns"`
28+
WallRFC3339 string `json:"wall_rfc3339"`
29+
MonotonicNs int64 `json:"monotonic_ns"`
30+
TimezoneName string `json:"timezone_name"`
31+
TimezoneOffS int `json:"timezone_offset_s"`
32+
GoVersion string `json:"go_version"`
33+
}
34+
35+
func (clockCmd) Run(_ context.Context, _ json.RawMessage, _ diag.Emitter) (any, error) {
36+
now := time.Now()
37+
zoneName, zoneOff := now.Zone()
38+
// time.Since(epoch) yields the monotonic delta; we report it raw so
39+
// the caller can compare two reads without trusting the wall clock.
40+
mono := time.Since(time.Unix(0, 0))
41+
return clockResult{
42+
WallUnixNs: now.UnixNano(),
43+
WallRFC3339: now.UTC().Format(time.RFC3339Nano),
44+
MonotonicNs: int64(mono),
45+
TimezoneName: zoneName,
46+
TimezoneOffS: zoneOff,
47+
GoVersion: runtime.Version(),
48+
}, nil
49+
}

pkg/diag/commands/register.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package commands
2+
3+
import "github.com/apoxy-dev/apoxy/pkg/diag"
4+
5+
// RegisterAll registers every built-in diag command into r. The agent
6+
// main calls this once at startup with a fresh registry. New commands
7+
// just add a single line here.
8+
//
9+
// Order does not matter for correctness — Registry sorts the manifest
10+
// — but listing in dependency order helps reviewers see what's new.
11+
func RegisterAll(r *diag.Registry) {
12+
r.Register(Agent(r))
13+
r.Register(Clock())
14+
}
15+
16+
// NewDefaultRegistry returns a fresh Registry with every built-in
17+
// command pre-registered. Equivalent to NewRegistry + RegisterAll.
18+
func NewDefaultRegistry() *diag.Registry {
19+
r := diag.NewRegistry()
20+
RegisterAll(r)
21+
return r
22+
}

0 commit comments

Comments
 (0)