Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion api/core/v1alpha2/tunnel_agent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,17 @@ type TunnelAgentConnection struct {
// +optional
ConnectedAt *metav1.Time `json:"connectedAt,omitempty,omitzero"`

// Address is the overlay address of the agent assigned to this connection.
// Address is the overlay address/cidr of the agent assigned to this connection.
// +optional
Address string `json:"address,omitempty,omitzero"`

// RelayAddress is the address of the relay managing this connection.
// +optional
RelayAddress string `json:"relayAddress,omitempty,omitzero"`

// VNI is the 24-bit virtual network identifier used for this connection, if applicable.
// +optional
VNI *uint `json:"vni,omitempty,omitzero"`
}

// TunnelAgentStatus represents the status of a tunnel agent.
Expand Down
5 changes: 5 additions & 0 deletions api/core/v1alpha2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion api/generated/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/adrg/xdg v0.5.3
github.com/alphadose/haxmap v1.4.1
github.com/anatol/vmtest v0.0.0-20250318022921-2f32244e2f0f
github.com/apoxy-dev/icx v0.7.1
github.com/apoxy-dev/icx v0.7.2
github.com/avast/retry-go/v4 v4.6.1
github.com/bramvdbogaerde/go-scp v1.5.0
github.com/buraksezer/olric v0.5.6
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ github.com/apoxy-dev/connect-ip-go v0.0.0-20250530062404-603929a73f45 h1:SwPk1n/
github.com/apoxy-dev/connect-ip-go v0.0.0-20250530062404-603929a73f45/go.mod h1:z5rtgIizc+/K27UtB0occwZgqg/mz3IqgyUJW8aubbI=
github.com/apoxy-dev/icx v0.7.1 h1:1uEvkyc2+IYHTvn8FMv/sbCOr8poJiQNmWLudGBuguY=
github.com/apoxy-dev/icx v0.7.1/go.mod h1:Muuk3bRXTp3YB5Xj+xHOGQ/T1xVxIKJuvmMfLBXhIN4=
github.com/apoxy-dev/icx v0.7.2 h1:6GqlqxkjwyEwaQBAJJ40+iM6D6w46IKmKWtE/43bCUk=
github.com/apoxy-dev/icx v0.7.2/go.mod h1:Muuk3bRXTp3YB5Xj+xHOGQ/T1xVxIKJuvmMfLBXhIN4=
github.com/apoxy-dev/quic-go v0.0.0-20250530165952-53cca597715e h1:10GIpiVyKoRgCyr0J2TvJtdn17bsFHN+ROWkeVJpcOU=
github.com/apoxy-dev/quic-go v0.0.0-20250530165952-53cca597715e/go.mod h1:MFlGGpcpJqRAfmYi6NC2cptDPSxRWTOGNuP4wqrWmzQ=
github.com/apoxy-dev/upgrade-cli v0.0.0-20240213232412-a56c3a52fa0e h1:FBNxMQD93z2ththupB/BYKLEaMWaEr+G+sJWJqU2wC4=
Expand Down
310 changes: 310 additions & 0 deletions pkg/apiserver/controllers/tunnel_agent_reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
package controllers

import (
"context"
"fmt"
"net/netip"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
controllerlog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

corev1alpha2 "github.com/apoxy-dev/apoxy/api/core/v1alpha2"
tunnet "github.com/apoxy-dev/apoxy/pkg/tunnel/net"
"github.com/apoxy-dev/apoxy/pkg/tunnel/vni"
)

// +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnelagents,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnelagents/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnelagents/finalizers,verbs=update
// +kubebuilder:rbac:groups=core.apoxy.dev/v1alpha2,resources=tunnels,verbs=get;list;watch

type TunnelAgentReconciler struct {
client client.Client
agentIPAM tunnet.IPAM
vniPool *vni.VNIPool
}

func NewTunnelAgentReconciler(c client.Client, agentIPAM tunnet.IPAM, vniPool *vni.VNIPool) *TunnelAgentReconciler {
return &TunnelAgentReconciler{
client: c,
agentIPAM: agentIPAM,
vniPool: vniPool,
}
}

func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := controllerlog.FromContext(ctx, "name", req.Name)

var agent corev1alpha2.TunnelAgent
if err := r.client.Get(ctx, req.NamespacedName, &agent); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

// handle deletion
if !agent.DeletionTimestamp.IsZero() {
if controllerutil.ContainsFinalizer(&agent, ApiServerFinalizer) {
changed, err := r.releaseResourcesIfPresent(ctx, log, req.NamespacedName)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to release resources: %w", err)
}

// releaseResourcesIfPresent potentially mutates the object, so we need
// to refetch it to avoid conflicts when we remove the finalizer.
if changed {
if err := r.client.Get(ctx, req.NamespacedName, &agent); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
}

// Remove finalizer
controllerutil.RemoveFinalizer(&agent, ApiServerFinalizer)
if err := r.client.Update(ctx, &agent); err != nil {
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
}

// ensure finalizer
if !controllerutil.ContainsFinalizer(&agent, ApiServerFinalizer) {
controllerutil.AddFinalizer(&agent, ApiServerFinalizer)
if err := r.client.Update(ctx, &agent); err != nil {
return ctrl.Result{}, err
}
}

// fetch owner Tunnel
tunnelName := agent.Spec.TunnelRef.Name
if tunnelName == "" {
// TODO: why would this happen? Should we mark the agent as failed.
log.Info("tunnelRef.name is empty; skipping")
return ctrl.Result{}, nil
}

var tunnel corev1alpha2.Tunnel
if err := r.client.Get(ctx, client.ObjectKey{Name: tunnelName}, &tunnel); err != nil {
if apierrors.IsNotFound(err) {
// TODO: why would this happen? Should we mark the agent as failed.
log.Info("Referenced Tunnel not found; skipping", "tunnelName", tunnelName)
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
return ctrl.Result{}, err
}

// ensure controller ownerRef agent -> tunnel
changed, err := r.ensureControllerOwner(&agent, &tunnel)
if err != nil {
return ctrl.Result{}, err
}
if changed {
if err := r.client.Update(ctx, &agent); err != nil {
return ctrl.Result{}, err
}
}

// Assign overlay addresses and VNIs for any connections missing them
if err := r.ensureConnectionAllocations(ctx, log, req.NamespacedName); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

func (r *TunnelAgentReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
// Reconcile when spec generation changes OR when status (e.g., Connections) changes.
statusOrGenChanged := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool { return true },
DeleteFunc: func(e event.DeleteEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool {
oldObj, ok1 := e.ObjectOld.(*corev1alpha2.TunnelAgent)
newObj, ok2 := e.ObjectNew.(*corev1alpha2.TunnelAgent)
if !ok1 || !ok2 {
return false
}
genChanged := oldObj.GetGeneration() != newObj.GetGeneration()
statusDiff := !equality.Semantic.DeepEqual(oldObj.Status, newObj.Status)
return genChanged || statusDiff
},
}

return ctrl.NewControllerManagedBy(mgr).
For(&corev1alpha2.TunnelAgent{}, builder.WithPredicates(statusOrGenChanged)).
Complete(r)
}

func (r *TunnelAgentReconciler) ensureConnectionAllocations(
ctx context.Context,
log logr.Logger,
key client.ObjectKey,
) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var cur corev1alpha2.TunnelAgent
if err := r.client.Get(ctx, key, &cur); err != nil {
return err
}

// Track newly made allocations so we can roll them back if Status().Update fails.
var newlyAllocatedPrefixes []netip.Prefix
var newlyAllocatedVNIs []uint

for i := range cur.Status.Connections {
conn := &cur.Status.Connections[i]

// Allocate overlay address if missing
if conn.Address == "" {
pfx, err := r.agentIPAM.Allocate()
if err != nil {
// rollback anything we grabbed this pass
for _, p := range newlyAllocatedPrefixes {
_ = r.agentIPAM.Release(p)
}
for _, v := range newlyAllocatedVNIs {
r.vniPool.Free(v)
}
return fmt.Errorf("failed to allocate address: %w", err)
}
conn.Address = pfx.String()
newlyAllocatedPrefixes = append(newlyAllocatedPrefixes, pfx)

log.Info("Allocated overlay address", "connectionID", conn.ID, "address", conn.Address)
}

// Allocate VNI if missing (nil means "unset"; zero can be valid but your pool won't return 0)
if conn.VNI == nil {
vni, err := r.vniPool.Allocate()
if err != nil {
// rollback anything we grabbed this pass
for _, p := range newlyAllocatedPrefixes {
_ = r.agentIPAM.Release(p)
}
for _, vni := range newlyAllocatedVNIs {
r.vniPool.Free(vni)
}
return fmt.Errorf("failed to allocate VNI: %w", err)
}
conn.VNI = &vni
newlyAllocatedVNIs = append(newlyAllocatedVNIs, vni)

log.Info("Allocated VNI", "connectionID", conn.ID, "vni", *conn.VNI)
}
}

if len(newlyAllocatedPrefixes) == 0 && len(newlyAllocatedVNIs) == 0 {
// nothing changed
return nil
}

// Commit to status; if it fails, release fresh allocations from this attempt.
if err := r.client.Status().Update(ctx, &cur); err != nil {
// rollback anything we grabbed this pass
for _, p := range newlyAllocatedPrefixes {
_ = r.agentIPAM.Release(p)
}
for _, vni := range newlyAllocatedVNIs {
r.vniPool.Free(vni)
}
return err
}

return nil
})
}

func (r *TunnelAgentReconciler) releaseResourcesIfPresent(
ctx context.Context,
log logr.Logger,
key client.ObjectKey,
) (bool, error) {
var changed bool
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// Always work on a fresh copy to avoid write conflicts.
var cur corev1alpha2.TunnelAgent
if err := r.client.Get(ctx, key, &cur); err != nil {
if apierrors.IsNotFound(err) {
return nil
}

return err
}

// If there are no connections, we’re done.
if len(cur.Status.Connections) == 0 {
return nil
}

// Free resources that are still recorded in status and clear the fields.
for i := range cur.Status.Connections {
conn := &cur.Status.Connections[i]

// Release overlay address/prefix (if set)
if conn.Address != "" {
pfx, err := netip.ParsePrefix(conn.Address)
if err != nil {
return fmt.Errorf("failed to parse address %q for release: %w", conn.Address, err)
}
if err := r.agentIPAM.Release(pfx); err != nil {
return fmt.Errorf("failed to release address %q: %w", conn.Address, err)
}
log.Info("Released overlay address", "connectionID", conn.ID, "address", conn.Address)
conn.Address = "" // clear in status
changed = true
}

// Release VNI (if set)
if conn.VNI != nil {
v := *conn.VNI
r.vniPool.Free(v)
log.Info("Released VNI", "connectionID", conn.ID, "vni", v)
conn.VNI = nil // clear in status
changed = true
}
}

// Commit to status.
if changed {
if err := r.client.Status().Update(ctx, &cur); err != nil {
return err
}
}

return nil
})

return changed, err
}

func (r *TunnelAgentReconciler) ensureControllerOwner(child client.Object, owner client.Object) (bool, error) {
for _, or := range child.GetOwnerReferences() {
if or.UID == owner.GetUID() && or.Controller != nil && *or.Controller {
return false, nil
}
}

// Set controller reference (overwrites any existing controller owner)
if err := controllerutil.SetControllerReference(
owner,
child,
r.client.Scheme(),
); err != nil {
return false, err
}

return true, nil
}
Loading