diff --git a/gravity/endpoint_independence_test.go b/gravity/endpoint_independence_test.go index 6d426c4..312c146 100644 --- a/gravity/endpoint_independence_test.go +++ b/gravity/endpoint_independence_test.go @@ -1661,6 +1661,81 @@ func TestSelectStreamForPacket_MarksEndpointUnhealthy_TriggersPerEndpointReconne } } +// TestSelectStreamForPacket_UsesBoundTunnelWhenEndpointHealthIsStale verifies +// that response traffic can still use the endpoint it is already bound to when +// endpoint health has gone stale but the tunnel stream itself is still healthy. +// This covers the post-hello/post-tunnel reconnect window where +// refreshEndpointHealth() may mark every endpoint unhealthy before the data +// path has actually failed. +func TestSelectStreamForPacket_UsesBoundTunnelWhenEndpointHealthIsStale(t *testing.T) { + t.Parallel() + + g := newWritePacketTestClient(t, 2) + + g.endpoints[0].healthy.Store(false) + g.endpoints[1].healthy.Store(false) + g.streamManager.connectionHealth[0] = false + g.streamManager.connectionHealth[1] = false + g.streamManager.controlStreams[0] = &configurableMockStream{} + g.streamManager.controlStreams[1] = &configurableMockStream{} + + boundStream := &hardeningMockTunnelStream{} + setupWritePacketStreams(g, []*StreamInfo{ + {connIndex: 0, isHealthy: true, streamID: "ep0-t0", stream: boundStream, lastUsed: time.Now()}, + {connIndex: 1, isHealthy: false, streamID: "ep1-t0", stream: &hardeningMockTunnelStream{}, lastUsed: time.Now()}, + }) + + pkt := makeIPv6Packet() + preBindFlowToEndpoint(g, pkt, 0) + + stream, err := g.selectStreamForPacket(pkt) + if err != nil { + t.Fatalf("expected bound healthy tunnel fallback, got: %v", err) + } + if stream.connIndex != 0 { + t.Fatalf("expected bound stream from endpoint 0, got connIndex=%d", stream.connIndex) + } +} + +func TestSelectStreamForPacket_BoundTunnelFallbackRefreshesBindingTTL(t *testing.T) { + t.Parallel() + + g := newWritePacketTestClient(t, 2) + + g.endpoints[0].healthy.Store(false) + g.endpoints[1].healthy.Store(false) + g.streamManager.connectionHealth[0] = false + g.streamManager.connectionHealth[1] = false + g.streamManager.controlStreams[0] = &configurableMockStream{} + g.streamManager.controlStreams[1] = &configurableMockStream{} + + boundStream := &hardeningMockTunnelStream{} + setupWritePacketStreams(g, []*StreamInfo{ + {connIndex: 0, isHealthy: true, streamID: "ep0-t0", stream: boundStream, lastUsed: time.Now()}, + }) + + pkt := makeIPv6Packet() + preBindFlowToEndpoint(g, pkt, 0) + + key := ExtractFlowKey(pkt) + before := time.Now().Add(-2 * time.Second) + g.selector.mu.Lock() + g.selector.bindings[key].LastUsed = before + g.selector.mu.Unlock() + + _, err := g.selectStreamForPacket(pkt) + if err != nil { + t.Fatalf("expected bound healthy tunnel fallback, got: %v", err) + } + + g.selector.mu.RLock() + after := g.selector.bindings[key].LastUsed + g.selector.mu.RUnlock() + if !after.After(before) { + t.Fatalf("expected binding lastUsed to advance, before=%v after=%v", before, after) + } +} + // --- Category D: Safety Net Edge Cases --- // TestTriggerAllEndpointReconnections_ClosingClient verifies that diff --git a/gravity/grpc_client.go b/gravity/grpc_client.go index e361f6d..7be15a8 100644 --- a/gravity/grpc_client.go +++ b/gravity/grpc_client.go @@ -5313,6 +5313,10 @@ func (g *GravityClient) selectStreamForPacket(payload []byte) (*StreamInfo, erro for attempt := 0; attempt < len(endpoints); attempt++ { endpoint := selector.Select(payload, endpoints) if endpoint == nil { + if fallbackStream, fallbackURL, ok := g.selectBoundTunnelFallback(payload, selector); ok { + g.logger.Debug("selectStream: using bound endpoint %s despite unhealthy endpoint state because a healthy tunnel stream still exists", fallbackURL) + return fallbackStream, nil + } // Log endpoint health summary for debugging selector failures. var healthSummary []string for i, ep := range endpoints { @@ -5337,6 +5341,10 @@ func (g *GravityClient) selectStreamForPacket(payload []byte) (*StreamInfo, erro g.triggerEndpointReconnectByURL(endpoint.URL) g.wakePeerDiscovery() } + if fallbackStream, fallbackURL, ok := g.selectBoundTunnelFallback(payload, selector); ok { + g.logger.Debug("selectStream: using bound endpoint %s after selector exhausted healthy endpoint attempts because a healthy tunnel stream still exists", fallbackURL) + return fallbackStream, nil + } return nil, fmt.Errorf("no healthy tunnel streams on any endpoint") } @@ -5370,6 +5378,33 @@ func (g *GravityClient) selectStreamForPacket(payload []byte) (*StreamInfo, erro return stream, nil } +func (g *GravityClient) selectBoundTunnelFallback(payload []byte, selector *EndpointSelector) (*StreamInfo, string, bool) { + if selector == nil { + return nil, "", false + } + + key := ExtractFlowKey(payload) + now := time.Now() + + selector.mu.RLock() + binding, ok := selector.bindings[key] + selector.mu.RUnlock() + if !ok || binding == nil || binding.Endpoint == nil || now.Sub(binding.LastUsed) >= selector.ttl { + return nil, "", false + } + + stream, err := g.selectStreamForEndpoint(payload, binding.Endpoint.URL) + if err != nil { + return nil, binding.Endpoint.URL, false + } + selector.mu.Lock() + if current, ok := selector.bindings[key]; ok && current == binding { + current.LastUsed = now + } + selector.mu.Unlock() + return stream, binding.Endpoint.URL, true +} + func (g *GravityClient) selectStreamForEndpoint(payload []byte, endpointURL string) (*StreamInfo, error) { g.mu.RLock() candidateIndexes := append([]int(nil), g.endpointStreamIndices[endpointURL]...)