Skip to content

Commit

Permalink
endpointmanager: fix bpf policy pressure getting stuck.
Browse files Browse the repository at this point in the history
Currently the policy map pressure metric only updates the map pressure
metric when a new pressure value that is higher than the current one is
set. This means that the metric can only ever go up, so when maps are
shrunk (ex. such as after doing an cilium fqdn cache clean) the metric
never goes down.

This changes the behavior of the metric to maintain a map of map
pressure values. When the trigger is invoked, it iterates all values and
finds the max - updating the map_pressure gauge for policymaps to the
max value. Endpoints that are shut down have their values removed.

Signed-off-by: Tom Hadlaw <tom.hadlaw@isovalent.com>
  • Loading branch information
tommyp1ckles authored and aanm committed Nov 1, 2023
1 parent 904ceb3 commit 28ce005
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 15 deletions.
11 changes: 9 additions & 2 deletions pkg/endpoint/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,14 +1043,21 @@ func (e *Endpoint) SkipStateClean() {

// PolicyMapPressureEvent represents an event for a policymap pressure metric
// update that is sent via the policyMapPressureUpdater interface.
type PolicyMapPressureEvent struct{ Value float64 }
type PolicyMapPressureEvent struct {
Value float64
EndpointID uint16
}
type policyMapPressureUpdater interface {
Update(PolicyMapPressureEvent)
Remove(uint16)
}

func (e *Endpoint) updatePolicyMapPressureMetric() {
value := float64(e.realizedPolicy.GetPolicyMap().Len()) / float64(e.policyMap.MaxEntries())
e.PolicyMapPressureUpdater.Update(PolicyMapPressureEvent{value})
e.PolicyMapPressureUpdater.Update(PolicyMapPressureEvent{
Value: value,
EndpointID: e.ID,
})
}

func (e *Endpoint) deletePolicyKey(keyToDelete policy.Key, incremental bool) bool {
Expand Down
7 changes: 7 additions & 0 deletions pkg/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,16 @@ func (e *Endpoint) InitEndpointScope(parent cell.Scope) {
}

func (e *Endpoint) Close() {
e.mutex.Lock()
defer e.mutex.Unlock()

if e.closeHealthReporter != nil {
e.closeHealthReporter()
}

if e.PolicyMapPressureUpdater != nil {
e.PolicyMapPressureUpdater.Remove(e.ID)
}
}

type namedPortsGetter interface {
Expand Down
52 changes: 39 additions & 13 deletions pkg/endpointmanager/policymap_pressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,56 @@
package endpointmanager

import (
"sync/atomic"
"golang.org/x/exp/maps"

"github.com/cilium/cilium/pkg/endpoint"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/maps/policymap"
"github.com/cilium/cilium/pkg/metrics"
"github.com/cilium/cilium/pkg/time"
"github.com/cilium/cilium/pkg/trigger"
)

// Update upserts the endpoint ID and updates the max endpoint policy map pressure metric.
func (p *policyMapPressure) Update(ev endpoint.PolicyMapPressureEvent) {
val := ev.Value
p.Lock()
p.current[ev.EndpointID] = val
p.Unlock()

log.WithField(logfields.Value, val).Debug("EndpointManager policymap received event")

cur := p.current.Load()
if cur == nil || val > *cur {
p.current.Store(&val)
p.trigger.Trigger()
}
p.trigger.Trigger()
}

// Remove removes an endpoints policy map pressure by endpoint ID.
// Should be called to clean up the metric when an endpoint is removed.
func (p *policyMapPressure) Remove(id uint16) {
p.Lock()
delete(p.current, id)
p.Unlock()

p.trigger.Trigger()
}

var policyMapPressureMinInterval = 10 * time.Second

func newPolicyMapPressure() *policyMapPressure {
if !metrics.BPFMapPressure {
return nil
}

p := new(policyMapPressure)
p.gauge = metrics.NewBPFMapPressureGauge(policymap.MapName+"*", policymap.PressureMetricThreshold)
p.current = make(map[uint16]float64)

var err error
p.trigger, err = trigger.NewTrigger(trigger.Parameters{
// It seems like 10s is a small enough window of time where the user
// can still reasonably react to a rising BPF map pressure. Keep it
// below the default Prometheus scrape interval of 15s anyway.
MinInterval: 10 * time.Second,
MinInterval: policyMapPressureMinInterval,
TriggerFunc: func([]string) { p.update() },
Name: "endpointmanager-policymap-max-size-metrics",
})
Expand All @@ -57,22 +71,34 @@ func (mgr *policyMapPressure) update() {
return
}

if value := mgr.current.Load(); value != nil {
mgr.gauge.Set(*value)
mgr.RLock()
max := float64(0)
for _, value := range maps.Values(mgr.current) {
if value > max {
max = value
}
}
mgr.RUnlock()
mgr.gauge.Set(max)
}

type metricsGauge interface {
Set(value float64)
}

// policyMapPressure implements policyMapPressure to provide the endpoint's
// policymap pressure metric. It only exports the maximum policymap pressure
// from all endpoints within the EndpointManager to reduce cardinality of the
// metric.
type policyMapPressure struct {
// current holds the current maximum policymap pressure value that is
// pushed into gauge via trigger..
current atomic.Pointer[float64]
lock.RWMutex

// current holds the current maximum policymap pressure values by endpoint ID
// that is pushed into gauge via trigger..
current map[uint16]float64

// gauge is the gauge metric.
gauge *metrics.GaugeWithThreshold
gauge metricsGauge

// trigger handles exporting / updating the gauge with the value in current
// on an interval.
Expand Down
46 changes: 46 additions & 0 deletions pkg/endpointmanager/policymap_pressure_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package endpointmanager

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/cilium/cilium/pkg/endpoint"
)

func TestPolicyMapPressure(t *testing.T) {
assert := assert.New(t)
policyMapPressureMinInterval = 0
p := newPolicyMapPressure()
p.gauge = &fakeGague{}
assert.Equal(float64(0), p.gauge.(*fakeGague).lastValue)
p.Update(endpoint.PolicyMapPressureEvent{
EndpointID: 1,
Value: .5,
})
assertMetricEq := func(expected float64) {
assert.Eventually(func() bool {
return p.gauge.(*fakeGague).lastValue == expected
}, time.Second, 1*time.Millisecond)
}
assertMetricEq(.5)
p.Update(endpoint.PolicyMapPressureEvent{
EndpointID: 2,
Value: 1,
})
assertMetricEq(1)
p.Remove(2)
assertMetricEq(.5)
}

type fakeGague struct {
lastValue float64
}

func (f *fakeGague) Set(value float64) {
f.lastValue = value
}

0 comments on commit 28ce005

Please sign in to comment.