Skip to content

Commit 8542795

Browse files
authored
[icxtunnel/controllers] remove relay address from list on shutdown (#81)
So that we don't leak non-existent relay addresses.
1 parent a8460df commit 8542795

File tree

4 files changed

+136
-1
lines changed

4 files changed

+136
-1
lines changed

pkg/tunnel/controllers/relay.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@ type Relay interface {
2121
SetOnConnect(onConnect func(ctx context.Context, tunnelName, agentName string, conn Connection) error)
2222
// SetOnDisconnect sets a callback that is invoked when a connection is closed.
2323
SetOnDisconnect(onDisconnect func(ctx context.Context, agentName, id string) error)
24+
// SetOnShutdown sets a callback that is invoked when the relay is shutting down.
25+
SetOnShutdown(onShutdown func(ctx context.Context))
2426
}

pkg/tunnel/controllers/tunnel_reconciler.go

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package controllers
33
import (
44
"context"
55
"fmt"
6+
"log/slog"
67
"slices"
78

89
apierrors "k8s.io/apimachinery/pkg/api/errors"
910
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/types"
1012
"k8s.io/client-go/util/retry"
1113
ctrl "sigs.k8s.io/controller-runtime"
1214
"sigs.k8s.io/controller-runtime/pkg/builder"
@@ -25,11 +27,13 @@ type TunnelReconciler struct {
2527
}
2628

2729
func NewTunnelReconciler(c client.Client, relay Relay, labelSelector string) *TunnelReconciler {
28-
return &TunnelReconciler{
30+
r := &TunnelReconciler{
2931
client: c,
3032
relay: relay,
3133
labelSelector: labelSelector,
3234
}
35+
relay.SetOnShutdown(r.RemoveRelayAddress)
36+
return r
3337
}
3438

3539
func (r *TunnelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
@@ -102,3 +106,53 @@ func (r *TunnelReconciler) SetupWithManager(mgr ctrl.Manager) error {
102106
For(&corev1alpha2.Tunnel{}, builder.WithPredicates(&predicate.ResourceVersionChangedPredicate{}, ls)).
103107
Complete(r)
104108
}
109+
110+
func (r *TunnelReconciler) RemoveRelayAddress(ctx context.Context) {
111+
// Build the same label selector we filter on during watch.
112+
lss, err := metav1.ParseToLabelSelector(r.labelSelector)
113+
if err != nil {
114+
slog.Error("Failed to parse label selector during shutdown cleanup", slog.Any("error", err))
115+
return
116+
}
117+
sel, err := metav1.LabelSelectorAsSelector(lss)
118+
if err != nil {
119+
slog.Error("Failed to build label selector during shutdown cleanup", slog.Any("error", err))
120+
return
121+
}
122+
123+
var list corev1alpha2.TunnelList
124+
if err := r.client.List(ctx, &list, &client.ListOptions{LabelSelector: sel}); err != nil {
125+
slog.Error("Failed to list tunnels during shutdown cleanup", slog.Any("error", err))
126+
return
127+
}
128+
129+
relayAddr := r.relay.Address().String()
130+
for _, t := range list.Items {
131+
// Skip if there's nothing to remove.
132+
if !slices.Contains(t.Status.Addresses, relayAddr) {
133+
continue
134+
}
135+
136+
key := types.NamespacedName{Namespace: t.Namespace, Name: t.Name}
137+
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
138+
var latest corev1alpha2.Tunnel
139+
if err := r.client.Get(ctx, key, &latest); err != nil {
140+
return err
141+
}
142+
143+
// Filter out this relay's address.
144+
filtered := latest.Status.Addresses[:0]
145+
for _, a := range latest.Status.Addresses {
146+
if a != relayAddr {
147+
filtered = append(filtered, a)
148+
}
149+
}
150+
latest.Status.Addresses = filtered
151+
152+
return r.client.Status().Update(ctx, &latest)
153+
})
154+
if err != nil {
155+
slog.Error("Failed to remove relay address from tunnel during shutdown cleanup", slog.Any("error", err), slog.String("tunnel", key.String()))
156+
}
157+
}
158+
}

pkg/tunnel/controllers/tunnel_reconciler_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ func TestTunnelReconciler(t *testing.T) {
4343
relay.On("SetCredentials", "tun-1", "secret-token").Once()
4444
relay.On("SetRelayAddresses", "tun-1", mock.Anything).Once()
4545
relay.On("SetEgressGateway", mock.Anything).Return().Once()
46+
relay.On("SetOnShutdown", mock.Anything).Return().Once()
4647

4748
r := controllers.NewTunnelReconciler(c, relay, "")
4849

@@ -53,6 +54,59 @@ func TestTunnelReconciler(t *testing.T) {
5354
relay.AssertExpectations(t)
5455
}
5556

57+
func TestTunnelReconciler_OnShutdownRemovesAddress(t *testing.T) {
58+
scheme := runtime.NewScheme()
59+
require.NoError(t, corev1alpha2.Install(scheme))
60+
61+
relayAddr := netip.MustParseAddrPort("1.1.1.1:443")
62+
63+
tunnel := &corev1alpha2.Tunnel{
64+
ObjectMeta: metav1.ObjectMeta{Name: "tun-1"},
65+
Status: corev1alpha2.TunnelStatus{
66+
// Seed with our relay address plus another one to ensure only ours is removed.
67+
Addresses: []string{relayAddr.String(), "2.2.2.2:443"},
68+
},
69+
}
70+
71+
c := fakeclient.NewClientBuilder().
72+
WithScheme(scheme).
73+
WithStatusSubresource(&corev1alpha2.Tunnel{}).
74+
WithObjects(tunnel).
75+
Build()
76+
77+
relay := &mockRelay{}
78+
relay.On("Address").Return(relayAddr)
79+
80+
var onShutdown func(context.Context)
81+
relay.
82+
On("SetOnShutdown", mock.Anything).
83+
Run(func(args mock.Arguments) {
84+
onShutdown = args.Get(0).(func(context.Context))
85+
}).
86+
Return().
87+
Once()
88+
89+
// We don't need other relay expectations for this test.
90+
controllers.NewTunnelReconciler(c, relay, "")
91+
92+
// Sanity: ensure the tunnel initially contains the relay address.
93+
var before corev1alpha2.Tunnel
94+
require.NoError(t, c.Get(context.Background(), types.NamespacedName{Name: "tun-1"}, &before))
95+
require.Contains(t, before.Status.Addresses, relayAddr.String())
96+
97+
// Invoke the captured shutdown hook.
98+
require.NotNil(t, onShutdown, "onShutdown should be captured from SetOnShutdown")
99+
onShutdown(t.Context())
100+
101+
// After shutdown, our relay address should be removed from the status.
102+
var after corev1alpha2.Tunnel
103+
require.NoError(t, c.Get(context.Background(), types.NamespacedName{Name: "tun-1"}, &after))
104+
require.NotContains(t, after.Status.Addresses, relayAddr.String())
105+
require.ElementsMatch(t, []string{"2.2.2.2:443"}, after.Status.Addresses)
106+
107+
relay.AssertExpectations(t)
108+
}
109+
56110
func testLogr(t *testing.T) logr.Logger {
57111
if testing.Verbose() {
58112
l := stdr.New(log.New(os.Stdout, "", log.LstdFlags))
@@ -94,3 +148,7 @@ func (m *mockRelay) SetOnConnect(onConnect func(ctx context.Context, tunnelName,
94148
func (m *mockRelay) SetOnDisconnect(onDisconnect func(ctx context.Context, agentName, id string) error) {
95149
m.Called(onDisconnect)
96150
}
151+
152+
func (m *mockRelay) SetOnShutdown(onShutdown func(ctx context.Context)) {
153+
m.Called(onShutdown)
154+
}

pkg/tunnel/relay.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type Relay struct {
5050
agents *haxmap.Map[string, string] // map[connectionID]agentName
5151
onConnect func(ctx context.Context, tunnelName, agentName string, conn controllers.Connection) error
5252
onDisconnect func(ctx context.Context, agentName, id string) error
53+
onShutdown func(ctx context.Context)
5354
}
5455

5556
func NewRelay(name string, pc net.PacketConn, cert tls.Certificate, handler *icx.Handler, idHasher *hasher.Hasher, router router.Router) *Relay {
@@ -112,6 +113,14 @@ func (r *Relay) SetOnDisconnect(onDisconnect func(ctx context.Context, agentName
112113
r.onDisconnect = onDisconnect
113114
}
114115

116+
// SetOnShutdown sets a callback that is invoked when the relay is shutting down.
117+
func (r *Relay) SetOnShutdown(onShutdown func(context.Context)) {
118+
r.mu.Lock()
119+
defer r.mu.Unlock()
120+
121+
r.onShutdown = onShutdown
122+
}
123+
115124
// Start starts the relay.
116125
func (r *Relay) Start(ctx context.Context) error {
117126
ln, err := quic.ListenEarly(
@@ -178,6 +187,18 @@ func (r *Relay) Start(ctx context.Context) error {
178187
return err
179188
}
180189

190+
// Invoke shutdown callback if set.
191+
r.mu.Lock()
192+
onShutdown := r.onShutdown
193+
r.mu.Unlock()
194+
195+
if onShutdown != nil {
196+
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
197+
defer cancel()
198+
199+
onShutdown(shutdownCtx)
200+
}
201+
181202
return nil
182203
}
183204

0 commit comments

Comments
 (0)