Skip to content

Commit

Permalink
[#583][Delta] Ensure resources are properly sent again if envoy unsub…
Browse files Browse the repository at this point in the history
…scribes then subscribes again to a resource

Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>
  • Loading branch information
valerian-roche committed Aug 25, 2022
1 parent af7a06d commit ea51144
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
5 changes: 5 additions & 0 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ func (s *server) unsubscribe(resources []string, streamState *stream.StreamState
// * detect the version change, and return the resource (as an update)
// * detect the resource deletion, and set it as removed in the response
streamState.GetKnownResources()[resource] = ""
} else {
// Clean-up the state version for this resource.
// This addresses https://github.com/envoyproxy/go-control-plane/issues/583, where a resource unsubscribed then subscribed again
// is not sent again while envoy expects it.
delete(streamState.GetKnownResources(), resource)
}
delete(sv, resource)
}
Expand Down
42 changes: 40 additions & 2 deletions pkg/server/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"google.golang.org/grpc"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
Expand Down Expand Up @@ -467,7 +468,7 @@ func TestDeltaWildcardSubscriptions(t *testing.T) {
},
}

validateResponse := func(t *testing.T, replies <-chan *discovery.DeltaDiscoveryResponse, expectedResources []string, expectedRemovedResources []string) {
validateResponse := func(t *testing.T, replies <-chan *discovery.DeltaDiscoveryResponse, expectedResources []string, expectedRemovedResources []string) string {
t.Helper()
select {
case response := <-replies:
Expand All @@ -480,8 +481,10 @@ func TestDeltaWildcardSubscriptions(t *testing.T) {
assert.ElementsMatch(t, names, expectedResources)
assert.ElementsMatch(t, response.RemovedResources, expectedRemovedResources)
}
return response.Nonce
case <-time.After(1 * time.Second):
t.Fatalf("got no response")
require.Fail(t, "got no response")
return ""
}
}

Expand Down Expand Up @@ -530,6 +533,41 @@ func TestDeltaWildcardSubscriptions(t *testing.T) {

})

t.Run("unsubscribed resources are sent again if re-subscribed later on and the version has not changed", func(t *testing.T) {
resp := makeMockDeltaStream(t)
defer close(resp.recv)
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
go func() {
err := s.DeltaAggregatedResources(resp)
assert.NoError(t, err)
}()

resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: []string{"endpoints0", "endpoints1"},
}
nonce := validateResponse(t, resp.sent, []string{"endpoints0", "endpoints1"}, nil)

// Unsubscribe from endpoints0
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
TypeUrl: rsrc.EndpointType,
ResponseNonce: nonce,
ResourceNamesUnsubscribe: []string{"endpoints0"},
}
// No reply is expected here

// Subscribe again to endpoints0
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
TypeUrl: rsrc.EndpointType,
ResponseNonce: nonce,
ResourceNamesSubscribe: []string{"endpoints0"},
}
validateResponse(t, resp.sent, []string{"endpoints0"}, nil)
})

t.Run("* subscribtion/unsubscription support", func(t *testing.T) {
resp := makeMockDeltaStream(t)
defer close(resp.recv)
Expand Down

0 comments on commit ea51144

Please sign in to comment.