Skip to content

Commit 1894a0b

Browse files
committed
[tunnel] add agent metrics, overlay scraper, and re-export collector
Add Prometheus metrics to the tunnel agent (info, uptime, per-protocol packet/byte counters, BFD heartbeats, DNS query/cache stats, mirror reconciler counters) and a scraper + re-export system for tunnelproxy. Each connected agent gets a dedicated scrape goroutine that polls its /metrics endpoint through the overlay network. Results flow through a buffered channel to a single aggregator goroutine, avoiding concurrent map writes. The ReexportCollector serves the aggregated metrics on the tunnelproxy's /metrics endpoint with tunnel_node/agent/project_id labels injected. The agent advertises its metrics port to the server via a connection label (CONNECT-IP) or ConnectRequest field (relay API), so the scraper knows which port to hit regardless of the configured --metrics-addr. Per-protocol counters on the packet hot path use pre-resolved prometheus.Counter objects to avoid WithLabelValues map lookups on every packet.
1 parent 938c50f commit 1894a0b

File tree

16 files changed

+1048
-19
lines changed

16 files changed

+1048
-19
lines changed

pkg/cmd/tunnel/run.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ import (
66
"io"
77
"log/slog"
88
"math/rand"
9-
"slices"
9+
"net"
1010
"net/http"
1111
"net/netip"
1212
"os"
13+
"slices"
14+
"strconv"
1315
"sync"
1416
"time"
1517

@@ -476,6 +478,13 @@ func (t *tunnelNodeReconciler) reconcile(ctx context.Context, req ctrl.Request)
476478
if t.cfg.IsLocalMode || insecureSkipVerify {
477479
cOpts = append(cOpts, tunnel.WithInsecureSkipVerify(true))
478480
}
481+
// Advertise the metrics port so the server can scrape agent metrics
482+
// through the overlay network.
483+
if _, portStr, err := net.SplitHostPort(metricsAddr); err == nil {
484+
if p, err := strconv.Atoi(portStr); err == nil && p > 0 {
485+
cOpts = append(cOpts, tunnel.WithClientMetricsPort(p))
486+
}
487+
}
479488

480489
t.dialMu.Lock()
481490
t.tunnelUID = tnUUID

pkg/kube-controller/controllers/mirror.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,18 +286,23 @@ func (r *MirrorReconciler) syncGateway(ctx context.Context, gw *gwapiv1.Gateway)
286286
if apierrors.IsNotFound(err) {
287287
log.Infof("Mirror: creating Gateway %s (from %s/%s)", apoxyName, gw.Namespace, gw.Name)
288288
if _, err := r.apoxyClient.GatewayV1().Gateways().Create(ctx, apoxy, metav1.CreateOptions{}); err != nil {
289+
MirrorSyncErrors.WithLabelValues("Gateway").Inc()
289290
return reconcile.Result{}, fmt.Errorf("creating Apoxy Gateway %s: %w", apoxyName, err)
290291
}
292+
MirrorSyncedResources.WithLabelValues("Gateway").Inc()
291293
return reconcile.Result{}, nil
292294
} else if err != nil {
295+
MirrorSyncErrors.WithLabelValues("Gateway").Inc()
293296
return reconcile.Result{}, fmt.Errorf("getting Apoxy Gateway %s: %w", apoxyName, err)
294297
}
295298

296299
apoxy.ResourceVersion = existing.ResourceVersion
297300
log.Infof("Mirror: updating Gateway %s (from %s/%s)", apoxyName, gw.Namespace, gw.Name)
298301
if _, err := r.apoxyClient.GatewayV1().Gateways().Update(ctx, apoxy, metav1.UpdateOptions{}); err != nil {
302+
MirrorSyncErrors.WithLabelValues("Gateway").Inc()
299303
return reconcile.Result{}, fmt.Errorf("updating Apoxy Gateway %s: %w", apoxyName, err)
300304
}
305+
MirrorSyncedResources.WithLabelValues("Gateway").Inc()
301306
return reconcile.Result{}, nil
302307
}
303308

@@ -348,18 +353,23 @@ func (r *MirrorReconciler) syncHTTPRoute(ctx context.Context, route *gwapiv1.HTT
348353
if apierrors.IsNotFound(err) {
349354
log.Infof("Mirror: creating HTTPRoute %s (from %s/%s)", apoxyName, route.Namespace, route.Name)
350355
if _, err := r.apoxyClient.GatewayV1().HTTPRoutes().Create(ctx, apoxy, metav1.CreateOptions{}); err != nil {
356+
MirrorSyncErrors.WithLabelValues("HTTPRoute").Inc()
351357
return reconcile.Result{}, fmt.Errorf("creating Apoxy HTTPRoute %s: %w", apoxyName, err)
352358
}
359+
MirrorSyncedResources.WithLabelValues("HTTPRoute").Inc()
353360
return reconcile.Result{}, nil
354361
} else if err != nil {
362+
MirrorSyncErrors.WithLabelValues("HTTPRoute").Inc()
355363
return reconcile.Result{}, fmt.Errorf("getting Apoxy HTTPRoute %s: %w", apoxyName, err)
356364
}
357365

358366
apoxy.ResourceVersion = existing.ResourceVersion
359367
log.Infof("Mirror: updating HTTPRoute %s (from %s/%s)", apoxyName, route.Namespace, route.Name)
360368
if _, err := r.apoxyClient.GatewayV1().HTTPRoutes().Update(ctx, apoxy, metav1.UpdateOptions{}); err != nil {
369+
MirrorSyncErrors.WithLabelValues("HTTPRoute").Inc()
361370
return reconcile.Result{}, fmt.Errorf("updating Apoxy HTTPRoute %s: %w", apoxyName, err)
362371
}
372+
MirrorSyncedResources.WithLabelValues("HTTPRoute").Inc()
363373
return reconcile.Result{}, nil
364374
}
365375

@@ -410,18 +420,23 @@ func (r *MirrorReconciler) syncGRPCRoute(ctx context.Context, route *gwapiv1.GRP
410420
if apierrors.IsNotFound(err) {
411421
log.Infof("Mirror: creating GRPCRoute %s (from %s/%s)", apoxyName, route.Namespace, route.Name)
412422
if _, err := r.apoxyClient.GatewayV1().GRPCRoutes().Create(ctx, apoxy, metav1.CreateOptions{}); err != nil {
423+
MirrorSyncErrors.WithLabelValues("GRPCRoute").Inc()
413424
return reconcile.Result{}, fmt.Errorf("creating Apoxy GRPCRoute %s: %w", apoxyName, err)
414425
}
426+
MirrorSyncedResources.WithLabelValues("GRPCRoute").Inc()
415427
return reconcile.Result{}, nil
416428
} else if err != nil {
429+
MirrorSyncErrors.WithLabelValues("GRPCRoute").Inc()
417430
return reconcile.Result{}, fmt.Errorf("getting Apoxy GRPCRoute %s: %w", apoxyName, err)
418431
}
419432

420433
apoxy.ResourceVersion = existing.ResourceVersion
421434
log.Infof("Mirror: updating GRPCRoute %s (from %s/%s)", apoxyName, route.Namespace, route.Name)
422435
if _, err := r.apoxyClient.GatewayV1().GRPCRoutes().Update(ctx, apoxy, metav1.UpdateOptions{}); err != nil {
436+
MirrorSyncErrors.WithLabelValues("GRPCRoute").Inc()
423437
return reconcile.Result{}, fmt.Errorf("updating Apoxy GRPCRoute %s: %w", apoxyName, err)
424438
}
439+
MirrorSyncedResources.WithLabelValues("GRPCRoute").Inc()
425440
return reconcile.Result{}, nil
426441
}
427442

@@ -472,18 +487,23 @@ func (r *MirrorReconciler) syncTCPRoute(ctx context.Context, route *gwapiv1alpha
472487
if apierrors.IsNotFound(err) {
473488
log.Infof("Mirror: creating TCPRoute %s (from %s/%s)", apoxyName, route.Namespace, route.Name)
474489
if _, err := r.apoxyClient.GatewayV1alpha2().TCPRoutes().Create(ctx, apoxy, metav1.CreateOptions{}); err != nil {
490+
MirrorSyncErrors.WithLabelValues("TCPRoute").Inc()
475491
return reconcile.Result{}, fmt.Errorf("creating Apoxy TCPRoute %s: %w", apoxyName, err)
476492
}
493+
MirrorSyncedResources.WithLabelValues("TCPRoute").Inc()
477494
return reconcile.Result{}, nil
478495
} else if err != nil {
496+
MirrorSyncErrors.WithLabelValues("TCPRoute").Inc()
479497
return reconcile.Result{}, fmt.Errorf("getting Apoxy TCPRoute %s: %w", apoxyName, err)
480498
}
481499

482500
apoxy.ResourceVersion = existing.ResourceVersion
483501
log.Infof("Mirror: updating TCPRoute %s (from %s/%s)", apoxyName, route.Namespace, route.Name)
484502
if _, err := r.apoxyClient.GatewayV1alpha2().TCPRoutes().Update(ctx, apoxy, metav1.UpdateOptions{}); err != nil {
503+
MirrorSyncErrors.WithLabelValues("TCPRoute").Inc()
485504
return reconcile.Result{}, fmt.Errorf("updating Apoxy TCPRoute %s: %w", apoxyName, err)
486505
}
506+
MirrorSyncedResources.WithLabelValues("TCPRoute").Inc()
487507
return reconcile.Result{}, nil
488508
}
489509

@@ -534,18 +554,23 @@ func (r *MirrorReconciler) syncTLSRoute(ctx context.Context, route *gwapiv1alpha
534554
if apierrors.IsNotFound(err) {
535555
log.Infof("Mirror: creating TLSRoute %s (from %s/%s)", apoxyName, route.Namespace, route.Name)
536556
if _, err := r.apoxyClient.GatewayV1alpha2().TLSRoutes().Create(ctx, apoxy, metav1.CreateOptions{}); err != nil {
557+
MirrorSyncErrors.WithLabelValues("TLSRoute").Inc()
537558
return reconcile.Result{}, fmt.Errorf("creating Apoxy TLSRoute %s: %w", apoxyName, err)
538559
}
560+
MirrorSyncedResources.WithLabelValues("TLSRoute").Inc()
539561
return reconcile.Result{}, nil
540562
} else if err != nil {
563+
MirrorSyncErrors.WithLabelValues("TLSRoute").Inc()
541564
return reconcile.Result{}, fmt.Errorf("getting Apoxy TLSRoute %s: %w", apoxyName, err)
542565
}
543566

544567
apoxy.ResourceVersion = existing.ResourceVersion
545568
log.Infof("Mirror: updating TLSRoute %s (from %s/%s)", apoxyName, route.Namespace, route.Name)
546569
if _, err := r.apoxyClient.GatewayV1alpha2().TLSRoutes().Update(ctx, apoxy, metav1.UpdateOptions{}); err != nil {
570+
MirrorSyncErrors.WithLabelValues("TLSRoute").Inc()
547571
return reconcile.Result{}, fmt.Errorf("updating Apoxy TLSRoute %s: %w", apoxyName, err)
548572
}
573+
MirrorSyncedResources.WithLabelValues("TLSRoute").Inc()
549574
return reconcile.Result{}, nil
550575
}
551576

@@ -596,18 +621,23 @@ func (r *MirrorReconciler) syncUDPRoute(ctx context.Context, route *gwapiv1alpha
596621
if apierrors.IsNotFound(err) {
597622
log.Infof("Mirror: creating UDPRoute %s (from %s/%s)", apoxyName, route.Namespace, route.Name)
598623
if _, err := r.apoxyClient.GatewayV1alpha2().UDPRoutes().Create(ctx, apoxy, metav1.CreateOptions{}); err != nil {
624+
MirrorSyncErrors.WithLabelValues("UDPRoute").Inc()
599625
return reconcile.Result{}, fmt.Errorf("creating Apoxy UDPRoute %s: %w", apoxyName, err)
600626
}
627+
MirrorSyncedResources.WithLabelValues("UDPRoute").Inc()
601628
return reconcile.Result{}, nil
602629
} else if err != nil {
630+
MirrorSyncErrors.WithLabelValues("UDPRoute").Inc()
603631
return reconcile.Result{}, fmt.Errorf("getting Apoxy UDPRoute %s: %w", apoxyName, err)
604632
}
605633

606634
apoxy.ResourceVersion = existing.ResourceVersion
607635
log.Infof("Mirror: updating UDPRoute %s (from %s/%s)", apoxyName, route.Namespace, route.Name)
608636
if _, err := r.apoxyClient.GatewayV1alpha2().UDPRoutes().Update(ctx, apoxy, metav1.UpdateOptions{}); err != nil {
637+
MirrorSyncErrors.WithLabelValues("UDPRoute").Inc()
609638
return reconcile.Result{}, fmt.Errorf("updating Apoxy UDPRoute %s: %w", apoxyName, err)
610639
}
640+
MirrorSyncedResources.WithLabelValues("UDPRoute").Inc()
611641
return reconcile.Result{}, nil
612642
}
613643

@@ -636,6 +666,7 @@ func (r *MirrorReconciler) RunHeartbeat(ctx context.Context, namespace string) e
636666
for {
637667
if err := r.renewLease(ctx, namespace, leaseName, durationSecs); err != nil {
638668
log.Errorf("Mirror heartbeat: failed to renew lease %s: %v", leaseName, err)
669+
MirrorHeartbeatFailures.Inc()
639670
}
640671

641672
select {
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package controllers
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
"sigs.k8s.io/controller-runtime/pkg/metrics"
6+
)
7+
8+
var (
9+
// MirrorSyncedResources counts successful resource sync operations.
10+
MirrorSyncedResources = prometheus.NewCounterVec(
11+
prometheus.CounterOpts{
12+
Name: "tunnel_mirror_synced_resources_total",
13+
Help: "Total mirror resource sync operations that succeeded.",
14+
},
15+
[]string{"resource_type"},
16+
)
17+
// MirrorSyncErrors counts failed resource sync operations.
18+
MirrorSyncErrors = prometheus.NewCounterVec(
19+
prometheus.CounterOpts{
20+
Name: "tunnel_mirror_sync_errors_total",
21+
Help: "Total mirror resource sync operations that failed.",
22+
},
23+
[]string{"resource_type"},
24+
)
25+
// MirrorHeartbeatFailures counts heartbeat lease renewal failures.
26+
MirrorHeartbeatFailures = prometheus.NewCounter(
27+
prometheus.CounterOpts{
28+
Name: "tunnel_mirror_heartbeat_failures_total",
29+
Help: "Total mirror heartbeat lease renewal failures.",
30+
},
31+
)
32+
)
33+
34+
func init() {
35+
metrics.Registry.MustRegister(
36+
MirrorSyncedResources,
37+
MirrorSyncErrors,
38+
MirrorHeartbeatFailures,
39+
)
40+
}

pkg/tunnel/api/client.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ import (
1919
)
2020

2121
type Client struct {
22-
http *http.Client
23-
h3 *http3.Transport
24-
baseURL *url.URL
25-
tunnelName string
26-
token string
27-
agent string
22+
http *http.Client
23+
h3 *http3.Transport
24+
baseURL *url.URL
25+
tunnelName string
26+
token string
27+
agent string
28+
metricsPort int
2829
}
2930

3031
type ClientOptions struct {
@@ -44,6 +45,9 @@ type ClientOptions struct {
4445
// PacketConn is an optional UDP PacketConn to use for QUIC connections.
4546
// If nil, a new UDP socket will be created for each connection.
4647
PacketConn net.PacketConn
48+
// MetricsPort is the port the agent's Prometheus metrics server listens on.
49+
// Advertised to the relay so it can scrape metrics through the overlay.
50+
MetricsPort int
4751
}
4852

4953
func NewClient(opts ClientOptions) (*Client, error) {
@@ -104,12 +108,13 @@ func NewClient(opts ClientOptions) (*Client, error) {
104108
}
105109

106110
return &Client{
107-
http: hc,
108-
h3: t,
109-
baseURL: u,
110-
tunnelName: opts.TunnelName,
111-
token: opts.Token,
112-
agent: opts.Agent,
111+
http: hc,
112+
h3: t,
113+
baseURL: u,
114+
tunnelName: opts.TunnelName,
115+
token: opts.Token,
116+
agent: opts.Agent,
117+
metricsPort: opts.MetricsPort,
113118
}, nil
114119
}
115120

@@ -119,7 +124,7 @@ func (c *Client) Close() error {
119124

120125
// Connect to the relay and establish a new tunnel connection.
121126
func (c *Client) Connect(ctx context.Context) (*ConnectResponse, error) {
122-
reqBody := ConnectRequest{Agent: c.agent}
127+
reqBody := ConnectRequest{Agent: c.agent, MetricsPort: c.metricsPort}
123128
var resp ConnectResponse
124129
if err := c.doJSON(ctx, http.MethodPost, c.path("/v1/tunnel/"+c.tunnelName), reqBody, &resp, http.StatusCreated); err != nil {
125130
return nil, err

pkg/tunnel/api/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ type Request struct {
1616
type ConnectRequest struct {
1717
// Agent is the name of the agent.
1818
Agent string `json:"agent"`
19+
// MetricsPort is the port the agent's Prometheus metrics server listens on.
20+
// 0 means the agent does not expose metrics.
21+
MetricsPort int `json:"metricsPort,omitempty"`
1922
}
2023

2124
type ConnectResponse struct {

pkg/tunnel/bfdl/metrics.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,16 @@ var (
6767
},
6868
[]string{"role", "direction"},
6969
)
70+
71+
// BFDHeartbeatsReceived counts valid BFD heartbeat packets received.
72+
// Labels: "role" (server|client).
73+
BFDHeartbeatsReceived = prometheus.NewCounterVec(
74+
prometheus.CounterOpts{
75+
Name: "tunnel_bfd_heartbeats_received_total",
76+
Help: "Total valid BFD heartbeat packets received.",
77+
},
78+
[]string{"role"},
79+
)
7080
)
7181

7282
func init() {
@@ -77,5 +87,6 @@ func init() {
7787
BFDStateTransitions,
7888
BFDDetectTimeouts,
7989
BFDPacketErrors,
90+
BFDHeartbeatsReceived,
8091
)
8192
}

pkg/tunnel/bfdl/server.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,11 @@ func (s *Server) Start(ctx context.Context) error {
211211
continue
212212
}
213213

214-
if connID != "" && s.onAlive != nil {
215-
s.onAlive(ctx, connID)
214+
if connID != "" {
215+
BFDHeartbeatsReceived.WithLabelValues("server").Inc()
216+
if s.onAlive != nil {
217+
s.onAlive(ctx, connID)
218+
}
216219
}
217220
}
218221
}

pkg/tunnel/client.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"log/slog"
1010
"net"
11+
"strconv"
1112
"net/http"
1213
"net/netip"
1314
"net/url"
@@ -26,6 +27,7 @@ import (
2627
alog "github.com/apoxy-dev/apoxy/pkg/log"
2728
"github.com/apoxy-dev/apoxy/pkg/tunnel/bfdl"
2829
tunnelconn "github.com/apoxy-dev/apoxy/pkg/tunnel/connection"
30+
"github.com/apoxy-dev/apoxy/pkg/tunnel/metrics"
2931
"github.com/apoxy-dev/apoxy/pkg/tunnel/router"
3032
)
3133

@@ -73,6 +75,9 @@ type tunnelClientOptions struct {
7375
packetObserver tunnelconn.PacketObserver
7476
// Labels to send on tunnel connections.
7577
labels map[string]string
78+
// metricsPort is the port the agent's metrics server listens on.
79+
// Advertised to the server for overlay scraping.
80+
metricsPort int
7681
}
7782

7883
func defaultClientOptions() *tunnelClientOptions {
@@ -163,6 +168,14 @@ func WithLabels(labels map[string]string) TunnelClientOption {
163168
}
164169
}
165170

171+
// WithClientMetricsPort advertises the agent's metrics port to the server
172+
// so the server can scrape metrics from this agent through the overlay.
173+
func WithClientMetricsPort(port int) TunnelClientOption {
174+
return func(o *tunnelClientOptions) {
175+
o.metricsPort = port
176+
}
177+
}
178+
166179
// BuildClientRouter builds a router for the client tunnel side using provided
167180
// options and sane defaults.
168181
func BuildClientRouter(opts ...TunnelClientOption) (router.Router, error) {
@@ -288,6 +301,9 @@ func (d *TunnelDialer) Dial(
288301
for k, v := range options.labels {
289302
q.Add("label."+k, v)
290303
}
304+
if options.metricsPort > 0 {
305+
q.Add("label."+metrics.LabelMetricsPort, strconv.Itoa(options.metricsPort))
306+
}
291307
addrUrl.RawQuery = q.Encode()
292308

293309
tmpl, err := uritemplate.New(addrUrl.String())

0 commit comments

Comments
 (0)