Skip to content

Commit

Permalink
cache: fix per-response caching and switch to pointers for requests a…
Browse files Browse the repository at this point in the history
…nd responses (#340)

* cache: fix per response caching and switch to pointers for response

Signed-off-by: Joshua Rutherford <joshua.rutherford@greymatter.io>

* cache: ensure cached discovery response logic is platform agnostic

Signed-off-by: Joshua Rutherford <joshua.rutherford@greymatter.io>

Co-authored-by: Joshua Rutherford <joshua.rutherford@greymatter.io>
  • Loading branch information
Joshua Rutherford and Joshua Rutherford committed Aug 12, 2020
1 parent 9c7e2b2 commit 9821e2b
Show file tree
Hide file tree
Showing 23 changed files with 356 additions and 308 deletions.
4 changes: 2 additions & 2 deletions build/integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ echo Envoy log: ${ENVOY_LOG}
ENVOY_PID=$!

function cleanup() {
kill ${ENVOY_PID}
kill ${UPSTREAM_PID}
kill ${ENVOY_PID} ${UPSTREAM_PID}
wait ${ENVOY_PID} ${UPSTREAM_PID} 2> /dev/null || true
}
trap cleanup EXIT

Expand Down
74 changes: 37 additions & 37 deletions pkg/cache/v2/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cache
import (
"context"
"fmt"
"sync/atomic"

discovery "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
Expand All @@ -43,15 +44,15 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateWatch(Request) (value chan Response, cancel func())
CreateWatch(*Request) (value chan Response, cancel func())
}

// Cache is a generic config cache with a watcher.
type Cache interface {
ConfigWatcher

// Fetch implements the polling method of the config cache using a non-empty request.
Fetch(context.Context, Request) (Response, error)
Fetch(context.Context, *Request) (Response, error)
}

// Response is a wrapper around Envoy's DiscoveryResponse.
Expand All @@ -70,7 +71,7 @@ type Response interface {
// be included in the final Discovery Response.
type RawResponse struct {
// Request is the original request.
Request discovery.DiscoveryRequest
Request *discovery.DiscoveryRequest

// Version of the resources as tracked by the cache for the given type.
// Proxy responds with this version as an acknowledgement.
Expand All @@ -79,21 +80,16 @@ type RawResponse struct {
// Resources to be included in the response.
Resources []types.Resource

// isResourceMarshaled indicates whether the resources have been marshaled.
// This is internally maintained by go-control-plane to prevent future
// duplication in marshaling efforts.
isResourceMarshaled bool

// marshaledResponse holds the serialized discovery response.
marshaledResponse *discovery.DiscoveryResponse
// marshaledResponse holds an atomic reference to the serialized discovery response.
marshaledResponse atomic.Value
}

var _ Response = &RawResponse{}

// PassthroughResponse is a pre constructed xDS response that need not go through marshalling transformations.
type PassthroughResponse struct {
// Request is the original request.
Request discovery.DiscoveryRequest
Request *discovery.DiscoveryRequest

// The discovery response that needs to be sent as is, without any marshalling transformations.
DiscoveryResponse *discovery.DiscoveryResponse
Expand All @@ -104,55 +100,59 @@ var _ Response = &PassthroughResponse{}
// GetDiscoveryResponse performs the marshalling the first time its called and uses the cached response subsequently.
// This is necessary because the marshalled response does not change across the calls.
// This caching behavior is important in high throughput scenarios because grpc marshalling has a cost and it drives the cpu utilization under load.
func (r RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) {
if r.isResourceMarshaled {
return r.marshaledResponse, nil
}
func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) {

marshaledResponse := r.marshaledResponse.Load()

if marshaledResponse == nil {

marshaledResources := make([]*any.Any, len(r.Resources))
marshaledResources := make([]*any.Any, len(r.Resources))

for i, resource := range r.Resources {
marshaledResource, err := MarshalResource(resource)
if err != nil {
return nil, err
for i, resource := range r.Resources {
marshaledResource, err := MarshalResource(resource)
if err != nil {
return nil, err
}
marshaledResources[i] = &any.Any{
TypeUrl: r.Request.TypeUrl,
Value: marshaledResource,
}
}
marshaledResources[i] = &any.Any{
TypeUrl: r.Request.TypeUrl,
Value: marshaledResource,

marshaledResponse = &discovery.DiscoveryResponse{
VersionInfo: r.Version,
Resources: marshaledResources,
TypeUrl: r.Request.TypeUrl,
}
}

r.isResourceMarshaled = true
r.marshaledResponse.Store(marshaledResponse)
}

return &discovery.DiscoveryResponse{
VersionInfo: r.Version,
Resources: marshaledResources,
TypeUrl: r.Request.TypeUrl,
}, nil
return marshaledResponse.(*discovery.DiscoveryResponse), nil
}

// GetRequest returns the original Discovery Request.
func (r RawResponse) GetRequest() *discovery.DiscoveryRequest {
return &r.Request
func (r *RawResponse) GetRequest() *discovery.DiscoveryRequest {
return r.Request
}

// GetVersion returns the response version.
func (r RawResponse) GetVersion() (string, error) {
func (r *RawResponse) GetVersion() (string, error) {
return r.Version, nil
}

// GetDiscoveryResponse returns the final passthrough Discovery Response.
func (r PassthroughResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) {
func (r *PassthroughResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) {
return r.DiscoveryResponse, nil
}

// GetRequest returns the original Discovery Request
func (r PassthroughResponse) GetRequest() *discovery.DiscoveryRequest {
return &r.Request
func (r *PassthroughResponse) GetRequest() *discovery.DiscoveryRequest {
return r.Request
}

// GetVersion returns the response version.
func (r PassthroughResponse) GetVersion() (string, error) {
func (r *PassthroughResponse) GetVersion() (string, error) {
if r.DiscoveryResponse != nil {
return r.DiscoveryResponse.VersionInfo, nil
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/cache/v2/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
func TestResponseGetDiscoveryResponse(t *testing.T) {
routes := []types.Resource{&route.RouteConfiguration{Name: resourceName}}
resp := cache.RawResponse{
Request: discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
Version: "v",
Resources: routes,
}
Expand All @@ -30,6 +30,10 @@ func TestResponseGetDiscoveryResponse(t *testing.T) {
assert.Equal(t, discoveryResponse.VersionInfo, resp.Version)
assert.Equal(t, len(discoveryResponse.Resources), 1)

cachedResponse, err := resp.GetDiscoveryResponse()
assert.Nil(t, err)
assert.Same(t, discoveryResponse, cachedResponse)

r := &route.RouteConfiguration{}
err = ptypes.UnmarshalAny(discoveryResponse.Resources[0], r)
assert.Nil(t, err)
Expand All @@ -46,7 +50,7 @@ func TestPassthroughResponseGetDiscoveryResponse(t *testing.T) {
VersionInfo: "v",
}
resp := cache.PassthroughResponse{
Request: discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
DiscoveryResponse: dr,
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/v2/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func (cache *LinearCache) respond(value chan Response, staleResources []string)
}
}
}
value <- RawResponse{
Request: Request{TypeUrl: cache.typeURL},
value <- &RawResponse{
Request: &Request{TypeUrl: cache.typeURL},
Resources: resources,
Version: strconv.FormatUint(cache.version, 10),
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func (cache *LinearCache) DeleteResource(name string) error {
return nil
}

func (cache *LinearCache) CreateWatch(request Request) (chan Response, func()) {
func (cache *LinearCache) CreateWatch(request *Request) (chan Response, func()) {
value := make(chan Response, 1)
if request.TypeUrl != cache.typeURL {
close(value)
Expand Down Expand Up @@ -207,7 +207,7 @@ func (cache *LinearCache) CreateWatch(request Request) (chan Response, func()) {
}
}

func (cache *LinearCache) Fetch(ctx context.Context, request Request) (Response, error) {
func (cache *LinearCache) Fetch(ctx context.Context, request *Request) (Response, error) {
return nil, errors.New("not implemented")
}

Expand Down
42 changes: 21 additions & 21 deletions pkg/cache/v2/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func mustBlock(t *testing.T, w <-chan Response) {

func TestLinearInitialResources(t *testing.T) {
c := NewLinearCache(testType, map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})
w, _ := c.CreateWatch(Request{ResourceNames: []string{"a"}, TypeUrl: testType})
w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType})
verifyResponse(t, w, "0", 1)
w, _ = c.CreateWatch(Request{TypeUrl: testType})
w, _ = c.CreateWatch(&Request{TypeUrl: testType})
verifyResponse(t, w, "0", 2)
}

Expand All @@ -84,7 +84,7 @@ func TestLinearCornerCases(t *testing.T) {
t.Error("expected error on nil resource")
}
// create an incorrect type URL request
w, _ := c.CreateWatch(Request{TypeUrl: "test"})
w, _ := c.CreateWatch(&Request{TypeUrl: "test"})
select {
case _, more := <-w:
if more {
Expand All @@ -99,9 +99,9 @@ func TestLinearBasic(t *testing.T) {
c := NewLinearCache(testType, nil)

// Create watches before a resource is ready
w1, _ := c.CreateWatch(Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
w1, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
mustBlock(t, w1)
w, _ := c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "0"})
w, _ := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
mustBlock(t, w)
checkWatchCount(t, c, "a", 2)
checkWatchCount(t, c, "b", 1)
Expand All @@ -112,44 +112,44 @@ func TestLinearBasic(t *testing.T) {
verifyResponse(t, w, "1", 1)

// Request again, should get same response
w, _ = c.CreateWatch(Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
checkWatchCount(t, c, "a", 0)
verifyResponse(t, w, "1", 1)
w, _ = c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "0"})
w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
checkWatchCount(t, c, "a", 0)
verifyResponse(t, w, "1", 1)

// Add another element and update the first, response should be different
c.UpdateResource("b", testResource("b"))
c.UpdateResource("a", testResource("aa"))
w, _ = c.CreateWatch(Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
verifyResponse(t, w, "3", 1)
w, _ = c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "0"})
w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
verifyResponse(t, w, "3", 2)
}

func TestLinearDeletion(t *testing.T) {
c := NewLinearCache(testType, map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})
w, _ := c.CreateWatch(Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
c.DeleteResource("a")
verifyResponse(t, w, "1", 0)
checkWatchCount(t, c, "a", 0)
w, _ = c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "0"})
w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
verifyResponse(t, w, "1", 1)
checkWatchCount(t, c, "b", 0)
c.DeleteResource("b")
w, _ = c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "1"})
w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
verifyResponse(t, w, "2", 0)
checkWatchCount(t, c, "b", 0)
}

func TestLinearWatchTwo(t *testing.T) {
c := NewLinearCache(testType, map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})
w, _ := c.CreateWatch(Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"})
w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"})
mustBlock(t, w)
w1, _ := c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "0"})
w1, _ := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
mustBlock(t, w1)
c.UpdateResource("a", testResource("aa"))
// should only get the modified resource
Expand All @@ -162,24 +162,24 @@ func TestLinearCancel(t *testing.T) {
c.UpdateResource("a", testResource("a"))

// cancel watch-all
w, cancel := c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "1"})
w, cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
cancel()
checkWatchCount(t, c, "a", 0)

// cancel watch for "a"
w, cancel = c.CreateWatch(Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"})
w, cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"})
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
cancel()
checkWatchCount(t, c, "a", 0)

// open four watches for "a" and "b" and two for all, cancel one of each, make sure the second one is unaffected
w, cancel = c.CreateWatch(Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"})
w2, cancel2 := c.CreateWatch(Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"})
w3, cancel3 := c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "1"})
w4, cancel4 := c.CreateWatch(Request{TypeUrl: testType, VersionInfo: "1"})
w, cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"})
w2, cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"})
w3, cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
w4, cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
mustBlock(t, w)
mustBlock(t, w2)
mustBlock(t, w3)
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestLinearConcurrentSetWatch(t *testing.T) {
} else {
id2 := fmt.Sprintf("%d", i-1)
t.Logf("request resources %q and %q", id, id2)
value, _ := c.CreateWatch(Request{
value, _ := c.CreateWatch(&Request{
// Only expect one to become stale
ResourceNames: []string{id, id2},
VersionInfo: "0",
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/v2/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type MuxCache struct {

var _ Cache = &MuxCache{}

func (mux *MuxCache) CreateWatch(request Request) (chan Response, func()) {
key := mux.Classify(request)
func (mux *MuxCache) CreateWatch(request *Request) (chan Response, func()) {
key := mux.Classify(*request)
cache, exists := mux.Caches[key]
if !exists {
value := make(chan Response, 0)
Expand All @@ -45,6 +45,6 @@ func (mux *MuxCache) CreateWatch(request Request) (chan Response, func()) {
return cache.CreateWatch(request)
}

func (mux *MuxCache) Fetch(ctx context.Context, request Request) (Response, error) {
func (mux *MuxCache) Fetch(ctx context.Context, request *Request) (Response, error) {
return nil, errors.New("not implemented")
}

0 comments on commit 9821e2b

Please sign in to comment.