Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions gravity/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ type GravityClient struct {
peerCycleInterval time.Duration
lastCycleTime atomic.Int64 // unix timestamp of last cycle
peerDiscoveryWake chan struct{} // non-blocking signal to wake the discovery loop immediately
peerDiscoveryDisabled bool
discoveryCtx context.Context
discoveryCancel context.CancelFunc
discoveryDone chan struct{}
Expand Down Expand Up @@ -835,6 +836,7 @@ func (g *GravityClient) startMultiEndpoint() error {
g.mu.Unlock()
return ErrNoGravityFound
}
g.peerDiscoveryDisabled = allGravityURLsAreDirectIPs(urls)
g.mu.Unlock()

g.logger.Info("multi-endpoint mode: connecting to %d Gravity servers: %v", len(urls), urls)
Expand Down Expand Up @@ -1190,6 +1192,50 @@ func (g *GravityClient) resolveGravityURLs() []string {
return urls
}

func allGravityURLsAreDirectIPs(urls []string) bool {
hasURL := false
for _, raw := range urls {
raw = strings.TrimSpace(raw)
if raw == "" {
continue
}
hasURL = true
if !isDirectIPGravityURL(raw) {
return false
}
}
return hasURL
}

func isDirectIPGravityURL(raw string) bool {
raw = strings.TrimSpace(raw)
if raw == "" {
return false
}

if ip := net.ParseIP(strings.Trim(raw, "[]")); ip != nil {
return true
}

parseable := raw
if strings.HasPrefix(parseable, "grpc://") {
parseable = "https://" + parseable[len("grpc://"):]
} else if !strings.Contains(parseable, "://") {
parseable = "https://" + parseable
}

u, err := url.Parse(parseable)
if err != nil {
return false
}

host := u.Hostname()
if host == "" {
return false
}
return net.ParseIP(host) != nil
}

// tunnelLivenessMonitor periodically sends keepalive probes on tunnel streams
// and checks whether they're actually carrying data. It detects zombie streams
// where the gRPC connection is alive (keepalive passes, control stream active)
Expand Down Expand Up @@ -5046,6 +5092,10 @@ func (g *GravityClient) startPeerDiscovery() {
if g.discoveryResolveFunc == nil {
return
}
if g.peerDiscoveryDisabled {
g.logger.Debug("peer discovery: disabled for direct IP Gravity URLs")
return
}

g.discoveryMu.Lock()
defer g.discoveryMu.Unlock()
Expand Down
75 changes: 75 additions & 0 deletions gravity/peer_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,58 @@ func TestResolveGravityURLs_MaxPeersZeroIncludesAll(t *testing.T) {
}
}

func TestAllGravityURLsAreDirectIPs(t *testing.T) {
tests := []struct {
name string
urls []string
want bool
}{
{
name: "single IPv4 URL",
urls: []string{"grpc://10.0.0.1:443"},
want: true,
},
{
name: "multiple IPv4 URLs",
urls: []string{"grpc://10.0.0.1:443", "grpc://10.0.0.2:443"},
want: true,
},
{
name: "bracketed IPv6 URL",
urls: []string{"grpc://[fd15:d710::1]:443"},
want: true,
},
{
name: "hostnames are not direct IPs",
urls: []string{"grpc://gravity.example.com:443"},
want: false,
},
{
name: "mixed hostname and IP keeps discovery enabled",
urls: []string{"grpc://10.0.0.1:443", "grpc://gravity.example.com:443"},
want: false,
},
{
name: "empty input",
urls: []string{"", " "},
want: false,
},
{
name: "host port without scheme",
urls: []string{"10.0.0.1:443"},
want: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := allGravityURLsAreDirectIPs(tt.urls); got != tt.want {
t.Fatalf("allGravityURLsAreDirectIPs(%#v) = %v, want %v", tt.urls, got, tt.want)
}
})
}
}

// ---------- pickRandomURL ----------

func TestPickRandomURL_SingleElement(t *testing.T) {
Expand Down Expand Up @@ -819,6 +871,29 @@ func TestCleanup_CancelsPeerDiscoveryLoop(t *testing.T) {
}
}

func TestStartPeerDiscovery_DisabledForDirectIPGravityURLs(t *testing.T) {
g := newTestGravityClient([]string{"grpc://10.0.0.1:443"}, []string{"grpc://10.0.0.1:443"})
defer g.cancel()

g.peerDiscoveryWake = make(chan struct{}, 1)
g.discoveryResolveFunc = func() []string {
t.Fatal("discovery resolver should not be called for direct IP Gravity URLs")
return nil
}
g.peerDiscoveryDisabled = allGravityURLsAreDirectIPs(g.gravityURLs)

g.startPeerDiscovery()

g.discoveryMu.Lock()
discoveryCtx := g.discoveryCtx
discoveryDone := g.discoveryDone
g.discoveryMu.Unlock()

if discoveryCtx != nil || discoveryDone != nil {
t.Fatal("expected peer discovery loop not to start for direct IP Gravity URLs")
}
}

func TestCleanup_CancelsBlockedPeerDiscoveryResolve(t *testing.T) {
g := newTestGravityClient([]string{
"grpc://g1.example.com",
Expand Down
Loading