Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/core/v1alpha2/tunnel_agent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ type TunnelAgentConnection struct {
// VNI is the 24-bit virtual network identifier used for this connection, if applicable.
// +optional
VNI *uint `json:"vni,omitempty,omitzero"`

// LastRXTimestamp is the last time a packet was received from the agent on
// this connection.
// +optional
LastRXTimestamp *metav1.Time `json:"lastRxTimestamp,omitempty,omitzero"`

// RXBytes is the total number of bytes received from the agent on this connection.
// +optional
RXBytes uint64 `json:"rxBytes,omitempty,omitzero"`

// TXBytes is the total number of bytes transmitted to the agent on this connection.
// +optional
TxBytes uint64 `json:"txBytes,omitempty,omitzero"`
}

// TunnelAgentStatus represents the status of a tunnel agent.
Expand Down
4 changes: 4 additions & 0 deletions api/core/v1alpha2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions api/generated/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

179 changes: 158 additions & 21 deletions pkg/apiserver/controllers/tunnel_agent_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controllers
import (
"context"
"fmt"
"log/slog"
"net/netip"
"time"

Expand All @@ -14,6 +15,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
controllerlog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"

corev1alpha2 "github.com/apoxy-dev/apoxy/api/core/v1alpha2"
Expand All @@ -26,7 +28,12 @@ import (
// +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnelagents/finalizers,verbs=update
// +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnels,verbs=get;list;watch

const indexControllerOwnerUID = ".metadata.controllerOwnerUID"
const (
// must be longer than the relays own gc max silence
gcMaxSilence = 5 * time.Minute
gcCheckInterval = time.Minute
indexControllerOwnerUID = ".metadata.controllerOwnerUID"
)

type TunnelAgentReconciler struct {
client client.Client
Expand Down Expand Up @@ -64,7 +71,8 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request)

changed, err := r.releaseResourcesIfPresent(ctx, log, req.NamespacedName)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to release resources: %w", err)
log.Error(err, "Failed to release resources for TunnelAgent")
// not retryable, just log and continue so we don't block deletion.
}

// Refetch to avoid conflicts if we modified the object
Expand Down Expand Up @@ -154,6 +162,27 @@ func (r *TunnelAgentReconciler) SetupWithManager(ctx context.Context, mgr ctrl.M
return err
}

// Run periodic orphaned connection cleanup
err := mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
ticker := time.NewTicker(gcCheckInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil

case <-ticker.C:
if err := r.PruneOrphanedConnections(ctx); err != nil {
slog.Warn("Failed to run orphaned connection cleanup", slog.Any("error", err))
}
}
}
}))
if err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&corev1alpha2.TunnelAgent{}, builder.WithPredicates(&predicate.ResourceVersionChangedPredicate{})).
Complete(r)
Expand Down Expand Up @@ -234,6 +263,34 @@ func (r *TunnelAgentReconciler) ensureConnectionAllocations(
})
}

// releaseConnectionResources releases any resources held by a single connection.
// It attempts to release both the IP prefix and the VNI; it returns the first
// error encountered but will attempt both releases regardless.
func (r *TunnelAgentReconciler) releaseConnectionResources(addr string, vniPtr *uint) error {
var firstErr error

// Release overlay address/prefix (if set)
if addr != "" {
pfx, err := netip.ParsePrefix(addr)
if err != nil {
if firstErr == nil {
firstErr = fmt.Errorf("failed to parse address %q for release: %w", addr, err)
}
} else {
if err := r.agentIPAM.Release(pfx); err != nil && firstErr == nil {
firstErr = fmt.Errorf("failed to release address %q: %w", addr, err)
}
}
}

// Release VNI (if set)
if vniPtr != nil {
r.vniPool.Release(*vniPtr)
}

return firstErr
}

func (r *TunnelAgentReconciler) releaseResourcesIfPresent(
ctx context.Context,
log logr.Logger,
Expand All @@ -257,28 +314,19 @@ func (r *TunnelAgentReconciler) releaseResourcesIfPresent(
for i := range cur.Status.Connections {
conn := &cur.Status.Connections[i]

// Release overlay address/prefix (if set)
if conn.Address != "" {
pfx, err := netip.ParsePrefix(conn.Address)
if err != nil {
return fmt.Errorf("failed to parse address %q for release: %w", conn.Address, err)
}
if err := r.agentIPAM.Release(pfx); err != nil {
return fmt.Errorf("failed to release address %q: %w", conn.Address, err)
}
log.Info("Released overlay address", "connectionID", conn.ID, "address", conn.Address)
conn.Address = ""
changed = true
if conn.Address == "" && conn.VNI == nil {
continue
}

// Release VNI (if set)
if conn.VNI != nil {
vni := *conn.VNI
r.vniPool.Release(vni)
log.Info("Released VNI", "connectionID", conn.ID, "vni", vni)
conn.VNI = nil
changed = true
// Release any resources the connection holds.
if err := r.releaseConnectionResources(conn.Address, conn.VNI); err != nil {
return err
}
log.Info("Released resources for connection", "connectionID", conn.ID, "address", conn.Address, "vni", conn.VNI)

conn.Address = ""
conn.VNI = nil
changed = true
}

if changed {
Expand All @@ -304,3 +352,92 @@ func (r *TunnelAgentReconciler) ensureControllerOwner(child client.Object, owner
}
return true, nil
}

// PruneOrphanedConnections prunes orphaned connections from TunnelAgent status
// (due to a relay unexpectedly shutting down). This is exposed for testing purposes.
func (r *TunnelAgentReconciler) PruneOrphanedConnections(ctx context.Context) error {
var agents corev1alpha2.TunnelAgentList
if err := r.client.List(ctx, &agents); err != nil {
return err
}

var firstErr error
for i := range agents.Items {
agent := &agents.Items[i]
key := client.ObjectKeyFromObject(agent)

err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var cur corev1alpha2.TunnelAgent
if err := r.client.Get(ctx, key, &cur); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}

// No connections to prune
if len(cur.Status.Connections) == 0 {
return nil
}

now := time.Now().UTC()
conns := make([]corev1alpha2.TunnelAgentConnection, 0, len(cur.Status.Connections))
updated := false

for j := range cur.Status.Connections {
conn := &cur.Status.Connections[j]

// Determine orphaned-ness
isOrphaned := false
switch {
case conn.LastRXTimestamp != nil:
isOrphaned = conn.LastRXTimestamp.Add(gcMaxSilence).Before(now)
case conn.ConnectedAt != nil:
isOrphaned = conn.ConnectedAt.Add(gcMaxSilence).Before(now)
default:
isOrphaned = false
}
if !isOrphaned {
conns = append(conns, *conn)
continue
}

slog.Info("Pruning orphaned connection from TunnelAgent",
slog.String("agent", agent.Name),
slog.String("connectionID", conn.ID),
slog.String("address", conn.Address))

// Release any resources the orphaned connection holds.
if err := r.releaseConnectionResources(conn.Address, conn.VNI); err != nil {
slog.Warn("Failed to release resources for orphaned connection",
slog.String("agent", agent.Name),
slog.String("connectionID", conn.ID),
slog.String("address", conn.Address),
slog.Any("error", err))
}

// Do not append to the new list: this prunes the connection.
updated = true
}
if !updated {
return nil
}

cur.Status.Connections = conns
if err := r.client.Status().Update(ctx, &cur); err != nil {
return err
}
return nil
})

if err != nil {
if firstErr == nil {
firstErr = err
}
slog.Warn("Failed pruning orphaned connections for agent",
slog.String("agent", agent.Name), slog.Any("error", err))
}
}

return firstErr
}
89 changes: 89 additions & 0 deletions pkg/apiserver/controllers/tunnel_agent_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controllers_test
import (
"net/netip"
"testing"
"time"

"github.com/stretchr/testify/require"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -145,6 +146,94 @@ func TestTunnelAgentReconciler_DeletionReleasesResourcesAndRemovesFinalizer(t *t
require.Equal(t, uint(1), reV, "expected released VNI to be available again")
}

func TestTunnelAgentPruneOrphanedConnections(t *testing.T) {
ctx := ctrl.LoggerInto(t.Context(), testLogr(t))

// Scheme
scheme := runtime.NewScheme()
require.NoError(t, corev1alpha2.Install(scheme))

// IPAM + VNI pool
systemULA := tunnet.NewULA(ctx, tunnet.SystemNetworkID)
agentIPAM, err := systemULA.IPAM(ctx, 96)
require.NoError(t, err)

vpool := vni.NewVNIPool()

// Allocate resources we'll assign to connections
pfxOrphaned, err := agentIPAM.Allocate()
require.NoError(t, err)
vOrphaned, err := vpool.Allocate()
require.NoError(t, err)
require.Equal(t, uint(1), vOrphaned)

pfxFresh, err := agentIPAM.Allocate()
require.NoError(t, err)
vFresh, err := vpool.Allocate()
require.NoError(t, err)
require.Equal(t, uint(2), vFresh)

// Times must be metav1.Time, not time.Time
now := time.Now().UTC()
orphaned := metav1.Time{Time: now.Add(-1 * time.Hour)} // orphaned -> should be pruned
fresh := metav1.Time{Time: now.Add(-1 * time.Minute)} // fresh -> should stay

agent := &corev1alpha2.TunnelAgent{
TypeMeta: metav1.TypeMeta{
Kind: "TunnelAgent",
APIVersion: "core.apoxy.dev/v1alpha2",
},
ObjectMeta: metav1.ObjectMeta{
Name: "agent-gc",
},
Spec: corev1alpha2.TunnelAgentSpec{
TunnelRef: corev1alpha2.TunnelRef{Name: "tun-any"},
},
Status: corev1alpha2.TunnelAgentStatus{
Connections: []corev1alpha2.TunnelAgentConnection{
{
ID: "conn-orphaned",
Address: pfxOrphaned.String(),
VNI: &vOrphaned,
LastRXTimestamp: &orphaned,
},
{
ID: "conn-fresh",
Address: pfxFresh.String(),
VNI: &vFresh,
LastRXTimestamp: &fresh,
},
},
},
}

c := fakeclient.NewClientBuilder().
WithScheme(scheme).
WithStatusSubresource(&corev1alpha2.Tunnel{}, &corev1alpha2.TunnelAgent{}).
WithObjects(agent).
Build()

r := controllers.NewTunnelAgentReconciler(c, agentIPAM, vpool)

// WHEN GC runs
require.NoError(t, r.PruneOrphanedConnections(ctx))

// THEN orphan pruned, fresh kept
var got corev1alpha2.TunnelAgent
require.NoError(t, c.Get(ctx, types.NamespacedName{Name: agent.Name}, &got))
require.Len(t, got.Status.Connections, 1)
require.Equal(t, "conn-fresh", got.Status.Connections[0].ID)

// Released resources are available again
rePfx, err := agentIPAM.Allocate()
require.NoError(t, err)
require.Equal(t, pfxOrphaned, rePfx)

reV, err := vpool.Allocate()
require.NoError(t, err)
require.Equal(t, vOrphaned, reV)
}

func mkAgentWithEmptyConnection(name, tunnelName string) *corev1alpha2.TunnelAgent {
return &corev1alpha2.TunnelAgent{
TypeMeta: metav1.TypeMeta{Kind: "TunnelAgent", APIVersion: "core.apoxy.dev/v1alpha2"},
Expand Down
Loading