Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix labels synchronization issues on Cilium #29248

Merged
merged 5 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cilium-health/launch/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func LaunchAsEndpoint(baseCtx context.Context,
// Give the endpoint a security identity
ctx, cancel := context.WithTimeout(baseCtx, LaunchTime)
defer cancel()
ep.UpdateLabels(ctx, labels.LabelHealth, nil, true)
ep.UpdateLabels(ctx, labels.LabelSourceAny, labels.LabelHealth, nil, true)

// Initialize the health client to talk to this instance.
client := &Client{host: "http://" + net.JoinHostPort(healthIP.String(), strconv.Itoa(option.Config.ClusterHealthPort))}
Expand Down
34 changes: 19 additions & 15 deletions daemon/cmd/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"maps"
"net"
"net/http"
"runtime"
Expand Down Expand Up @@ -411,16 +412,16 @@ func (d *Daemon) createEndpoint(ctx context.Context, owner regeneration.Owner, e
return invalidDataError(ep, err)
}

addLabels := labels.NewLabelsFromModel(epTemplate.Labels)
apiLabels := labels.NewLabelsFromModel(epTemplate.Labels)
infoLabels := labels.NewLabelsFromModel([]string{})

if len(addLabels) > 0 {
if lbls := addLabels.FindReserved(); lbls != nil {
if len(apiLabels) > 0 {
if lbls := apiLabels.FindReserved(); lbls != nil {
return invalidDataError(ep, fmt.Errorf("not allowed to add reserved labels: %s", lbls))
}

addLabels, _ = labelsfilter.Filter(addLabels)
if len(addLabels) == 0 {
apiLabels, _ = labelsfilter.Filter(apiLabels)
if len(apiLabels) == 0 {
return invalidDataError(ep, fmt.Errorf("no valid labels provided"))
}
}
Expand All @@ -430,15 +431,17 @@ func (d *Daemon) createEndpoint(ctx context.Context, owner regeneration.Owner, e
d.endpointCreations.NewCreateRequest(ep, cancel)
defer d.endpointCreations.EndCreateRequest(ep)

identityLbls := maps.Clone(apiLabels)

if ep.K8sNamespaceAndPodNameIsSet() && d.clientset.IsEnabled() {
pod, cp, identityLabels, info, annotations, err := d.fetchK8sMetadataForEndpoint(ep.K8sNamespace, ep.K8sPodName)
pod, cp, k8sIdentityLbls, k8sInfoLbls, annotations, err := d.fetchK8sMetadataForEndpoint(ep.K8sNamespace, ep.K8sPodName)
if err != nil {
ep.Logger("api").WithError(err).Warning("Unable to fetch kubernetes labels")
} else {
ep.SetPod(pod)
ep.SetK8sMetadata(cp)
addLabels.MergeLabels(identityLabels)
infoLabels.MergeLabels(info)
identityLbls.MergeLabels(k8sIdentityLbls)
infoLabels.MergeLabels(k8sInfoLbls)
if _, ok := annotations[bandwidth.IngressBandwidth]; ok {
log.WithFields(logrus.Fields{
logfields.K8sPodName: epTemplate.K8sNamespace + "/" + epTemplate.K8sPodName,
Expand All @@ -458,11 +461,11 @@ func (d *Daemon) createEndpoint(ctx context.Context, owner regeneration.Owner, e

// The following docs describe the cases where the init identity is used:
// http://docs.cilium.io/en/latest/policy/lifecycle/#init-identity
if len(addLabels) == 0 {
if len(identityLbls) == 0 {
// If the endpoint has no labels, give the endpoint a special identity with
// label reserved:init so we can generate a custom policy for it until we
// get its actual identity.
addLabels = labels.Labels{
identityLbls = labels.Labels{
labels.IDNameInit: labels.NewLabel(labels.IDNameInit, "", labels.LabelSourceReserved),
}
}
Expand All @@ -474,8 +477,8 @@ func (d *Daemon) createEndpoint(ctx context.Context, owner regeneration.Owner, e
if ep.K8sNamespaceAndPodNameIsSet() && d.clientset.IsEnabled() {
// If there are labels, but no pod namespace, then it's
// likely that there are no k8s labels at all. Resolve.
if _, k8sLabelsConfigured := addLabels[k8sConst.PodNamespaceLabel]; !k8sLabelsConfigured {
ep.RunMetadataResolver(d.bwManager, d.fetchK8sMetadataForEndpoint)
if _, k8sLabelsConfigured := identityLbls[k8sConst.PodNamespaceLabel]; !k8sLabelsConfigured {
ep.RunMetadataResolver(false, false, apiLabels, d.bwManager, d.fetchK8sMetadataForEndpoint)
}
}

Expand All @@ -495,9 +498,10 @@ func (d *Daemon) createEndpoint(ctx context.Context, owner regeneration.Owner, e
// since the pod event handler would not find the endpoint for that pod
// in the endpoint manager. Thus, we will fetch the labels again
// and update the endpoint with these labels.
ep.RunMetadataResolver(d.bwManager, d.fetchK8sMetadataForEndpoint)
// Wait for the regeneration to be triggered before continuing.
regenTriggered = ep.RunMetadataResolver(false, true, apiLabels, d.bwManager, d.fetchK8sMetadataForEndpoint)
} else {
regenTriggered = ep.UpdateLabels(ctx, addLabels, infoLabels, true)
regenTriggered = ep.UpdateLabels(ctx, labels.LabelSourceAny, identityLbls, infoLabels, true)
}

select {
Expand Down Expand Up @@ -989,7 +993,7 @@ func (d *Daemon) modifyEndpointIdentityLabelsFromAPI(id string, add, del labels.
return PatchEndpointIDInvalidCode, err
}

if err := ep.ModifyIdentityLabels(addLabels, delLabels); err != nil {
if err := ep.ModifyIdentityLabels(labels.LabelSourceAny, addLabels, delLabels); err != nil {
return PatchEndpointIDLabelsNotFoundCode, err
}

Expand Down
6 changes: 3 additions & 3 deletions daemon/cmd/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (d *Daemon) validateEndpoint(ep *endpoint.Endpoint) (valid bool, err error)
// which the endpoint manager will begin processing the events off the
// queue.
ep.InitEventQueue()
ep.RunMetadataResolver(d.bwManager, d.fetchK8sMetadataForEndpoint)
ep.RunRestoredMetadataResolver(d.bwManager, d.fetchK8sMetadataForEndpoint)
}

if err := ep.ValidateConnectorPlumbing(checkLink); err != nil {
Expand Down Expand Up @@ -338,7 +338,7 @@ func (d *Daemon) regenerateRestoredEndpoints(state *endpointRestoreState, endpoi

if ep.IsHost() {
log.WithField(logfields.EndpointID, ep.ID).Info("Successfully restored endpoint. Scheduling regeneration")
if err := ep.RegenerateAfterRestore(endpointsRegenerator); err != nil {
if err := ep.RegenerateAfterRestore(endpointsRegenerator, d.bwManager, d.fetchK8sMetadataForEndpoint); err != nil {
log.WithField(logfields.EndpointID, ep.ID).WithError(err).Debug("error regenerating restored host endpoint")
epRegenerated <- false
} else {
Expand All @@ -356,7 +356,7 @@ func (d *Daemon) regenerateRestoredEndpoints(state *endpointRestoreState, endpoi
}
log.WithField(logfields.EndpointID, ep.ID).Info("Successfully restored endpoint. Scheduling regeneration")
go func(ep *endpoint.Endpoint, epRegenerated chan<- bool) {
if err := ep.RegenerateAfterRestore(endpointsRegenerator); err != nil {
if err := ep.RegenerateAfterRestore(endpointsRegenerator, d.bwManager, d.fetchK8sMetadataForEndpoint); err != nil {
log.WithField(logfields.EndpointID, ep.ID).WithError(err).Debug("error regenerating during restore")
epRegenerated <- false
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/endpoint/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,8 @@ func (e *Endpoint) ProcessChangeRequest(newEp *Endpoint, validPatchTransitionSta
// no need to set changed here
}

e.replaceInformationLabels(newEp.OpLabels.OrchestrationInfo)
rev := e.replaceIdentityLabels(newEp.OpLabels.IdentityLabels())
e.replaceInformationLabels(labels.LabelSourceAny, newEp.OpLabels.OrchestrationInfo)
rev := e.replaceIdentityLabels(labels.LabelSourceAny, newEp.OpLabels.IdentityLabels())
if rev != 0 {
// Run as a goroutine since the runIdentityResolver needs to get the lock
go e.runIdentityResolver(e.aliveCtx, rev, false)
Expand Down
Loading
Loading