Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions gravity/endpoint_independence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1661,6 +1661,81 @@ func TestSelectStreamForPacket_MarksEndpointUnhealthy_TriggersPerEndpointReconne
}
}

// TestSelectStreamForPacket_UsesBoundTunnelWhenEndpointHealthIsStale verifies
// that response traffic can still use the endpoint it is already bound to when
// endpoint health has gone stale but the tunnel stream itself is still healthy.
// This covers the post-hello/post-tunnel reconnect window where
// refreshEndpointHealth() may mark every endpoint unhealthy before the data
// path has actually failed.
func TestSelectStreamForPacket_UsesBoundTunnelWhenEndpointHealthIsStale(t *testing.T) {
t.Parallel()

g := newWritePacketTestClient(t, 2)

g.endpoints[0].healthy.Store(false)
g.endpoints[1].healthy.Store(false)
g.streamManager.connectionHealth[0] = false
g.streamManager.connectionHealth[1] = false
g.streamManager.controlStreams[0] = &configurableMockStream{}
g.streamManager.controlStreams[1] = &configurableMockStream{}

boundStream := &hardeningMockTunnelStream{}
setupWritePacketStreams(g, []*StreamInfo{
{connIndex: 0, isHealthy: true, streamID: "ep0-t0", stream: boundStream, lastUsed: time.Now()},
{connIndex: 1, isHealthy: false, streamID: "ep1-t0", stream: &hardeningMockTunnelStream{}, lastUsed: time.Now()},
})

pkt := makeIPv6Packet()
preBindFlowToEndpoint(g, pkt, 0)

stream, err := g.selectStreamForPacket(pkt)
if err != nil {
t.Fatalf("expected bound healthy tunnel fallback, got: %v", err)
}
if stream.connIndex != 0 {
t.Fatalf("expected bound stream from endpoint 0, got connIndex=%d", stream.connIndex)
}
}

func TestSelectStreamForPacket_BoundTunnelFallbackRefreshesBindingTTL(t *testing.T) {
t.Parallel()

g := newWritePacketTestClient(t, 2)

g.endpoints[0].healthy.Store(false)
g.endpoints[1].healthy.Store(false)
g.streamManager.connectionHealth[0] = false
g.streamManager.connectionHealth[1] = false
g.streamManager.controlStreams[0] = &configurableMockStream{}
g.streamManager.controlStreams[1] = &configurableMockStream{}

boundStream := &hardeningMockTunnelStream{}
setupWritePacketStreams(g, []*StreamInfo{
{connIndex: 0, isHealthy: true, streamID: "ep0-t0", stream: boundStream, lastUsed: time.Now()},
})

pkt := makeIPv6Packet()
preBindFlowToEndpoint(g, pkt, 0)

key := ExtractFlowKey(pkt)
before := time.Now().Add(-2 * time.Second)
g.selector.mu.Lock()
g.selector.bindings[key].LastUsed = before
g.selector.mu.Unlock()

_, err := g.selectStreamForPacket(pkt)
if err != nil {
t.Fatalf("expected bound healthy tunnel fallback, got: %v", err)
}

g.selector.mu.RLock()
after := g.selector.bindings[key].LastUsed
g.selector.mu.RUnlock()
if !after.After(before) {
t.Fatalf("expected binding lastUsed to advance, before=%v after=%v", before, after)
}
}

// --- Category D: Safety Net Edge Cases ---

// TestTriggerAllEndpointReconnections_ClosingClient verifies that
Expand Down
35 changes: 35 additions & 0 deletions gravity/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5313,6 +5313,10 @@ func (g *GravityClient) selectStreamForPacket(payload []byte) (*StreamInfo, erro
for attempt := 0; attempt < len(endpoints); attempt++ {
endpoint := selector.Select(payload, endpoints)
if endpoint == nil {
if fallbackStream, fallbackURL, ok := g.selectBoundTunnelFallback(payload, selector); ok {
g.logger.Debug("selectStream: using bound endpoint %s despite unhealthy endpoint state because a healthy tunnel stream still exists", fallbackURL)
return fallbackStream, nil
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
// Log endpoint health summary for debugging selector failures.
var healthSummary []string
for i, ep := range endpoints {
Expand All @@ -5337,6 +5341,10 @@ func (g *GravityClient) selectStreamForPacket(payload []byte) (*StreamInfo, erro
g.triggerEndpointReconnectByURL(endpoint.URL)
g.wakePeerDiscovery()
}
if fallbackStream, fallbackURL, ok := g.selectBoundTunnelFallback(payload, selector); ok {
g.logger.Debug("selectStream: using bound endpoint %s after selector exhausted healthy endpoint attempts because a healthy tunnel stream still exists", fallbackURL)
return fallbackStream, nil
}
return nil, fmt.Errorf("no healthy tunnel streams on any endpoint")
}

Expand Down Expand Up @@ -5370,6 +5378,33 @@ func (g *GravityClient) selectStreamForPacket(payload []byte) (*StreamInfo, erro
return stream, nil
}

func (g *GravityClient) selectBoundTunnelFallback(payload []byte, selector *EndpointSelector) (*StreamInfo, string, bool) {
if selector == nil {
return nil, "", false
}

key := ExtractFlowKey(payload)
now := time.Now()

selector.mu.RLock()
binding, ok := selector.bindings[key]
selector.mu.RUnlock()
if !ok || binding == nil || binding.Endpoint == nil || now.Sub(binding.LastUsed) >= selector.ttl {
return nil, "", false
}

stream, err := g.selectStreamForEndpoint(payload, binding.Endpoint.URL)
if err != nil {
return nil, binding.Endpoint.URL, false
}
selector.mu.Lock()
if current, ok := selector.bindings[key]; ok && current == binding {
current.LastUsed = now
}
selector.mu.Unlock()
return stream, binding.Endpoint.URL, true
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

func (g *GravityClient) selectStreamForEndpoint(payload []byte, endpointURL string) (*StreamInfo, error) {
g.mu.RLock()
candidateIndexes := append([]int(nil), g.endpointStreamIndices[endpointURL]...)
Expand Down
Loading