Skip to content

Commit c86de3a

Browse files
committed
feat: outbound latency test
1 parent 24ffb51 commit c86de3a

18 files changed

Lines changed: 755 additions & 100 deletions

.env.example

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ API_KEY = xxxxxxxx-yyyy-zzzz-mmmm-aaaaaaaaaaa
2222
# STARTUP_LOG_TAIL_SIZE = 200
2323
# STATS_UPDATE_INTERVAL_SECONDS = 10
2424
# STATS_CLEANUP_INTERVAL_SECONDS = 300
25+
# LATENCY_TEST_URL = https://www.gstatic.com/generate_204
26+
# LATENCY_TIMEOUT_SECONDS = 5
2527

2628
### WireGuard host NAT
2729
### Built-in routing enables runtime IPv4 forwarding and manages scoped nft NAT/forwarding rules.

backend/backend.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type Backend interface {
1818
UpdateUsersAndRestart(context.Context, []*common.User) error
1919
GetSysStats(context.Context) (*common.BackendStatsResponse, error)
2020
GetStats(context.Context, *common.StatRequest) (*common.StatResponse, error)
21+
GetOutboundsLatency(context.Context, *common.LatencyRequest) (*common.LatencyResponse, error)
2122
GetUserOnlineStats(context.Context, string) (*common.OnlineStatResponse, error)
2223
GetUserOnlineIpListStats(context.Context, string) (*common.StatsOnlineIpListResponse, error)
2324
}

backend/wireguard/latency.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package wireguard
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"os"
7+
"strings"
8+
"time"
9+
10+
"github.com/pasarguard/node/common"
11+
)
12+
13+
const wireGuardNATOutputInterfaceEnv = "PG_NODE_WG_NAT_OUTPUT_INTERFACE"
14+
15+
func (wg *WireGuard) latencyProbeInterface() string {
16+
if iface := strings.TrimSpace(os.Getenv(wireGuardNATOutputInterfaceEnv)); iface != "" {
17+
return iface
18+
}
19+
20+
wg.mu.RLock()
21+
defer wg.mu.RUnlock()
22+
if wg.config == nil {
23+
return ""
24+
}
25+
return strings.TrimSpace(wg.config.InterfaceName)
26+
}
27+
28+
func (wg *WireGuard) GetOutboundsLatency(ctx context.Context, request *common.LatencyRequest) (*common.LatencyResponse, error) {
29+
wg.mu.RLock()
30+
state := wg.state
31+
testURL := wg.cfg.LatencyTestURL
32+
timeoutSeconds := wg.cfg.LatencyTimeoutSeconds
33+
wg.mu.RUnlock()
34+
35+
if state != lifecycleRunning {
36+
return nil, errWireGuardNotStarted
37+
}
38+
39+
iface := wg.latencyProbeInterface()
40+
name := iface
41+
if name == "" {
42+
name = "wireguard"
43+
}
44+
45+
requestName := strings.TrimSpace(request.GetName())
46+
if requestName != "" && requestName != name {
47+
return &common.LatencyResponse{Latencies: []*common.Latency{}}, nil
48+
}
49+
50+
client := &http.Client{
51+
Timeout: time.Duration(timeoutSeconds) * time.Second,
52+
Transport: latencyHTTPTransport(iface),
53+
}
54+
55+
req, err := http.NewRequestWithContext(ctx, http.MethodHead, testURL, nil)
56+
if err != nil {
57+
return nil, err
58+
}
59+
60+
start := time.Now()
61+
resp, err := client.Do(req)
62+
delay := time.Since(start).Milliseconds()
63+
now := time.Now().Unix()
64+
65+
alive := err == nil
66+
if resp != nil && resp.Body != nil {
67+
resp.Body.Close()
68+
}
69+
70+
latency := &common.Latency{
71+
Name: name,
72+
Alive: alive,
73+
Delay: delay,
74+
Link: testURL,
75+
LastTryTime: now,
76+
Source: "wireguard-probe",
77+
}
78+
if alive {
79+
latency.LastSeenTime = now
80+
}
81+
82+
return &common.LatencyResponse{Latencies: []*common.Latency{latency}}, nil
83+
}

backend/wireguard/latency_linux.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
//go:build linux
2+
3+
package wireguard
4+
5+
import (
6+
"net"
7+
"net/http"
8+
"syscall"
9+
10+
"golang.org/x/sys/unix"
11+
)
12+
13+
func latencyHTTPTransport(iface string) *http.Transport {
14+
transport := http.DefaultTransport.(*http.Transport).Clone()
15+
if iface == "" {
16+
return transport
17+
}
18+
19+
dialer := &net.Dialer{
20+
Control: func(_, _ string, c syscall.RawConn) error {
21+
var controlErr error
22+
if err := c.Control(func(fd uintptr) {
23+
controlErr = unix.SetsockoptString(int(fd), unix.SOL_SOCKET, unix.SO_BINDTODEVICE, iface)
24+
}); err != nil {
25+
return err
26+
}
27+
return controlErr
28+
},
29+
}
30+
transport.DialContext = dialer.DialContext
31+
return transport
32+
}

backend/wireguard/latency_other.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
//go:build !linux
2+
3+
package wireguard
4+
5+
import "net/http"
6+
7+
func latencyHTTPTransport(_ string) *http.Transport {
8+
return http.DefaultTransport.(*http.Transport).Clone()
9+
}

backend/wireguard/latency_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package wireguard
2+
3+
import "testing"
4+
5+
func TestLatencyProbeInterfaceFallsBackToConfiguredInterface(t *testing.T) {
6+
t.Setenv(wireGuardNATOutputInterfaceEnv, "")
7+
8+
wg := &WireGuard{
9+
config: &Config{InterfaceName: "wg-test"},
10+
}
11+
12+
if got := wg.latencyProbeInterface(); got != "wg-test" {
13+
t.Fatalf("unexpected interface: got %s want wg-test", got)
14+
}
15+
}
16+
17+
func TestLatencyProbeInterfacePrefersNATEgressEnv(t *testing.T) {
18+
t.Setenv(wireGuardNATOutputInterfaceEnv, "eth9")
19+
20+
wg := &WireGuard{
21+
config: &Config{InterfaceName: "wg-test"},
22+
}
23+
24+
if got := wg.latencyProbeInterface(); got != "eth9" {
25+
t.Fatalf("unexpected interface: got %s want eth9", got)
26+
}
27+
}

backend/xray/config.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"log"
77
"net"
8+
"net/netip"
89
"slices"
910
"sort"
1011
"strings"
@@ -575,7 +576,12 @@ func apiRuleSources() []string {
575576
return sources
576577
}
577578

578-
func (c *Config) ApplyAPI(apiPort int) (err error) {
579+
func loopbackListenAddress(port int) string {
580+
addr := netip.AddrPortFrom(netip.MustParseAddr("127.0.0.1"), uint16(port))
581+
return addr.String()
582+
}
583+
584+
func (c *Config) ApplyAPI(apiPort, metricPort int) (err error) {
579585
// Remove the existing inbound with the API_INBOUND tag
580586
for i, inbound := range c.InboundConfigs {
581587
if inbound.Tag == "API_INBOUND" {
@@ -590,6 +596,11 @@ func (c *Config) ApplyAPI(apiPort int) (err error) {
590596
Tag: apiTag,
591597
}
592598

599+
c.Metrics = map[string]any{
600+
"tag": "metric",
601+
"listen": loopbackListenAddress(metricPort),
602+
}
603+
593604
if c.RouterConfig == nil {
594605
c.RouterConfig = &conf.RouterConfig{}
595606
}

backend/xray/latency.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package xray
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"net/http"
9+
"net/netip"
10+
"sort"
11+
"time"
12+
13+
"github.com/pasarguard/node/common"
14+
)
15+
16+
type observatoryEntry struct {
17+
Alive bool `json:"alive"`
18+
Delay int64 `json:"delay"`
19+
OutboundTag string `json:"outbound_tag"`
20+
LastSeenTime int64 `json:"last_seen_time"`
21+
LastTryTime int64 `json:"last_try_time"`
22+
}
23+
24+
type debugVarsResponse struct {
25+
Observatory map[string]observatoryEntry `json:"observatory"`
26+
}
27+
28+
func (x *Xray) GetOutboundsLatency(ctx context.Context, request *common.LatencyRequest) (*common.LatencyResponse, error) {
29+
x.mu.RLock()
30+
started := x.core != nil && x.core.Started()
31+
metricPort := x.metricPort
32+
timeoutSeconds := x.cfg.LatencyTimeoutSeconds
33+
x.mu.RUnlock()
34+
35+
if !started {
36+
return nil, errors.New("xray not started")
37+
}
38+
39+
client := &http.Client{Timeout: time.Duration(timeoutSeconds) * time.Second}
40+
addr := netip.AddrPortFrom(netip.MustParseAddr("127.0.0.1"), uint16(metricPort))
41+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://"+addr.String()+"/debug/vars", nil)
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
resp, err := client.Do(req)
47+
if err != nil {
48+
return nil, fmt.Errorf("read xray observatory metrics: %w", err)
49+
}
50+
defer resp.Body.Close()
51+
52+
if resp.StatusCode != http.StatusOK {
53+
return nil, fmt.Errorf("read xray observatory metrics: unexpected status %d", resp.StatusCode)
54+
}
55+
56+
var vars debugVarsResponse
57+
if err := json.NewDecoder(resp.Body).Decode(&vars); err != nil {
58+
return nil, fmt.Errorf("decode xray observatory metrics: %w", err)
59+
}
60+
if len(vars.Observatory) == 0 {
61+
return nil, errors.New("xray outbound latency is not available")
62+
}
63+
64+
name := request.GetName()
65+
keys := make([]string, 0, len(vars.Observatory))
66+
for key := range vars.Observatory {
67+
if name != "" && key != name {
68+
continue
69+
}
70+
keys = append(keys, key)
71+
}
72+
sort.Strings(keys)
73+
74+
latencies := make([]*common.Latency, 0, len(keys))
75+
for _, key := range keys {
76+
entry := vars.Observatory[key]
77+
linkName := entry.OutboundTag
78+
if linkName == "" {
79+
linkName = key
80+
}
81+
latencies = append(latencies, &common.Latency{
82+
Name: key,
83+
Alive: entry.Alive,
84+
Delay: entry.Delay,
85+
Link: linkName,
86+
LastSeenTime: entry.LastSeenTime,
87+
LastTryTime: entry.LastTryTime,
88+
Source: "xray-observatory",
89+
})
90+
}
91+
92+
return &common.LatencyResponse{Latencies: latencies}, nil
93+
}

backend/xray/xray.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@ type Xray struct {
1717
cfg *config.Config
1818
core *Core
1919
handler *api.XrayHandler
20+
metricPort int
2021
cancelFunc context.CancelFunc
2122
mu sync.RWMutex
2223
}
2324

24-
func New(ctx context.Context, xrayConfig *Config, users []*common.User, port int, cfg *config.Config) (*Xray, error) {
25+
func New(ctx context.Context, xrayConfig *Config, users []*common.User, apiPort, metricPort int, cfg *config.Config) (*Xray, error) {
2526
executableAbsolutePath, err := filepath.Abs(cfg.XrayExecutablePath)
2627
if err != nil {
2728
return nil, err
@@ -42,11 +43,12 @@ func New(ctx context.Context, xrayConfig *Config, users []*common.User, port int
4243
xray := &Xray{
4344
cancelFunc: xCancel,
4445
cfg: cfg,
46+
metricPort: metricPort,
4547
}
4648

4749
start := time.Now()
4850

49-
if err = xrayConfig.ApplyAPI(port); err != nil {
51+
if err = xrayConfig.ApplyAPI(apiPort, metricPort); err != nil {
5052
return nil, err
5153
}
5254

@@ -80,7 +82,7 @@ func New(ctx context.Context, xrayConfig *Config, users []*common.User, port int
8082

8183
xray.core = core
8284

83-
handler, err := api.NewXrayAPI(port)
85+
handler, err := api.NewXrayAPI(apiPort)
8486
if err != nil {
8587
xray.Shutdown()
8688
return nil, err

0 commit comments

Comments
 (0)