Skip to content

Commit d81cec5

Browse files
committed
[tunnel] Refactor for multicluster support
1 parent f53b2ab commit d81cec5

File tree

3 files changed

+126
-23
lines changed

3 files changed

+126
-23
lines changed

cmd/tunnelproxy/main.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,20 +162,21 @@ func main() {
162162
log.Infof("Using network ID from K8S_POD_UID: %s", *networkID)
163163
}
164164

165+
clientGetter := &tunnel.SingleClusterClientGetter{Client: mgr.GetClient()}
165166
srv, err := tunnel.NewTunnelServer(
166-
mgr.GetClient(),
167+
clientGetter,
167168
jwtValidator,
168169
r,
169170
tunnel.WithExternalAddrs(extIPv4Prefix),
170-
tunnel.WithLabelSelector(*tunnelNodeSelector),
171171
tunnel.WithPublicAddr(*publicAddr),
172172
tunnel.WithIPAMv4(tunnet.NewIPAMv4(gCtx)),
173173
)
174174
if err != nil {
175175
log.Fatalf("Failed to create tunnel server: %v", err)
176176
}
177-
if err := srv.SetupWithManager(mgr); err != nil {
178-
log.Fatalf("Unable to setup Tunnel Proxy server: %v", err)
177+
reconciler := tunnel.NewTunnelNodeReconciler(mgr.GetClient(), srv, *tunnelNodeSelector)
178+
if err := reconciler.SetupWithManager(mgr); err != nil {
179+
log.Fatalf("Unable to setup TunnelNode reconciler: %v", err)
179180
}
180181

181182
if err := tunnelproxy.NewProxyTunnelReconciler(

pkg/tunnel/server.go

Lines changed: 117 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
ctrl "sigs.k8s.io/controller-runtime"
3030
"sigs.k8s.io/controller-runtime/pkg/builder"
3131
"sigs.k8s.io/controller-runtime/pkg/client"
32+
"sigs.k8s.io/controller-runtime/pkg/handler"
3233
"sigs.k8s.io/controller-runtime/pkg/log"
3334
"sigs.k8s.io/controller-runtime/pkg/predicate"
3435
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -149,11 +150,33 @@ func (c *conn) String() string {
149150
return fmt.Sprintf("%s [%s]: %v %v", c.obj.Name, c.connID, c.addrv4, c.addrv6)
150151
}
151152

152-
type TunnelServer struct {
153-
client.Client
153+
// ClientGetter is an interface for obtaining a Kubernetes client.
154+
// This abstraction allows the TunnelServer to work with both single-cluster
155+
// and multi-cluster setups (e.g., multicluster-runtime).
156+
type ClientGetter interface {
157+
// GetClient returns a client for the given context.
158+
// In single-cluster mode, clusterName can be empty.
159+
// In multi-cluster mode, clusterName identifies the target cluster.
160+
GetClient(ctx context.Context, clusterName string) (client.Client, error)
161+
}
162+
163+
// SingleClusterClientGetter wraps a single client.Client for use with ClientGetter.
164+
type SingleClusterClientGetter struct {
165+
Client client.Client
166+
}
154167

168+
// GetClient returns the wrapped client, ignoring the clusterName.
169+
func (s *SingleClusterClientGetter) GetClient(ctx context.Context, clusterName string) (client.Client, error) {
170+
return s.Client, nil
171+
}
172+
173+
// TunnelServer manages QUIC tunnel connections and routes traffic via CONNECT-IP.
174+
// It is designed to be used with a separate TunnelNodeReconciler that handles
175+
// Kubernetes reconciliation, making it compatible with multicluster-runtime.
176+
type TunnelServer struct {
155177
options *tunnelServerOptions
156178

179+
clientGetter ClientGetter
157180
jwtValidator token.JWTValidator
158181
ln *quic.EarlyListener
159182
router router.Router
@@ -165,10 +188,19 @@ type TunnelServer struct {
165188
conns *haxmap.Map[string, *conn]
166189
}
167190

191+
// TunnelNodeReconciler reconciles TunnelNode objects and delegates connection
192+
// management to a TunnelServer. This reconciler can be used with multicluster-runtime
193+
// by using the EngageWithManager method instead of SetupWithManager.
194+
type TunnelNodeReconciler struct {
195+
client client.Client
196+
server *TunnelServer
197+
labelSelector string
198+
}
199+
168200
// NewTunnelServer creates a new server proxy that routes traffic via
169201
// QUIC tunnels.
170202
func NewTunnelServer(
171-
client client.Client,
203+
clientGetter ClientGetter,
172204
v token.JWTValidator,
173205
r router.Router,
174206
opts ...TunnelServerOption,
@@ -179,10 +211,9 @@ func NewTunnelServer(
179211
}
180212

181213
s := &TunnelServer{
182-
Client: client,
183-
184214
options: options,
185215

216+
clientGetter: clientGetter,
186217
jwtValidator: v,
187218
router: r,
188219

@@ -193,8 +224,21 @@ func NewTunnelServer(
193224
return s, nil
194225
}
195226

196-
func (t *TunnelServer) SetupWithManager(mgr ctrl.Manager) error {
197-
lss, err := metav1.ParseToLabelSelector(t.options.selector)
227+
// NewTunnelNodeReconciler creates a new reconciler for TunnelNode objects.
228+
// The reconciler delegates connection management to the provided TunnelServer.
229+
// For multicluster-runtime, use NewTunnelNodeReconcilerWithClientGetter instead.
230+
func NewTunnelNodeReconciler(c client.Client, server *TunnelServer, labelSelector string) *TunnelNodeReconciler {
231+
return &TunnelNodeReconciler{
232+
client: c,
233+
server: server,
234+
labelSelector: labelSelector,
235+
}
236+
}
237+
238+
// SetupWithManager sets up the reconciler with a standard controller-runtime manager.
239+
// For multicluster-runtime compatibility, use EngageWithManager instead.
240+
func (r *TunnelNodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
241+
lss, err := metav1.ParseToLabelSelector(r.labelSelector)
198242
if err != nil {
199243
return fmt.Errorf("failed to parse label selector: %w", err)
200244
}
@@ -209,7 +253,50 @@ func (t *TunnelServer) SetupWithManager(mgr ctrl.Manager) error {
209253
ls,
210254
),
211255
).
212-
Complete(reconcile.Func(t.reconcile)) // Using this contraption to keep reconcile method private.
256+
Complete(r)
257+
}
258+
259+
// EngageWithManager returns a builder that can be used with multicluster-runtime.
260+
// The caller is responsible for completing the builder with a cluster-aware handler.
261+
//
262+
// Example usage with multicluster-runtime:
263+
//
264+
// import mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder"
265+
//
266+
// reconciler := tunnel.NewTunnelNodeReconciler(server, labelSelector)
267+
// err := mcbuilder.ControllerManagedBy(mgr).
268+
// For(&corev1alpha.TunnelNode{}, reconciler.Predicates()...).
269+
// Complete(reconciler)
270+
func (r *TunnelNodeReconciler) EngageWithManager(mgr ctrl.Manager) *builder.Builder {
271+
return ctrl.NewControllerManagedBy(mgr).
272+
For(&corev1alpha.TunnelNode{})
273+
}
274+
275+
// Predicates returns the predicates to use when setting up the controller.
276+
// This is useful when using EngageWithManager or multicluster-runtime.
277+
func (r *TunnelNodeReconciler) Predicates() []predicate.Predicate {
278+
preds := []predicate.Predicate{
279+
&predicate.ResourceVersionChangedPredicate{},
280+
}
281+
if r.labelSelector != "" {
282+
lss, err := metav1.ParseToLabelSelector(r.labelSelector)
283+
if err == nil {
284+
if ls, err := predicate.LabelSelectorPredicate(*lss); err == nil {
285+
preds = append(preds, ls)
286+
}
287+
}
288+
}
289+
return preds
290+
}
291+
292+
// EventHandler returns an event handler that enqueues reconcile requests.
293+
// This is useful when setting up watches in multicluster scenarios.
294+
func (r *TunnelNodeReconciler) EventHandler() handler.EventHandler {
295+
return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
296+
return []reconcile.Request{
297+
{NamespacedName: types.NamespacedName{Name: obj.GetName()}},
298+
}
299+
})
213300
}
214301

215302
func (t *TunnelServer) Start(ctx context.Context) error {
@@ -452,9 +539,12 @@ func (t *TunnelServer) makeSingleConnectHandler(ctx context.Context, qConn quic.
452539
if len(t.options.extAddrs) > 0 && t.options.extAddrs[0].IsValid() {
453540
agent.PrivateAddress = t.options.extAddrs[0].Addr().String()
454541
}
455-
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
542+
c, err := t.clientGetter.GetClient(r.Context(), "")
543+
if err != nil {
544+
logger.Error("Failed to get client", slog.Any("error", err))
545+
} else if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
456546
upd := &corev1alpha.TunnelNode{}
457-
if err := t.Get(r.Context(), types.NamespacedName{Name: tn.Name}, upd); apierrors.IsNotFound(err) {
547+
if err := c.Get(r.Context(), types.NamespacedName{Name: tn.Name}, upd); apierrors.IsNotFound(err) {
458548
logger.Warn("Node not found while adding agent")
459549
return errors.New("node not found")
460550
} else if err != nil {
@@ -464,7 +554,7 @@ func (t *TunnelServer) makeSingleConnectHandler(ctx context.Context, qConn quic.
464554

465555
upsertAgentStatus(&upd.Status, agent)
466556

467-
return t.Status().Update(r.Context(), upd)
557+
return c.Status().Update(r.Context(), upd)
468558
}); err != nil {
469559
logger.Error("Failed to update agent status", slog.Any("error", err))
470560
}
@@ -510,10 +600,13 @@ func (t *TunnelServer) makeSingleConnectHandler(ctx context.Context, qConn quic.
510600

511601
t.conns.Del(connID)
512602

513-
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
603+
cleanupClient, cleanupErr := t.clientGetter.GetClient(context.Background(), "")
604+
if cleanupErr != nil {
605+
logger.Error("Failed to get client for cleanup", slog.Any("error", cleanupErr))
606+
} else if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
514607
upd := &corev1alpha.TunnelNode{}
515608
nn := types.NamespacedName{Name: tn.Name}
516-
if err := t.Get(context.Background(), nn, upd); apierrors.IsNotFound(err) {
609+
if err := cleanupClient.Get(context.Background(), nn, upd); apierrors.IsNotFound(err) {
517610
logger.Warn("Node not found")
518611
return errors.New("node not found")
519612
} else if err != nil {
@@ -528,7 +621,7 @@ func (t *TunnelServer) makeSingleConnectHandler(ctx context.Context, qConn quic.
528621
}
529622
}
530623

531-
return t.Status().Update(context.Background(), upd)
624+
return cleanupClient.Status().Update(context.Background(), upd)
532625
}); err != nil {
533626
logger.Error("Failed to update agent status", slog.Any("error", err))
534627
}
@@ -537,11 +630,18 @@ func (t *TunnelServer) makeSingleConnectHandler(ctx context.Context, qConn quic.
537630
}
538631
}
539632

540-
func (t *TunnelServer) reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
633+
// Reconcile implements reconcile.Reconciler for TunnelNodeReconciler.
634+
func (r *TunnelNodeReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
635+
return r.server.reconcileTunnelNode(ctx, r.client, request)
636+
}
637+
638+
// reconcileTunnelNode handles the reconciliation logic for a TunnelNode.
639+
// This method is called by the TunnelNodeReconciler with the appropriate client.
640+
func (t *TunnelServer) reconcileTunnelNode(ctx context.Context, c client.Client, request reconcile.Request) (reconcile.Result, error) {
541641
defer metrics.TunnelNodesManaged.Set(float64(t.tunnels.Len()))
542642

543643
node := &corev1alpha.TunnelNode{}
544-
if err := t.Get(ctx, request.NamespacedName, node); apierrors.IsNotFound(err) {
644+
if err := c.Get(ctx, request.NamespacedName, node); apierrors.IsNotFound(err) {
545645
return reconcile.Result{}, client.IgnoreNotFound(err)
546646
} else if err != nil {
547647
return reconcile.Result{}, fmt.Errorf("failed to get TunnelNode: %w", err)
@@ -569,7 +669,7 @@ func (t *TunnelServer) reconcile(ctx context.Context, request reconcile.Request)
569669
updated = true
570670
}
571671
if updated {
572-
if err := t.Status().Update(ctx, node); err != nil {
672+
if err := c.Status().Update(ctx, node); err != nil {
573673
return reconcile.Result{}, fmt.Errorf("failed to update TunnelNode status: %w", err)
574674
}
575675
}

pkg/tunnel/tunnel_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,9 @@ func TestTunnelEndToEnd_UserModeClient(t *testing.T) {
100100
serverRouter, err := router.NewNetlinkRouter()
101101
require.NoError(t, err)
102102

103+
clientGetter := &tunnel.SingleClusterClientGetter{Client: kubeClient}
103104
server, err := tunnel.NewTunnelServer(
104-
kubeClient,
105+
clientGetter,
105106
jwtValidator,
106107
serverRouter,
107108
tunnel.WithCertPath(filepath.Join(certsDir, "server.crt")),
@@ -309,8 +310,9 @@ func TestTunnelEndToEnd_KernelModeClient(t *testing.T) {
309310
)
310311
require.NoError(t, err)
311312

313+
clientGetter := &tunnel.SingleClusterClientGetter{Client: kubeClient}
312314
server, err := tunnel.NewTunnelServer(
313-
kubeClient,
315+
clientGetter,
314316
jwtValidator,
315317
serverRouter,
316318
tunnel.WithCertPath(filepath.Join(certsDir, "server.crt")),

0 commit comments

Comments
 (0)