Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions _examples/tunnel.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: core.apoxy.dev/v1alpha2
kind: Tunnel
metadata:
name: example
spec:
egressGateway:
enabled: true
76 changes: 36 additions & 40 deletions pkg/apiserver/controllers/tunnel_agent_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ import (
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
controllerlog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

Expand All @@ -28,6 +26,8 @@ import (
// +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnelagents/finalizers,verbs=update
// +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnels,verbs=get;list;watch

const indexControllerOwnerUID = ".metadata.controllerOwnerUID"

type TunnelAgentReconciler struct {
client client.Client
agentIPAM tunnet.IPAM
Expand All @@ -45,6 +45,8 @@ func NewTunnelAgentReconciler(c client.Client, agentIPAM tunnet.IPAM, vniPool *v
func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := controllerlog.FromContext(ctx, "name", req.Name)

log.Info("Reconciling TunnelAgent")

var agent corev1alpha2.TunnelAgent
if err := r.client.Get(ctx, req.NamespacedName, &agent); err != nil {
if apierrors.IsNotFound(err) {
Expand All @@ -53,16 +55,19 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

// handle deletion
// Handle deletion
if !agent.DeletionTimestamp.IsZero() {
log.Info("Handling deletion of TunnelAgent")

if controllerutil.ContainsFinalizer(&agent, ApiServerFinalizer) {
log.Info("Releasing resources for TunnelAgent")

changed, err := r.releaseResourcesIfPresent(ctx, log, req.NamespacedName)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to release resources: %w", err)
}

// releaseResourcesIfPresent potentially mutates the object, so we need
// to refetch it to avoid conflicts when we remove the finalizer.
// Refetch to avoid conflicts if we modified the object
if changed {
if err := r.client.Get(ctx, req.NamespacedName, &agent); err != nil {
if apierrors.IsNotFound(err) {
Expand All @@ -72,6 +77,8 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
}

log.Info("Removing finalizer from TunnelAgent")

// Remove finalizer
controllerutil.RemoveFinalizer(&agent, ApiServerFinalizer)
if err := r.client.Update(ctx, &agent); err != nil {
Expand All @@ -82,33 +89,34 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

// ensure finalizer
// Ensure finalizer
if !controllerutil.ContainsFinalizer(&agent, ApiServerFinalizer) {
controllerutil.AddFinalizer(&agent, ApiServerFinalizer)
if err := r.client.Update(ctx, &agent); err != nil {
return ctrl.Result{}, err
}
}

// fetch owner Tunnel
// Fetch owner Tunnel
tunnelName := agent.Spec.TunnelRef.Name
if tunnelName == "" {
// TODO: why would this happen? Should we mark the agent as failed.
log.Info("tunnelRef.name is empty; skipping")
return ctrl.Result{}, nil
}

log.Info("Fetching owner Tunnel", "tunnelName", tunnelName)

var tunnel corev1alpha2.Tunnel
if err := r.client.Get(ctx, client.ObjectKey{Name: tunnelName}, &tunnel); err != nil {
if apierrors.IsNotFound(err) {
// TODO: why would this happen? Should we mark the agent as failed.
log.Info("Referenced Tunnel not found; skipping", "tunnelName", tunnelName)
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
return ctrl.Result{}, err
}

// ensure controller ownerRef agent -> tunnel
// Ensure controller ownerRef agent -> tunnel
changed, err := r.ensureControllerOwner(&agent, &tunnel)
if err != nil {
return ctrl.Result{}, err
Expand All @@ -128,24 +136,26 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

func (r *TunnelAgentReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
// Reconcile when spec generation changes OR when status (e.g., Connections) changes.
statusOrGenChanged := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool { return true },
DeleteFunc: func(e event.DeleteEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool {
oldObj, ok1 := e.ObjectOld.(*corev1alpha2.TunnelAgent)
newObj, ok2 := e.ObjectNew.(*corev1alpha2.TunnelAgent)
if !ok1 || !ok2 {
return false
// Cache index to quickly look up TunnelAgents by their controller owner UID.
if err := mgr.GetFieldIndexer().IndexField(
ctx,
&corev1alpha2.TunnelAgent{},
indexControllerOwnerUID,
func(obj client.Object) []string {
ta := obj.(*corev1alpha2.TunnelAgent)
for _, or := range ta.GetOwnerReferences() {
if or.Controller != nil && *or.Controller {
return []string{string(or.UID)}
}
}
genChanged := oldObj.GetGeneration() != newObj.GetGeneration()
statusDiff := !equality.Semantic.DeepEqual(oldObj.Status, newObj.Status)
return genChanged || statusDiff
return nil
},
); err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&corev1alpha2.TunnelAgent{}, builder.WithPredicates(statusOrGenChanged)).
For(&corev1alpha2.TunnelAgent{}, builder.WithPredicates(&predicate.ResourceVersionChangedPredicate{})).
Complete(r)
}

Expand Down Expand Up @@ -182,11 +192,10 @@ func (r *TunnelAgentReconciler) ensureConnectionAllocations(
}
conn.Address = pfx.String()
newlyAllocatedPrefixes = append(newlyAllocatedPrefixes, pfx)

log.Info("Allocated overlay address", "connectionID", conn.ID, "address", conn.Address)
}

// Allocate VNI if missing (nil means "unset"; zero can be valid but your pool won't return 0)
// Allocate VNI if missing
if conn.VNI == nil {
vni, err := r.vniPool.Allocate()
if err != nil {
Expand All @@ -201,13 +210,11 @@ func (r *TunnelAgentReconciler) ensureConnectionAllocations(
}
conn.VNI = &vni
newlyAllocatedVNIs = append(newlyAllocatedVNIs, vni)

log.Info("Allocated VNI", "connectionID", conn.ID, "vni", *conn.VNI)
}
}

if len(newlyAllocatedPrefixes) == 0 && len(newlyAllocatedVNIs) == 0 {
// nothing changed
return nil
}

Expand All @@ -234,13 +241,11 @@ func (r *TunnelAgentReconciler) releaseResourcesIfPresent(
) (bool, error) {
var changed bool
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// Always work on a fresh copy to avoid write conflicts.
var cur corev1alpha2.TunnelAgent
if err := r.client.Get(ctx, key, &cur); err != nil {
if apierrors.IsNotFound(err) {
return nil
}

return err
}

Expand All @@ -249,7 +254,6 @@ func (r *TunnelAgentReconciler) releaseResourcesIfPresent(
return nil
}

// Free resources that are still recorded in status and clear the fields.
for i := range cur.Status.Connections {
conn := &cur.Status.Connections[i]

Expand All @@ -263,7 +267,7 @@ func (r *TunnelAgentReconciler) releaseResourcesIfPresent(
return fmt.Errorf("failed to release address %q: %w", conn.Address, err)
}
log.Info("Released overlay address", "connectionID", conn.ID, "address", conn.Address)
conn.Address = "" // clear in status
conn.Address = ""
changed = true
}

Expand All @@ -272,18 +276,16 @@ func (r *TunnelAgentReconciler) releaseResourcesIfPresent(
vni := *conn.VNI
r.vniPool.Release(vni)
log.Info("Released VNI", "connectionID", conn.ID, "vni", vni)
conn.VNI = nil // clear in status
conn.VNI = nil
changed = true
}
}

// Commit to status.
if changed {
if err := r.client.Status().Update(ctx, &cur); err != nil {
return err
}
}

return nil
})

Expand All @@ -297,14 +299,8 @@ func (r *TunnelAgentReconciler) ensureControllerOwner(child client.Object, owner
}
}

// Set controller reference (overwrites any existing controller owner)
if err := controllerutil.SetControllerReference(
owner,
child,
r.client.Scheme(),
); err != nil {
if err := controllerutil.SetControllerReference(owner, child, r.client.Scheme()); err != nil {
return false, err
}

return true, nil
}
47 changes: 42 additions & 5 deletions pkg/apiserver/controllers/tunnel_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"encoding/base64"
"io"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -39,13 +40,49 @@ func (r *TunnelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
}

return ctrl.Result{}, err
}

// handle deletion
// Handle deletion.
if !tunnel.DeletionTimestamp.IsZero() {
log.Info("Handling deletion of Tunnel")

if controllerutil.ContainsFinalizer(&tunnel, ApiServerFinalizer) {
// Manually implement garbage collection of controller-owned TunnelAgents.
// This is due to us not using the built in gc controller from k8s.io/controller-manager.

// List controller-owned TunnelAgents by indexed controller owner UID.
var agents corev1alpha2.TunnelAgentList
if err := r.client.List(
ctx,
&agents,
client.MatchingFields{indexControllerOwnerUID: string(tunnel.GetUID())},
); err != nil {
return ctrl.Result{}, err
}

// Kick off deletion for any children that still exist.
stillPresent := false
for i := range agents.Items {
a := &agents.Items[i]
stillPresent = true
if a.DeletionTimestamp.IsZero() {
if err := r.client.Delete(ctx, a); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
}
}
}

// If any child remains (possibly terminating due to its own finalizers),
// requeue and keep the parent's finalizer to emulate foreground deletion.
if stillPresent {
log.Info("Waiting for controller-owned TunnelAgents to terminate", "remaining", len(agents.Items))
return ctrl.Result{RequeueAfter: 2 * time.Second}, nil
}

// No children remain → remove the parent's finalizer.
log.Info("All controller-owned TunnelAgents gone; removing Tunnel finalizer")

// Remove finalizer
controllerutil.RemoveFinalizer(&tunnel, ApiServerFinalizer)
if err := r.client.Update(ctx, &tunnel); err != nil {
Expand All @@ -56,15 +93,15 @@ func (r *TunnelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{}, nil
}

// ensure finalizer
// Ensure finalizer.
if !controllerutil.ContainsFinalizer(&tunnel, ApiServerFinalizer) {
controllerutil.AddFinalizer(&tunnel, ApiServerFinalizer)
if err := r.client.Update(ctx, &tunnel); err != nil {
return ctrl.Result{}, err
}
}

// ensure bearer token
// Ensure bearer token in status.
if tunnel.Status.Credentials == nil || tunnel.Status.Credentials.Token == "" {
log.Info("Generating new bearer token for Tunnel")

Expand All @@ -88,7 +125,7 @@ func (r *TunnelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr

func (r *TunnelReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1alpha2.Tunnel{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
For(&corev1alpha2.Tunnel{}, builder.WithPredicates(&predicate.ResourceVersionChangedPredicate{})).
Complete(r)
}

Expand Down
Loading