Skip to content

Commit 042dc2d

Browse files
committed
[tunnel] add per-connection uptime metric and agent-process identifier
Before this change apoxy_tunnel_agent_uptime_seconds was the only "uptime" signal re-exported from agents via the tunnelproxy, and its `agent` label was set to the connID — so the same process-uptime value was duplicated across every conn_id a multi-conn agent held open, and there was no way to tell "one process with N conns" apart from "N independent processes". Adds two orthogonal signals: * A first-party gauge apoxy_tunnel_connection_uptime_seconds emitted by the tunnelproxy for every registered conn, computed from the MetricsStore Register time. This gives real per-session lifetime independent of agent push cadence. * A stable per-process identifier generated once at agent start and carried on every CONNECT-IP dial via the agent_process_id query param. The agent prefers the CRI container ID parsed from /proc/self/cgroup (so the value matches kubelet/containerd/cri-o metadata) and falls back to a UUID on non-Linux/bare-metal. Re-exported metrics now carry conn_id and agent_process_id labels alongside the legacy agent label (retained as an alias for dashboard compatibility). A ReexportCollector helper keeps the injected label set in a single source of truth so the uptime desc and the per-result label slice stay aligned. Useful queries this unlocks: * apoxy_tunnel_connection_uptime_seconds{conn_id="..."} — true per-session lifetime, no process-uptime duplication. * max by (tunnel_node, agent_process_id)(apoxy_tunnel_agent_uptime_seconds) — de-duplicated per-process uptime for min-conns>1 agents. * count by (tunnel_node, agent_process_id)(apoxy_tunnel_connections_active) — active conns per agent process.
1 parent 3b84338 commit 042dc2d

7 files changed

Lines changed: 386 additions & 65 deletions

File tree

pkg/tunnel/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
alog "github.com/apoxy-dev/apoxy/pkg/log"
3131
"github.com/apoxy-dev/apoxy/pkg/tunnel/bfdl"
3232
tunnelconn "github.com/apoxy-dev/apoxy/pkg/tunnel/connection"
33+
"github.com/apoxy-dev/apoxy/pkg/tunnel/metrics"
3334
"github.com/apoxy-dev/apoxy/pkg/tunnel/router"
3435
)
3536

@@ -292,6 +293,7 @@ func (d *TunnelDialer) Dial(
292293
for k, v := range options.labels {
293294
q.Add("label."+k, v)
294295
}
296+
q.Add(metrics.QueryParamAgentProcessID, metrics.AgentProcessID())
295297
addrUrl.RawQuery = q.Encode()
296298

297299
tmpl, err := uritemplate.New(addrUrl.String())

pkg/tunnel/metrics/metrics.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package metrics
22

33
import (
4+
"os"
5+
"regexp"
46
"sync"
57
"time"
68

9+
"github.com/google/uuid"
710
"github.com/prometheus/client_golang/prometheus"
811
"sigs.k8s.io/controller-runtime/pkg/metrics"
912

@@ -13,6 +16,48 @@ import (
1316
// startTime is the time the process started. Used for uptime calculation.
1417
var startTime = time.Now()
1518

19+
// QueryParamAgentProcessID is the CONNECT-IP query-string key the agent uses
20+
// to tell the server its stable per-process ID. Referenced by both the client
21+
// and the server handler so a rename can't silently break the wire.
22+
const QueryParamAgentProcessID = "agent_process_id"
23+
24+
// processID is stable for the process lifetime so callers can distinguish
25+
// "same process with multiple conns" from "multiple processes each with one
26+
// conn". Prefers a CRI container ID (cross-refs kubelet/containerd metadata)
27+
// and falls back to a UUID when none is detectable.
28+
var processID = initProcessID()
29+
30+
// containerIDRegex matches the 64-char hex token that CRI runtimes
31+
// (containerd, cri-o, docker, podman) embed in cgroup paths. Covers both
32+
// cgroup v1 and v2 layouts and the common systemd-slice wrappers
33+
// (`cri-containerd-<id>.scope`, `docker-<id>.scope`, etc).
34+
var containerIDRegex = regexp.MustCompile(`[0-9a-f]{64}`)
35+
36+
func initProcessID() string {
37+
// Linux-only: on macOS/Windows the read fails and we fall back to a UUID.
38+
// Both paths rotate on container/process restart, so cardinality is bounded
39+
// by the same restart rate either way.
40+
if id := detectContainerID("/proc/self/cgroup"); id != "" {
41+
return id
42+
}
43+
return uuid.NewString()
44+
}
45+
46+
func detectContainerID(path string) string {
47+
data, err := os.ReadFile(path)
48+
if err != nil {
49+
return ""
50+
}
51+
return parseCgroupForContainerID(data)
52+
}
53+
54+
func parseCgroupForContainerID(data []byte) string {
55+
return containerIDRegex.FindString(string(data))
56+
}
57+
58+
// AgentProcessID returns the stable per-process ID for this agent.
59+
func AgentProcessID() string { return processID }
60+
1661
var (
1762
// Agent info and lifecycle metrics.
1863

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package metrics
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestParseCgroupForContainerID(t *testing.T) {
10+
// 64-hex fixtures taken from real-world cgroup paths.
11+
const k8sCri = "3bf3c5a2e4d8f9c0a1b2c3d4e5f6789012345678abcdef0123456789abcdef01"
12+
const docker = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"
13+
const criO = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
14+
15+
tests := []struct {
16+
name string
17+
data string
18+
want string
19+
}{
20+
{
21+
name: "cgroup v1 k8s containerd",
22+
data: "12:memory:/kubepods.slice/kubepods-burstable.slice/" +
23+
"kubepods-burstable-pod1234abcd.slice/cri-containerd-" + k8sCri + ".scope\n",
24+
want: k8sCri,
25+
},
26+
{
27+
name: "cgroup v2 unified containerd",
28+
data: "0::/kubepods.slice/kubepods-burstable.slice/" +
29+
"kubepods-burstable-pod1234abcd.slice/cri-containerd-" + k8sCri + ".scope\n",
30+
want: k8sCri,
31+
},
32+
{
33+
name: "cgroup v1 docker",
34+
data: "11:memory:/docker/" + docker + "\n",
35+
want: docker,
36+
},
37+
{
38+
name: "cgroup v1 cri-o",
39+
data: "1:name=systemd:/kubepods/burstable/pod<uid>/crio-" + criO + ".scope\n",
40+
want: criO,
41+
},
42+
{
43+
name: "multiple lines picks the first 64-hex token",
44+
data: "12:cpuset:/\n" +
45+
"11:memory:/docker/" + docker + "\n" +
46+
"0::/docker/" + docker + "\n",
47+
want: docker,
48+
},
49+
{
50+
name: "no container id (bare-metal / tests)",
51+
data: "0::/user.slice/user-1000.slice/session-1.scope\n",
52+
want: "",
53+
},
54+
{
55+
name: "empty file",
56+
data: "",
57+
want: "",
58+
},
59+
{
60+
name: "hex shorter than 64 is not matched (avoids partial shas)",
61+
data: "0::/system.slice/some-service-abc123def.scope\n",
62+
want: "",
63+
},
64+
}
65+
for _, tt := range tests {
66+
t.Run(tt.name, func(t *testing.T) {
67+
got := parseCgroupForContainerID([]byte(tt.data))
68+
assert.Equal(t, tt.want, got)
69+
})
70+
}
71+
}
72+
73+
func TestDetectContainerID_MissingFile(t *testing.T) {
74+
// Non-existent path — e.g., macOS dev or sandboxed environments.
75+
got := detectContainerID("/proc/does-not-exist/cgroup")
76+
assert.Empty(t, got, "missing file must return empty, not panic")
77+
}
78+
79+
func TestAgentProcessID_Stable(t *testing.T) {
80+
// Whatever initProcessID() chose at package init, AgentProcessID() must
81+
// return the same value on every call within the process lifetime.
82+
a := AgentProcessID()
83+
b := AgentProcessID()
84+
assert.NotEmpty(t, a)
85+
assert.Equal(t, a, b, "AgentProcessID must be stable across calls")
86+
}

pkg/tunnel/metrics/reexport.go

Lines changed: 73 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,58 @@ import (
77

88
"github.com/prometheus/client_golang/prometheus"
99
dto "github.com/prometheus/client_model/go"
10-
"google.golang.org/protobuf/proto"
1110
)
1211

1312
const (
1413
// StaleResultTimeout is how long a pushed result remains valid.
1514
// Results older than this are skipped during collection.
1615
StaleResultTimeout = 60 * time.Second
1716

18-
labelTunnelNode = "tunnel_node"
19-
labelAgent = "agent"
20-
labelProjectID = "project_id"
17+
labelTunnelNode = "tunnel_node"
18+
labelAgent = "agent" // Deprecated alias for conn_id; retained for dashboard compatibility.
19+
labelConnID = "conn_id"
20+
labelAgentProcessID = "agent_process_id"
21+
labelProjectID = "project_id"
22+
23+
// connUptimeMetric is the first-party per-connection uptime metric emitted
24+
// by the ReexportCollector (computed from StoreResult.RegisteredAt). Unlike
25+
// tunnel_agent_uptime_seconds — which is re-exported from the agent and
26+
// reports the agent *process* uptime duplicated across every conn_id — this
27+
// metric reflects the lifetime of a single CONNECT-IP session.
28+
connUptimeMetric = "tunnel_connection_uptime_seconds"
2129
)
2230

31+
// targetLabelNames is the canonical order of the labels we inject on every
32+
// metric emitted by this collector (both re-exported agent metrics and the
33+
// first-party conn uptime gauge). Single source of truth so the connUptimeDesc
34+
// and per-result label-value slice stay in lock-step.
35+
var targetLabelNames = []string{
36+
labelTunnelNode,
37+
labelAgent,
38+
labelConnID,
39+
labelAgentProcessID,
40+
labelProjectID,
41+
}
42+
43+
func targetLabelValues(t StoreTarget) []string {
44+
return []string{
45+
t.TunnelNode,
46+
t.AgentName, // legacy "agent" value; today always equal to ConnID on the server side
47+
t.ConnID,
48+
t.AgentProcessID,
49+
t.ProjectID,
50+
}
51+
}
52+
2353
// ReexportCollector implements prometheus.Collector by iterating over pushed
24-
// agent metrics and re-emitting them with tunnel_node, agent, and project_id
25-
// labels injected. It should be registered with the tunnelproxy's Prometheus
26-
// registry so agent metrics appear on the tunnelproxy's /metrics endpoint.
54+
// agent metrics and re-emitting them with tunnel_node, agent, conn_id,
55+
// agent_process_id, and project_id labels injected. It should be registered
56+
// with the tunnelproxy's Prometheus registry so agent metrics appear on the
57+
// tunnelproxy's /metrics endpoint.
2758
type ReexportCollector struct {
28-
store *MetricsStore
29-
prefix string
59+
store *MetricsStore
60+
prefix string
61+
connUptimeDesc *prometheus.Desc
3062
}
3163

3264
// ReexportOption configures a ReexportCollector.
@@ -47,6 +79,12 @@ func NewReexportCollector(store *MetricsStore, opts ...ReexportOption) *Reexport
4779
for _, o := range opts {
4880
o(c)
4981
}
82+
c.connUptimeDesc = prometheus.NewDesc(
83+
c.prefix+connUptimeMetric,
84+
"Seconds since this tunnel connection was registered with the tunnelproxy.",
85+
targetLabelNames,
86+
nil,
87+
)
5088
return c
5189
}
5290

@@ -60,25 +98,34 @@ func (c *ReexportCollector) Describe(ch chan<- *prometheus.Desc) {}
6098
func (c *ReexportCollector) Collect(ch chan<- prometheus.Metric) {
6199
now := time.Now()
62100
c.store.ForEachResult(func(connID string, result *StoreResult) bool {
101+
values := targetLabelValues(result.Target)
102+
// Guard tolerates tests that populate store.results directly, bypassing
103+
// Register. In production Register always stamps RegisteredAt.
104+
if !result.RegisteredAt.IsZero() {
105+
ch <- prometheus.MustNewConstMetric(
106+
c.connUptimeDesc,
107+
prometheus.GaugeValue,
108+
now.Sub(result.RegisteredAt).Seconds(),
109+
values...,
110+
)
111+
}
63112
if now.Sub(result.PushedAt) > StaleResultTimeout {
64113
return true
65114
}
66-
c.collectResult(ch, result)
115+
c.collectResult(ch, result, values)
67116
return true
68117
})
69118
}
70119

71-
func (c *ReexportCollector) collectResult(ch chan<- prometheus.Metric, result *StoreResult) {
72-
extraLabels := []*dto.LabelPair{
73-
{Name: proto.String(labelTunnelNode), Value: proto.String(result.Target.TunnelNode)},
74-
{Name: proto.String(labelAgent), Value: proto.String(result.Target.AgentName)},
75-
{Name: proto.String(labelProjectID), Value: proto.String(result.Target.ProjectID)},
76-
}
77-
120+
func (c *ReexportCollector) collectResult(
121+
ch chan<- prometheus.Metric,
122+
result *StoreResult,
123+
targetValues []string,
124+
) {
78125
for name, family := range result.Families {
79126
prefixedName := c.prefix + name
80127
for _, m := range family.Metric {
81-
pm, err := c.toPrometheusMetric(prefixedName, family.GetType(), m, extraLabels)
128+
pm, err := c.toPrometheusMetric(prefixedName, family.GetType(), m, targetValues)
82129
if err != nil {
83130
slog.Debug("Skipping metric",
84131
slog.String("name", prefixedName),
@@ -95,20 +142,17 @@ func (c *ReexportCollector) toPrometheusMetric(
95142
name string,
96143
mtype dto.MetricType,
97144
m *dto.Metric,
98-
extraLabels []*dto.LabelPair,
145+
targetValues []string,
99146
) (prometheus.Metric, error) {
100-
// Copy labels to avoid mutating the protobuf message's backing array.
101147
existing := m.GetLabel()
102-
allLabels := make([]*dto.LabelPair, 0, len(existing)+len(extraLabels))
103-
allLabels = append(allLabels, existing...)
104-
allLabels = append(allLabels, extraLabels...)
105-
106-
labelNames := make([]string, len(allLabels))
107-
labelValues := make([]string, len(allLabels))
108-
for i, lp := range allLabels {
109-
labelNames[i] = lp.GetName()
110-
labelValues[i] = lp.GetValue()
148+
labelNames := make([]string, 0, len(existing)+len(targetLabelNames))
149+
labelValues := make([]string, 0, len(existing)+len(targetLabelNames))
150+
for _, lp := range existing {
151+
labelNames = append(labelNames, lp.GetName())
152+
labelValues = append(labelValues, lp.GetValue())
111153
}
154+
labelNames = append(labelNames, targetLabelNames...)
155+
labelValues = append(labelValues, targetValues...)
112156

113157
desc := prometheus.NewDesc(name, "Re-exported agent metric.", labelNames, nil)
114158

0 commit comments

Comments
 (0)