Skip to content

Commit

Permalink
pkg/endpoint: keep endpoint labels for their original sources
Browse files Browse the repository at this point in the history
Fix two Cilium bugs related to label handling:

1. Addressed an issue during endpoint restoration where Cilium would incorrectly
   replace labels not sourced from Kubernetes. Previously, labels set on an
   endpoint outside of Kubernetes were wiped out upon restoration, as all labels
   were overwritten with those fetched from Kubernetes.

2. Resolved a bug that occurred when a user added or removed a label from a pod
   or namespace while the Cilium agent was inactive. Upon Cilium restart, the
   affected endpoint failed to reflect these changes, leading to synchronization
   issues in label management.

Signed-off-by: André Martins <andre@cilium.io>
  • Loading branch information
aanm committed Dec 1, 2023
1 parent 82e8849 commit e43b759
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 53 deletions.
32 changes: 18 additions & 14 deletions daemon/cmd/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"maps"
"net"
"net/http"
"runtime"
Expand Down Expand Up @@ -411,16 +412,16 @@ func (d *Daemon) createEndpoint(ctx context.Context, owner regeneration.Owner, e
return invalidDataError(ep, err)
}

addLabels := labels.NewLabelsFromModel(epTemplate.Labels)
apiLabels := labels.NewLabelsFromModel(epTemplate.Labels)
infoLabels := labels.NewLabelsFromModel([]string{})

if len(addLabels) > 0 {
if lbls := addLabels.FindReserved(); lbls != nil {
if len(apiLabels) > 0 {
if lbls := apiLabels.FindReserved(); lbls != nil {
return invalidDataError(ep, fmt.Errorf("not allowed to add reserved labels: %s", lbls))
}

addLabels, _ = labelsfilter.Filter(addLabels)
if len(addLabels) == 0 {
apiLabels, _ = labelsfilter.Filter(apiLabels)
if len(apiLabels) == 0 {
return invalidDataError(ep, fmt.Errorf("no valid labels provided"))
}
}
Expand All @@ -430,15 +431,17 @@ func (d *Daemon) createEndpoint(ctx context.Context, owner regeneration.Owner, e
d.endpointCreations.NewCreateRequest(ep, cancel)
defer d.endpointCreations.EndCreateRequest(ep)

identityLbls := maps.Clone(apiLabels)

if ep.K8sNamespaceAndPodNameIsSet() && d.clientset.IsEnabled() {
pod, cp, identityLabels, info, annotations, err := d.fetchK8sMetadataForEndpoint(ep.K8sNamespace, ep.K8sPodName)
pod, cp, k8sIdentityLbls, k8sInfoLbls, annotations, err := d.fetchK8sMetadataForEndpoint(ep.K8sNamespace, ep.K8sPodName)
if err != nil {
ep.Logger("api").WithError(err).Warning("Unable to fetch kubernetes labels")
} else {
ep.SetPod(pod)
ep.SetK8sMetadata(cp)
addLabels.MergeLabels(identityLabels)
infoLabels.MergeLabels(info)
identityLbls.MergeLabels(k8sIdentityLbls)
infoLabels.MergeLabels(k8sInfoLbls)
if _, ok := annotations[bandwidth.IngressBandwidth]; ok {
log.WithFields(logrus.Fields{
logfields.K8sPodName: epTemplate.K8sNamespace + "/" + epTemplate.K8sPodName,
Expand All @@ -458,11 +461,11 @@ func (d *Daemon) createEndpoint(ctx context.Context, owner regeneration.Owner, e

// The following docs describe the cases where the init identity is used:
// http://docs.cilium.io/en/latest/policy/lifecycle/#init-identity
if len(addLabels) == 0 {
if len(identityLbls) == 0 {
// If the endpoint has no labels, give the endpoint a special identity with
// label reserved:init so we can generate a custom policy for it until we
// get its actual identity.
addLabels = labels.Labels{
identityLbls = labels.Labels{
labels.IDNameInit: labels.NewLabel(labels.IDNameInit, "", labels.LabelSourceReserved),
}
}
Expand All @@ -474,8 +477,8 @@ func (d *Daemon) createEndpoint(ctx context.Context, owner regeneration.Owner, e
if ep.K8sNamespaceAndPodNameIsSet() && d.clientset.IsEnabled() {
// If there are labels, but no pod namespace, then it's
// likely that there are no k8s labels at all. Resolve.
if _, k8sLabelsConfigured := addLabels[k8sConst.PodNamespaceLabel]; !k8sLabelsConfigured {
ep.RunMetadataResolver(d.bwManager, d.fetchK8sMetadataForEndpoint)
if _, k8sLabelsConfigured := identityLbls[k8sConst.PodNamespaceLabel]; !k8sLabelsConfigured {
ep.RunMetadataResolver(false, false, apiLabels, d.bwManager, d.fetchK8sMetadataForEndpoint)
}
}

Expand All @@ -495,9 +498,10 @@ func (d *Daemon) createEndpoint(ctx context.Context, owner regeneration.Owner, e
// since the pod event handler would not find the endpoint for that pod
// in the endpoint manager. Thus, we will fetch the labels again
// and update the endpoint with these labels.
ep.RunMetadataResolver(d.bwManager, d.fetchK8sMetadataForEndpoint)
// Wait for the regeneration to be triggered before continuing.
regenTriggered = ep.RunMetadataResolver(false, true, apiLabels, d.bwManager, d.fetchK8sMetadataForEndpoint)
} else {
regenTriggered = ep.UpdateLabels(ctx, labels.LabelSourceAny, addLabels, infoLabels, true)
regenTriggered = ep.UpdateLabels(ctx, labels.LabelSourceAny, identityLbls, infoLabels, true)
}

select {
Expand Down
6 changes: 3 additions & 3 deletions daemon/cmd/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (d *Daemon) validateEndpoint(ep *endpoint.Endpoint) (valid bool, err error)
// which the endpoint manager will begin processing the events off the
// queue.
ep.InitEventQueue()
ep.RunMetadataResolver(d.bwManager, d.fetchK8sMetadataForEndpoint)
ep.RunRestoredMetadataResolver(d.bwManager, d.fetchK8sMetadataForEndpoint)
}

if err := ep.ValidateConnectorPlumbing(checkLink); err != nil {
Expand Down Expand Up @@ -338,7 +338,7 @@ func (d *Daemon) regenerateRestoredEndpoints(state *endpointRestoreState, endpoi

if ep.IsHost() {
log.WithField(logfields.EndpointID, ep.ID).Info("Successfully restored endpoint. Scheduling regeneration")
if err := ep.RegenerateAfterRestore(endpointsRegenerator); err != nil {
if err := ep.RegenerateAfterRestore(endpointsRegenerator, d.bwManager, d.fetchK8sMetadataForEndpoint); err != nil {
log.WithField(logfields.EndpointID, ep.ID).WithError(err).Debug("error regenerating restored host endpoint")
epRegenerated <- false
} else {
Expand All @@ -356,7 +356,7 @@ func (d *Daemon) regenerateRestoredEndpoints(state *endpointRestoreState, endpoi
}
log.WithField(logfields.EndpointID, ep.ID).Info("Successfully restored endpoint. Scheduling regeneration")
go func(ep *endpoint.Endpoint, epRegenerated chan<- bool) {
if err := ep.RegenerateAfterRestore(endpointsRegenerator); err != nil {
if err := ep.RegenerateAfterRestore(endpointsRegenerator, d.bwManager, d.fetchK8sMetadataForEndpoint); err != nil {
log.WithField(logfields.EndpointID, ep.ID).WithError(err).Debug("error regenerating during restore")
epRegenerated <- false
return
Expand Down
185 changes: 150 additions & 35 deletions pkg/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,100 @@ func (e *Endpoint) APICanModifyConfig(n models.ConfigurationMap) error {
return nil
}

// metadataResolver will resolve the endpoint's metadata from a metadata
// resolver.
//
// - restoredEndpoint - should be set to 'true' if the endpoint is being
// restored.
//
// - blocking - will block this function until the endpoint receives a new
// security identity, and it is regenerated. If 'false', this
// operation will be done in the background and 'regenTriggered'
// will always be 'false'.
//
// - bwm - the bandwidth manager used to update the bandwidth policy for this
// endpoint.
//
// - resolveMetadata - the metadata resolver that will be used to retrieve this
// endpoint's metadata.
func (e *Endpoint) metadataResolver(ctx context.Context,
restoredEndpoint, blocking bool,
baseLabels labels.Labels,
bwm bandwidth.Manager,
resolveMetadata MetadataResolverCB) (regenTriggered bool, err error) {
if !e.K8sNamespaceAndPodNameIsSet() {
e.Logger(resolveLabels).Debug("Namespace and Pod are not set")
return false, nil
}

// copy the base labels into this local variable
// so that we don't override 'baseLabels'.
controllerBaseLabels := labels.NewFrom(baseLabels)

ns, podName := e.GetK8sNamespace(), e.GetK8sPodName()

pod, cp, identityLabels, info, _, err := resolveMetadata(ns, podName)
if err != nil {
e.Logger(resolveLabels).WithError(err).Warning("Unable to fetch kubernetes labels")
// If we were unable to fetch the k8s endpoints then
// we will mark the endpoint with the init identity.
if !restoredEndpoint {
// Only mark the endpoint with the 'init' identity if we are not
// restoring the endpoint from a restart.
identityLabels := labels.Labels{
labels.IDNameInit: labels.NewLabel(labels.IDNameInit, "", labels.LabelSourceReserved),
}
regenTriggered := e.UpdateLabels(ctx, labels.LabelSourceAny, identityLabels, nil, true)
if blocking {
return regenTriggered, err
}
}
return false, err
}

// Merge the labels retrieved from the 'resolveMetadata' into the base
// labels.
controllerBaseLabels.MergeLabels(identityLabels)

e.SetPod(pod)
e.SetK8sMetadata(cp)
e.UpdateNoTrackRules(func(_, _ string) (noTrackPort string, err error) {
po, _, _, _, _, err := resolveMetadata(ns, podName)
if err != nil {
return "", err
}
value, _ := annotation.Get(po, annotation.NoTrack, annotation.NoTrackAlias)
return value, nil
})
e.UpdateVisibilityPolicy(func(_, _ string) (proxyVisibility string, err error) {
po, _, _, _, _, err := resolveMetadata(ns, podName)
if err != nil {
return "", err
}
value, _ := annotation.Get(po, annotation.ProxyVisibility, annotation.ProxyVisibilityAlias)
return value, nil
})
e.UpdateBandwidthPolicy(bwm, func(ns, podName string) (bandwidthEgress string, err error) {
_, _, _, _, annotations, err := resolveMetadata(ns, podName)
if err != nil {
return "", err
}
return annotations[bandwidth.EgressBandwidth], nil
})

// If 'baseLabels' are not set then 'controllerBaseLabels' only contains
// labels from k8s. Thus, we should only replace the labels that have their
// source as 'k8s' otherwise we will risk on replacing other labels that
// were added from other sources.
source := labels.LabelSourceK8s
if len(baseLabels) != 0 {
source = labels.LabelSourceAny
}
regenTriggered = e.UpdateLabels(ctx, source, controllerBaseLabels, info, blocking)

return regenTriggered, nil
}

// MetadataResolverCB provides an implementation for resolving the endpoint
// metadata for an endpoint such as the associated labels and annotations.
type MetadataResolverCB func(ns, podName string) (pod *slim_corev1.Pod, _ []slim_corev1.ContainerPort, identityLabels labels.Labels, infoLabels labels.Labels, annotations map[string]string, err error)
Expand All @@ -1644,10 +1738,30 @@ type MetadataResolverCB func(ns, podName string) (pod *slim_corev1.Pod, _ []slim
// either the first successful metadata resolution or when the endpoint is
// removed.
//
// baseLabels contains the list of labels use as "base" for the endpoint.
// The labels retrieved from 'MetadataResolverCB' will be merged into the
// baseLabels and put into the endpoint.
// If this list is empty, the labels set on the endpoint will be
// replaced by the labels returned 'MetadataResolverCB' as long their source
// matches the source of the labels already present on the endpoint.
//
// restoredEndpoint should be set to 'true' if the endpoint is being restored.
// If this is set to false and the resolver is unable to retrieve the endpoint
// labels from k8s, the endpoint will be set with the 'init' identity.
//
// blocking - will block this function until the endpoint receives a new
// security identity, and it is regenerated. If 'false', this
// operation will be done in the background and 'regenTriggered'
// will always be 'false'.
//
// This assumes that after the initial successful resolution, other mechanisms
// will handle updates (such as pkg/k8s/watchers informers).
func (e *Endpoint) RunMetadataResolver(bwm bandwidth.Manager, resolveMetadata MetadataResolverCB) {
done := make(chan struct{})
func (e *Endpoint) RunMetadataResolver(restoredEndpoint, blocking bool, baseLabels labels.Labels, bwm bandwidth.Manager, resolveMetadata MetadataResolverCB) (regenTriggered bool) {
var regenTriggeredCh chan bool
if blocking {
regenTriggeredCh = make(chan bool)
}
done := make(chan bool)
controllerName := resolveLabels + "-" + e.GetK8sNamespaceAndPodName()
go func() {
select {
Expand All @@ -1662,49 +1776,50 @@ func (e *Endpoint) RunMetadataResolver(bwm bandwidth.Manager, resolveMetadata Me
controller.ControllerParams{
Group: resolveLabelsControllerGroup,
DoFunc: func(ctx context.Context) error {
if e.K8sNamespaceAndPodNameIsSet() {
e.Logger(resolveLabels).Debug("Namespace and Pod are not set")
return nil
regenTriggered, err := e.metadataResolver(ctx, restoredEndpoint, blocking, baseLabels, bwm, resolveMetadata)
if blocking {
select {
// Check if the channel was closed.
case <-regenTriggeredCh:
case <-e.aliveCtx.Done():
case regenTriggeredCh <- regenTriggered:
close(regenTriggeredCh)
}
}
ns, podName := e.GetK8sNamespace(), e.GetK8sPodName()

pod, cp, identityLabels, info, _, err := resolveMetadata(ns, podName)
if err != nil {
e.Logger(resolveLabels).WithError(err).Warning("Unable to fetch kubernetes labels")
return err
}
e.SetPod(pod)
e.SetK8sMetadata(cp)
e.UpdateNoTrackRules(func(_, _ string) (noTrackPort string, err error) {
po, _, _, _, _, err := resolveMetadata(ns, podName)
if err != nil {
return "", err
}
value, _ := annotation.Get(po, annotation.NoTrack, annotation.NoTrackAlias)
return value, nil
})
e.UpdateVisibilityPolicy(func(_, _ string) (proxyVisibility string, err error) {
po, _, _, _, _, err := resolveMetadata(ns, podName)
if err != nil {
return "", err
}
value, _ := annotation.Get(po, annotation.ProxyVisibility, annotation.ProxyVisibilityAlias)
return value, nil
})
e.UpdateBandwidthPolicy(bwm, func(ns, podName string) (bandwidthEgress string, err error) {
_, _, _, _, annotations, err := resolveMetadata(ns, podName)
if err != nil {
return "", err
}
return annotations[bandwidth.EgressBandwidth], nil
})
e.UpdateLabels(ctx, labels.LabelSourceAny, identityLabels, info, true)
close(done)
return nil
},
Context: e.aliveCtx,
},
)

// If the caller wants this function to be blocking while resolving
// identities / regenerating then we will wait for the first result of
// `e.metadataResolver` before returning.
if blocking {
select {
case regenTriggered, ok := <-regenTriggeredCh:
return regenTriggered && ok
case <-e.aliveCtx.Done():
return false
}
}
return false
}

// RunRestoredMetadataResolver starts a controller associated with the received
// endpoint which will periodically attempt to resolve the metadata for the
// endpoint and update the endpoint with the related. It stops resolving after
// either the first successful metadata resolution or when the endpoint is
// removed.
//
// This assumes that after the initial successful resolution, other mechanisms
// will handle updates (such as pkg/k8s/watchers informers).
func (e *Endpoint) RunRestoredMetadataResolver(bwm bandwidth.Manager, resolveMetadata MetadataResolverCB) {
e.RunMetadataResolver(true, false, nil, bwm, resolveMetadata)
}

// ModifyIdentityLabels changes the custom and orchestration identity labels of an endpoint.
Expand Down
9 changes: 8 additions & 1 deletion pkg/endpoint/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cilium/cilium/api/v1/models"
"github.com/cilium/cilium/pkg/common"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/datapath/linux/bandwidth"
"github.com/cilium/cilium/pkg/endpoint/regeneration"
"github.com/cilium/cilium/pkg/fqdn"
"github.com/cilium/cilium/pkg/fqdn/restore"
Expand Down Expand Up @@ -190,14 +191,20 @@ func partitionEPDirNamesByRestoreStatus(eptsDirNames []string) (complete []strin
// RegenerateAfterRestore performs the following operations on the specified
// Endpoint:
// * allocates an identity for the Endpoint
// * fetches the latest labels from the pod.
// * regenerates the endpoint
// Returns an error if any operation fails while trying to perform the above
// operations.
func (e *Endpoint) RegenerateAfterRestore(regenerator *Regenerator) error {
func (e *Endpoint) RegenerateAfterRestore(regenerator *Regenerator, bwm bandwidth.Manager, resolveMetadata MetadataResolverCB) error {
if err := e.restoreIdentity(regenerator); err != nil {
return err
}

// Now that we have restored the endpoints' identity, run the metadata
// resolver so that we can fetch the latest labels from the pod for this
// endpoint.
e.RunRestoredMetadataResolver(bwm, resolveMetadata)

scopedLog := log.WithField(logfields.EndpointID, e.ID)

regenerationMetadata := &regeneration.ExternalRegenerationMetadata{
Expand Down

0 comments on commit e43b759

Please sign in to comment.