Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#583][Sotw] Ensure resources are properly sent again if envoy unsubscribes then subscribes again to a resource #585

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
46 changes: 43 additions & 3 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"google.golang.org/protobuf/types/known/durationpb"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
Expand All @@ -37,6 +36,27 @@ type Request = discovery.DiscoveryRequest
// DeltaRequest is an alias for the delta discovery request type.
type DeltaRequest = discovery.DeltaDiscoveryRequest

// ClientState provides additional data on the client knowledge for the type matching the request
// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources)
// Though the methods may return mutable parts of the state for performance reasons,
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation
Comment on lines +39 to +42
Copy link

@atollena atollena Jan 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment would benefit from emphasizing that this state is per resource type.

Suggested change
// ClientState provides additional data on the client knowledge for the type matching the request
// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources)
// Though the methods may return mutable parts of the state for performance reasons,
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation
// ClientState stores the server view of the client state for a given resource type.
// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources).
// Though the methods may return mutable parts of the state for performance reasons,
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated on parent PR #584, not yet rebased on this branch

type ClientState interface {
// GetKnownResources returns the list of resources the clients has ACKed and their associated version.
// The versions are:
// - delta protocol: version of the specific resource set in the response
// - sotw protocol: version of the global response when the resource was last ACKed
GetKnownResources() map[string]string
Copy link

@atollena atollena Jan 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't GetAckedResources be more clear?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to this name. I have not pushed this branch yet, but I updated #584


// GetSubscribedResources returns the list of resources currently subscribed to by the client for the type.
// For delta it keeps track across requests
// For sotw it is a normalized view of the request resources
GetSubscribedResources() map[string]struct{}

// IsWildcard returns whether the client has a wildcard watch.
// This considers subtilities related to the current migration of wildcard definition within the protocol.
IsWildcard() bool
}

// ConfigWatcher requests watches for configuration resources by a node, last
// applied version identifier, and resource names hint. The watch should send
// the responses when they are ready. The watch can be canceled by the
Expand All @@ -50,7 +70,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateWatch(*Request, stream.StreamState, chan Response) (cancel func())
CreateWatch(*Request, ClientState, chan Response) (cancel func())

// CreateDeltaWatch returns a new open incremental xDS watch.
//
Expand All @@ -59,7 +79,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func())
CreateDeltaWatch(*DeltaRequest, ClientState, chan DeltaResponse) (cancel func())
}

// ConfigFetcher fetches configuration resources from cache
Expand All @@ -85,6 +105,9 @@ type Response interface {
// Get the version in the Response.
GetVersion() (string, error)

// Get the list of resources part of the response without having to cast resources
GetResourceNames() []string

// Get the context provided during response creation.
GetContext() context.Context
}
Expand Down Expand Up @@ -122,6 +145,9 @@ type RawResponse struct {
// Resources to be included in the response.
Resources []types.ResourceWithTTL

// Names of the resources included in the response
ResourceNames []string

// Whether this is a heartbeat response. For xDS versions that support TTL, this
// will be converted into a response that doesn't contain the actual resource protobuf.
// This allows for more lightweight updates that server only to update the TTL timer.
Expand Down Expand Up @@ -171,6 +197,9 @@ type PassthroughResponse struct {
// The discovery response that needs to be sent as is, without any marshaling transformations.
DiscoveryResponse *discovery.DiscoveryResponse

// Names of the resources set in the response
ResourceNames []string

ctx context.Context
}

Expand Down Expand Up @@ -268,6 +297,12 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover
return marshaledResponse.(*discovery.DeltaDiscoveryResponse), nil
}

// GetResourceNames returns the list of resources returned within the response
// without having to decode the resources
func (r *RawResponse) GetResourceNames() []string {
return r.ResourceNames
}

// GetRequest returns the original Discovery Request.
func (r *RawResponse) GetRequest() *discovery.DiscoveryRequest {
return r.Request
Expand Down Expand Up @@ -330,6 +365,11 @@ func (r *PassthroughResponse) GetDiscoveryResponse() (*discovery.DiscoveryRespon
return r.DiscoveryResponse, nil
}

// GetResourceNames returns the list of resources included within the response
func (r *PassthroughResponse) GetResourceNames() []string {
return r.ResourceNames
}

// GetDeltaDiscoveryResponse returns the final passthrough Delta Discovery Response.
func (r *DeltaPassthroughResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error) {
return r.DeltaDiscoveryResponse, nil
Expand Down
15 changes: 7 additions & 8 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

// groups together resource-related arguments for the createDeltaResponse function
Expand All @@ -28,7 +27,7 @@ type resourceContainer struct {
systemVersion string
}

func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse {
func createDeltaResponse(ctx context.Context, req *DeltaRequest, state ClientState, resources resourceContainer) *RawDeltaResponse {
// variables to build our response with
var nextVersionMap map[string]string
var filtered []types.Resource
Expand All @@ -37,7 +36,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
// If we are handling a wildcard request, we want to respond with all resources
switch {
case state.IsWildcard():
if len(state.GetResourceVersions()) == 0 {
if len(state.GetKnownResources()) == 0 {
filtered = make([]types.Resource, 0, len(resources.resourceMap))
}
nextVersionMap = make(map[string]string, len(resources.resourceMap))
Expand All @@ -46,25 +45,25 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
// we can just set it here to be used for comparison later
version := resources.versionMap[name]
nextVersionMap[name] = version
prevVersion, found := state.GetResourceVersions()[name]
prevVersion, found := state.GetKnownResources()[name]
if !found || (prevVersion != version) {
filtered = append(filtered, r)
}
}

// Compute resources for removal
// The resource version can be set to "" here to trigger a removal even if never returned before
for name := range state.GetResourceVersions() {
for name := range state.GetKnownResources() {
if _, ok := resources.resourceMap[name]; !ok {
toRemove = append(toRemove, name)
}
}
default:
nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames()))
nextVersionMap = make(map[string]string, len(state.GetSubscribedResources()))
// state.GetResourceVersions() may include resources no longer subscribed
// In the current code this gets silently cleaned when updating the version map
for name := range state.GetSubscribedResourceNames() {
prevVersion, found := state.GetResourceVersions()[name]
for name := range state.GetSubscribedResources() {
prevVersion, found := state.GetKnownResources()[name]
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
Expand Down
65 changes: 33 additions & 32 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package cache_test
import (
"context"
"fmt"
"reflect"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
Expand All @@ -35,13 +35,14 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
// Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(true, nil)
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.NewStreamState(true, nil), watches[typ])
}, &state, watches[typ])
}

if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
Expand Down Expand Up @@ -69,15 +70,15 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(false, versionMap[typ])
for resource := range versionMap[typ] {
state.GetSubscribedResourceNames()[resource] = struct{}{}
state.GetSubscribedResources()[resource] = struct{}{}
}
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, state, watches[typ])
}, &state, watches[typ])
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
Expand Down Expand Up @@ -112,6 +113,7 @@ func TestDeltaRemoveResources(t *testing.T) {
watches := make(map[string]chan cache.DeltaResponse)
streams := make(map[string]*stream.StreamState)

// At this stage the cache is empty, so a watch is opened
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(true, make(map[string]string))
Expand All @@ -123,23 +125,25 @@ func TestDeltaRemoveResources(t *testing.T) {
Id: "node",
},
TypeUrl: typ,
}, *streams[typ], watches[typ])
}, streams[typ], watches[typ])
}

if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
t.Fatal(err)
}
snapshot := fixture.snapshot()
snapshot.Resources[types.Endpoint] = cache.NewResources(fixture.version, []types.Resource{
testEndpoint,
resource.MakeEndpoint("otherCluster", 8080),
})
require.NoError(t, c.SetSnapshot(context.Background(), key, snapshot))

for _, typ := range testTypes {
t.Run(typ, func(t *testing.T) {
select {
case out := <-watches[typ]:
snapshot := fixture.snapshot()
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
nextVersionMap := out.GetNextVersionMap()
streams[typ].SetResourceVersions(nextVersionMap)
case <-time.After(time.Second):
t.Fatal("failed to receive a snapshot response")
require.Fail(t, "failed to receive a snapshot response")
}
})
}
Expand All @@ -152,35 +156,30 @@ func TestDeltaRemoveResources(t *testing.T) {
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
}, *streams[typ], watches[typ])
TypeUrl: typ,
ResponseNonce: "nonce",
}, streams[typ], watches[typ])
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
t.Errorf("watches should be created for the latest version, saw %d watches expected %d", count, len(testTypes))
}
assert.Equal(t, len(testTypes), c.GetStatusInfo(key).GetNumDeltaWatches(), "watches should be created for the latest version")

// set a partially versioned snapshot with no endpoints
// set a partially versioned snapshot with only one endpoint
snapshot2 := fixture.snapshot()
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{})
if err := c.SetSnapshot(context.Background(), key, snapshot2); err != nil {
t.Fatal(err)
}
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{
testEndpoint, // this cluster is not changed, we do not expect it back in "resources"
})
require.NoError(t, c.SetSnapshot(context.Background(), key, snapshot2))

// validate response for endpoints
select {
case out := <-watches[testTypes[0]]:
snapshot2 := fixture.snapshot()
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{})
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType))
assert.Empty(t, out.(*cache.RawDeltaResponse).Resources)
assert.Equal(t, []string{"otherCluster"}, out.(*cache.RawDeltaResponse).RemovedResources)
nextVersionMap := out.GetNextVersionMap()

// make sure the version maps are different since we no longer are tracking any endpoint resources
if reflect.DeepEqual(streams[testTypes[0]].GetResourceVersions(), nextVersionMap) {
t.Fatalf("versionMap for the endpoint resource type did not change, received: %v, instead of an empty map", nextVersionMap)
}
assert.NotEqual(t, nextVersionMap, streams[testTypes[0]].GetKnownResources(), "versionMap for the endpoint resource type did not change")
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
assert.Fail(t, "failed to receive snapshot response")
}
}

Expand All @@ -203,13 +202,14 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
t.Fatalf("snapshot failed: %s", err)
}
} else {
state := stream.NewStreamState(false, make(map[string]string))
cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: id,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: []string{clusterName},
}, stream.NewStreamState(false, make(map[string]string)), responses)
}, &state, responses)

defer cancel()
}
Expand All @@ -226,14 +226,14 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {
// Create a non-buffered channel that will block sends.
watchCh := make(chan cache.DeltaResponse)
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
state.SetSubscribedResources(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: names[rsrc.EndpointType],
}, state, watchCh)
}, &state, watchCh)

// The first time we set the snapshot without consuming from the blocking channel, so this should time out.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
Expand Down Expand Up @@ -269,13 +269,14 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
for _, typ := range testTypes {
responses := make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(false, make(map[string]string))
cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.NewStreamState(false, make(map[string]string)), responses)
}, &state, responses)

// Cancel the watch
cancel()
Expand Down
Loading