Skip to content

Commit

Permalink
Wait until ACK before sending additional pushes
Browse files Browse the repository at this point in the history
Fixes: istio#25685

At large scale, Envoy suffers from overload of XDS pushes, and there is
no backpressure in the system. Other control planes, such as any based
on go-control-plane, outperform Istio in config update propogations
under load as a result.

This changes adds a backpressure mechanism to ensure we do not push more
configs than Envoy can handle. By slowing down the pushes, the
propogation time of new configurations actually increases. We do this by
keeping note, but not sending, any push requests where that TypeUrl has
an un-ACKed request in flight. When we get an ACK, if there is a pending
push request we will immediately trigger it. This effectively means that
in a high churn environment, each proxy will always have exactly 1
outstanding push per type, and when the ACK is recieved we will
immediately send a new update.

This PR is co-authored by Steve, who did a huge amount of work in
developing this into the state it is today, as wel as finding and
testing the problem. See istio#27563 for
much of this work.

Co-Authored-By: Steven Dake sdake@ibm.com
  • Loading branch information
howardjohn committed Oct 27, 2020
1 parent 0de069c commit f9ef09d
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 12 deletions.
14 changes: 14 additions & 0 deletions pilot/pkg/features/pilot.go
Expand Up @@ -386,4 +386,18 @@ var (

WorkloadEntryHealthChecks = env.RegisterBoolVar("PILOT_ENABLE_WORKLOAD_ENTRY_HEALTHCHECKS", false,
"Enables automatic health checks of WorkloadEntries based on the config provided in the associated WorkloadGroup").Get()

EnableFlowControl = env.RegisterBoolVar(
"PILOT_ENABLE_FLOW_CONTROL",
false,
"If enabled, pilot will wait for the completion of a receive operation before"+
"executing a push operation. This is a form of flow control and is useful in"+
"environments with high rates of push requests to each gateway. By default,"+
"this is false.").Get()

FlowControlTimeout = env.RegisterDurationVar(
"PILOT_FLOW_CONTROL_TIMEOUT",
15*time.Second,
"If set, the max amount of time to delay a push by. Depends on PILOT_ENABLE_FLOW_CONTROL.",
).Get()
)
3 changes: 3 additions & 0 deletions pilot/pkg/model/context.go
Expand Up @@ -270,6 +270,9 @@ type WatchedResource struct {
// NonceAcked is the last acked message.
NonceAcked string

// NonceNacked is the last nacked message. This is reset following a successful ACK
NonceNacked string

// LastSent tracks the time of the generated push, to determine the time it takes the client to ack.
LastSent time.Time

Expand Down
86 changes: 74 additions & 12 deletions pilot/pkg/xds/ads.go
Expand Up @@ -85,6 +85,11 @@ type Connection struct {

// stop can be used to end the connection manually via debug endpoints. Only to be used for testing.
stop chan struct{}

// blockedPushes is a map of TypeUrl to push request. This is set when we attempt to push to a busy Envoy
// (last push not ACKed). When we get an ACK from Envoy, if the type is populated here, we will trigger
// the push.
blockedPushes map[string]*model.PushRequest
}

// Event represents a config or registry event that results in a push.
Expand All @@ -98,11 +103,12 @@ type Event struct {

func newConnection(peerAddr string, stream DiscoveryStream) *Connection {
return &Connection{
pushChannel: make(chan *Event),
stop: make(chan struct{}),
PeerAddr: peerAddr,
Connect: time.Now(),
stream: stream,
pushChannel: make(chan *Event),
stop: make(chan struct{}),
PeerAddr: peerAddr,
Connect: time.Now(),
stream: stream,
blockedPushes: map[string]*model.PushRequest{},
}
}

Expand Down Expand Up @@ -180,13 +186,28 @@ func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *C
s.StatusReporter.RegisterEvent(con.ConID, req.TypeUrl, req.ResponseNonce)
}

if !s.shouldRespond(con, req) {
shouldRespond := s.shouldRespond(con, req)

con.proxy.Lock()
request, haveBlockedPush := con.blockedPushes[req.TypeUrl]
delete(con.blockedPushes, req.TypeUrl)
con.proxy.Unlock()

if shouldRespond {
// This is a request, trigger a full push for this type
request = &model.PushRequest{Full: true}
} else if !haveBlockedPush {
// This is an ACK, no delayed push
// Return immediately, no action needed
return nil
} else {
// we have a blocked push which we will use
adsLog.Debugf("%s: DEQUEUE for node:%s", v3.GetShortType(req.TypeUrl), con.proxy.ID)
}

push := s.globalPushContext()

return s.pushXds(con, push, versionInfo(), con.Watched(req.TypeUrl), &model.PushRequest{Full: true})
return s.pushXds(con, push, versionInfo(), con.Watched(req.TypeUrl), request)
}

// StreamAggregatedResources implements the ADS interface.
Expand Down Expand Up @@ -294,6 +315,9 @@ func (s *DiscoveryServer) shouldRespond(con *Connection, request *discovery.Disc
if s.InternalGen != nil {
s.InternalGen.OnNack(con.proxy, request)
}
con.proxy.Lock()
con.proxy.WatchedResources[request.TypeUrl].NonceNacked = request.ResponseNonce
con.proxy.Unlock()
return false
}

Expand Down Expand Up @@ -336,6 +360,10 @@ func (s *DiscoveryServer) shouldRespond(con *Connection, request *discovery.Disc
adsLog.Debugf("ADS:%s: REQ %s Expired nonce received %s, sent %s", stype,
con.ConID, request.ResponseNonce, previousInfo.NonceSent)
xdsExpiredNonce.With(typeTag.Value(v3.GetMetricType(request.TypeUrl))).Increment()
con.proxy.Lock()
con.proxy.WatchedResources[request.TypeUrl].NonceNacked = ""
con.proxy.WatchedResources[request.TypeUrl].LastRequest = request
con.proxy.Unlock()
return false
}

Expand All @@ -345,6 +373,7 @@ func (s *DiscoveryServer) shouldRespond(con *Connection, request *discovery.Disc
previousResources := con.proxy.WatchedResources[request.TypeUrl].ResourceNames
con.proxy.WatchedResources[request.TypeUrl].VersionAcked = request.VersionInfo
con.proxy.WatchedResources[request.TypeUrl].NonceAcked = request.ResponseNonce
con.proxy.WatchedResources[request.TypeUrl].NonceNacked = ""
con.proxy.WatchedResources[request.TypeUrl].ResourceNames = request.ResourceNames
con.proxy.WatchedResources[request.TypeUrl].LastRequest = request
con.proxy.Unlock()
Expand Down Expand Up @@ -589,9 +618,30 @@ func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {
// Send pushes to all generators
// Each Generator is responsible for determining if the push event requires a push
for _, w := range getPushResources(con.proxy.WatchedResources) {
err := s.pushXds(con, pushRequest.Push, currentVersion, w, pushRequest)
if err != nil {
return err
synced, timeout := con.Synced(w.TypeUrl)
if features.EnableFlowControl && !synced && timeout {
// We are not synced, but we have been stuck for too long. We will trigger the push anyways to
// avoid any scenario where this may deadlock.
// This can possibly be removed in the future if we find this never causes issues
totalDelayedPushes.With(typeTag.Value(v3.GetMetricType(w.TypeUrl))).Increment()
adsLog.Warnf("%s: QUEUE TIMEOUT for node:%s", v3.GetShortType(w.TypeUrl), con.proxy.ID)
}

if !features.EnableFlowControl || synced || timeout {
err := s.pushXds(con, pushRequest.Push, currentVersion, w, pushRequest)
if err != nil {
return err
}
} else {
// The type is not yet synced. Instead of pushing now, which may overload Envoy,
// we will wait until the last push is ACKed and trigger the push. See
// https://github.com/istio/istio/issues/25685 for details on the performance
// impact of sending pushes before Envoy ACKs.
totalDelayedPushes.With(typeTag.Value(v3.GetMetricType(w.TypeUrl))).Increment()
adsLog.Debugf("%s: QUEUE for node:%s", v3.GetShortType(w.TypeUrl), con.proxy.ID)
con.proxy.Lock()
con.blockedPushes[w.TypeUrl] = con.blockedPushes[w.TypeUrl].Merge(pushEv.pushRequest)
con.proxy.Unlock()
}
}
if pushRequest.Full {
Expand Down Expand Up @@ -775,7 +825,8 @@ func (s *DiscoveryServer) removeCon(conID string) {
// Send with timeout
func (conn *Connection) send(res *discovery.DiscoveryResponse) error {
errChan := make(chan error, 1)
// hardcoded for now - not sure if we need a setting

// sendTimeout may be modified via environment
t := time.NewTimer(sendTimeout)
go func() {
start := time.Now()
Expand All @@ -786,7 +837,6 @@ func (conn *Connection) send(res *discovery.DiscoveryResponse) error {

select {
case <-t.C:
// TODO: wait for ACK
adsLog.Infof("Timeout writing %s", conn.ConID)
xdsResponseWriteTimeouts.Increment()
return status.Errorf(codes.DeadlineExceeded, "timeout sending")
Expand Down Expand Up @@ -817,6 +867,18 @@ func (conn *Connection) send(res *discovery.DiscoveryResponse) error {
}
}

// nolint
// Synced checks if the type has been synced, meaning the most recent push was ACKed
func (conn *Connection) Synced(typeUrl string) (bool, bool) {
conn.proxy.RLock()
defer conn.proxy.RUnlock()
acked := conn.proxy.WatchedResources[typeUrl].NonceAcked
sent := conn.proxy.WatchedResources[typeUrl].NonceSent
nacked := conn.proxy.WatchedResources[typeUrl].NonceNacked != ""
sendTime := conn.proxy.WatchedResources[typeUrl].LastSent
return nacked || acked == sent, time.Since(sendTime) > features.FlowControlTimeout
}

// nolint
func (conn *Connection) NonceAcked(typeUrl string) string {
conn.proxy.RLock()
Expand Down
65 changes: 65 additions & 0 deletions pilot/pkg/xds/ads_test.go
Expand Up @@ -22,9 +22,11 @@ import (

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/genproto/googleapis/rpc/status"

mesh "istio.io/api/mesh/v1alpha1"
networking "istio.io/api/networking/v1alpha3"
"istio.io/istio/pilot/pkg/features"
"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pilot/pkg/util/sets"
"istio.io/istio/pilot/pkg/xds"
Expand All @@ -40,6 +42,7 @@ import (
"istio.io/istio/pkg/security"
"istio.io/istio/pkg/spiffe"
"istio.io/istio/tests/util"
"istio.io/pkg/log"
)

const (
Expand Down Expand Up @@ -756,6 +759,68 @@ func TestEnvoyRDSProtocolError(t *testing.T) {
})
}

func TestBlockedPush(t *testing.T) {
original := features.EnableFlowControl
t.Cleanup(func() {
features.EnableFlowControl = original
})
t.Run("flow control enabled", func(t *testing.T) {
features.EnableFlowControl = true
log.FindScope("ads").SetOutputLevel(log.DebugLevel)
features.EnableFlowControl = true
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
ads := s.ConnectADS().WithType(v3.ClusterType)
ads.RequestResponseAck(nil)
// Send push, get a response but do not ACK it
xds.AdsPushAll(s.Discovery)
res := ads.ExpectResponse()

// Another push results in no response as we are blocked
xds.AdsPushAll(s.Discovery)
ads.ExpectNoResponse()

// ACK, unblocking the previous push
ads.Request(&discovery.DiscoveryRequest{ResponseNonce: res.Nonce})
ads.ExpectResponse()
})
t.Run("flow control enabled NACK", func(t *testing.T) {
log.FindScope("ads").SetOutputLevel(log.DebugLevel)
features.EnableFlowControl = true
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
ads := s.ConnectADS().WithType(v3.ClusterType)
ads.RequestResponseAck(nil)
// Send push, get a response and NACK it
xds.AdsPushAll(s.Discovery)
res := ads.ExpectResponse()
ads.Request(&discovery.DiscoveryRequest{ResponseNonce: res.Nonce, ErrorDetail: &status.Status{Message: "Test request NACK"}})

// Another push results in no response as we are blocked
xds.AdsPushAll(s.Discovery)
ads.ExpectResponse()

// ACK, unblocking the previous push
ads.Request(&discovery.DiscoveryRequest{ResponseNonce: res.Nonce})
ads.ExpectNoResponse()
})
t.Run("flow control disabled", func(t *testing.T) {
features.EnableFlowControl = false
s := xds.NewFakeDiscoveryServer(t, xds.FakeOptions{})
ads := s.ConnectADS().WithType(v3.ClusterType)
ads.RequestResponseAck(nil)
// Send push, get a response but do not ACK it
xds.AdsPushAll(s.Discovery)
res := ads.ExpectResponse()

// Another push results in response as we do not care that we are blocked
xds.AdsPushAll(s.Discovery)
ads.ExpectResponse()

// ACK, unblocking the previous push
ads.Request(&discovery.DiscoveryRequest{ResponseNonce: res.Nonce})
ads.ExpectNoResponse()
})
}

func TestEnvoyRDSUpdatedRouteRequest(t *testing.T) {
expectRoutes := func(resp *discovery.DiscoveryResponse, expected ...string) {
t.Helper()
Expand Down
17 changes: 17 additions & 0 deletions pilot/pkg/xds/monitoring.go
Expand Up @@ -65,6 +65,21 @@ var (
monitoring.WithLabels(typeTag),
)

// Number of delayed pushes. Currently this happens only when the last push has not been ACKed
totalDelayedPushes = monitoring.NewSum(
"pilot_xds_delayed_pushes_total",
"Total number of XDS pushes that are delayed.",
monitoring.WithLabels(typeTag),
)

// Number of delayed pushes that we pushed prematurely as a failsafe.
// This indicates that either the failsafe timeout is too aggressive or there is a deadlock
totalDelayedPushTimeouts = monitoring.NewSum(
"pilot_xds_delayed_push_timeouts_total",
"Total number of XDS pushes that are delayed and timed out",
monitoring.WithLabels(typeTag),
)

xdsExpiredNonce = monitoring.NewSum(
"pilot_xds_expired_nonce",
"Total number of XDS requests with an expired nonce.",
Expand Down Expand Up @@ -235,5 +250,7 @@ func init() {
inboundUpdates,
pushTriggers,
sendTime,
totalDelayedPushes,
totalDelayedPushTimeouts,
)
}
10 changes: 10 additions & 0 deletions releasenotes/notes/backpressure.yaml
@@ -0,0 +1,10 @@
apiVersion: release-notes/v2
kind: feature
area: networking
issue:
- https://github.com/istio/istio/issues/25685
releaseNotes:
- |
**Added** support for backpressure on XDS pushes to avoid overloading Envoy during periods of high configuration
churn. This is disabled by default and can be enabled by setting the PILOT_ENABLE_FLOW_CONTROL environment variable in Istiod.

0 comments on commit f9ef09d

Please sign in to comment.