diff --git a/api/core/v1alpha2/tunnel_agent_types.go b/api/core/v1alpha2/tunnel_agent_types.go index 6ea3328..42b037b 100644 --- a/api/core/v1alpha2/tunnel_agent_types.go +++ b/api/core/v1alpha2/tunnel_agent_types.go @@ -58,6 +58,19 @@ type TunnelAgentConnection struct { // VNI is the 24-bit virtual network identifier used for this connection, if applicable. // +optional VNI *uint `json:"vni,omitempty,omitzero"` + + // LastRXTimestamp is the last time a packet was received from the agent on + // this connection. + // +optional + LastRXTimestamp *metav1.Time `json:"lastRxTimestamp,omitempty,omitzero"` + + // RXBytes is the total number of bytes received from the agent on this connection. + // +optional + RXBytes uint64 `json:"rxBytes,omitempty,omitzero"` + + // TXBytes is the total number of bytes transmitted to the agent on this connection. + // +optional + TxBytes uint64 `json:"txBytes,omitempty,omitzero"` } // TunnelAgentStatus represents the status of a tunnel agent. diff --git a/api/core/v1alpha2/zz_generated.deepcopy.go b/api/core/v1alpha2/zz_generated.deepcopy.go index 8ab0443..04dce1b 100644 --- a/api/core/v1alpha2/zz_generated.deepcopy.go +++ b/api/core/v1alpha2/zz_generated.deepcopy.go @@ -1414,6 +1414,10 @@ func (in *TunnelAgentConnection) DeepCopyInto(out *TunnelAgentConnection) { *out = new(uint) **out = **in } + if in.LastRXTimestamp != nil { + in, out := &in.LastRXTimestamp, &out.LastRXTimestamp + *out = (*in).DeepCopy() + } return } diff --git a/api/generated/zz_generated.openapi.go b/api/generated/zz_generated.openapi.go index 59d862b..48f178f 100644 --- a/api/generated/zz_generated.openapi.go +++ b/api/generated/zz_generated.openapi.go @@ -5358,6 +5358,26 @@ func schema_apoxy_api_core_v1alpha2_TunnelAgentConnection(ref common.ReferenceCa Format: "int32", }, }, + "lastRxTimestamp": { + SchemaProps: spec.SchemaProps{ + Description: "LastRXTimestamp is the last time a packet was received from the agent on this connection.", + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), + }, + }, + "rxBytes": { + SchemaProps: spec.SchemaProps{ + Description: "RXBytes is the total number of bytes received from the agent on this connection.", + Type: []string{"integer"}, + Format: "int64", + }, + }, + "txBytes": { + SchemaProps: spec.SchemaProps{ + Description: "TXBytes is the total number of bytes transmitted to the agent on this connection.", + Type: []string{"integer"}, + Format: "int64", + }, + }, }, Required: []string{"id"}, }, diff --git a/pkg/apiserver/controllers/tunnel_agent_reconciler.go b/pkg/apiserver/controllers/tunnel_agent_reconciler.go index 124c00a..31235da 100644 --- a/pkg/apiserver/controllers/tunnel_agent_reconciler.go +++ b/pkg/apiserver/controllers/tunnel_agent_reconciler.go @@ -3,6 +3,7 @@ package controllers import ( "context" "fmt" + "log/slog" "net/netip" "time" @@ -14,6 +15,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" controllerlog "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" corev1alpha2 "github.com/apoxy-dev/apoxy/api/core/v1alpha2" @@ -26,7 +28,12 @@ import ( // +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnelagents/finalizers,verbs=update // +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnels,verbs=get;list;watch -const indexControllerOwnerUID = ".metadata.controllerOwnerUID" +const ( + // must be longer than the relays own gc max silence + gcMaxSilence = 5 * time.Minute + gcCheckInterval = time.Minute + indexControllerOwnerUID = ".metadata.controllerOwnerUID" +) type TunnelAgentReconciler struct { client client.Client @@ -64,7 +71,8 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) changed, err := r.releaseResourcesIfPresent(ctx, log, req.NamespacedName) if err != nil { - return ctrl.Result{}, fmt.Errorf("failed to release resources: %w", err) + log.Error(err, "Failed to release resources for TunnelAgent") + // not retryable, just log and continue so we don't block deletion. } // Refetch to avoid conflicts if we modified the object @@ -154,6 +162,27 @@ func (r *TunnelAgentReconciler) SetupWithManager(ctx context.Context, mgr ctrl.M return err } + // Run periodic orphaned connection cleanup + err := mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { + ticker := time.NewTicker(gcCheckInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + + case <-ticker.C: + if err := r.PruneOrphanedConnections(ctx); err != nil { + slog.Warn("Failed to run orphaned connection cleanup", slog.Any("error", err)) + } + } + } + })) + if err != nil { + return err + } + return ctrl.NewControllerManagedBy(mgr). For(&corev1alpha2.TunnelAgent{}, builder.WithPredicates(&predicate.ResourceVersionChangedPredicate{})). Complete(r) @@ -234,6 +263,34 @@ func (r *TunnelAgentReconciler) ensureConnectionAllocations( }) } +// releaseConnectionResources releases any resources held by a single connection. +// It attempts to release both the IP prefix and the VNI; it returns the first +// error encountered but will attempt both releases regardless. +func (r *TunnelAgentReconciler) releaseConnectionResources(addr string, vniPtr *uint) error { + var firstErr error + + // Release overlay address/prefix (if set) + if addr != "" { + pfx, err := netip.ParsePrefix(addr) + if err != nil { + if firstErr == nil { + firstErr = fmt.Errorf("failed to parse address %q for release: %w", addr, err) + } + } else { + if err := r.agentIPAM.Release(pfx); err != nil && firstErr == nil { + firstErr = fmt.Errorf("failed to release address %q: %w", addr, err) + } + } + } + + // Release VNI (if set) + if vniPtr != nil { + r.vniPool.Release(*vniPtr) + } + + return firstErr +} + func (r *TunnelAgentReconciler) releaseResourcesIfPresent( ctx context.Context, log logr.Logger, @@ -257,28 +314,19 @@ func (r *TunnelAgentReconciler) releaseResourcesIfPresent( for i := range cur.Status.Connections { conn := &cur.Status.Connections[i] - // Release overlay address/prefix (if set) - if conn.Address != "" { - pfx, err := netip.ParsePrefix(conn.Address) - if err != nil { - return fmt.Errorf("failed to parse address %q for release: %w", conn.Address, err) - } - if err := r.agentIPAM.Release(pfx); err != nil { - return fmt.Errorf("failed to release address %q: %w", conn.Address, err) - } - log.Info("Released overlay address", "connectionID", conn.ID, "address", conn.Address) - conn.Address = "" - changed = true + if conn.Address == "" && conn.VNI == nil { + continue } - // Release VNI (if set) - if conn.VNI != nil { - vni := *conn.VNI - r.vniPool.Release(vni) - log.Info("Released VNI", "connectionID", conn.ID, "vni", vni) - conn.VNI = nil - changed = true + // Release any resources the connection holds. + if err := r.releaseConnectionResources(conn.Address, conn.VNI); err != nil { + return err } + log.Info("Released resources for connection", "connectionID", conn.ID, "address", conn.Address, "vni", conn.VNI) + + conn.Address = "" + conn.VNI = nil + changed = true } if changed { @@ -304,3 +352,92 @@ func (r *TunnelAgentReconciler) ensureControllerOwner(child client.Object, owner } return true, nil } + +// PruneOrphanedConnections prunes orphaned connections from TunnelAgent status +// (due to a relay unexpectedly shutting down). This is exposed for testing purposes. +func (r *TunnelAgentReconciler) PruneOrphanedConnections(ctx context.Context) error { + var agents corev1alpha2.TunnelAgentList + if err := r.client.List(ctx, &agents); err != nil { + return err + } + + var firstErr error + for i := range agents.Items { + agent := &agents.Items[i] + key := client.ObjectKeyFromObject(agent) + + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var cur corev1alpha2.TunnelAgent + if err := r.client.Get(ctx, key, &cur); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + // No connections to prune + if len(cur.Status.Connections) == 0 { + return nil + } + + now := time.Now().UTC() + conns := make([]corev1alpha2.TunnelAgentConnection, 0, len(cur.Status.Connections)) + updated := false + + for j := range cur.Status.Connections { + conn := &cur.Status.Connections[j] + + // Determine orphaned-ness + isOrphaned := false + switch { + case conn.LastRXTimestamp != nil: + isOrphaned = conn.LastRXTimestamp.Add(gcMaxSilence).Before(now) + case conn.ConnectedAt != nil: + isOrphaned = conn.ConnectedAt.Add(gcMaxSilence).Before(now) + default: + isOrphaned = false + } + if !isOrphaned { + conns = append(conns, *conn) + continue + } + + slog.Info("Pruning orphaned connection from TunnelAgent", + slog.String("agent", agent.Name), + slog.String("connectionID", conn.ID), + slog.String("address", conn.Address)) + + // Release any resources the orphaned connection holds. + if err := r.releaseConnectionResources(conn.Address, conn.VNI); err != nil { + slog.Warn("Failed to release resources for orphaned connection", + slog.String("agent", agent.Name), + slog.String("connectionID", conn.ID), + slog.String("address", conn.Address), + slog.Any("error", err)) + } + + // Do not append to the new list: this prunes the connection. + updated = true + } + if !updated { + return nil + } + + cur.Status.Connections = conns + if err := r.client.Status().Update(ctx, &cur); err != nil { + return err + } + return nil + }) + + if err != nil { + if firstErr == nil { + firstErr = err + } + slog.Warn("Failed pruning orphaned connections for agent", + slog.String("agent", agent.Name), slog.Any("error", err)) + } + } + + return firstErr +} diff --git a/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go b/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go index 61c2271..e507986 100644 --- a/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go +++ b/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go @@ -3,6 +3,7 @@ package controllers_test import ( "net/netip" "testing" + "time" "github.com/stretchr/testify/require" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -145,6 +146,94 @@ func TestTunnelAgentReconciler_DeletionReleasesResourcesAndRemovesFinalizer(t *t require.Equal(t, uint(1), reV, "expected released VNI to be available again") } +func TestTunnelAgentPruneOrphanedConnections(t *testing.T) { + ctx := ctrl.LoggerInto(t.Context(), testLogr(t)) + + // Scheme + scheme := runtime.NewScheme() + require.NoError(t, corev1alpha2.Install(scheme)) + + // IPAM + VNI pool + systemULA := tunnet.NewULA(ctx, tunnet.SystemNetworkID) + agentIPAM, err := systemULA.IPAM(ctx, 96) + require.NoError(t, err) + + vpool := vni.NewVNIPool() + + // Allocate resources we'll assign to connections + pfxOrphaned, err := agentIPAM.Allocate() + require.NoError(t, err) + vOrphaned, err := vpool.Allocate() + require.NoError(t, err) + require.Equal(t, uint(1), vOrphaned) + + pfxFresh, err := agentIPAM.Allocate() + require.NoError(t, err) + vFresh, err := vpool.Allocate() + require.NoError(t, err) + require.Equal(t, uint(2), vFresh) + + // Times must be metav1.Time, not time.Time + now := time.Now().UTC() + orphaned := metav1.Time{Time: now.Add(-1 * time.Hour)} // orphaned -> should be pruned + fresh := metav1.Time{Time: now.Add(-1 * time.Minute)} // fresh -> should stay + + agent := &corev1alpha2.TunnelAgent{ + TypeMeta: metav1.TypeMeta{ + Kind: "TunnelAgent", + APIVersion: "core.apoxy.dev/v1alpha2", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "agent-gc", + }, + Spec: corev1alpha2.TunnelAgentSpec{ + TunnelRef: corev1alpha2.TunnelRef{Name: "tun-any"}, + }, + Status: corev1alpha2.TunnelAgentStatus{ + Connections: []corev1alpha2.TunnelAgentConnection{ + { + ID: "conn-orphaned", + Address: pfxOrphaned.String(), + VNI: &vOrphaned, + LastRXTimestamp: &orphaned, + }, + { + ID: "conn-fresh", + Address: pfxFresh.String(), + VNI: &vFresh, + LastRXTimestamp: &fresh, + }, + }, + }, + } + + c := fakeclient.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&corev1alpha2.Tunnel{}, &corev1alpha2.TunnelAgent{}). + WithObjects(agent). + Build() + + r := controllers.NewTunnelAgentReconciler(c, agentIPAM, vpool) + + // WHEN GC runs + require.NoError(t, r.PruneOrphanedConnections(ctx)) + + // THEN orphan pruned, fresh kept + var got corev1alpha2.TunnelAgent + require.NoError(t, c.Get(ctx, types.NamespacedName{Name: agent.Name}, &got)) + require.Len(t, got.Status.Connections, 1) + require.Equal(t, "conn-fresh", got.Status.Connections[0].ID) + + // Released resources are available again + rePfx, err := agentIPAM.Allocate() + require.NoError(t, err) + require.Equal(t, pfxOrphaned, rePfx) + + reV, err := vpool.Allocate() + require.NoError(t, err) + require.Equal(t, vOrphaned, reV) +} + func mkAgentWithEmptyConnection(name, tunnelName string) *corev1alpha2.TunnelAgent { return &corev1alpha2.TunnelAgent{ TypeMeta: metav1.TypeMeta{Kind: "TunnelAgent", APIVersion: "core.apoxy.dev/v1alpha2"}, diff --git a/pkg/tunnel/connection.go b/pkg/tunnel/connection.go index 85eb196..c48b8a0 100644 --- a/pkg/tunnel/connection.go +++ b/pkg/tunnel/connection.go @@ -218,3 +218,31 @@ func (c *connection) SetOverlayAddress(addr string) error { func (c *connection) IncrementKeyEpoch() uint32 { return c.keyEpoch.Add(1) } + +// Stats returns a snapshot built from the currently configured VNI (if any). +func (c *connection) Stats() (controllers.ConnectionStats, bool) { + c.mu.Lock() + + if c.vni == nil || c.handler == nil { + c.mu.Unlock() + return controllers.ConnectionStats{}, false + } + c.mu.Unlock() + + vnet, ok := c.handler.GetVirtualNetwork(*c.vni) + if !ok || vnet == nil { + return controllers.ConnectionStats{}, false + } + + var lastRx time.Time + nano := vnet.Stats.LastRXUnixNano.Load() + if nano > 0 { + lastRx = time.Unix(0, nano) + } + + return controllers.ConnectionStats{ + RXBytes: vnet.Stats.RXBytes.Load(), + TXBytes: vnet.Stats.TXBytes.Load(), + LastRX: lastRx, + }, true +} diff --git a/pkg/tunnel/controllers/connection.go b/pkg/tunnel/controllers/connection.go index 35743a0..9674dd6 100644 --- a/pkg/tunnel/controllers/connection.go +++ b/pkg/tunnel/controllers/connection.go @@ -3,8 +3,20 @@ package controllers import ( "context" "io" + "time" ) +// ConnectionStats is a lightweight snapshot of connection counters. +type ConnectionStats struct { + // RXBytes is the total number of bytes received on this connection. + RXBytes uint64 + // TXBytes is the total number of bytes transmitted on this connection. + TXBytes uint64 + // LastRX is the last time a packet was received on this connection. + // The zero value indicates that no packets have been received. + LastRX time.Time +} + // Connection is a simple abstraction representing a connection from a TunnelAgent to a Relay. type Connection interface { io.Closer @@ -14,4 +26,6 @@ type Connection interface { SetOverlayAddress(addr string) error // Set the VNI assigned to this connection. SetVNI(ctx context.Context, vni uint) error + // Stats returns a snapshot of connection statistics. + Stats() (ConnectionStats, bool) } diff --git a/pkg/tunnel/controllers/tunnel_agent_reconciler.go b/pkg/tunnel/controllers/tunnel_agent_reconciler.go index 6a86722..bf2692f 100644 --- a/pkg/tunnel/controllers/tunnel_agent_reconciler.go +++ b/pkg/tunnel/controllers/tunnel_agent_reconciler.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "time" "github.com/alphadose/haxmap" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -15,11 +16,17 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" controllerlog "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" corev1alpha2 "github.com/apoxy-dev/apoxy/api/core/v1alpha2" ) +// How often to push connection stats to the TunnelAgent status. +// This is throttled to avoid overwhelming the API server. +const statsUpdateInterval = 30 * time.Second + +// Each relay gets its own finalizer on the TunnelAgent object. const tunnelRelayFinalizerTmpl = "tunnelrelay.apoxy.dev/%s-finalizer" type TunnelAgentReconciler struct { @@ -27,7 +34,8 @@ type TunnelAgentReconciler struct { relay Relay labelSelector string finalizer string - conns *haxmap.Map[string, Connection] + conns *haxmap.Map[string, Connection] // id -> connection + connAgent *haxmap.Map[string, string] // id -> agent name } func NewTunnelAgentReconciler(c client.Client, relay Relay, labelSelector string) *TunnelAgentReconciler { @@ -37,6 +45,7 @@ func NewTunnelAgentReconciler(c client.Client, relay Relay, labelSelector string relay: relay, finalizer: finalizer, conns: haxmap.New[string, Connection](), + connAgent: haxmap.New[string, string](), } relay.SetOnConnect(r.AddConnection) relay.SetOnDisconnect(r.RemoveConnection) @@ -108,6 +117,24 @@ func (r *TunnelAgentReconciler) SetupWithManager(mgr ctrl.Manager) error { return fmt.Errorf("failed to create label selector predicate: %w", err) } + // Periodically push connection stats to the API server. + err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { + ticker := time.NewTicker(statsUpdateInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + r.PushStatsOnce(ctx) + } + } + })) + if err != nil { + return err + } + return ctrl.NewControllerManagedBy(mgr). For(&corev1alpha2.TunnelAgent{}, builder.WithPredicates(&predicate.ResourceVersionChangedPredicate{}, ls)). Complete(r) @@ -117,6 +144,7 @@ func (r *TunnelAgentReconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *TunnelAgentReconciler) AddConnection(ctx context.Context, tunnelName, agentName string, conn Connection) error { // Track the connection in-memory. r.conns.Set(conn.ID(), conn) + r.connAgent.Set(conn.ID(), agentName) // Get the parent Tunnel object. var tunnel corev1alpha2.Tunnel @@ -211,6 +239,7 @@ func (r *TunnelAgentReconciler) RemoveConnection(ctx context.Context, agentName, slog.Warn("Failed to close connection", slog.String("id", id), slog.Any("error", err)) } } + r.connAgent.Del(id) // Remove from status.connections (by ID) if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { @@ -275,3 +304,62 @@ func (r *TunnelAgentReconciler) RemoveConnection(ctx context.Context, agentName, return nil }) } + +// pushStatsOnce performs a single stats sweep. +// This is exposed for testing purposes. +func (r *TunnelAgentReconciler) PushStatsOnce(ctx context.Context) { + // Snapshot live connection stats. + updatesByAgent := make(map[string][]corev1alpha2.TunnelAgentConnection) + r.conns.ForEach(func(id string, conn Connection) bool { + agentName, ok := r.connAgent.Get(id) + if !ok { + slog.Warn("Connection has no associated agent", slog.String("id", id)) + return true // shouldn't happen, but skip safely + } + + if s, ok := conn.Stats(); ok { + u := corev1alpha2.TunnelAgentConnection{ + ID: id, + RXBytes: s.RXBytes, + TxBytes: s.TXBytes, + } + if !s.LastRX.IsZero() { + t := metav1.NewTime(s.LastRX) + u.LastRXTimestamp = &t + } + updatesByAgent[agentName] = append(updatesByAgent[agentName], u) + } + return true + }) + + // Apply updates per agent with conflict retries. + for agentName, updates := range updatesByAgent { + _ = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var cur corev1alpha2.TunnelAgent + if err := r.client.Get(ctx, types.NamespacedName{Name: agentName}, &cur); err != nil { + // If the object is gone, skip. + return client.IgnoreNotFound(err) + } + + // Build a quick lookup: connection ID -> status entry. + connByID := make(map[string]*corev1alpha2.TunnelAgentConnection, len(cur.Status.Connections)) + for i := range cur.Status.Connections { + c := &cur.Status.Connections[i] + connByID[c.ID] = c + } + + // Apply stats only for known connections. + for _, u := range updates { + if c := connByID[u.ID]; c != nil { + c.RXBytes = u.RXBytes + c.TxBytes = u.TxBytes + if u.LastRXTimestamp != nil { + c.LastRXTimestamp = u.LastRXTimestamp + } + } + } + + return r.client.Status().Update(ctx, &cur) + }) + } +} diff --git a/pkg/tunnel/controllers/tunnel_agent_reconciler_test.go b/pkg/tunnel/controllers/tunnel_agent_reconciler_test.go index 516c94f..adbbd40 100644 --- a/pkg/tunnel/controllers/tunnel_agent_reconciler_test.go +++ b/pkg/tunnel/controllers/tunnel_agent_reconciler_test.go @@ -4,6 +4,7 @@ import ( "context" "net/netip" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -230,6 +231,57 @@ func TestTunnelAgentReconcile_SetsAddressAndVNI(t *testing.T) { relay.AssertExpectations(t) } +func TestTunnelAgentPushStatsOnce_UpdatesStatusForKnownConnection(t *testing.T) { + ctx := ctrl.LoggerInto(t.Context(), testLogr(t)) + + scheme := runtime.NewScheme() + require.NoError(t, corev1alpha2.Install(scheme)) + + tunnel := mkTunnel("tun-stats") + agent := mkAgent("tun-stats", "agent-stats") + + c := fakeclient.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&corev1alpha2.Tunnel{}, &corev1alpha2.TunnelAgent{}). + WithObjects(tunnel, agent). + Build() + + relay := &mockRelay{} + relay.On("Name").Return("relay-stats") + relay.On("Address").Return(netip.MustParseAddrPort("203.0.113.30:443")) + relay.On("SetOnConnect", mock.Anything).Return().Once() + relay.On("SetOnDisconnect", mock.Anything).Return().Once() + + r := controllers.NewTunnelAgentReconciler(c, relay, "") + + conn := &mockConn{} + conn.On("ID").Return("conn-stat").Maybe() + + require.NoError(t, r.AddConnection(ctx, tunnel.Name, agent.Name, conn)) + + lastRX := time.Date(2025, 1, 2, 3, 4, 5, 0, time.UTC) + conn.On("Stats").Return(controllers.ConnectionStats{ + RXBytes: 1111, + TXBytes: 2222, + LastRX: lastRX, + }, true).Once() + + r.PushStatsOnce(ctx) + + var got corev1alpha2.TunnelAgent + require.NoError(t, c.Get(ctx, types.NamespacedName{Name: agent.Name}, &got)) + require.Len(t, got.Status.Connections, 1) + entry := got.Status.Connections[0] + assert.Equal(t, "conn-stat", entry.ID) + assert.Equal(t, uint64(1111), entry.RXBytes) + assert.Equal(t, uint64(2222), entry.TxBytes) + require.NotNil(t, entry.LastRXTimestamp) + assert.True(t, entry.LastRXTimestamp.Time.Equal(lastRX)) + + relay.AssertExpectations(t) + conn.AssertExpectations(t) +} + func mkTunnel(tunnelName string) *corev1alpha2.Tunnel { return &corev1alpha2.Tunnel{ TypeMeta: metav1.TypeMeta{ @@ -278,6 +330,11 @@ func (m *mockConn) SetVNI(ctx context.Context, v uint) error { return args.Error(0) } +func (m *mockConn) Stats() (controllers.ConnectionStats, bool) { + args := m.Called() + return args.Get(0).(controllers.ConnectionStats), args.Bool(1) +} + func (m *mockConn) Close() error { args := m.Called() return args.Error(0)