From 8930ea860222bb337aac8ca8e789ff057004d9a5 Mon Sep 17 00:00:00 2001 From: Damian Peckett Date: Thu, 4 Sep 2025 11:09:48 +0200 Subject: [PATCH 1/3] [tunnelv2] add controllers for new types --- api/core/v1alpha2/tunnel_agent_types.go | 6 +- api/core/v1alpha2/zz_generated.deepcopy.go | 5 + api/generated/zz_generated.openapi.go | 9 +- .../controllers/tunnel_agent_reconciler.go | 317 ++++++++++++++++++ .../tunnel_agent_reconciler_test.go | 93 +++++ .../controllers/tunnel_reconciler.go | 122 +++++++ .../controllers/tunnel_reconciler_test.go | 77 +++++ pkg/apiserver/controllers/tunnelnode.go | 8 +- pkg/tunnel/controllers/connection.go | 14 + pkg/tunnel/controllers/relay.go | 13 + .../controllers/tunnel_agent_reconciler.go | 235 +++++++++++++ .../tunnel_agent_reconciler_test.go | 235 +++++++++++++ pkg/tunnel/controllers/tunnel_reconciler.go | 62 ++++ .../controllers/tunnel_reconciler_test.go | 76 +++++ pkg/tunnel/kex/server.go | 7 +- pkg/tunnel/{kex/vni_pool.go => vni/pool.go} | 6 +- 16 files changed, 1273 insertions(+), 12 deletions(-) create mode 100644 pkg/apiserver/controllers/tunnel_agent_reconciler.go create mode 100644 pkg/apiserver/controllers/tunnel_agent_reconciler_test.go create mode 100644 pkg/apiserver/controllers/tunnel_reconciler.go create mode 100644 pkg/apiserver/controllers/tunnel_reconciler_test.go create mode 100644 pkg/tunnel/controllers/connection.go create mode 100644 pkg/tunnel/controllers/relay.go create mode 100644 pkg/tunnel/controllers/tunnel_agent_reconciler.go create mode 100644 pkg/tunnel/controllers/tunnel_agent_reconciler_test.go create mode 100644 pkg/tunnel/controllers/tunnel_reconciler.go create mode 100644 pkg/tunnel/controllers/tunnel_reconciler_test.go rename pkg/tunnel/{kex/vni_pool.go => vni/pool.go} (91%) diff --git a/api/core/v1alpha2/tunnel_agent_types.go b/api/core/v1alpha2/tunnel_agent_types.go index c7ab0b4..eca59f6 100644 --- a/api/core/v1alpha2/tunnel_agent_types.go +++ b/api/core/v1alpha2/tunnel_agent_types.go @@ -47,13 +47,17 @@ type TunnelAgentConnection struct { // +optional ConnectedAt *metav1.Time `json:"connectedAt,omitempty,omitzero"` - // Address is the overlay address of the agent assigned to this connection. + // Address is the overlay address/cidr of the agent assigned to this connection. // +optional Address string `json:"address,omitempty,omitzero"` // RelayAddress is the address of the relay managing this connection. // +optional RelayAddress string `json:"relayAddress,omitempty,omitzero"` + + // VNI is the 24-bit virtual network identifier used for this connection, if applicable. + // +optional + VNI *int `json:"vni,omitempty,omitzero"` } // TunnelAgentStatus represents the status of a tunnel agent. diff --git a/api/core/v1alpha2/zz_generated.deepcopy.go b/api/core/v1alpha2/zz_generated.deepcopy.go index 6ab0631..725c6e3 100644 --- a/api/core/v1alpha2/zz_generated.deepcopy.go +++ b/api/core/v1alpha2/zz_generated.deepcopy.go @@ -103,6 +103,11 @@ func (in *TunnelAgentConnection) DeepCopyInto(out *TunnelAgentConnection) { in, out := &in.ConnectedAt, &out.ConnectedAt *out = (*in).DeepCopy() } + if in.VNI != nil { + in, out := &in.VNI, &out.VNI + *out = new(int) + **out = **in + } return } diff --git a/api/generated/zz_generated.openapi.go b/api/generated/zz_generated.openapi.go index a12c631..ddbb774 100644 --- a/api/generated/zz_generated.openapi.go +++ b/api/generated/zz_generated.openapi.go @@ -2917,7 +2917,7 @@ func schema_apoxy_api_core_v1alpha2_TunnelAgentConnection(ref common.ReferenceCa }, "address": { SchemaProps: spec.SchemaProps{ - Description: "Address is the overlay address of the agent assigned to this connection.", + Description: "Address is the overlay address/cidr of the agent assigned to this connection.", Type: []string{"string"}, Format: "", }, @@ -2929,6 +2929,13 @@ func schema_apoxy_api_core_v1alpha2_TunnelAgentConnection(ref common.ReferenceCa Format: "", }, }, + "vni": { + SchemaProps: spec.SchemaProps{ + Description: "VNI is the 24-bit virtual network identifier used for this connection, if applicable.", + Type: []string{"integer"}, + Format: "int32", + }, + }, }, Required: []string{"id"}, }, diff --git a/pkg/apiserver/controllers/tunnel_agent_reconciler.go b/pkg/apiserver/controllers/tunnel_agent_reconciler.go new file mode 100644 index 0000000..88d8401 --- /dev/null +++ b/pkg/apiserver/controllers/tunnel_agent_reconciler.go @@ -0,0 +1,317 @@ +package controllers + +import ( + "context" + "fmt" + "net/netip" + "time" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/util/retry" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + controllerlog "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + corev1alpha2 "github.com/apoxy-dev/apoxy/api/core/v1alpha2" + tunnet "github.com/apoxy-dev/apoxy/pkg/tunnel/net" + "github.com/apoxy-dev/apoxy/pkg/tunnel/vni" +) + +const ( + indexByTunnelRef = "spec.tunnelRef.name" +) + +// +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnelagents,verbs=get;list;watch;update;patch +// +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnelagents/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnelagents/finalizers,verbs=update +// +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnels,verbs=get;list;watch + +type TunnelAgentReconciler struct { + client client.Client + agentIPAM tunnet.IPAM + vniPool *vni.VNIPool +} + +func NewTunnelAgentReconciler(c client.Client, agentIPAM tunnet.IPAM, vniPool *vni.VNIPool) *TunnelAgentReconciler { + return &TunnelAgentReconciler{ + client: c, + agentIPAM: agentIPAM, + vniPool: vniPool, + } +} + +func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := controllerlog.FromContext(ctx, "name", req.Name) + + var agent corev1alpha2.TunnelAgent + if err := r.client.Get(ctx, req.NamespacedName, &agent); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + // handle deletion + if !agent.DeletionTimestamp.IsZero() { + if controllerutil.ContainsFinalizer(&agent, ApiServerFinalizer) { + if err := r.releaseResourcesIfPresent(log, &agent); err != nil { + log.Error(err, "failed to release resources; will retry") + return ctrl.Result{}, fmt.Errorf("failed to release resources: %w", err) + } + + // Remove finalizer + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var cur corev1alpha2.TunnelAgent + if getErr := r.client.Get(ctx, req.NamespacedName, &cur); getErr != nil { + return getErr + } + controllerutil.RemoveFinalizer(&cur, ApiServerFinalizer) + return r.client.Update(ctx, &cur) + }); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil + } + + // ensure finalizer + if !controllerutil.ContainsFinalizer(&agent, ApiServerFinalizer) { + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var cur corev1alpha2.TunnelAgent + if getErr := r.client.Get(ctx, req.NamespacedName, &cur); getErr != nil { + return getErr + } + controllerutil.AddFinalizer(&cur, ApiServerFinalizer) + return r.client.Update(ctx, &cur) + }); err != nil { + return ctrl.Result{}, err + } + } + + // fetch owner Tunnel + tunnelName := agent.Spec.TunnelRef.Name + if tunnelName == "" { + log.Info("tunnelRef.name is empty; skipping") + return ctrl.Result{}, nil + } + + var tunnel corev1alpha2.Tunnel + if err := r.client.Get(ctx, client.ObjectKey{Name: tunnelName}, &tunnel); err != nil { + if apierrors.IsNotFound(err) { + log.Info("Referenced Tunnel not found; will retry", "tunnel", tunnelName) + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } + return ctrl.Result{}, err + } + + // ensure controller ownerRef agent -> tunnel (retry on conflict) + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var cur corev1alpha2.TunnelAgent + if getErr := r.client.Get(ctx, req.NamespacedName, &cur); getErr != nil { + return getErr + } + changed, ensureErr := r.ensureControllerOwner(&cur, &tunnel) + if ensureErr != nil { + return ensureErr + } + if !changed { + return nil + } + return r.client.Update(ctx, &cur) + }); err != nil { + return ctrl.Result{}, err + } + + // Assign overlay addresses and VNIs for any connections missing them + if err := r.ensureConnectionAllocations(ctx, log, req.NamespacedName); err != nil { + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil +} + +func (r *TunnelAgentReconciler) SetupWithManager(mgr ctrl.Manager) error { + // field index + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1alpha2.TunnelAgent{}, indexByTunnelRef, + func(obj client.Object) []string { + ta := obj.(*corev1alpha2.TunnelAgent) + if ta.Spec.TunnelRef.Name == "" { + return nil + } + return []string{ta.Spec.TunnelRef.Name} + }); err != nil { + return fmt.Errorf("index TunnelAgents by TunnelRef: %w", err) + } + + // map Tunnel -> its agents + mapTunnelToAgents := handler.TypedEnqueueRequestsFromMapFunc[*corev1alpha2.Tunnel](func(ctx context.Context, t *corev1alpha2.Tunnel) []reconcile.Request { + var list corev1alpha2.TunnelAgentList + if err := mgr.GetClient().List(ctx, &list, client.MatchingFields{indexByTunnelRef: t.Name}); err != nil { + return nil + } + reqs := make([]reconcile.Request, 0, len(list.Items)) + for _, ta := range list.Items { + reqs = append(reqs, reconcile.Request{NamespacedName: client.ObjectKey{Name: ta.Name}}) + } + return reqs + }) + + // Reconcile when spec generation changes OR when status (e.g., Connections) changes. + statusChanged := predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + oldObj, ok1 := e.ObjectOld.(*corev1alpha2.TunnelAgent) + newObj, ok2 := e.ObjectNew.(*corev1alpha2.TunnelAgent) + if !ok1 || !ok2 { + return false + } + genChanged := oldObj.GetGeneration() != newObj.GetGeneration() + statusDiff := !equality.Semantic.DeepEqual(oldObj.Status, newObj.Status) + return genChanged || statusDiff + }, + } + + return ctrl.NewControllerManagedBy(mgr). + For(&corev1alpha2.TunnelAgent{}, builder.WithPredicates(statusChanged)). + WatchesRawSource( + source.Kind(mgr.GetCache(), &corev1alpha2.Tunnel{}, mapTunnelToAgents), + ). + Complete(r) +} + +func (r *TunnelAgentReconciler) ensureConnectionAllocations( + ctx context.Context, + log logr.Logger, + key client.ObjectKey, +) error { + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var cur corev1alpha2.TunnelAgent + if getErr := r.client.Get(ctx, key, &cur); getErr != nil { + return getErr + } + + // Track newly made allocations so we can roll them back if Status().Update fails. + var newlyAllocatedPrefixes []netip.Prefix + var newlyAllocatedVNIs []uint32 + + needsUpdate := false + addrAssigned := 0 + vniAssigned := 0 + + for i := range cur.Status.Connections { + conn := &cur.Status.Connections[i] + + // Allocate overlay address if missing + if conn.Address == "" { + pfx, ipErr := r.agentIPAM.Allocate() + if ipErr != nil { + // rollback anything we grabbed this pass + for _, p := range newlyAllocatedPrefixes { + _ = r.agentIPAM.Release(p) + } + for _, v := range newlyAllocatedVNIs { + r.vniPool.Free(v) + } + log.Error(ipErr, "failed to allocate address") + return fmt.Errorf("failed to allocate address: %w", ipErr) + } + conn.Address = pfx.String() + newlyAllocatedPrefixes = append(newlyAllocatedPrefixes, pfx) + addrAssigned++ + needsUpdate = true + } + + // Allocate VNI if missing (nil means "unset"; zero can be valid but your pool won't return 0) + if conn.VNI == nil { + v, vErr := r.vniPool.Allocate() + if vErr != nil { + // rollback anything we grabbed this pass + for _, p := range newlyAllocatedPrefixes { + _ = r.agentIPAM.Release(p) + } + for _, nv := range newlyAllocatedVNIs { + r.vniPool.Free(nv) + } + log.Error(vErr, "failed to allocate VNI") + return fmt.Errorf("failed to allocate VNI: %w", vErr) + } + vInt := int(v) // status uses *int; vniPool returns uint32 + conn.VNI = &vInt + newlyAllocatedVNIs = append(newlyAllocatedVNIs, v) + vniAssigned++ + needsUpdate = true + } + } + + if !needsUpdate { + log.Info("no connections missing address or VNI") + return nil + } + + // Commit to status; if it fails, release fresh allocations from this attempt. + if updErr := r.client.Status().Update(ctx, &cur); updErr != nil { + for _, p := range newlyAllocatedPrefixes { + _ = r.agentIPAM.Release(p) + } + for _, v := range newlyAllocatedVNIs { + r.vniPool.Free(v) + } + log.Error(updErr, "status update failed; released newly allocated resources", + "addresses", addrAssigned, "vnis", vniAssigned) + return updErr + } + + log.Info("assigned resources to connections", + "addresses", addrAssigned, "vnis", vniAssigned) + return nil + }) +} + +func (r *TunnelAgentReconciler) ensureControllerOwner(child client.Object, owner client.Object) (bool, error) { + for _, or := range child.GetOwnerReferences() { + if or.UID == owner.GetUID() && or.Controller != nil && *or.Controller { + return false, nil + } + } + + // Set controller reference (overwrites any existing controller owner) + if err := controllerutil.SetControllerReference( + owner, + child, + r.client.Scheme(), + ); err != nil { + return false, err + } + + return true, nil +} + +func (r *TunnelAgentReconciler) releaseResourcesIfPresent(log logr.Logger, agent *corev1alpha2.TunnelAgent) error { + for _, conn := range agent.Status.Connections { + // Release overlay address/prefix + if conn.Address != "" { + if pfx, err := netip.ParsePrefix(conn.Address); err == nil { + if relErr := r.agentIPAM.Release(pfx); relErr != nil { + log.Error(relErr, "failed to release prefix", "address", conn.Address) + return relErr + } + } else { + log.Error(fmt.Errorf("unrecognized address format"), "skipping address release", "address", conn.Address) + } + } + + // Release VNI + if conn.VNI != nil { + r.vniPool.Free(uint32(*conn.VNI)) + } + } + return nil +} diff --git a/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go b/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go new file mode 100644 index 0000000..1f6d2ea --- /dev/null +++ b/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go @@ -0,0 +1,93 @@ +package controllers_test + +import ( + "net/netip" + "testing" + + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + corev1alpha2 "github.com/apoxy-dev/apoxy/api/core/v1alpha2" + "github.com/apoxy-dev/apoxy/pkg/apiserver/controllers" + tunnet "github.com/apoxy-dev/apoxy/pkg/tunnel/net" + "github.com/apoxy-dev/apoxy/pkg/tunnel/vni" +) + +func TestTunnelAgentReconciler(t *testing.T) { + ctx := ctrl.LoggerInto(t.Context(), testLogr(t)) + + // Scheme + scheme := runtime.NewScheme() + require.NoError(t, corev1alpha2.Install(scheme)) + + systemULA := tunnet.NewULA(ctx, tunnet.SystemNetworkID) + // Agent allocations are still prefixes (e.g., /96) + agentIPAM, err := systemULA.IPAM(ctx, 96) + require.NoError(t, err) + + // Given: a Tunnel and a TunnelAgent with one connection missing Address + tun := mkTunnel("tun-happy") + agent := mkAgentWithEmptyConnection("agent-happy", "tun-happy") + + c := fakeclient.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&corev1alpha2.Tunnel{}, &corev1alpha2.TunnelAgent{}). + WithObjects(tun, agent). + Build() + + r := controllers.NewTunnelAgentReconciler(c, agentIPAM, vni.NewVNIPool()) + + // When + _, err = r.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Name: agent.Name}}) + require.NoError(t, err) + + // Then + var got corev1alpha2.TunnelAgent + require.NoError(t, c.Get(ctx, types.NamespacedName{Name: agent.Name}, &got)) + + // Finalizer added + require.True(t, controllerutil.ContainsFinalizer(&got, controllers.ApiServerFinalizer)) + + // Owner reference to parent Tunnel + require.Len(t, got.OwnerReferences, 1) + require.Equal(t, tun.Name, got.OwnerReferences[0].Name) + + // Address assigned in status for the connection that was empty + require.Len(t, got.Status.Connections, 1) + require.NotEmpty(t, got.Status.Connections[0].Address, "expected Address to be set on the connection") + + if pfx, err := netip.ParsePrefix(got.Status.Connections[0].Address); err == nil { + require.True(t, pfx.IsValid(), "allocated overlay prefix should be valid") + } else { + t.Fatalf("allocated Address is not a valid prefix: %v", err) + } + + // VNI assigned in status for the connection that was empty + require.NotNil(t, got.Status.Connections[0].VNI, "expected VNI to be set on the connection") + require.Greater(t, *got.Status.Connections[0].VNI, 0) +} + +func mkAgentWithEmptyConnection(name, tunnelName string) *corev1alpha2.TunnelAgent { + return &corev1alpha2.TunnelAgent{ + TypeMeta: metav1.TypeMeta{Kind: "TunnelAgent", APIVersion: "core.apoxy.dev/v1alpha2"}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: corev1alpha2.TunnelAgentSpec{ + TunnelRef: corev1alpha2.TunnelRef{Name: tunnelName}, + }, + Status: corev1alpha2.TunnelAgentStatus{ + Connections: []corev1alpha2.TunnelAgentConnection{ + { + ID: "conn-1", + RelayAddress: "relay-1", + }, + }, + }, + } +} diff --git a/pkg/apiserver/controllers/tunnel_reconciler.go b/pkg/apiserver/controllers/tunnel_reconciler.go new file mode 100644 index 0000000..a3d48bb --- /dev/null +++ b/pkg/apiserver/controllers/tunnel_reconciler.go @@ -0,0 +1,122 @@ +package controllers + +import ( + "context" + "crypto/rand" + "encoding/base64" + "io" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/util/retry" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + controllerlog "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + corev1alpha2 "github.com/apoxy-dev/apoxy/api/core/v1alpha2" +) + +// +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnels,verbs=get;list;watch;update;patch +// +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnels/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnels/finalizers,verbs=update + +const tokenLength = 32 // 32 bytes -> 43 char base64 string + +type TunnelReconciler struct { + client client.Client +} + +func NewTunnelReconciler(c client.Client) *TunnelReconciler { + return &TunnelReconciler{client: c} +} + +func (r *TunnelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := controllerlog.FromContext(ctx, "name", req.Name) + + var tunnel corev1alpha2.Tunnel + if err := r.client.Get(ctx, req.NamespacedName, &tunnel); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + // handle deletion + if !tunnel.DeletionTimestamp.IsZero() { + if controllerutil.ContainsFinalizer(&tunnel, ApiServerFinalizer) { + log.Info("handling Tunnel deletion") + + // Remove finalizer + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var cur corev1alpha2.Tunnel + if getErr := r.client.Get(ctx, req.NamespacedName, &cur); getErr != nil { + return getErr + } + controllerutil.RemoveFinalizer(&cur, ApiServerFinalizer) + return r.client.Update(ctx, &cur) + }); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil + } + + // ensure finalizer + if !controllerutil.ContainsFinalizer(&tunnel, ApiServerFinalizer) { + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var cur corev1alpha2.Tunnel + if getErr := r.client.Get(ctx, req.NamespacedName, &cur); getErr != nil { + return getErr + } + controllerutil.AddFinalizer(&cur, ApiServerFinalizer) + return r.client.Update(ctx, &cur) + }); err != nil { + return ctrl.Result{}, err + } + } + + // ensure bearer token + if tunnel.Status.Credentials == nil || tunnel.Status.Credentials.Token == "" { + token, err := generateBearerToken(tokenLength) + if err != nil { + return ctrl.Result{}, err + } + + log.Info("Generating new bearer token for Tunnel") + + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var cur corev1alpha2.Tunnel + if getErr := r.client.Get(ctx, req.NamespacedName, &cur); getErr != nil { + return getErr + } + if cur.Status.Credentials == nil { + cur.Status.Credentials = &corev1alpha2.TunnelCredentials{} + } + cur.Status.Credentials.Token = token + return r.client.Status().Update(ctx, &cur) + }); err != nil { + return ctrl.Result{}, err + } + } + + log.Info("Tunnel reconciled successfully") + return ctrl.Result{}, nil +} + +func (r *TunnelReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1alpha2.Tunnel{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Complete(r) +} + +// generateBearerToken creates a random, base64-url-encoded token of n bytes. +func generateBearerToken(n int) (string, error) { + b := make([]byte, n) + if _, err := io.ReadFull(rand.Reader, b); err != nil { + return "", err + } + // URL-safe base64 without padding + return base64.RawURLEncoding.EncodeToString(b), nil +} diff --git a/pkg/apiserver/controllers/tunnel_reconciler_test.go b/pkg/apiserver/controllers/tunnel_reconciler_test.go new file mode 100644 index 0000000..1f2a45d --- /dev/null +++ b/pkg/apiserver/controllers/tunnel_reconciler_test.go @@ -0,0 +1,77 @@ +package controllers_test + +import ( + "log" + "os" + "testing" + + "github.com/go-logr/logr" + "github.com/go-logr/stdr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + + ctrl "sigs.k8s.io/controller-runtime" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + corev1alpha2 "github.com/apoxy-dev/apoxy/api/core/v1alpha2" + "github.com/apoxy-dev/apoxy/pkg/apiserver/controllers" +) + +func TestTunnelReconciler(t *testing.T) { + ctx := ctrl.LoggerInto(t.Context(), testLogr(t)) + + // Scheme + scheme := runtime.NewScheme() + require.NoError(t, corev1alpha2.Install(scheme)) + + // Given a Tunnel without credentials + tun := mkTunnel("tun-happy") + + c := fakeclient.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&corev1alpha2.Tunnel{}). + WithObjects(tun). + Build() + + r := controllers.NewTunnelReconciler(c) + + // When + _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Name: tun.Name}}) + require.NoError(t, err) + + // Then + var got corev1alpha2.Tunnel + require.NoError(t, c.Get(ctx, types.NamespacedName{Name: tun.Name}, &got)) + + // Finalizer added + assert.True(t, controllerutil.ContainsFinalizer(&got, controllers.ApiServerFinalizer)) + + // Token created in status + require.NotNil(t, got.Status.Credentials) + assert.NotEmpty(t, got.Status.Credentials.Token, "expected an opaque bearer token to be generated") +} + +func testLogr(t *testing.T) logr.Logger { + if testing.Verbose() { + l := stdr.New(log.New(os.Stdout, "", log.LstdFlags)) + return l + } + return logr.Discard() +} + +func mkTunnel(name string) *corev1alpha2.Tunnel { + return &corev1alpha2.Tunnel{ + TypeMeta: metav1.TypeMeta{ + Kind: "Tunnel", + APIVersion: "core.apoxy.dev/v1alpha2", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } +} diff --git a/pkg/apiserver/controllers/tunnelnode.go b/pkg/apiserver/controllers/tunnelnode.go index 2ca78d9..4ef0399 100644 --- a/pkg/apiserver/controllers/tunnelnode.go +++ b/pkg/apiserver/controllers/tunnelnode.go @@ -21,7 +21,7 @@ import ( corev1alpha "github.com/apoxy-dev/apoxy/api/core/v1alpha" ) -const apiserverFinalizer = "apiserver.apoxy.dev/finalizer" +const ApiServerFinalizer = "apiserver.apoxy.dev/finalizer" // TunnelNodeReconciler implements a basic garbage collector for dead/orphaned // TunnelNode objects. @@ -138,7 +138,7 @@ func (r *TunnelNodeReconciler) Reconcile(ctx context.Context, req reconcile.Requ } } - controllerutil.RemoveFinalizer(tn, apiserverFinalizer) + controllerutil.RemoveFinalizer(tn, ApiServerFinalizer) if err := r.Update(ctx, tn); err != nil { return ctrl.Result{}, err } @@ -146,9 +146,9 @@ func (r *TunnelNodeReconciler) Reconcile(ctx context.Context, req reconcile.Requ return ctrl.Result{}, nil // Deleted } - if !controllerutil.ContainsFinalizer(tn, apiserverFinalizer) { + if !controllerutil.ContainsFinalizer(tn, ApiServerFinalizer) { log.Info("Adding finalizer to Proxy") - controllerutil.AddFinalizer(tn, apiserverFinalizer) + controllerutil.AddFinalizer(tn, ApiServerFinalizer) if err := r.Update(ctx, tn); err != nil { return ctrl.Result{}, err } diff --git a/pkg/tunnel/controllers/connection.go b/pkg/tunnel/controllers/connection.go new file mode 100644 index 0000000..b5a5663 --- /dev/null +++ b/pkg/tunnel/controllers/connection.go @@ -0,0 +1,14 @@ +package controllers + +import "io" + +// Connection is a simple abstraction representing a connection from a TunnelAgent to a Relay. +type Connection interface { + io.Closer + // ID is the unique identifier of the connection. + ID() string + // Set the overlay address/cidr assigned to this connection. + SetAddress(addr string) + // Set the VNI assigned to this connection. + SetVNI(vni uint32) +} diff --git a/pkg/tunnel/controllers/relay.go b/pkg/tunnel/controllers/relay.go new file mode 100644 index 0000000..801b807 --- /dev/null +++ b/pkg/tunnel/controllers/relay.go @@ -0,0 +1,13 @@ +package controllers + +import "net/netip" + +// Relay is a simple abstraction representing a relay server that TunnelAgents connect to. +type Relay interface { + // Name is the name of the relay. + Name() string + // Address is the address of the relay. + Address() netip.AddrPort + // SetCredentials sets the authentication token used by agents to authenticate with the relay. + SetCredentials(tunnelName, token string) +} diff --git a/pkg/tunnel/controllers/tunnel_agent_reconciler.go b/pkg/tunnel/controllers/tunnel_agent_reconciler.go new file mode 100644 index 0000000..bb20b45 --- /dev/null +++ b/pkg/tunnel/controllers/tunnel_agent_reconciler.go @@ -0,0 +1,235 @@ +package controllers + +import ( + "context" + "fmt" + + "github.com/alphadose/haxmap" + "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + corev1alpha2 "github.com/apoxy-dev/apoxy/api/core/v1alpha2" +) + +const tunnelRelayFinalizerTmpl = "tunnelrelay.apoxy.dev/%s/finalizer" + +type TunnelAgentReconciler struct { + client client.Client + relay Relay + labelSelector string + finalizer string + conns *haxmap.Map[string, Connection] +} + +func NewTunnelAgentReconciler(c client.Client, relay Relay, labelSelector string) *TunnelAgentReconciler { + finalizer := fmt.Sprintf(tunnelRelayFinalizerTmpl, relay.Name()) + return &TunnelAgentReconciler{ + client: c, + relay: relay, + finalizer: finalizer, + conns: haxmap.New[string, Connection](), + } +} + +func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + var agent corev1alpha2.TunnelAgent + if err := r.client.Get(ctx, req.NamespacedName, &agent); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + // handle deletion + if !agent.DeletionTimestamp.IsZero() { + if controllerutil.ContainsFinalizer(&agent, r.finalizer) { + // Close all connections associated with this relay. + for _, c := range agent.Status.Connections { + if conn, ok := r.conns.Get(c.ID); ok { + _ = conn.Close() + r.conns.Del(c.ID) + } + } + + // Remove our finalizer + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var cur corev1alpha2.TunnelAgent + if getErr := r.client.Get(ctx, req.NamespacedName, &cur); getErr != nil { + return getErr + } + controllerutil.RemoveFinalizer(&cur, r.finalizer) + return r.client.Update(ctx, &cur) + }); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil + } + + // Propagate status → live connection: set overlay address and VNI when populated by the other reconciler. + for _, sc := range agent.Status.Connections { + if conn, ok := r.conns.Get(sc.ID); ok { + if sc.Address != "" { + conn.SetAddress(sc.Address) + } + if sc.VNI != nil { + conn.SetVNI(uint32(*sc.VNI)) + } + } + } + + return ctrl.Result{}, nil +} + +func (r *TunnelAgentReconciler) SetupWithManager(mgr ctrl.Manager) error { + lss, err := metav1.ParseToLabelSelector(r.labelSelector) + if err != nil { + return fmt.Errorf("failed to parse label selector: %w", err) + } + + ls, err := predicate.LabelSelectorPredicate(*lss) + if err != nil { + return fmt.Errorf("failed to create label selector predicate: %w", err) + } + + // React to status-only updates (connections added/changed), spec/generation changes, and deletion toggles. + statusOrGenChanged := predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { return true }, + DeleteFunc: func(e event.DeleteEvent) bool { return true }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldObj, ok1 := e.ObjectOld.(*corev1alpha2.TunnelAgent) + newObj, ok2 := e.ObjectNew.(*corev1alpha2.TunnelAgent) + if !ok1 || !ok2 { + return false + } + return !equality.Semantic.DeepEqual(oldObj.Status.Connections, newObj.Status.Connections) || + oldObj.GetGeneration() != newObj.GetGeneration() || + (oldObj.DeletionTimestamp.IsZero() != newObj.DeletionTimestamp.IsZero()) + }, + } + + return ctrl.NewControllerManagedBy(mgr). + For(&corev1alpha2.TunnelAgent{}, builder.WithPredicates(ls, statusOrGenChanged)). + Complete(r) +} + +// AddConnection registers a new active connection for the given agent. +func (r *TunnelAgentReconciler) AddConnection(ctx context.Context, agentName string, conn Connection) error { + // Track the connection in-memory. + r.conns.Set(conn.ID(), conn) + + // Upsert connection in status (first), so we truly have a connection before adding the finalizer. + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var cur corev1alpha2.TunnelAgent + if err := r.client.Get(ctx, types.NamespacedName{Name: agentName}, &cur); err != nil { + return err + } + + now := metav1.Now() + entry := corev1alpha2.TunnelAgentConnection{ + ID: conn.ID(), + ConnectedAt: &now, + RelayAddress: r.relay.Address().String(), + } + + found := false + for i := range cur.Status.Connections { + if cur.Status.Connections[i].ID == entry.ID { + cur.Status.Connections[i] = entry + found = true + break + } + } + if !found { + cur.Status.Connections = append(cur.Status.Connections, entry) + } + + return r.client.Status().Update(ctx, &cur) + }); err != nil { + return err + } + + // Add the finalizer ONLY IF we have at least one connection for this relay. + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var cur corev1alpha2.TunnelAgent + if err := r.client.Get(ctx, types.NamespacedName{Name: agentName}, &cur); err != nil { + return err + } + + if controllerutil.ContainsFinalizer(&cur, r.finalizer) { + return nil + } + + hasRelayConn := false + for _, c := range cur.Status.Connections { + if _, ok := r.conns.Get(c.ID); ok { + hasRelayConn = true + break + } + } + + if hasRelayConn { + controllerutil.AddFinalizer(&cur, r.finalizer) + return r.client.Update(ctx, &cur) + } + + return nil + }) +} + +// RemoveConnection deregisters a connection from the given agent by its ID. +func (r *TunnelAgentReconciler) RemoveConnection(ctx context.Context, agentName, id string) error { + // Drop from in-memory map. + r.conns.Del(id) + + // Remove from status.connections (by ID) + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var cur corev1alpha2.TunnelAgent + if err := r.client.Get(ctx, types.NamespacedName{Name: agentName}, &cur); err != nil { + return err + } + + newConns := make([]corev1alpha2.TunnelAgentConnection, 0, len(cur.Status.Connections)) + for _, c := range cur.Status.Connections { + if c.ID != id { + newConns = append(newConns, c) + } + } + cur.Status.Connections = newConns + + return r.client.Status().Update(ctx, &cur) + }); err != nil { + return err + } + + // If no connections remain for THIS relay, remove our relay-scoped finalizer. + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var cur corev1alpha2.TunnelAgent + if err := r.client.Get(ctx, types.NamespacedName{Name: agentName}, &cur); err != nil { + return err + } + + hasRelayConn := false + for _, c := range cur.Status.Connections { + if _, ok := r.conns.Get(c.ID); ok { + hasRelayConn = true + break + } + } + + if !hasRelayConn && controllerutil.ContainsFinalizer(&cur, r.finalizer) { + controllerutil.RemoveFinalizer(&cur, r.finalizer) + return r.client.Update(ctx, &cur) + } + return nil + }) +} diff --git a/pkg/tunnel/controllers/tunnel_agent_reconciler_test.go b/pkg/tunnel/controllers/tunnel_agent_reconciler_test.go new file mode 100644 index 0000000..e0167b3 --- /dev/null +++ b/pkg/tunnel/controllers/tunnel_agent_reconciler_test.go @@ -0,0 +1,235 @@ +package controllers_test + +import ( + "net/netip" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + + ctrl "sigs.k8s.io/controller-runtime" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + corev1alpha2 "github.com/apoxy-dev/apoxy/api/core/v1alpha2" + "github.com/apoxy-dev/apoxy/pkg/tunnel/controllers" +) + +func TestTunnelAgentReconciler_AddConnection(t *testing.T) { + ctx := ctrl.LoggerInto(t.Context(), testLogr(t)) + + scheme := runtime.NewScheme() + require.NoError(t, corev1alpha2.Install(scheme)) + + agent := mkAgent("agent-1") + + c := fakeclient.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&corev1alpha2.TunnelAgent{}). + WithObjects(agent). + Build() + + relay := &mockRelay{} + relay.On("Name").Return("relay-a") + relay.On("Address").Return(netip.MustParseAddrPort("203.0.113.10:443")) + + r := controllers.NewTunnelAgentReconciler(c, relay, "") + + conn := &mockConn{} + conn.On("ID").Return("conn-123") + + require.NoError(t, r.AddConnection(ctx, agent.Name, conn)) + + var got corev1alpha2.TunnelAgent + require.NoError(t, c.Get(ctx, types.NamespacedName{Name: agent.Name}, &got)) + + require.Len(t, got.Status.Connections, 1) + entry := got.Status.Connections[0] + assert.Equal(t, "conn-123", entry.ID) + // Address and VNI should NOT be set by AddConnection (populated by the other reconciler). + assert.Equal(t, "", entry.Address) + assert.Nil(t, entry.VNI) + assert.Equal(t, relay.Address().String(), entry.RelayAddress) + + finalizer := "tunnelrelay.apoxy.dev/" + relay.Name() + "/finalizer" + assert.True(t, controllerutil.ContainsFinalizer(&got, finalizer)) + + conn.AssertExpectations(t) +} + +func TestTunnelAgentReconciler_RemoveConnection(t *testing.T) { + ctx := ctrl.LoggerInto(t.Context(), testLogr(t)) + + scheme := runtime.NewScheme() + require.NoError(t, corev1alpha2.Install(scheme)) + + agent := mkAgent("agent-2") + c := fakeclient.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&corev1alpha2.TunnelAgent{}). + WithObjects(agent). + Build() + + relay := &mockRelay{} + relay.On("Name").Return("relay-a") + relay.On("Address").Return(netip.MustParseAddrPort("203.0.113.20:443")) + r := controllers.NewTunnelAgentReconciler(c, relay, "") + finalizer := "tunnelrelay.apoxy.dev/" + relay.Name() + "/finalizer" + + // Two mock conns + conn1 := &mockConn{} + conn1.On("ID").Return("c1").Maybe() + + conn2 := &mockConn{} + conn2.On("ID").Return("c2").Maybe() + + require.NoError(t, r.AddConnection(ctx, agent.Name, conn1)) + require.NoError(t, r.AddConnection(ctx, agent.Name, conn2)) + + // Remove first + require.NoError(t, r.RemoveConnection(ctx, agent.Name, "c1")) + var got corev1alpha2.TunnelAgent + require.NoError(t, c.Get(ctx, types.NamespacedName{Name: agent.Name}, &got)) + assert.Len(t, got.Status.Connections, 1) + assert.True(t, controllerutil.ContainsFinalizer(&got, finalizer)) + + // Remove second + require.NoError(t, r.RemoveConnection(ctx, agent.Name, "c2")) + require.NoError(t, c.Get(ctx, types.NamespacedName{Name: agent.Name}, &got)) + assert.Empty(t, got.Status.Connections) + assert.False(t, controllerutil.ContainsFinalizer(&got, finalizer)) +} + +func TestTunnelAgentReconciler_ClosesConnections(t *testing.T) { + ctx := ctrl.LoggerInto(t.Context(), testLogr(t)) + + scheme := runtime.NewScheme() + require.NoError(t, corev1alpha2.Install(scheme)) + + agent := mkAgent("agent-3") + c := fakeclient.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&corev1alpha2.TunnelAgent{}). + WithObjects(agent). + Build() + + relay := &mockRelay{} + relay.On("Name").Return("relay-z") + relay.On("Address").Return(netip.MustParseAddrPort("198.51.100.7:8443")) + r := controllers.NewTunnelAgentReconciler(c, relay, "") + finalizer := "tunnelrelay.apoxy.dev/" + relay.Name() + "/finalizer" + + // Mock conn that should be closed + conn := &mockConn{} + conn.On("ID").Return("close-me").Maybe() + conn.On("Close").Return(nil).Once() + + // Add connection -> status + in-memory tracking + finalizer + require.NoError(t, r.AddConnection(ctx, agent.Name, conn)) + + // Ensure finalizer exists before deletion + var cur corev1alpha2.TunnelAgent + require.NoError(t, c.Get(ctx, types.NamespacedName{Name: agent.Name}, &cur)) + require.True(t, controllerutil.ContainsFinalizer(&cur, finalizer), "expected finalizer before deletion") + + // Simulate deletion properly: client.Delete sets deletionTimestamp (object sticks around due to finalizer) + require.NoError(t, c.Delete(ctx, &cur)) + + // Reconcile deletion (should close conn, drop from map, and remove finalizer) + _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Name: agent.Name}}) + require.NoError(t, err) + + // After reconcile, object may be fully gone already (OK) or still present sans finalizer. + var after corev1alpha2.TunnelAgent + err = c.Get(ctx, types.NamespacedName{Name: agent.Name}, &after) + if !apierrors.IsNotFound(err) { + require.NoError(t, err) + assert.False(t, controllerutil.ContainsFinalizer(&after, finalizer), "finalizer should be removed on deletion") + } + + // Ensure Close() was called + conn.AssertExpectations(t) +} + +func TestTunnelAgentReconcile_SetsAddressAndVNI(t *testing.T) { + ctx := ctrl.LoggerInto(t.Context(), testLogr(t)) + + scheme := runtime.NewScheme() + require.NoError(t, corev1alpha2.Install(scheme)) + + agent := mkAgent("agent-4") + c := fakeclient.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&corev1alpha2.TunnelAgent{}). + WithObjects(agent). + Build() + + relay := &mockRelay{} + relay.On("Name").Return("relay-x") + relay.On("Address").Return(netip.MustParseAddrPort("192.0.2.77:7443")) + r := controllers.NewTunnelAgentReconciler(c, relay, "") + + // Create a live connection tracked by the reconciler. + conn := &mockConn{} + conn.On("ID").Return("live-1") + + require.NoError(t, r.AddConnection(ctx, agent.Name, conn)) + + // Simulate the OTHER reconciler filling status.address & vni + var cur corev1alpha2.TunnelAgent + require.NoError(t, c.Get(ctx, types.NamespacedName{Name: agent.Name}, &cur)) + require.Len(t, cur.Status.Connections, 1) + cur.Status.Connections[0].Address = "10.123.0.5/32" + v := 4242 + cur.Status.Connections[0].VNI = &v + require.NoError(t, c.Status().Update(ctx, &cur)) + + // Expect our live connection to receive SetAddress + SetVNI on reconcile + conn.On("SetAddress", "10.123.0.5/32").Return().Once() + conn.On("SetVNI", uint32(4242)).Return().Once() + + _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Name: agent.Name}}) + require.NoError(t, err) + + conn.AssertExpectations(t) +} + +func mkAgent(name string) *corev1alpha2.TunnelAgent { + return &corev1alpha2.TunnelAgent{ + TypeMeta: metav1.TypeMeta{ + Kind: "TunnelAgent", + APIVersion: "core.apoxy.dev/v1alpha2", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } +} + +type mockConn struct { + mock.Mock +} + +func (m *mockConn) ID() string { + args := m.Called() + return args.String(0) +} + +func (m *mockConn) SetAddress(addr string) { + m.Called(addr) +} + +func (m *mockConn) SetVNI(v uint32) { + m.Called(v) +} + +func (m *mockConn) Close() error { + args := m.Called() + return args.Error(0) +} diff --git a/pkg/tunnel/controllers/tunnel_reconciler.go b/pkg/tunnel/controllers/tunnel_reconciler.go new file mode 100644 index 0000000..14bf7c1 --- /dev/null +++ b/pkg/tunnel/controllers/tunnel_reconciler.go @@ -0,0 +1,62 @@ +package controllers + +import ( + "context" + "fmt" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + corev1alpha2 "github.com/apoxy-dev/apoxy/api/core/v1alpha2" +) + +type TunnelReconciler struct { + client client.Client + relay Relay + labelSelector string +} + +func NewTunnelReconciler(c client.Client, relay Relay, labelSelector string) *TunnelReconciler { + return &TunnelReconciler{ + client: c, + relay: relay, + labelSelector: labelSelector, + } +} + +func (r *TunnelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + var tunnel corev1alpha2.Tunnel + if err := r.client.Get(ctx, req.NamespacedName, &tunnel); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + // Update relay credentials if they have changed. + if tunnel.Status.Credentials != nil { + r.relay.SetCredentials(tunnel.Name, tunnel.Status.Credentials.Token) + } + + return ctrl.Result{}, nil +} + +func (r *TunnelReconciler) SetupWithManager(mgr ctrl.Manager) error { + lss, err := metav1.ParseToLabelSelector(r.labelSelector) + if err != nil { + return fmt.Errorf("failed to parse label selector: %w", err) + } + + ls, err := predicate.LabelSelectorPredicate(*lss) + if err != nil { + return fmt.Errorf("failed to create label selector predicate: %w", err) + } + + return ctrl.NewControllerManagedBy(mgr). + For(&corev1alpha2.Tunnel{}, builder.WithPredicates(predicate.GenerationChangedPredicate{}, ls)). + Complete(r) +} diff --git a/pkg/tunnel/controllers/tunnel_reconciler_test.go b/pkg/tunnel/controllers/tunnel_reconciler_test.go new file mode 100644 index 0000000..681c8cf --- /dev/null +++ b/pkg/tunnel/controllers/tunnel_reconciler_test.go @@ -0,0 +1,76 @@ +package controllers_test + +import ( + "log" + "net/netip" + "os" + "testing" + + "github.com/apoxy-dev/apoxy/pkg/tunnel/controllers" + "github.com/go-logr/logr" + "github.com/go-logr/stdr" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + + corev1alpha2 "github.com/apoxy-dev/apoxy/api/core/v1alpha2" +) + +func TestTunnelReconciler(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, corev1alpha2.Install(scheme)) + + tunnel := &corev1alpha2.Tunnel{ + ObjectMeta: metav1.ObjectMeta{Name: "tun-1"}, + Status: corev1alpha2.TunnelStatus{ + Credentials: &corev1alpha2.TunnelCredentials{Token: "secret-token"}, + }, + } + + c := fakeclient.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&corev1alpha2.Tunnel{}). + WithObjects(tunnel). + Build() + + relay := &mockRelay{} + relay.On("SetCredentials", "tun-1", "secret-token").Once() + + r := controllers.NewTunnelReconciler(c, relay, "") + + _, err := r.Reconcile(ctrl.LoggerInto(t.Context(), testLogr(t)), + ctrl.Request{NamespacedName: types.NamespacedName{Name: "tun-1"}}) + require.NoError(t, err) + + relay.AssertExpectations(t) +} + +func testLogr(t *testing.T) logr.Logger { + if testing.Verbose() { + l := stdr.New(log.New(os.Stdout, "", log.LstdFlags)) + return l + } + return logr.Discard() +} + +type mockRelay struct { + mock.Mock +} + +func (m *mockRelay) Name() string { + args := m.Called() + return args.String(0) +} + +func (m *mockRelay) Address() netip.AddrPort { + args := m.Called() + return args.Get(0).(netip.AddrPort) +} + +func (m *mockRelay) SetCredentials(tunnelName, token string) { + m.Called(tunnelName, token) +} diff --git a/pkg/tunnel/kex/server.go b/pkg/tunnel/kex/server.go index 3878e28..ddfe124 100644 --- a/pkg/tunnel/kex/server.go +++ b/pkg/tunnel/kex/server.go @@ -18,6 +18,7 @@ import ( tunnet "github.com/apoxy-dev/apoxy/pkg/tunnel/net" "github.com/apoxy-dev/apoxy/pkg/tunnel/token" + "github.com/apoxy-dev/apoxy/pkg/tunnel/vni" ) type virtualNetwork struct { @@ -31,8 +32,8 @@ type virtualNetwork struct { type Server struct { handler *icx.Handler validator token.JWTValidator - vniPool *VNIPool // todo: persist this? - ipam tunnet.IPAM // todo: persist this? + vniPool *vni.VNIPool // todo: persist this? + ipam tunnet.IPAM // todo: persist this? networks sync.Map keyLifespan time.Duration } @@ -41,7 +42,7 @@ func NewServer(ctx context.Context, handler *icx.Handler, validator token.JWTVal return &Server{ handler: handler, validator: validator, - vniPool: NewVNIPool(), + vniPool: vni.NewVNIPool(), ipam: tunnet.NewIPAMv4(ctx), keyLifespan: keyLifespan, } diff --git a/pkg/tunnel/kex/vni_pool.go b/pkg/tunnel/vni/pool.go similarity index 91% rename from pkg/tunnel/kex/vni_pool.go rename to pkg/tunnel/vni/pool.go index 0d8272a..02270b9 100644 --- a/pkg/tunnel/kex/vni_pool.go +++ b/pkg/tunnel/vni/pool.go @@ -1,4 +1,4 @@ -package kex +package vni import ( "fmt" @@ -11,7 +11,7 @@ const ( maxVNI = 1 << 24 // 24-bit space ) -// TODO: support for some kind of persistent bitmap datastructure. +// TODO: support for some kind of persistent bitmap datastructure (sqlite3?). type VNIPool struct { mu sync.Mutex pool bitmap.Bitmap @@ -27,7 +27,7 @@ func (v *VNIPool) Allocate() (uint32, error) { v.mu.Lock() defer v.mu.Unlock() - vni, err := v.pool.FirstZero(0) + vni, err := v.pool.FirstZero(1) if err != nil || vni >= maxVNI { return 0, fmt.Errorf("no available virtual network IDs") } From 0b6a58dedfeaf16d74dcfe68e3aedc20ba68f237 Mon Sep 17 00:00:00 2001 From: Damian Peckett Date: Thu, 4 Sep 2025 17:35:55 +0200 Subject: [PATCH 2/3] [tunnelv2] implement connection for icx --- api/core/v1alpha2/tunnel_agent_types.go | 2 +- api/core/v1alpha2/zz_generated.deepcopy.go | 2 +- go.mod | 2 +- go.sum | 2 + .../controllers/tunnel_agent_reconciler.go | 7 +- .../tunnel_agent_reconciler_test.go | 2 +- pkg/tunnel/adapter/connection.go | 162 ++++++++++++++++++ pkg/tunnel/controllers/connection.go | 6 +- pkg/tunnel/controllers/relay.go | 2 +- .../controllers/tunnel_agent_reconciler.go | 9 +- .../tunnel_agent_reconciler_test.go | 18 +- pkg/tunnel/vni/pool.go | 8 +- 12 files changed, 196 insertions(+), 26 deletions(-) create mode 100644 pkg/tunnel/adapter/connection.go diff --git a/api/core/v1alpha2/tunnel_agent_types.go b/api/core/v1alpha2/tunnel_agent_types.go index eca59f6..bca9bdc 100644 --- a/api/core/v1alpha2/tunnel_agent_types.go +++ b/api/core/v1alpha2/tunnel_agent_types.go @@ -57,7 +57,7 @@ type TunnelAgentConnection struct { // VNI is the 24-bit virtual network identifier used for this connection, if applicable. // +optional - VNI *int `json:"vni,omitempty,omitzero"` + VNI *uint `json:"vni,omitempty,omitzero"` } // TunnelAgentStatus represents the status of a tunnel agent. diff --git a/api/core/v1alpha2/zz_generated.deepcopy.go b/api/core/v1alpha2/zz_generated.deepcopy.go index 725c6e3..f138c47 100644 --- a/api/core/v1alpha2/zz_generated.deepcopy.go +++ b/api/core/v1alpha2/zz_generated.deepcopy.go @@ -105,7 +105,7 @@ func (in *TunnelAgentConnection) DeepCopyInto(out *TunnelAgentConnection) { } if in.VNI != nil { in, out := &in.VNI, &out.VNI - *out = new(int) + *out = new(uint) **out = **in } return diff --git a/go.mod b/go.mod index 1a3f9f8..01ccb6e 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/adrg/xdg v0.5.3 github.com/alphadose/haxmap v1.4.1 github.com/anatol/vmtest v0.0.0-20250318022921-2f32244e2f0f - github.com/apoxy-dev/icx v0.7.1 + github.com/apoxy-dev/icx v0.7.2 github.com/avast/retry-go/v4 v4.6.1 github.com/bramvdbogaerde/go-scp v1.5.0 github.com/buraksezer/olric v0.5.6 diff --git a/go.sum b/go.sum index 8db3db7..94d568e 100644 --- a/go.sum +++ b/go.sum @@ -119,6 +119,8 @@ github.com/apoxy-dev/connect-ip-go v0.0.0-20250530062404-603929a73f45 h1:SwPk1n/ github.com/apoxy-dev/connect-ip-go v0.0.0-20250530062404-603929a73f45/go.mod h1:z5rtgIizc+/K27UtB0occwZgqg/mz3IqgyUJW8aubbI= github.com/apoxy-dev/icx v0.7.1 h1:1uEvkyc2+IYHTvn8FMv/sbCOr8poJiQNmWLudGBuguY= github.com/apoxy-dev/icx v0.7.1/go.mod h1:Muuk3bRXTp3YB5Xj+xHOGQ/T1xVxIKJuvmMfLBXhIN4= +github.com/apoxy-dev/icx v0.7.2 h1:6GqlqxkjwyEwaQBAJJ40+iM6D6w46IKmKWtE/43bCUk= +github.com/apoxy-dev/icx v0.7.2/go.mod h1:Muuk3bRXTp3YB5Xj+xHOGQ/T1xVxIKJuvmMfLBXhIN4= github.com/apoxy-dev/quic-go v0.0.0-20250530165952-53cca597715e h1:10GIpiVyKoRgCyr0J2TvJtdn17bsFHN+ROWkeVJpcOU= github.com/apoxy-dev/quic-go v0.0.0-20250530165952-53cca597715e/go.mod h1:MFlGGpcpJqRAfmYi6NC2cptDPSxRWTOGNuP4wqrWmzQ= github.com/apoxy-dev/upgrade-cli v0.0.0-20240213232412-a56c3a52fa0e h1:FBNxMQD93z2ththupB/BYKLEaMWaEr+G+sJWJqU2wC4= diff --git a/pkg/apiserver/controllers/tunnel_agent_reconciler.go b/pkg/apiserver/controllers/tunnel_agent_reconciler.go index 88d8401..9fa367f 100644 --- a/pkg/apiserver/controllers/tunnel_agent_reconciler.go +++ b/pkg/apiserver/controllers/tunnel_agent_reconciler.go @@ -200,7 +200,7 @@ func (r *TunnelAgentReconciler) ensureConnectionAllocations( // Track newly made allocations so we can roll them back if Status().Update fails. var newlyAllocatedPrefixes []netip.Prefix - var newlyAllocatedVNIs []uint32 + var newlyAllocatedVNIs []uint needsUpdate := false addrAssigned := 0 @@ -243,8 +243,7 @@ func (r *TunnelAgentReconciler) ensureConnectionAllocations( log.Error(vErr, "failed to allocate VNI") return fmt.Errorf("failed to allocate VNI: %w", vErr) } - vInt := int(v) // status uses *int; vniPool returns uint32 - conn.VNI = &vInt + conn.VNI = &v newlyAllocatedVNIs = append(newlyAllocatedVNIs, v) vniAssigned++ needsUpdate = true @@ -310,7 +309,7 @@ func (r *TunnelAgentReconciler) releaseResourcesIfPresent(log logr.Logger, agent // Release VNI if conn.VNI != nil { - r.vniPool.Free(uint32(*conn.VNI)) + r.vniPool.Free(*conn.VNI) } } return nil diff --git a/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go b/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go index 1f6d2ea..b2a19db 100644 --- a/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go +++ b/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go @@ -69,7 +69,7 @@ func TestTunnelAgentReconciler(t *testing.T) { // VNI assigned in status for the connection that was empty require.NotNil(t, got.Status.Connections[0].VNI, "expected VNI to be set on the connection") - require.Greater(t, *got.Status.Connections[0].VNI, 0) + require.Equal(t, *got.Status.Connections[0].VNI, uint(1)) } func mkAgentWithEmptyConnection(name, tunnelName string) *corev1alpha2.TunnelAgent { diff --git a/pkg/tunnel/adapter/connection.go b/pkg/tunnel/adapter/connection.go new file mode 100644 index 0000000..76a3b2c --- /dev/null +++ b/pkg/tunnel/adapter/connection.go @@ -0,0 +1,162 @@ +package adapter + +import ( + "crypto/sha256" + "fmt" + "net/netip" + "sync" + + "github.com/apoxy-dev/icx" + "gvisor.dev/gvisor/pkg/tcpip" +) + +// Connection is a connection like abstraction over an icx virtual network. +type Connection struct { + mu sync.Mutex + handler *icx.Handler + localAddr *netip.AddrPort + remoteAddr *netip.AddrPort + vni *uint + overlayAddr *netip.Prefix +} + +// NewConnection creates a new Connection instance. +func NewConnection(handler *icx.Handler, localAddr, remoteAddr *netip.AddrPort) *Connection { + return &Connection{ + handler: handler, + localAddr: localAddr, + remoteAddr: remoteAddr, + } +} + +func (c *Connection) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.vni != nil { + if err := c.handler.RemoveVirtualNetwork(*c.vni); err != nil { + return err + } + c.vni = nil + c.overlayAddr = nil + } + return nil +} + +// ID is the unique identifier of the connection. +func (c *Connection) ID() string { + c.mu.Lock() + defer c.mu.Unlock() + + encode := func(ap *netip.AddrPort) []byte { + if ap == nil { + return []byte{0} // marker for nil + } + var b []byte + addr := ap.Addr() + if addr.Is4() { + b = append(b, 4) // family marker + a := addr.As4() + b = append(b, a[:]...) + } else { + b = append(b, 6) // family marker + a := addr.As16() + b = append(b, a[:]...) + } + p := ap.Port() + b = append(b, byte(p>>8), byte(p)) // big-endian port + return b + } + + left := encode(c.localAddr) + right := encode(c.remoteAddr) + + // Sort the pair to make the ID bidirectional + var data []byte + if string(left) < string(right) { + data = append(left, right...) + } else { + data = append(right, left...) + } + + sum := sha256.Sum256(data) + return fmt.Sprintf("%x", sum[:16]) +} + +// Set the VNI assigned to this connection. +func (c *Connection) SetVNI(vni uint) error { + c.mu.Lock() + defer c.mu.Unlock() + + // No change + if c.vni != nil && *c.vni == vni { + return nil + } + + // Remove existing VNI if set + if c.vni != nil { + if err := c.handler.RemoveVirtualNetwork(*c.vni); err != nil { + return err + } + c.vni = nil + } + + // Add new VNI + var addrs []netip.Prefix + if c.overlayAddr != nil { + addrs = []netip.Prefix{*c.overlayAddr} + } + + if err := c.handler.AddVirtualNetwork(vni, toFullAddress(c.remoteAddr), addrs); err != nil { + return fmt.Errorf("failed to add virtual network %d: %w", vni, err) + } + c.vni = &vni + + return nil +} + +// Set the overlay address/cidr assigned to this connection. +func (c *Connection) SetOverlayAddress(addr string) error { + c.mu.Lock() + defer c.mu.Unlock() + + p, err := netip.ParsePrefix(addr) + if err != nil { + return fmt.Errorf("failed to parse overlay address %q: %w", addr, err) + } + + // No change + if c.overlayAddr != nil && (*c.overlayAddr).String() == p.String() { + return nil + } + c.overlayAddr = &p + + // If a VNI is active, update its allowed prefixes in-place. + if c.vni != nil { + if err := c.handler.UpdateVirtualNetworkAddrs(*c.vni, []netip.Prefix{p}); err != nil { + return fmt.Errorf("failed to update virtual network %d with address %q: %w", *c.vni, addr, err) + } + } + + return nil +} + +func toFullAddress(addrPort *netip.AddrPort) *tcpip.FullAddress { + if addrPort == nil { + return nil + } + + if addrPort.Addr().Is4() { + addrv4 := addrPort.Addr().As4() + return &tcpip.FullAddress{ + Addr: tcpip.AddrFrom4Slice(addrv4[:]), + Port: uint16(addrPort.Port()), + } + } else { + addrv6 := addrPort.Addr().As16() + return &tcpip.FullAddress{ + Addr: tcpip.AddrFrom16Slice(addrv6[:]), + Port: uint16(addrPort.Port()), + } + } +} diff --git a/pkg/tunnel/controllers/connection.go b/pkg/tunnel/controllers/connection.go index b5a5663..37044d2 100644 --- a/pkg/tunnel/controllers/connection.go +++ b/pkg/tunnel/controllers/connection.go @@ -7,8 +7,8 @@ type Connection interface { io.Closer // ID is the unique identifier of the connection. ID() string - // Set the overlay address/cidr assigned to this connection. - SetAddress(addr string) + // Set the overlay address/prefix assigned to this connection. + SetOverlayAddress(addr string) error // Set the VNI assigned to this connection. - SetVNI(vni uint32) + SetVNI(vni uint) error } diff --git a/pkg/tunnel/controllers/relay.go b/pkg/tunnel/controllers/relay.go index 801b807..1dbc003 100644 --- a/pkg/tunnel/controllers/relay.go +++ b/pkg/tunnel/controllers/relay.go @@ -6,7 +6,7 @@ import "net/netip" type Relay interface { // Name is the name of the relay. Name() string - // Address is the address of the relay. + // Address is the underlay address of the relay. Address() netip.AddrPort // SetCredentials sets the authentication token used by agents to authenticate with the relay. SetCredentials(tunnelName, token string) diff --git a/pkg/tunnel/controllers/tunnel_agent_reconciler.go b/pkg/tunnel/controllers/tunnel_agent_reconciler.go index bb20b45..d408aa9 100644 --- a/pkg/tunnel/controllers/tunnel_agent_reconciler.go +++ b/pkg/tunnel/controllers/tunnel_agent_reconciler.go @@ -79,10 +79,15 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) for _, sc := range agent.Status.Connections { if conn, ok := r.conns.Get(sc.ID); ok { if sc.Address != "" { - conn.SetAddress(sc.Address) + if err := conn.SetOverlayAddress(sc.Address); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to set overlay address for connection %q: %w", sc.ID, err) + } } + if sc.VNI != nil { - conn.SetVNI(uint32(*sc.VNI)) + if err := conn.SetVNI(uint(*sc.VNI)); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to set VNI for connection %q: %w", sc.ID, err) + } } } } diff --git a/pkg/tunnel/controllers/tunnel_agent_reconciler_test.go b/pkg/tunnel/controllers/tunnel_agent_reconciler_test.go index e0167b3..e62757a 100644 --- a/pkg/tunnel/controllers/tunnel_agent_reconciler_test.go +++ b/pkg/tunnel/controllers/tunnel_agent_reconciler_test.go @@ -186,13 +186,13 @@ func TestTunnelAgentReconcile_SetsAddressAndVNI(t *testing.T) { require.NoError(t, c.Get(ctx, types.NamespacedName{Name: agent.Name}, &cur)) require.Len(t, cur.Status.Connections, 1) cur.Status.Connections[0].Address = "10.123.0.5/32" - v := 4242 + v := uint(4242) cur.Status.Connections[0].VNI = &v require.NoError(t, c.Status().Update(ctx, &cur)) - // Expect our live connection to receive SetAddress + SetVNI on reconcile - conn.On("SetAddress", "10.123.0.5/32").Return().Once() - conn.On("SetVNI", uint32(4242)).Return().Once() + // Expect our live connection to receive SetOverlayAddress + SetVNI on reconcile + conn.On("SetOverlayAddress", "10.123.0.5/32").Return(nil).Once() + conn.On("SetVNI", uint(4242)).Return(nil).Once() _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Name: agent.Name}}) require.NoError(t, err) @@ -221,12 +221,14 @@ func (m *mockConn) ID() string { return args.String(0) } -func (m *mockConn) SetAddress(addr string) { - m.Called(addr) +func (m *mockConn) SetOverlayAddress(addr string) error { + args := m.Called(addr) + return args.Error(0) } -func (m *mockConn) SetVNI(v uint32) { - m.Called(v) +func (m *mockConn) SetVNI(v uint) error { + args := m.Called(v) + return args.Error(0) } func (m *mockConn) Close() error { diff --git a/pkg/tunnel/vni/pool.go b/pkg/tunnel/vni/pool.go index 02270b9..a61b749 100644 --- a/pkg/tunnel/vni/pool.go +++ b/pkg/tunnel/vni/pool.go @@ -23,7 +23,7 @@ func NewVNIPool() *VNIPool { } } -func (v *VNIPool) Allocate() (uint32, error) { +func (v *VNIPool) Allocate() (uint, error) { v.mu.Lock() defer v.mu.Unlock() @@ -32,14 +32,14 @@ func (v *VNIPool) Allocate() (uint32, error) { return 0, fmt.Errorf("no available virtual network IDs") } v.pool.Add(vni) - return vni, nil + return uint(vni), nil } -func (v *VNIPool) Free(vni uint32) { +func (v *VNIPool) Free(vni uint) { if vni >= maxVNI { return } v.mu.Lock() defer v.mu.Unlock() - v.pool.Remove(vni) + v.pool.Remove(uint32(vni)) } From 19fbf4ec6d3a0a07f005cfc8fa5cb52274c1fd6f Mon Sep 17 00:00:00 2001 From: Damian Peckett Date: Fri, 5 Sep 2025 12:42:32 +0200 Subject: [PATCH 3/3] [tunnelv2] simplify controllers somewhat --- .../controllers/tunnel_agent_reconciler.go | 246 +++++++++--------- .../tunnel_agent_reconciler_test.go | 73 ++++++ .../controllers/tunnel_reconciler.go | 46 +--- .../controllers/tunnel_agent_reconciler.go | 21 +- pkg/tunnel/controllers/tunnel_reconciler.go | 1 + 5 files changed, 215 insertions(+), 172 deletions(-) diff --git a/pkg/apiserver/controllers/tunnel_agent_reconciler.go b/pkg/apiserver/controllers/tunnel_agent_reconciler.go index 9fa367f..c588978 100644 --- a/pkg/apiserver/controllers/tunnel_agent_reconciler.go +++ b/pkg/apiserver/controllers/tunnel_agent_reconciler.go @@ -15,21 +15,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" controllerlog "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" corev1alpha2 "github.com/apoxy-dev/apoxy/api/core/v1alpha2" tunnet "github.com/apoxy-dev/apoxy/pkg/tunnel/net" "github.com/apoxy-dev/apoxy/pkg/tunnel/vni" ) -const ( - indexByTunnelRef = "spec.tunnelRef.name" -) - // +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnelagents,verbs=get;list;watch;update;patch // +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnelagents/status,verbs=get;update;patch // +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnelagents/finalizers,verbs=update @@ -63,36 +56,36 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) // handle deletion if !agent.DeletionTimestamp.IsZero() { if controllerutil.ContainsFinalizer(&agent, ApiServerFinalizer) { - if err := r.releaseResourcesIfPresent(log, &agent); err != nil { - log.Error(err, "failed to release resources; will retry") + changed, err := r.releaseResourcesIfPresent(ctx, log, req.NamespacedName) + if err != nil { return ctrl.Result{}, fmt.Errorf("failed to release resources: %w", err) } - // Remove finalizer - if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - var cur corev1alpha2.TunnelAgent - if getErr := r.client.Get(ctx, req.NamespacedName, &cur); getErr != nil { - return getErr + // releaseResourcesIfPresent potentially mutates the object, so we need + // to refetch it to avoid conflicts when we remove the finalizer. + if changed { + if err := r.client.Get(ctx, req.NamespacedName, &agent); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, err } - controllerutil.RemoveFinalizer(&cur, ApiServerFinalizer) - return r.client.Update(ctx, &cur) - }); err != nil { + } + + // Remove finalizer + controllerutil.RemoveFinalizer(&agent, ApiServerFinalizer) + if err := r.client.Update(ctx, &agent); err != nil { return ctrl.Result{}, err } } + return ctrl.Result{}, nil } // ensure finalizer if !controllerutil.ContainsFinalizer(&agent, ApiServerFinalizer) { - if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - var cur corev1alpha2.TunnelAgent - if getErr := r.client.Get(ctx, req.NamespacedName, &cur); getErr != nil { - return getErr - } - controllerutil.AddFinalizer(&cur, ApiServerFinalizer) - return r.client.Update(ctx, &cur) - }); err != nil { + controllerutil.AddFinalizer(&agent, ApiServerFinalizer) + if err := r.client.Update(ctx, &agent); err != nil { return ctrl.Result{}, err } } @@ -100,6 +93,7 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) // fetch owner Tunnel tunnelName := agent.Spec.TunnelRef.Name if tunnelName == "" { + // TODO: why would this happen? Should we mark the agent as failed. log.Info("tunnelRef.name is empty; skipping") return ctrl.Result{}, nil } @@ -107,29 +101,23 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) var tunnel corev1alpha2.Tunnel if err := r.client.Get(ctx, client.ObjectKey{Name: tunnelName}, &tunnel); err != nil { if apierrors.IsNotFound(err) { - log.Info("Referenced Tunnel not found; will retry", "tunnel", tunnelName) - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + // TODO: why would this happen? Should we mark the agent as failed. + log.Info("Referenced Tunnel not found; skipping", "tunnelName", tunnelName) + return ctrl.Result{RequeueAfter: 30 * time.Second}, nil } return ctrl.Result{}, err } - // ensure controller ownerRef agent -> tunnel (retry on conflict) - if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - var cur corev1alpha2.TunnelAgent - if getErr := r.client.Get(ctx, req.NamespacedName, &cur); getErr != nil { - return getErr - } - changed, ensureErr := r.ensureControllerOwner(&cur, &tunnel) - if ensureErr != nil { - return ensureErr - } - if !changed { - return nil - } - return r.client.Update(ctx, &cur) - }); err != nil { + // ensure controller ownerRef agent -> tunnel + changed, err := r.ensureControllerOwner(&agent, &tunnel) + if err != nil { return ctrl.Result{}, err } + if changed { + if err := r.client.Update(ctx, &agent); err != nil { + return ctrl.Result{}, err + } + } // Assign overlay addresses and VNIs for any connections missing them if err := r.ensureConnectionAllocations(ctx, log, req.NamespacedName); err != nil { @@ -139,34 +127,11 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } -func (r *TunnelAgentReconciler) SetupWithManager(mgr ctrl.Manager) error { - // field index - if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1alpha2.TunnelAgent{}, indexByTunnelRef, - func(obj client.Object) []string { - ta := obj.(*corev1alpha2.TunnelAgent) - if ta.Spec.TunnelRef.Name == "" { - return nil - } - return []string{ta.Spec.TunnelRef.Name} - }); err != nil { - return fmt.Errorf("index TunnelAgents by TunnelRef: %w", err) - } - - // map Tunnel -> its agents - mapTunnelToAgents := handler.TypedEnqueueRequestsFromMapFunc[*corev1alpha2.Tunnel](func(ctx context.Context, t *corev1alpha2.Tunnel) []reconcile.Request { - var list corev1alpha2.TunnelAgentList - if err := mgr.GetClient().List(ctx, &list, client.MatchingFields{indexByTunnelRef: t.Name}); err != nil { - return nil - } - reqs := make([]reconcile.Request, 0, len(list.Items)) - for _, ta := range list.Items { - reqs = append(reqs, reconcile.Request{NamespacedName: client.ObjectKey{Name: ta.Name}}) - } - return reqs - }) - +func (r *TunnelAgentReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { // Reconcile when spec generation changes OR when status (e.g., Connections) changes. - statusChanged := predicate.Funcs{ + statusOrGenChanged := predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { return true }, + DeleteFunc: func(e event.DeleteEvent) bool { return true }, UpdateFunc: func(e event.UpdateEvent) bool { oldObj, ok1 := e.ObjectOld.(*corev1alpha2.TunnelAgent) newObj, ok2 := e.ObjectNew.(*corev1alpha2.TunnelAgent) @@ -180,10 +145,7 @@ func (r *TunnelAgentReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). - For(&corev1alpha2.TunnelAgent{}, builder.WithPredicates(statusChanged)). - WatchesRawSource( - source.Kind(mgr.GetCache(), &corev1alpha2.Tunnel{}, mapTunnelToAgents), - ). + For(&corev1alpha2.TunnelAgent{}, builder.WithPredicates(statusOrGenChanged)). Complete(r) } @@ -194,25 +156,21 @@ func (r *TunnelAgentReconciler) ensureConnectionAllocations( ) error { return retry.RetryOnConflict(retry.DefaultBackoff, func() error { var cur corev1alpha2.TunnelAgent - if getErr := r.client.Get(ctx, key, &cur); getErr != nil { - return getErr + if err := r.client.Get(ctx, key, &cur); err != nil { + return err } // Track newly made allocations so we can roll them back if Status().Update fails. var newlyAllocatedPrefixes []netip.Prefix var newlyAllocatedVNIs []uint - needsUpdate := false - addrAssigned := 0 - vniAssigned := 0 - for i := range cur.Status.Connections { conn := &cur.Status.Connections[i] // Allocate overlay address if missing if conn.Address == "" { - pfx, ipErr := r.agentIPAM.Allocate() - if ipErr != nil { + pfx, err := r.agentIPAM.Allocate() + if err != nil { // rollback anything we grabbed this pass for _, p := range newlyAllocatedPrefixes { _ = r.agentIPAM.Release(p) @@ -220,58 +178,116 @@ func (r *TunnelAgentReconciler) ensureConnectionAllocations( for _, v := range newlyAllocatedVNIs { r.vniPool.Free(v) } - log.Error(ipErr, "failed to allocate address") - return fmt.Errorf("failed to allocate address: %w", ipErr) + return fmt.Errorf("failed to allocate address: %w", err) } conn.Address = pfx.String() newlyAllocatedPrefixes = append(newlyAllocatedPrefixes, pfx) - addrAssigned++ - needsUpdate = true + + log.Info("Allocated overlay address", "connectionID", conn.ID, "address", conn.Address) } // Allocate VNI if missing (nil means "unset"; zero can be valid but your pool won't return 0) if conn.VNI == nil { - v, vErr := r.vniPool.Allocate() - if vErr != nil { + vni, err := r.vniPool.Allocate() + if err != nil { // rollback anything we grabbed this pass for _, p := range newlyAllocatedPrefixes { _ = r.agentIPAM.Release(p) } - for _, nv := range newlyAllocatedVNIs { - r.vniPool.Free(nv) + for _, vni := range newlyAllocatedVNIs { + r.vniPool.Free(vni) } - log.Error(vErr, "failed to allocate VNI") - return fmt.Errorf("failed to allocate VNI: %w", vErr) + return fmt.Errorf("failed to allocate VNI: %w", err) } - conn.VNI = &v - newlyAllocatedVNIs = append(newlyAllocatedVNIs, v) - vniAssigned++ - needsUpdate = true + conn.VNI = &vni + newlyAllocatedVNIs = append(newlyAllocatedVNIs, vni) + + log.Info("Allocated VNI", "connectionID", conn.ID, "vni", *conn.VNI) } } - if !needsUpdate { - log.Info("no connections missing address or VNI") + if len(newlyAllocatedPrefixes) == 0 && len(newlyAllocatedVNIs) == 0 { + // nothing changed return nil } // Commit to status; if it fails, release fresh allocations from this attempt. - if updErr := r.client.Status().Update(ctx, &cur); updErr != nil { + if err := r.client.Status().Update(ctx, &cur); err != nil { + // rollback anything we grabbed this pass for _, p := range newlyAllocatedPrefixes { _ = r.agentIPAM.Release(p) } - for _, v := range newlyAllocatedVNIs { + for _, vni := range newlyAllocatedVNIs { + r.vniPool.Free(vni) + } + return err + } + + return nil + }) +} + +func (r *TunnelAgentReconciler) releaseResourcesIfPresent( + ctx context.Context, + log logr.Logger, + key client.ObjectKey, +) (bool, error) { + var changed bool + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + // Always work on a fresh copy to avoid write conflicts. + var cur corev1alpha2.TunnelAgent + if err := r.client.Get(ctx, key, &cur); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + + return err + } + + // If there are no connections, we’re done. + if len(cur.Status.Connections) == 0 { + return nil + } + + // Free resources that are still recorded in status and clear the fields. + for i := range cur.Status.Connections { + conn := &cur.Status.Connections[i] + + // Release overlay address/prefix (if set) + if conn.Address != "" { + pfx, err := netip.ParsePrefix(conn.Address) + if err != nil { + return fmt.Errorf("failed to parse address %q for release: %w", conn.Address, err) + } + if err := r.agentIPAM.Release(pfx); err != nil { + return fmt.Errorf("failed to release address %q: %w", conn.Address, err) + } + log.Info("Released overlay address", "connectionID", conn.ID, "address", conn.Address) + conn.Address = "" // clear in status + changed = true + } + + // Release VNI (if set) + if conn.VNI != nil { + v := *conn.VNI r.vniPool.Free(v) + log.Info("Released VNI", "connectionID", conn.ID, "vni", v) + conn.VNI = nil // clear in status + changed = true + } + } + + // Commit to status. + if changed { + if err := r.client.Status().Update(ctx, &cur); err != nil { + return err } - log.Error(updErr, "status update failed; released newly allocated resources", - "addresses", addrAssigned, "vnis", vniAssigned) - return updErr } - log.Info("assigned resources to connections", - "addresses", addrAssigned, "vnis", vniAssigned) return nil }) + + return changed, err } func (r *TunnelAgentReconciler) ensureControllerOwner(child client.Object, owner client.Object) (bool, error) { @@ -292,25 +308,3 @@ func (r *TunnelAgentReconciler) ensureControllerOwner(child client.Object, owner return true, nil } - -func (r *TunnelAgentReconciler) releaseResourcesIfPresent(log logr.Logger, agent *corev1alpha2.TunnelAgent) error { - for _, conn := range agent.Status.Connections { - // Release overlay address/prefix - if conn.Address != "" { - if pfx, err := netip.ParsePrefix(conn.Address); err == nil { - if relErr := r.agentIPAM.Release(pfx); relErr != nil { - log.Error(relErr, "failed to release prefix", "address", conn.Address) - return relErr - } - } else { - log.Error(fmt.Errorf("unrecognized address format"), "skipping address release", "address", conn.Address) - } - } - - // Release VNI - if conn.VNI != nil { - r.vniPool.Free(*conn.VNI) - } - } - return nil -} diff --git a/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go b/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go index b2a19db..61c2271 100644 --- a/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go +++ b/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/stretchr/testify/require" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -72,6 +73,78 @@ func TestTunnelAgentReconciler(t *testing.T) { require.Equal(t, *got.Status.Connections[0].VNI, uint(1)) } +func TestTunnelAgentReconciler_DeletionReleasesResourcesAndRemovesFinalizer(t *testing.T) { + ctx := ctrl.LoggerInto(t.Context(), testLogr(t)) + + // Scheme + scheme := runtime.NewScheme() + require.NoError(t, corev1alpha2.Install(scheme)) + + // IPAM + VNI pool + systemULA := tunnet.NewULA(ctx, tunnet.SystemNetworkID) + agentIPAM, err := systemULA.IPAM(ctx, 96) + require.NoError(t, err) + + vpool := vni.NewVNIPool() + + // Pre-allocate a prefix and VNI that we'll pretend belong to the agent + pfx, err := agentIPAM.Allocate() + require.NoError(t, err) + v, err := vpool.Allocate() + require.NoError(t, err) + require.Equal(t, uint(1), v, "sanity check VNI allocation order") + + // Agent marked for deletion, with finalizer and those resources in status + agent := &corev1alpha2.TunnelAgent{ + TypeMeta: metav1.TypeMeta{Kind: "TunnelAgent", APIVersion: "core.apoxy.dev/v1alpha2"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "agent-deleting", + Finalizers: []string{controllers.ApiServerFinalizer}, + DeletionTimestamp: &metav1.Time{Time: metav1.Now().Time}, + }, + Spec: corev1alpha2.TunnelAgentSpec{ + TunnelRef: corev1alpha2.TunnelRef{Name: "tun-any"}, + }, + Status: corev1alpha2.TunnelAgentStatus{ + Connections: []corev1alpha2.TunnelAgentConnection{ + { + ID: "conn-1", + RelayAddress: "relay-1", + Address: pfx.String(), + VNI: &v, + }, + }, + }, + } + + c := fakeclient.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&corev1alpha2.Tunnel{}, &corev1alpha2.TunnelAgent{}). + WithObjects(agent). + Build() + + r := controllers.NewTunnelAgentReconciler(c, agentIPAM, vpool) + + // When: reconcile deletion + _, err = r.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Name: agent.Name}}) + require.NoError(t, err) + + // Then: the object should be fully deleted by the API server once the finalizer is removed + var got corev1alpha2.TunnelAgent + err = c.Get(ctx, types.NamespacedName{Name: agent.Name}, &got) + require.Error(t, err) + require.True(t, apierrors.IsNotFound(err), "expected agent to be deleted after finalizer removal") + + // And: resources were released back to their pools (same prefix/VNI available again) + rePfx, err := agentIPAM.Allocate() + require.NoError(t, err) + require.Equal(t, pfx, rePfx, "expected released prefix to be available again") + + reV, err := vpool.Allocate() + require.NoError(t, err) + require.Equal(t, uint(1), reV, "expected released VNI to be available again") +} + func mkAgentWithEmptyConnection(name, tunnelName string) *corev1alpha2.TunnelAgent { return &corev1alpha2.TunnelAgent{ TypeMeta: metav1.TypeMeta{Kind: "TunnelAgent", APIVersion: "core.apoxy.dev/v1alpha2"}, diff --git a/pkg/apiserver/controllers/tunnel_reconciler.go b/pkg/apiserver/controllers/tunnel_reconciler.go index a3d48bb..6e16414 100644 --- a/pkg/apiserver/controllers/tunnel_reconciler.go +++ b/pkg/apiserver/controllers/tunnel_reconciler.go @@ -7,7 +7,6 @@ import ( "io" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -40,68 +39,50 @@ func (r *TunnelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr if apierrors.IsNotFound(err) { return ctrl.Result{}, nil } + return ctrl.Result{}, err } // handle deletion if !tunnel.DeletionTimestamp.IsZero() { if controllerutil.ContainsFinalizer(&tunnel, ApiServerFinalizer) { - log.Info("handling Tunnel deletion") - // Remove finalizer - if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - var cur corev1alpha2.Tunnel - if getErr := r.client.Get(ctx, req.NamespacedName, &cur); getErr != nil { - return getErr - } - controllerutil.RemoveFinalizer(&cur, ApiServerFinalizer) - return r.client.Update(ctx, &cur) - }); err != nil { + controllerutil.RemoveFinalizer(&tunnel, ApiServerFinalizer) + if err := r.client.Update(ctx, &tunnel); err != nil { return ctrl.Result{}, err } } + return ctrl.Result{}, nil } // ensure finalizer if !controllerutil.ContainsFinalizer(&tunnel, ApiServerFinalizer) { - if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - var cur corev1alpha2.Tunnel - if getErr := r.client.Get(ctx, req.NamespacedName, &cur); getErr != nil { - return getErr - } - controllerutil.AddFinalizer(&cur, ApiServerFinalizer) - return r.client.Update(ctx, &cur) - }); err != nil { + controllerutil.AddFinalizer(&tunnel, ApiServerFinalizer) + if err := r.client.Update(ctx, &tunnel); err != nil { return ctrl.Result{}, err } } // ensure bearer token if tunnel.Status.Credentials == nil || tunnel.Status.Credentials.Token == "" { + log.Info("Generating new bearer token for Tunnel") + token, err := generateBearerToken(tokenLength) if err != nil { return ctrl.Result{}, err } - log.Info("Generating new bearer token for Tunnel") + if tunnel.Status.Credentials == nil { + tunnel.Status.Credentials = &corev1alpha2.TunnelCredentials{} + } + tunnel.Status.Credentials.Token = token - if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - var cur corev1alpha2.Tunnel - if getErr := r.client.Get(ctx, req.NamespacedName, &cur); getErr != nil { - return getErr - } - if cur.Status.Credentials == nil { - cur.Status.Credentials = &corev1alpha2.TunnelCredentials{} - } - cur.Status.Credentials.Token = token - return r.client.Status().Update(ctx, &cur) - }); err != nil { + if err := r.client.Status().Update(ctx, &tunnel); err != nil { return ctrl.Result{}, err } } - log.Info("Tunnel reconciled successfully") return ctrl.Result{}, nil } @@ -117,6 +98,5 @@ func generateBearerToken(n int) (string, error) { if _, err := io.ReadFull(rand.Reader, b); err != nil { return "", err } - // URL-safe base64 without padding return base64.RawURLEncoding.EncodeToString(b), nil } diff --git a/pkg/tunnel/controllers/tunnel_agent_reconciler.go b/pkg/tunnel/controllers/tunnel_agent_reconciler.go index d408aa9..66852b3 100644 --- a/pkg/tunnel/controllers/tunnel_agent_reconciler.go +++ b/pkg/tunnel/controllers/tunnel_agent_reconciler.go @@ -46,6 +46,7 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) if apierrors.IsNotFound(err) { return ctrl.Result{}, nil } + return ctrl.Result{}, err } @@ -60,15 +61,9 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } - // Remove our finalizer - if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - var cur corev1alpha2.TunnelAgent - if getErr := r.client.Get(ctx, req.NamespacedName, &cur); getErr != nil { - return getErr - } - controllerutil.RemoveFinalizer(&cur, r.finalizer) - return r.client.Update(ctx, &cur) - }); err != nil { + // Remove finalizer + controllerutil.RemoveFinalizer(&agent, r.finalizer) + if err := r.client.Update(ctx, &agent); err != nil { return ctrl.Result{}, err } } @@ -106,7 +101,7 @@ func (r *TunnelAgentReconciler) SetupWithManager(mgr ctrl.Manager) error { return fmt.Errorf("failed to create label selector predicate: %w", err) } - // React to status-only updates (connections added/changed), spec/generation changes, and deletion toggles. + // Reconcile when spec generation changes OR when status (e.g., Connections) changes. statusOrGenChanged := predicate.Funcs{ CreateFunc: func(e event.CreateEvent) bool { return true }, DeleteFunc: func(e event.DeleteEvent) bool { return true }, @@ -116,9 +111,9 @@ func (r *TunnelAgentReconciler) SetupWithManager(mgr ctrl.Manager) error { if !ok1 || !ok2 { return false } - return !equality.Semantic.DeepEqual(oldObj.Status.Connections, newObj.Status.Connections) || - oldObj.GetGeneration() != newObj.GetGeneration() || - (oldObj.DeletionTimestamp.IsZero() != newObj.DeletionTimestamp.IsZero()) + genChanged := oldObj.GetGeneration() != newObj.GetGeneration() + statusDiff := !equality.Semantic.DeepEqual(oldObj.Status, newObj.Status) + return genChanged || statusDiff }, } diff --git a/pkg/tunnel/controllers/tunnel_reconciler.go b/pkg/tunnel/controllers/tunnel_reconciler.go index 14bf7c1..8f6d334 100644 --- a/pkg/tunnel/controllers/tunnel_reconciler.go +++ b/pkg/tunnel/controllers/tunnel_reconciler.go @@ -34,6 +34,7 @@ func (r *TunnelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr if apierrors.IsNotFound(err) { return ctrl.Result{}, nil } + return ctrl.Result{}, err }