Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orchestrator implementation #53

Merged
merged 32 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
124d83e
Return Resource on cache Fetch
Apr 8, 2020
8c07603
Orchestrator implementation
Apr 8, 2020
ccf3a75
Merge branch 'master' into orchestrator
Apr 8, 2020
8fbde42
Pass by reference for cache watch, export MarshalResources
Apr 13, 2020
ee54b65
Add downstream, upstream response map wrappers
Apr 13, 2020
3b505db
Changes and fixes to orchestrator workflow
Apr 13, 2020
071e8e5
Add unit tests
Apr 13, 2020
0e853a8
Add missing testdata
Apr 13, 2020
6234b8a
Link TODO with issues
Apr 13, 2020
fad0c13
Fanout response to watches in parallel
Apr 13, 2020
7723d2c
Send response only if req/resp versions differ
Apr 14, 2020
78ecf04
Remove sleep
Apr 15, 2020
2f15c34
Merge master
Apr 15, 2020
cc6284e
Merge master (#2)
jyotimahapatra Apr 21, 2020
24d87a6
DeleteRequest API (#59)
Apr 23, 2020
aefa63b
grpc graceful shutdown (#64)
jyotimahapatra Apr 24, 2020
3672040
Fix all the things
Apr 24, 2020
4c05a5a
Merge branch 'master' into orchestrator
Apr 24, 2020
f302e8c
lint
Apr 24, 2020
60f7a22
Fix cache test
Apr 24, 2020
97c91e7
Add TODOs, feedback
Apr 25, 2020
aeef7ca
Use sync.Map
Apr 29, 2020
5315967
s/shutdown/shutdownUpstream
Apr 30, 2020
d3d1e3b
add timeout
Apr 30, 2020
6bcaf46
add wg
Apr 30, 2020
30246e7
Reduce timeout, don't close downstream channels
May 1, 2020
ecb0a28
fanout with default, buffered downstream chans
May 4, 2020
144d156
Remove timeout
May 4, 2020
a36fc60
Document upstream/downstream files
May 5, 2020
d08ddcc
Heavily document watchUpstreams, s/watches/watchers
May 5, 2020
4e4e0cc
assert sync map keys
May 5, 2020
57ff78e
comment on default
May 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions internal/app/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *cache) Fetch(key string) (*Resource, error) {
func (c *cache) SetResponse(key string, resp v2.DiscoveryResponse) (map[*v2.DiscoveryRequest]bool, error) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
marshaledResources, err := marshalResources(resp.Resources)
marshaledResources, err := MarshalResources(resp.Resources)
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("failed to marshal resources for key: %s, err %v", key, err)
}
Expand All @@ -126,6 +126,7 @@ func (c *cache) SetResponse(key string, resp v2.DiscoveryResponse) (map[*v2.Disc
resource := Resource{
Resp: response,
ExpirationTime: c.getExpirationTime(time.Now()),
Requests: make(map[*v2.DiscoveryRequest]bool),
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
}
c.cache.Add(key, resource)
return nil, nil
Expand Down Expand Up @@ -193,7 +194,9 @@ func (c *cache) getExpirationTime(currentTime time.Time) time.Time {
return time.Time{}
}

func marshalResources(resources []*any.Any) ([]gcp_types.MarshaledResource, error) {
// MarshalResource converts the raw xDS discovery resources into a serialized
// form accepted by go-control-plane.
func MarshalResources(resources []*any.Any) ([]gcp_types.MarshaledResource, error) {
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
var marshaledResources []gcp_types.MarshaledResource
for _, resource := range resources {
marshaledResource, err := gcp.MarshalResource(resource)
Expand Down
3 changes: 2 additions & 1 deletion internal/app/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ var testResponse = Response{
}

var testResource = Resource{
Resp: &testResponse,
Resp: &testResponse,
Requests: make(map[*v2.DiscoveryRequest]bool),
}

func TestAddRequestAndFetch(t *testing.T) {
Expand Down
69 changes: 69 additions & 0 deletions internal/app/orchestrator/downstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package orchestrator
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved

import (
"sync"

gcp "github.com/envoyproxy/go-control-plane/pkg/cache/v2"
)

// downstreamResponseMap is a map of downstream xDS client requests to response
// channels.
type downstreamResponseMap struct {
mu sync.RWMutex
responseChannels map[*gcp.Request]chan gcp.Response
}

func newDownstreamResponseMap() downstreamResponseMap {
return downstreamResponseMap{
responseChannels: make(map[*gcp.Request]chan gcp.Response),
}
}

// createChannel initializes a new channel for a request if it doesn't already
// exist.
func (d *downstreamResponseMap) createChannel(req *gcp.Request) chan gcp.Response {
d.mu.Lock()
defer d.mu.Unlock()
if _, ok := d.responseChannels[req]; !ok {
d.responseChannels[req] = make(chan gcp.Response, 1)
}
return d.responseChannels[req]
}

// get retrieves the channel where responses are set for the specified request.
func (d *downstreamResponseMap) get(req *gcp.Request) (chan gcp.Response, bool) {
d.mu.RLock()
defer d.mu.RUnlock()
channel, ok := d.responseChannels[req]
return channel, ok
}

// delete removes the response channel and request entry from the map.
// Note: We don't close the response channel prior to deletion because there
// can be separate go routines that are still attempting to write to the
// channel. We rely on garbage collection to clean up and close outstanding
// response channels once the go routines finish writing to them.
func (d *downstreamResponseMap) delete(req *gcp.Request) chan gcp.Response {
d.mu.Lock()
defer d.mu.Unlock()
if channel, ok := d.responseChannels[req]; ok {
delete(d.responseChannels, req)
return channel
}
return nil
}

// deleteAll removes all response channels and request entries from the map.
// Note: We don't close the response channel prior to deletion because there
// can be separate go routines that are still attempting to write to the
// channel. We rely on garbage collection to clean up and close outstanding
// response channels once the go routines finish writing to them.
func (d *downstreamResponseMap) deleteAll(watches map[*gcp.Request]bool) {
d.mu.Lock()
defer d.mu.Unlock()
for watch := range watches {
if d.responseChannels[watch] != nil {
delete(d.responseChannels, watch)
}
}
}
227 changes: 214 additions & 13 deletions internal/app/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package orchestrator
import (
"context"
"fmt"
"sync"
"time"

bootstrapv1 "github.com/envoyproxy/xds-relay/pkg/api/bootstrap/v1"
Expand All @@ -24,6 +25,10 @@ import (

const (
component = "orchestrator"

// unaggregatedPrefix is the prefix used to label discovery requests that
// could not be successfully mapped to an aggregation rule.
unaggregatedPrefix = "unaggregated_"
)

// Orchestrator has the following responsibilities:
Expand All @@ -48,6 +53,10 @@ const (
// more details.
type Orchestrator interface {
gcp.Cache

// This is called by the main shutdown handler and tests to clean up
// open channels.
shutdown(ctx context.Context)
}

type orchestrator struct {
Expand All @@ -56,31 +65,42 @@ type orchestrator struct {
upstreamClient upstream.Client

logger log.Logger

downstreamResponseMap downstreamResponseMap
upstreamResponseMap upstreamResponseMap
}

// New instantiates the mapper, cache, upstream client components necessary for
// the orchestrator to operate and returns an instance of the instantiated
// orchestrator.
func New(ctx context.Context,
func New(
ctx context.Context,
l log.Logger,
mapper mapper.Mapper,
upstreamClient upstream.Client,
cacheConfig *bootstrapv1.Cache) Orchestrator {
cacheConfig *bootstrapv1.Cache,
) Orchestrator {
orchestrator := &orchestrator{
logger: l.Named(component),
mapper: mapper,
upstreamClient: upstreamClient,
logger: l.Named(component),
mapper: mapper,
upstreamClient: upstreamClient,
downstreamResponseMap: newDownstreamResponseMap(),
upstreamResponseMap: newUpstreamResponseMap(),
}

// Initialize cache.
cache, err := cache.NewCache(int(cacheConfig.MaxEntries),
cache, err := cache.NewCache(
int(cacheConfig.MaxEntries),
orchestrator.onCacheEvicted,
time.Duration(cacheConfig.Ttl.Nanos)*time.Nanosecond)
time.Duration(cacheConfig.Ttl.Nanos)*time.Nanosecond,
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
)
if err != nil {
orchestrator.logger.With("error", err).Panic(ctx, "failed to initialize cache")
}
orchestrator.cache = cache

go orchestrator.shutdown(ctx)

return orchestrator
}

Expand All @@ -95,16 +115,197 @@ func New(ctx context.Context,
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
func (c *orchestrator) CreateWatch(req gcp.Request) (chan gcp.Response, func()) {
// TODO implement.
return nil, nil
func (o *orchestrator) CreateWatch(req gcp.Request) (chan gcp.Response, func()) {
ctx := context.Background()
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved

// If this is the first time we're seeing the request from the
// downstream client, initialize a channel to feed future responses.
responseChannel := o.downstreamResponseMap.createChannel(&req)

aggregatedKey, err := o.mapper.GetKey(req)
if err != nil {
// Can't map the request to an aggregated key. Log and continue to
// propagate the response upstream without aggregation.
o.logger.With("err", err).With("req node", req.GetNode()).Warn(ctx, "failed to map to aggregated key")
// Mimic the aggregated key.
// TODO (https://github.com/envoyproxy/xds-relay/issues/56). This key
// needs to be made more granular to uniquely identify a request.
aggregatedKey = fmt.Sprintf("%s%s_%s", unaggregatedPrefix, req.GetNode().GetId(), req.GetTypeUrl())
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
}

// Register the watch for future responses.
err = o.cache.AddRequest(aggregatedKey, &req)
if err != nil {
// If we fail to register the watch, we need to kill this stream by
// closing the response channel.
o.logger.With("err", err).With("key", aggregatedKey).With(
"req node", req.GetNode()).Error(ctx, "failed to add watch")
closedChannel := o.downstreamResponseMap.delete(&req)
return closedChannel, nil
}

// Check if we have a cached response first.
cached, err := o.cache.Fetch(aggregatedKey)
if err != nil {
// Log, and continue to propagate the response upstream.
o.logger.With("err", err).With("key", aggregatedKey).Warn(ctx, "failed to fetch aggregated key")
}

if cached != nil && cached.Resp != nil && cached.Resp.Raw.GetVersionInfo() != req.GetVersionInfo() {
// If we have a cached response and the version is different,
// immediately push the result to the response channel.
go func() { responseChannel <- convertToGcpResponse(cached.Resp, req) }()
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
}

// Check if we have a upstream stream open for this aggregated key. If not,
// open a stream with the representative request.
if !o.upstreamResponseMap.exists(aggregatedKey) {
upstreamResponseChan, shutdown, err := o.upstreamClient.OpenStream(req)
if err != nil {
// TODO implement retry/back-off logic on error scenario.
// https://github.com/envoyproxy/xds-relay/issues/68
o.logger.With("err", err).With("key", aggregatedKey).Error(ctx, "Failed to open stream to origin server")
} else {
respChannel, upstreamOpenedPreviously := o.upstreamResponseMap.add(aggregatedKey, upstreamResponseChan)
if upstreamOpenedPreviously {
// A stream was opened previously due to a race between
// concurrent downstreams for the same aggregated key, between
// exists and add operations. In this event, simply close the
// slower stream and return the existing one.
shutdown()
} else {
// Spin up a go routine to watch for upstream responses.
// One routine is opened per aggregate key.
go o.watchUpstream(ctx, aggregatedKey, respChannel.response, respChannel.done, shutdown)
}
}
}

return responseChannel, o.onCancelWatch(aggregatedKey, &req)
}

// Fetch implements the polling method of the config cache using a non-empty request.
func (c *orchestrator) Fetch(context.Context, discovery.DiscoveryRequest) (*gcp.Response, error) {
func (o *orchestrator) Fetch(context.Context, discovery.DiscoveryRequest) (*gcp.Response, error) {
return nil, fmt.Errorf("Not implemented")
}

func (c *orchestrator) onCacheEvicted(key string, resource cache.Resource) {
// TODO implement.
// watchResponse is intended to be called in a goroutine, to receive incoming
// responses and fan out to downstream clients.
func (o *orchestrator) watchUpstream(
ctx context.Context,
aggregatedKey string,
responseChannel <-chan *discovery.DiscoveryResponse,
done <-chan bool,
shutdownUpstream func(),
) {
for {
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
select {
case x, more := <-responseChannel:
if !more {
// A problem occurred fetching the response upstream, log retry.
// TODO implement retry/back-off logic on error scenario.
// https://github.com/envoyproxy/xds-relay/issues/68
o.logger.With("key", aggregatedKey).Error(ctx, "upstream error")
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
return
}
// Cache the response.
_, err := o.cache.SetResponse(aggregatedKey, *x)
if err != nil {
// TODO if set fails, we may need to retry upstream as well.
// Currently the fallback is to rely on a future response, but
// that probably isn't ideal.
// https://github.com/envoyproxy/xds-relay/issues/70
//
// If we fail to cache the new response, log and return the old one.
o.logger.With("err", err).With("key", aggregatedKey).
Error(ctx, "Failed to cache the response")
}

// Get downstream watches and fan out.
// We retrieve from cache rather than directly fanning out the
// newly received response because the cache does additional
// resource serialization.
cached, err := o.cache.Fetch(aggregatedKey)
if err != nil {
o.logger.With("err", err).With("key", aggregatedKey).Error(ctx, "cache fetch failed")
// Can't do anything because we don't know who the watches
// are. Drop the response.
} else {
if cached == nil || cached.Resp == nil {
// If cache is empty, there is nothing to fan out.
// Error. Sanity check. Shouldn't ever reach this since we
// just set the response, but it's a rare scenario that can
// happen if the cache TTL is set very short.
o.logger.With("key", aggregatedKey).Error(ctx, "attempted to fan out with no cached response")
} else {
// Goldenpath.
o.fanout(cached.Resp, cached.Requests, aggregatedKey)
}
}
case <-done:
// Exit when signaled that the stream has closed.
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
shutdownUpstream()
return
}
}
}

// fanout pushes the response to the response channels of all open downstream
// watches in parallel.
func (o *orchestrator) fanout(resp *cache.Response, watchers map[*gcp.Request]bool, aggregatedKey string) {
var wg sync.WaitGroup
for watch := range watchers {
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
wg.Add(1)
go func(watch *gcp.Request) {
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()
if channel, ok := o.downstreamResponseMap.get(watch); ok {
select {
case channel <- convertToGcpResponse(resp, *watch):
break
default:
Copy link

@lita lita May 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How often is this expected to happen? It looks like this channel is buffered as 1. if there is spam of updates, maybe it is fine to drop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's more discussion for this here.

The idea behind this was that a downstream receiver could be blocked for any reason. Initially I had went with a timeout, but in discussions with @jyotimahapatra we realized that it would imply the slowest sidecar would block new updates from being sent to other sidecars, due to the use of WaitGroups. We can't get rid of WaitGroups because more recent updates might get overriden by a slower update if updates were bursty.

The buffer is necessary in order to give time for the receiver to initialize. I realized in this test that the second watch (for the same aggregated key) would fall to the default case if the channel was unbuffered.

If there is a spam of updates, I think it's reasonable to drop since only one of those updates will be applied downstream anyway, but we do lose the the guarantee that it is the "latest" update. Could move back to the configured timeout model if this is a concern, but I put some of those downsides ^.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thoughts are to move forward with the default for now. I'll document this. If it is a problem in practice, then we can revisit the timeout model or other approaches.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me.

Copy link

@lita lita May 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah in the future, you can do what Envoymanager does and drop the oldest and enqueue the newest rather than logging an error.

That has been working quite well for us since none of the configurations requires ordering and we send the state of the world. We always just want the latest update.

o.logger.With("key", aggregatedKey).With("node ID", watch.GetNode().GetId()).
Error(context.Background(), "channel blocked during fanout")
}
}
}(watch)
}
// Wait for all fanouts to complete.
wg.Wait()
}

// onCacheEvicted is called when the cache evicts a response due to TTL or
// other reasons. When this happens, we need to clean up open streams.
// We shut down both the downstream watches and the upstream stream.
func (o *orchestrator) onCacheEvicted(key string, resource cache.Resource) {
// TODO Potential for improvements here to handle the thundering herd
// problem: https://github.com/envoyproxy/xds-relay/issues/71
o.downstreamResponseMap.deleteAll(resource.Requests)
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
o.upstreamResponseMap.delete(key)
}

// onCancelWatch cleans up the cached watch when called.
func (o *orchestrator) onCancelWatch(aggregatedKey string, req *gcp.Request) func() {
return func() {
o.downstreamResponseMap.delete(req)
if err := o.cache.DeleteRequest(aggregatedKey, req); err != nil {
o.logger.With("key", aggregatedKey).With("err", err).Warn(context.Background(), "Failed to delete from cache")
}
}
}

// shutdown closes all upstream connections when ctx.Done is called.
func (o *orchestrator) shutdown(ctx context.Context) {
<-ctx.Done()
o.upstreamResponseMap.deleteAll()
}

// convertToGcpResponse constructs the go-control-plane response from the
// cached response.
func convertToGcpResponse(resp *cache.Response, req gcp.Request) gcp.Response {
return gcp.Response{
Request: req,
Version: resp.Raw.GetVersionInfo(),
ResourceMarshaled: true,
MarshaledResources: resp.MarshaledResources,
}
}
Loading