Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

endpoint: don't hold the endpoint lock while generating policy #26242

Merged
merged 6 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions daemon/cmd/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,7 @@ func (d *Daemon) createEndpoint(ctx context.Context, owner regeneration.Owner, e
ep.Logger("api").WithError(err).Warning("Unable to fetch kubernetes labels")
} else {
ep.SetPod(pod)
if err := ep.SetK8sMetadata(cp); err != nil {
return invalidDataError(ep, fmt.Errorf("Invalid ContainerPorts %v: %s", cp, err))
}
ep.SetK8sMetadata(cp)
addLabels.MergeLabels(identityLabels)
infoLabels.MergeLabels(info)
if _, ok := annotations[bandwidth.IngressBandwidth]; ok {
Expand Down
4 changes: 2 additions & 2 deletions daemon/cmd/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (d *Daemon) regenerateRestoredEndpoints(state *endpointRestoreState) (resto
func (d *Daemon) allocateIPsLocked(ep *endpoint.Endpoint) (err error) {
if option.Config.EnableIPv6 && ep.IPv6.IsValid() {
ipv6Pool := ipam.PoolOrDefault(ep.IPv6IPAMPool)
_, err = d.ipam.AllocateIPWithoutSyncUpstream(ep.IPv6.AsSlice(), ep.HumanStringLocked()+" [restored]", ipv6Pool)
_, err = d.ipam.AllocateIPWithoutSyncUpstream(ep.IPv6.AsSlice(), ep.HumanString()+" [restored]", ipv6Pool)
if err != nil {
return fmt.Errorf("unable to reallocate %s IPv6 address: %w", ep.IPv6, err)
}
Expand All @@ -409,7 +409,7 @@ func (d *Daemon) allocateIPsLocked(ep *endpoint.Endpoint) (err error) {

if option.Config.EnableIPv4 && ep.IPv4.IsValid() {
ipv4Pool := ipam.PoolOrDefault(ep.IPv4IPAMPool)
_, err = d.ipam.AllocateIPWithoutSyncUpstream(ep.IPv4.AsSlice(), ep.HumanStringLocked()+" [restored]", ipv4Pool)
_, err = d.ipam.AllocateIPWithoutSyncUpstream(ep.IPv4.AsSlice(), ep.HumanString()+" [restored]", ipv4Pool)
switch {
// We only check for BypassIPAllocUponRestore for IPv4 because we
// assume that this flag is only turned on for IPv4-only IPAM modes
Expand Down
14 changes: 10 additions & 4 deletions pkg/endpoint/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cilium/cilium/pkg/mac"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/policy"
"github.com/cilium/cilium/pkg/types"
"github.com/cilium/cilium/pkg/u8proto"
)

Expand Down Expand Up @@ -143,7 +144,7 @@ func (e *Endpoint) getModelEndpointIdentitiersRLocked() *models.EndpointIdentifi
ContainerName: e.containerName,
DockerEndpointID: e.dockerEndpointID,
DockerNetworkID: e.dockerNetworkID,
PodName: e.getK8sNamespaceAndPodName(),
PodName: e.GetK8sNamespaceAndPodName(),
K8sPodName: e.K8sPodName,
K8sNamespace: e.K8sNamespace,
}
Expand Down Expand Up @@ -298,10 +299,12 @@ func (e *Endpoint) GetHealthModel() *models.EndpointHealth {
}

// getNamedPortsModel returns the endpoint's NamedPorts object.
//
// Must be called with e.mutex RLock()ed.
func (e *Endpoint) getNamedPortsModel() (np models.NamedPorts) {
k8sPorts := e.k8sPorts
var k8sPorts types.NamedPortMap
if p := e.k8sPorts.Load(); p != nil {
k8sPorts = *p
}

// keep named ports ordered to avoid the unnecessary updates to
// kube-apiserver
names := make([]string, 0, len(k8sPorts))
Expand Down Expand Up @@ -461,6 +464,9 @@ func (e *Endpoint) policyStatus() models.EndpointPolicyEnabled {
// purposes should a caller choose to try to regenerate this endpoint, as well
// as an error if the Endpoint is being deleted, since there is no point in
// changing an Endpoint if it is going to be deleted.
//
// Before adding any new fields here, check to see if they are assumed to be mutable after
// endpoint creation!
func (e *Endpoint) ProcessChangeRequest(newEp *Endpoint, validPatchTransitionState bool) (string, error) {
var (
changed bool
Expand Down
31 changes: 18 additions & 13 deletions pkg/endpoint/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,18 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
stats := &regenContext.Stats
datapathRegenCtxt := regenContext.datapathRegenerationContext

// regenerate policy without holding the lock.
// This is because policy generation needs the ipcache to make progress, and the ipcache needs to call
// endpoint.ApplyPolicyMapChanges()
jrajahalme marked this conversation as resolved.
Show resolved Hide resolved
stats.policyCalculation.Start()
policyResult, err := e.regeneratePolicy()
stats.policyCalculation.End(err == nil)
squeed marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return false, fmt.Errorf("unable to regenerate policy for '%s': %w", e.StringID(), err)
}

stats.waitingForLock.Start()
err := e.lockAlive()
err = e.lockAlive()
stats.waitingForLock.End(err == nil)
if err != nil {
return false, err
Expand Down Expand Up @@ -769,6 +779,12 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
close(datapathRegenCtxt.ctCleaned)
}

// Set the computed policy as the "incoming" policy. This can fail if
// the endpoint's security identity changed during or after policy calculation.
if err := e.setDesiredPolicy(policyResult); err != nil {
return false, err
}

// We cannot obtain the rules while e.mutex is held, because obtaining
// fresh DNSRules requires the IPCache lock (which must not be taken while
// holding e.mutex to avoid deadlocks). Therefore, rules are obtained
Expand All @@ -777,12 +793,6 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul

// If dry mode is enabled, no further changes to BPF maps are performed
if option.Config.DryMode {

// Compute policy for this endpoint.
if err = e.regeneratePolicy(); err != nil {
return false, fmt.Errorf("Unable to regenerate policy: %s", err)
}

_ = e.updateAndOverrideEndpointOptions(nil)

// Dry mode needs Network Policy Updates, but the proxy wait group must
Expand Down Expand Up @@ -819,12 +829,6 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
// Only generate & populate policy map if a security identity is set up for
// this endpoint.
if e.SecurityIdentity != nil {
stats.policyCalculation.Start()
err = e.regeneratePolicy()
stats.policyCalculation.End(err == nil)
if err != nil {
return false, fmt.Errorf("unable to regenerate policy for '%s': %s", e.StringID(), err)
}

_ = e.updateAndOverrideEndpointOptions(nil)

Expand Down Expand Up @@ -855,6 +859,7 @@ func (e *Endpoint) runPreCompilationSteps(regenContext *regenerationContext, rul
//
// Do this before updating the bpf policy maps below, so that the proxy listeners have a chance to be
// ready when new traffic is redirected to them.
// note: unlike regeneratePolicy, updateNetworkPolicy requires the endpoint read lock
stats.proxyPolicyCalculation.Start()
err, networkPolicyRevertFunc := e.updateNetworkPolicy(datapathRegenCtxt.proxyWaitGroup)
stats.proxyPolicyCalculation.End(err == nil)
Expand Down
94 changes: 40 additions & 54 deletions pkg/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,21 @@ type Endpoint struct {
// mutex protects write operations to this endpoint structure
mutex lock.RWMutex

// containerName is the name given to the endpoint by the container runtime
// containerName is the name given to the endpoint by the container runtime.
// Mutable, must be read with the endpoint lock!
containerName string

// containerID is the container ID that docker has assigned to the endpoint
// containerID is the container ID that docker has assigned to the endpoint.
// Mutable, must be read with the endpoint lock!
containerID string

// dockerNetworkID is the network ID of the libnetwork network if the
// endpoint is a docker managed container which uses libnetwork
dockerNetworkID string

// dockerEndpointID is the Docker network endpoint ID if managed by
// libnetwork
// libnetwork.
// immutable.
dockerEndpointID string

// ifName is the name of the host facing interface (veth pair) which
Expand Down Expand Up @@ -227,27 +230,28 @@ type Endpoint struct {
// compiled and installed.
bpfHeaderfileHash string

// K8sPodName is the Kubernetes pod name of the endpoint
// K8sPodName is the Kubernetes pod name of the endpoint.
// Immutable after Endpoint creation.
K8sPodName string

// K8sNamespace is the Kubernetes namespace of the endpoint
// K8sNamespace is the Kubernetes namespace of the endpoint.
// Immutable after Endpoint creation.
K8sNamespace string

// pod
pod *slim_corev1.Pod
pod atomic.Pointer[slim_corev1.Pod]

// k8sPorts contains container ports associated in the pod.
// It is used to enforce k8s network policies with port names.
k8sPorts types.NamedPortMap
k8sPorts atomic.Pointer[types.NamedPortMap]

// logLimiter rate limits potentially repeating warning logs
logLimiter logging.Limiter

// k8sPortsSet keep track when k8sPorts was set at least one time.
hasK8sMetadata bool

// policyRevision is the policy revision this endpoint is currently on
// to modify this field please use endpoint.setPolicyRevision instead
// to modify this field please use endpoint.setPolicyRevision instead.
//
// To write, both ep.mutex and ep.buildMutex must be held.
policyRevision uint64

// policyRevisionSignals contains a map of PolicyRevision signals that
Expand All @@ -270,7 +274,8 @@ type Endpoint struct {
proxyStatistics map[string]*models.ProxyStatistics

// nextPolicyRevision is the policy revision that the endpoint has
// updated to and that will become effective with the next regenerate
// updated to and that will become effective with the next regenerate.
// Must hold the endpoint mutex *and* buildMutex to write, and either to read.
nextPolicyRevision uint64

// forcePolicyCompute full endpoint policy recomputation
Expand Down Expand Up @@ -301,7 +306,8 @@ type Endpoint struct {
// realizedRedirects maps the ID of each proxy redirect that has been
// successfully added into a proxy for this endpoint, to the redirect's
// proxy port number.
// You must hold Endpoint.mutex to read or write it.
// You must hold Endpoint.mutex AND Endpoint.buildMutex to write to it,
// and either (or both) of those locks to read from it.
realizedRedirects map[string]uint16

// ctCleaned indicates whether the conntrack table has already been
Expand All @@ -314,8 +320,13 @@ type Endpoint struct {
// for all endpoints that have the same Identity.
selectorPolicy policy.SelectorPolicy

// desiredPolicy is the policy calculated during regeneration. After
// successful regeneration, it is copied to realizedPolicy
// To write, both ep.mutex and ep.buildMutex must be held.
desiredPolicy *policy.EndpointPolicy

// realizedPolicy is the policy that has most recently been applied.
// ep.mutex must be held.
realizedPolicy *policy.EndpointPolicy

visibilityPolicy *policy.VisibilityPolicy
Expand Down Expand Up @@ -1176,43 +1187,26 @@ func (e *Endpoint) leaveLocked(proxyWaitGroup *completion.WaitGroup, conf Delete
// GetK8sNamespace returns the name of the pod if the endpoint represents a
// Kubernetes pod
func (e *Endpoint) GetK8sNamespace() string {
e.unconditionalRLock()
// const after creation
ns := e.K8sNamespace
e.runlock()
return ns
}

// SetPod sets the pod related to this endpoint.
func (e *Endpoint) SetPod(pod *slim_corev1.Pod) {
e.unconditionalLock()
e.pod = pod
e.unlock()
e.pod.Store(pod)
}

// GetPod retrieves the pod related to this endpoint
func (e *Endpoint) GetPod() *slim_corev1.Pod {
e.unconditionalRLock()
pod := e.pod
e.runlock()
return pod
}

// SetK8sNamespace modifies the endpoint's pod name
func (e *Endpoint) SetK8sNamespace(name string) {
e.unconditionalLock()
e.K8sNamespace = name
e.UpdateLogger(map[string]interface{}{
logfields.K8sPodName: e.getK8sNamespaceAndPodName(),
})
e.unlock()
return e.pod.Load()
}

// SetK8sMetadata sets the k8s container ports specified by kubernetes.
// Note that once put in place, the new k8sPorts is never changed,
// so that the map can be used concurrently without keeping locks.
// Reading the 'e.k8sPorts' member (the "map pointer") *itself* requires the endpoint lock!
// Can't really error out as that might break backwards compatibility.
func (e *Endpoint) SetK8sMetadata(containerPorts []slim_corev1.ContainerPort) error {
func (e *Endpoint) SetK8sMetadata(containerPorts []slim_corev1.ContainerPort) {
k8sPorts := make(types.NamedPortMap, len(containerPorts))
for _, cp := range containerPorts {
if cp.Name == "" {
Expand All @@ -1224,39 +1218,27 @@ func (e *Endpoint) SetK8sMetadata(containerPorts []slim_corev1.ContainerPort) er
continue
}
}
if len(k8sPorts) == 0 {
k8sPorts = nil // nil map with no storage
}
e.mutex.Lock()
e.hasK8sMetadata = true
e.k8sPorts = k8sPorts
e.mutex.Unlock()
return nil
e.k8sPorts.Store(&k8sPorts)
}

// GetK8sPorts returns the k8sPorts, which must not be modified by the caller
func (e *Endpoint) GetK8sPorts() (k8sPorts types.NamedPortMap, err error) {
err = e.rlockAlive()
if err != nil {
return nil, err
func (e *Endpoint) GetK8sPorts() (k8sPorts types.NamedPortMap) {
if p := e.k8sPorts.Load(); p != nil {
k8sPorts = *p
}
k8sPorts = e.k8sPorts
e.mutex.RUnlock()
return k8sPorts, nil
return k8sPorts
}

// HaveK8sMetadata returns true once hasK8sMetadata was set
func (e *Endpoint) HaveK8sMetadata() (metadataSet bool) {
e.mutex.RLock()
metadataSet = e.hasK8sMetadata
e.mutex.RUnlock()
return
p := e.k8sPorts.Load()
return p != nil
}

// K8sNamespaceAndPodNameIsSet returns true if the pod name is set
func (e *Endpoint) K8sNamespaceAndPodNameIsSet() bool {
e.unconditionalLock()
podName := e.getK8sNamespaceAndPodName()
podName := e.GetK8sNamespaceAndPodName()
e.unlock()
return podName != "" && podName != "/"
}
Expand Down Expand Up @@ -2000,6 +1982,10 @@ func (e *Endpoint) identityLabelsChanged(ctx context.Context, myChangeRev int) (
// SetPolicyRevision sets the endpoint's policy revision with the given
// revision.
func (e *Endpoint) SetPolicyRevision(rev uint64) {
// Wait for any in-progress regenerations to finish.
e.buildMutex.Lock()
defer e.buildMutex.Unlock()

if err := e.lockAlive(); err != nil {
return
}
Expand Down