Skip to content

Commit 3f2a2eb

Browse files
dilyevskyclaude
andcommitted
[tunnel] Add /ping endpoint and latency-based endpoint selection
Add a lightweight /ping endpoint to the tunnelproxy HTTP/3 server for latency probing without authentication overhead. Update the latency selector to make a full HTTP/3 round-trip to /ping instead of only measuring QUIC handshake time, giving a more representative latency measurement for endpoint selection. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 60f646b commit 3f2a2eb

File tree

8 files changed

+555
-7
lines changed

8 files changed

+555
-7
lines changed

pkg/cmd/tunnel/cmd.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ var (
6464
tunnelNodeForceConflicts bool
6565
// noTUI disables the TUI interface.
6666
noTUI bool
67+
// endpointSelection is the endpoint selection strategy.
68+
endpointSelection string
6769
)
6870

6971
var createCmd = &cobra.Command{
@@ -323,6 +325,8 @@ func init() {
323325
tunnelRunCmd.Flags().StringVar(&healthAddr, "health-addr", ":8080", "Listen address for health endpoint (default: :8080).")
324326
tunnelRunCmd.Flags().StringVar(&metricsAddr, "metrics-addr", ":8081", "Listen address for metrics endpoint (default: :8081).")
325327
tunnelRunCmd.Flags().BoolVar(&noTUI, "no-tui", false, "Disable TUI interface.")
328+
tunnelRunCmd.Flags().StringVar(&endpointSelection, "endpoint-selection", "latency",
329+
"Endpoint selection strategy: 'latency' (default) or 'random'")
326330

327331
tunnelCmd.AddCommand(createCmd)
328332
tunnelCmd.AddCommand(getCmd)

pkg/cmd/tunnel/run.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/apoxy-dev/apoxy/pkg/net/dns"
4343
"github.com/apoxy-dev/apoxy/pkg/tunnel"
4444
"github.com/apoxy-dev/apoxy/pkg/tunnel/connection"
45+
"github.com/apoxy-dev/apoxy/pkg/tunnel/endpointselect"
4546

4647
configv1alpha1 "github.com/apoxy-dev/apoxy/api/config/v1alpha1"
4748
corev1alpha "github.com/apoxy-dev/apoxy/api/core/v1alpha"
@@ -105,6 +106,9 @@ type tunnelNodeReconciler struct {
105106
tunMu sync.RWMutex
106107
tunDialerWorkers []*tunConn
107108

109+
// Endpoint selector for choosing tunnel server addresses.
110+
endpointSelector endpointselect.Selector
111+
108112
// Dial parameters protected by dialMu
109113
dialMu sync.RWMutex
110114
tunnelUID uuid.UUID
@@ -140,6 +144,20 @@ var tunnelRunCmd = &cobra.Command{
140144
preserveDefaultGwDsts = append(preserveDefaultGwDsts, dst)
141145
}
142146

147+
// Parse and create endpoint selector.
148+
strategy, err := endpointselect.ParseStrategy(endpointSelection)
149+
if err != nil {
150+
return fmt.Errorf("unable to parse endpoint selection strategy: %w", err)
151+
}
152+
selectorOpts := []endpointselect.Option{}
153+
if insecureSkipVerify {
154+
selectorOpts = append(selectorOpts, endpointselect.WithInsecureSkipVerify(true))
155+
}
156+
selector, err := endpointselect.NewSelector(strategy, selectorOpts...)
157+
if err != nil {
158+
return fmt.Errorf("unable to create endpoint selector: %w", err)
159+
}
160+
143161
cmd.SilenceUsage = true
144162

145163
cfg, err := config.Load()
@@ -173,9 +191,10 @@ var tunnelRunCmd = &cobra.Command{
173191
}
174192

175193
tun := &tunnelNodeReconciler{
176-
scheme: scheme,
177-
cfg: cfg,
178-
a3y: a3y,
194+
scheme: scheme,
195+
cfg: cfg,
196+
a3y: a3y,
197+
endpointSelector: selector,
179198

180199
tunDialerWorkers: make([]*tunConn, 0),
181200
}
@@ -412,9 +431,18 @@ func (t *tunnelNodeReconciler) reconcile(ctx context.Context, req ctrl.Request)
412431
return ctrl.Result{
413432
RequeueAfter: time.Second,
414433
}, nil
434+
} else if len(tunnelNode.Status.Addresses) == 1 {
435+
srvAddr = tunnelNode.Status.Addresses[0]
415436
} else {
416-
// TODO: Pick unused address at random if available.
417-
srvAddr = tunnelNode.Status.Addresses[rand.Intn(len(tunnelNode.Status.Addresses))]
437+
// Use endpoint selector to choose the best endpoint.
438+
selected, err := t.endpointSelector.Select(ctx, tunnelNode.Status.Addresses)
439+
if err != nil {
440+
log.Warn("Endpoint selection failed, using random",
441+
slog.Any("error", err))
442+
srvAddr = tunnelNode.Status.Addresses[rand.Intn(len(tunnelNode.Status.Addresses))]
443+
} else {
444+
srvAddr = selected
445+
}
418446
}
419447
} else {
420448
apiServerHost := "localhost"
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
package endpointselect
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"errors"
7+
"fmt"
8+
"log/slog"
9+
"net"
10+
"net/http"
11+
"sort"
12+
"sync"
13+
"time"
14+
15+
"github.com/quic-go/quic-go"
16+
"github.com/quic-go/quic-go/http3"
17+
)
18+
19+
const (
20+
// DefaultProbeTimeout is the default timeout for each endpoint probe.
21+
DefaultProbeTimeout = 3 * time.Second
22+
// DefaultMaxConcurrent is the default maximum number of concurrent probes.
23+
DefaultMaxConcurrent = 10
24+
)
25+
26+
// Option configures a LatencySelector.
27+
type Option func(*latencyOptions)
28+
29+
type latencyOptions struct {
30+
probeTimeout time.Duration
31+
maxConcurrent int
32+
insecureSkip bool
33+
}
34+
35+
// WithProbeTimeout sets the timeout for each endpoint probe.
36+
func WithProbeTimeout(timeout time.Duration) Option {
37+
return func(o *latencyOptions) {
38+
o.probeTimeout = timeout
39+
}
40+
}
41+
42+
// WithMaxConcurrent sets the maximum number of concurrent probes.
43+
func WithMaxConcurrent(max int) Option {
44+
return func(o *latencyOptions) {
45+
o.maxConcurrent = max
46+
}
47+
}
48+
49+
// WithInsecureSkipVerify sets whether to skip TLS certificate verification.
50+
func WithInsecureSkipVerify(skip bool) Option {
51+
return func(o *latencyOptions) {
52+
o.insecureSkip = skip
53+
}
54+
}
55+
56+
// LatencySelector selects endpoints based on QUIC handshake latency.
57+
type LatencySelector struct {
58+
opts latencyOptions
59+
}
60+
61+
// NewLatencySelector creates a new LatencySelector.
62+
func NewLatencySelector(opts ...Option) *LatencySelector {
63+
options := latencyOptions{
64+
probeTimeout: DefaultProbeTimeout,
65+
maxConcurrent: DefaultMaxConcurrent,
66+
}
67+
for _, opt := range opts {
68+
opt(&options)
69+
}
70+
return &LatencySelector{opts: options}
71+
}
72+
73+
// Select returns the endpoint with the lowest latency.
74+
func (s *LatencySelector) Select(ctx context.Context, endpoints []string) (string, error) {
75+
addr, _, err := s.SelectWithResults(ctx, endpoints)
76+
return addr, err
77+
}
78+
79+
// SelectWithResults returns the endpoint with the lowest latency along with all probe results.
80+
func (s *LatencySelector) SelectWithResults(ctx context.Context, endpoints []string) (string, []ProbeResult, error) {
81+
if len(endpoints) == 0 {
82+
return "", nil, errors.New("no endpoints provided")
83+
}
84+
if len(endpoints) == 1 {
85+
return endpoints[0], []ProbeResult{{
86+
Addr: endpoints[0],
87+
ProbedAt: time.Now(),
88+
}}, nil
89+
}
90+
91+
results := s.probeAll(ctx, endpoints)
92+
93+
// Sort by latency (errors go to the end).
94+
sort.Slice(results, func(i, j int) bool {
95+
// Errors go to the end.
96+
if results[i].Error != nil && results[j].Error != nil {
97+
return false
98+
}
99+
if results[i].Error != nil {
100+
return false
101+
}
102+
if results[j].Error != nil {
103+
return true
104+
}
105+
return results[i].Latency < results[j].Latency
106+
})
107+
108+
// Find the first successful result.
109+
for _, r := range results {
110+
if r.Error == nil {
111+
slog.Info("Selected endpoint based on latency",
112+
slog.String("addr", r.Addr),
113+
slog.Duration("latency", r.Latency))
114+
return r.Addr, results, nil
115+
}
116+
}
117+
118+
// All probes failed - return error with details.
119+
return "", results, errors.New("all endpoint probes failed")
120+
}
121+
122+
// probeAll probes all endpoints concurrently and returns the results.
123+
func (s *LatencySelector) probeAll(ctx context.Context, endpoints []string) []ProbeResult {
124+
results := make([]ProbeResult, len(endpoints))
125+
var wg sync.WaitGroup
126+
127+
// Semaphore to limit concurrent probes.
128+
sem := make(chan struct{}, s.opts.maxConcurrent)
129+
130+
for i, endpoint := range endpoints {
131+
wg.Add(1)
132+
go func(idx int, addr string) {
133+
defer wg.Done()
134+
135+
// Acquire semaphore.
136+
select {
137+
case sem <- struct{}{}:
138+
defer func() { <-sem }()
139+
case <-ctx.Done():
140+
results[idx] = ProbeResult{
141+
Addr: addr,
142+
Error: ctx.Err(),
143+
ProbedAt: time.Now(),
144+
}
145+
return
146+
}
147+
148+
results[idx] = s.probe(ctx, addr)
149+
}(i, endpoint)
150+
}
151+
152+
wg.Wait()
153+
return results
154+
}
155+
156+
// probe measures the round-trip latency to a single endpoint by making
157+
// an HTTP/3 request to the /ping endpoint.
158+
func (s *LatencySelector) probe(ctx context.Context, addr string) ProbeResult {
159+
result := ProbeResult{
160+
Addr: addr,
161+
ProbedAt: time.Now(),
162+
}
163+
164+
probeCtx, cancel := context.WithTimeout(ctx, s.opts.probeTimeout)
165+
defer cancel()
166+
167+
// Extract hostname from address for TLS ServerName.
168+
serverName := "proxy"
169+
if host, _, err := net.SplitHostPort(addr); err == nil && net.ParseIP(host) == nil {
170+
serverName = host
171+
}
172+
173+
tlsConfig := &tls.Config{
174+
ServerName: serverName,
175+
NextProtos: []string{http3.NextProtoH3},
176+
InsecureSkipVerify: s.opts.insecureSkip,
177+
}
178+
179+
quicConfig := &quic.Config{
180+
EnableDatagrams: true,
181+
InitialPacketSize: 1350,
182+
}
183+
184+
start := time.Now()
185+
186+
// Dial QUIC connection.
187+
qConn, err := quic.DialAddr(probeCtx, addr, tlsConfig, quicConfig)
188+
if err != nil {
189+
result.Error = err
190+
slog.Debug("Endpoint probe failed (QUIC dial)",
191+
slog.String("addr", addr),
192+
slog.Any("error", err))
193+
return result
194+
}
195+
defer qConn.CloseWithError(0, "probe complete")
196+
197+
// Make HTTP/3 request to /ping endpoint.
198+
tr := &http3.Transport{EnableDatagrams: true}
199+
hConn := tr.NewClientConn(qConn)
200+
201+
req, err := http.NewRequestWithContext(probeCtx, "GET", "https://proxy/ping", nil)
202+
if err != nil {
203+
result.Error = err
204+
slog.Debug("Endpoint probe failed (request creation)",
205+
slog.String("addr", addr),
206+
slog.Any("error", err))
207+
return result
208+
}
209+
210+
resp, err := hConn.RoundTrip(req)
211+
if err != nil {
212+
result.Error = err
213+
slog.Debug("Endpoint probe failed (HTTP/3 request)",
214+
slog.String("addr", addr),
215+
slog.Any("error", err))
216+
return result
217+
}
218+
defer resp.Body.Close()
219+
220+
if resp.StatusCode != http.StatusOK {
221+
result.Error = fmt.Errorf("ping returned status %d", resp.StatusCode)
222+
slog.Debug("Endpoint probe failed (bad status)",
223+
slog.String("addr", addr),
224+
slog.Int("status", resp.StatusCode))
225+
return result
226+
}
227+
228+
result.Latency = time.Since(start)
229+
230+
slog.Debug("Endpoint probe succeeded",
231+
slog.String("addr", addr),
232+
slog.Duration("latency", result.Latency))
233+
234+
return result
235+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package endpointselect
2+
3+
import (
4+
"context"
5+
"errors"
6+
"math/rand"
7+
"time"
8+
)
9+
10+
// RandomSelector selects a random endpoint from the list.
11+
type RandomSelector struct{}
12+
13+
// NewRandomSelector creates a new RandomSelector.
14+
func NewRandomSelector() *RandomSelector {
15+
return &RandomSelector{}
16+
}
17+
18+
// Select returns a random endpoint from the list.
19+
func (s *RandomSelector) Select(ctx context.Context, endpoints []string) (string, error) {
20+
addr, _, err := s.SelectWithResults(ctx, endpoints)
21+
return addr, err
22+
}
23+
24+
// SelectWithResults returns a random endpoint along with a single result entry.
25+
func (s *RandomSelector) SelectWithResults(ctx context.Context, endpoints []string) (string, []ProbeResult, error) {
26+
if len(endpoints) == 0 {
27+
return "", nil, errors.New("no endpoints provided")
28+
}
29+
30+
selected := endpoints[rand.Intn(len(endpoints))]
31+
result := []ProbeResult{{
32+
Addr: selected,
33+
ProbedAt: time.Now(),
34+
}}
35+
36+
return selected, result, nil
37+
}

0 commit comments

Comments
 (0)