diff --git a/daemon/cmd/daemon.go b/daemon/cmd/daemon.go index 1d9a438906fb..36dfea19ed1e 100644 --- a/daemon/cmd/daemon.go +++ b/daemon/cmd/daemon.go @@ -49,7 +49,6 @@ import ( "github.com/cilium/cilium/pkg/ipam" "github.com/cilium/cilium/pkg/ipcache" "github.com/cilium/cilium/pkg/k8s" - "github.com/cilium/cilium/pkg/k8s/endpointsynchronizer" "github.com/cilium/cilium/pkg/k8s/watchers" "github.com/cilium/cilium/pkg/loadbalancer" "github.com/cilium/cilium/pkg/lock" @@ -333,9 +332,7 @@ func NewDaemon(ctx context.Context, dp datapath.Datapath) (*Daemon, *endpointRes ipcache.IdentityAllocator = d.identityAllocator proxy.Allocator = d.identityAllocator - d.endpointManager = endpointmanager.NewEndpointManager(&endpointsynchronizer.EndpointSynchronizer{ - Allocator: d.identityAllocator, - }) + d.endpointManager = endpointmanager.NewEndpointManager(&watchers.EndpointSynchronizer{}) d.endpointManager.InitMetrics() d.k8sWatcher = watchers.NewK8sWatcher( diff --git a/daemon/cmd/endpoint.go b/daemon/cmd/endpoint.go index 70c5489bb4b4..3c65da0240e6 100644 --- a/daemon/cmd/endpoint.go +++ b/daemon/cmd/endpoint.go @@ -30,6 +30,7 @@ import ( "github.com/cilium/cilium/pkg/endpoint/regeneration" "github.com/cilium/cilium/pkg/k8s" k8sConst "github.com/cilium/cilium/pkg/k8s/apis/cilium.io" + "github.com/cilium/cilium/pkg/k8s/types" "github.com/cilium/cilium/pkg/labels" "github.com/cilium/cilium/pkg/labelsfilter" "github.com/cilium/cilium/pkg/logging/logfields" @@ -151,24 +152,26 @@ func NewPutEndpointIDHandler(d *Daemon) PutEndpointIDHandler { // fetchK8sLabelsAndAnnotations wraps the k8s package to fetch and provide // endpoint metadata. It implements endpoint.MetadataResolverCB. -func (d *Daemon) fetchK8sLabelsAndAnnotations(nsName, podName string) (labels.Labels, labels.Labels, map[string]string, error) { +// The returned pod is deepcopied which means the its fields can be written +// into. +func (d *Daemon) fetchK8sLabelsAndAnnotations(nsName, podName string) (*types.Pod, labels.Labels, labels.Labels, map[string]string, error) { p, err := d.k8sWatcher.GetCachedPod(nsName, podName) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } ns, err := d.k8sWatcher.GetCachedNamespace(nsName) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } lbls, annotations, err := k8s.GetPodMetadata(ns, p) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } k8sLbls := labels.Map2Labels(lbls, labels.LabelSourceK8s) identityLabels, infoLabels := labelsfilter.Filter(k8sLbls) - return identityLabels, infoLabels, annotations, nil + return p, identityLabels, infoLabels, annotations, nil } func invalidDataError(ep *endpoint.Endpoint, err error) (*endpoint.Endpoint, int, error) { @@ -262,10 +265,11 @@ func (d *Daemon) createEndpoint(ctx context.Context, epTemplate *models.Endpoint } if ep.K8sNamespaceAndPodNameIsSet() && k8s.IsEnabled() { - identityLabels, info, _, err := d.fetchK8sLabelsAndAnnotations(ep.K8sNamespace, ep.K8sPodName) + pod, identityLabels, info, _, err := d.fetchK8sLabelsAndAnnotations(ep.K8sNamespace, ep.K8sPodName) if err != nil { ep.Logger("api").WithError(err).Warning("Unable to fetch kubernetes labels") } else { + ep.SetPod(pod) addLabels.MergeLabels(identityLabels) infoLabels.MergeLabels(info) } diff --git a/daemon/cmd/state.go b/daemon/cmd/state.go index c9c6dbc100a7..951350da9038 100644 --- a/daemon/cmd/state.go +++ b/daemon/cmd/state.go @@ -22,7 +22,6 @@ import ( "os" "sync" - "github.com/cilium/cilium/pkg/annotation" "github.com/cilium/cilium/pkg/controller" "github.com/cilium/cilium/pkg/datapath/connector" "github.com/cilium/cilium/pkg/endpoint" @@ -92,13 +91,7 @@ func (d *Daemon) validateEndpoint(ep *endpoint.Endpoint) (valid bool, err error) return } - ep.UpdateVisibilityPolicy(func(ns, podName string) (proxyVisibility string, err error) { - _, _, annotations, err := d.fetchK8sLabelsAndAnnotations(ns, podName) - if err != nil { - return "", err - } - return annotations[annotation.ProxyVisibility], nil - }) + ep.RunMetadataResolver(d.fetchK8sLabelsAndAnnotations) }() } diff --git a/pkg/endpoint/endpoint.go b/pkg/endpoint/endpoint.go index f6c36f65bb7f..dae71d4c3c3b 100644 --- a/pkg/endpoint/endpoint.go +++ b/pkg/endpoint/endpoint.go @@ -40,6 +40,7 @@ import ( "github.com/cilium/cilium/pkg/identity/cache" "github.com/cilium/cilium/pkg/identity/identitymanager" ciliumio "github.com/cilium/cilium/pkg/k8s/apis/cilium.io" + "github.com/cilium/cilium/pkg/k8s/types" "github.com/cilium/cilium/pkg/labels" pkgLabels "github.com/cilium/cilium/pkg/labels" "github.com/cilium/cilium/pkg/lock" @@ -211,6 +212,9 @@ type Endpoint struct { // K8sNamespace is the Kubernetes namespace of the endpoint K8sNamespace string + // pod + pod *types.Pod + // policyRevision is the policy revision this endpoint is currently on // to modify this field please use endpoint.setPolicyRevision instead policyRevision uint64 @@ -1047,6 +1051,21 @@ func (e *Endpoint) GetK8sNamespace() string { return ns } +// SetPod sets the pod related to this endpoint. +func (e *Endpoint) SetPod(pod *types.Pod) { + e.unconditionalLock() + e.pod = pod + e.unlock() +} + +// GetPod retrieves the pod related to this endpoint +func (e *Endpoint) GetPod() *types.Pod { + e.unconditionalRLock() + pod := e.pod + e.runlock() + return pod +} + // SetK8sNamespace modifies the endpoint's pod name func (e *Endpoint) SetK8sNamespace(name string) { e.unconditionalLock() @@ -1459,7 +1478,7 @@ func APICanModify(e *Endpoint) error { // MetadataResolverCB provides an implementation for resolving the endpoint // metadata for an endpoint such as the associated labels and annotations. -type MetadataResolverCB func(ns, podName string) (identityLabels labels.Labels, infoLabels labels.Labels, annotations map[string]string, err error) +type MetadataResolverCB func(ns, podName string) (pod *types.Pod, identityLabels labels.Labels, infoLabels labels.Labels, annotations map[string]string, err error) // RunMetadataResolver starts a controller associated with the received // endpoint which will periodically attempt to resolve the metadata for the @@ -1481,13 +1500,14 @@ func (e *Endpoint) RunMetadataResolver(resolveMetadata MetadataResolverCB) { controller.ControllerParams{ DoFunc: func(ctx context.Context) error { ns, podName := e.GetK8sNamespace(), e.GetK8sPodName() - identityLabels, info, _, err := resolveMetadata(ns, podName) + pod, identityLabels, info, _, err := resolveMetadata(ns, podName) if err != nil { e.Logger(controllerName).WithError(err).Warning("Unable to fetch kubernetes labels") return err } + e.SetPod(pod) e.UpdateVisibilityPolicy(func(_, _ string) (proxyVisibility string, err error) { - _, _, annotations, err := resolveMetadata(ns, podName) + _, _, _, annotations, err := resolveMetadata(ns, podName) if err != nil { return "", err } diff --git a/pkg/k8s/endpointsynchronizer/logfields.go b/pkg/k8s/endpointsynchronizer/logfields.go deleted file mode 100644 index 5011a2ba6813..000000000000 --- a/pkg/k8s/endpointsynchronizer/logfields.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2016-2018 Authors of Cilium -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package endpointsynchronizer - -// logging field definitions -const ( - // subsysEndpointSync is the value for logfields.LogSubsys - subsysEndpointSync = "endpointsynchronizer" -) diff --git a/pkg/k8s/endpointsynchronizer/cep.go b/pkg/k8s/watchers/endpointsynchronizer.go similarity index 85% rename from pkg/k8s/endpointsynchronizer/cep.go rename to pkg/k8s/watchers/endpointsynchronizer.go index 87a877a1a249..2dd65f9cfc77 100644 --- a/pkg/k8s/endpointsynchronizer/cep.go +++ b/pkg/k8s/watchers/endpointsynchronizer.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package endpointsynchronizer +package watchers import ( "context" @@ -23,7 +23,6 @@ import ( "github.com/cilium/cilium/pkg/controller" "github.com/cilium/cilium/pkg/endpoint" - "github.com/cilium/cilium/pkg/identity/cache" "github.com/cilium/cilium/pkg/k8s" cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" k8sversion "github.com/cilium/cilium/pkg/k8s/version" @@ -36,13 +35,14 @@ import ( "k8s.io/apimachinery/pkg/types" ) +const ( + // subsysEndpointSync is the value for logfields.LogSubsys + subsysEndpointSync = "endpointsynchronizer" +) + // EndpointSynchronizer currently is an empty type, which wraps around syncing // of CiliumEndpoint resources. -// TODO - see whether folding the global variables below into this function -// is cleaner. -type EndpointSynchronizer struct { - Allocator *cache.CachingIdentityAllocator -} +type EndpointSynchronizer struct{} // RunK8sCiliumEndpointSync starts a controller that synchronizes the endpoint // to the corresponding k8s CiliumEndpoint CRD. It is expected that each CEP @@ -130,34 +130,51 @@ func (epSync *EndpointSynchronizer) RunK8sCiliumEndpointSync(e *endpoint.Endpoin return nil } - scopedLog.Debug("Deleting CEP during an initialization") - err := ciliumClient.CiliumEndpoints(namespace).Delete(ctx, podName, meta_v1.DeleteOptions{}) + scopedLog.Debug("Getting CEP during an initialization") + localCEP, err = ciliumClient.CiliumEndpoints(namespace).Get(ctx, podName, meta_v1.GetOptions{}) // It's only an error if it exists but something else happened - if err != nil && !k8serrors.IsNotFound(err) { - scopedLog.WithError(err).Warn("Error deleting CEP") - return err - } + switch { + case k8serrors.IsNotFound(err): + pod := e.GetPod() + if pod == nil { + scopedLog.Debug("Skipping CiliumEndpoint update because it has no k8s pod") + return nil + } - // We can't create localCEP directly, it must come from the k8s - // server via an API call. - cep := &cilium_v2.CiliumEndpoint{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: podName, - }, - } - if mdl != nil { - cep.Status = *mdl - } - localCEP, err = ciliumClient.CiliumEndpoints(namespace).Create(ctx, cep, meta_v1.CreateOptions{}) - if err != nil { - scopedLog.WithError(err).Error("Cannot create CEP") + // We can't create localCEP directly, it must come from the k8s + // server via an API call. + cep := &cilium_v2.CiliumEndpoint{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: podName, + OwnerReferences: []meta_v1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: pod.GetObjectMeta().GetName(), + UID: pod.GetObjectMeta().GetUID(), + }, + }, + }, + } + if mdl != nil { + cep.Status = *mdl + } + localCEP, err = ciliumClient.CiliumEndpoints(namespace).Create(ctx, cep, meta_v1.CreateOptions{}) + if err != nil { + scopedLog.WithError(err).Error("Cannot create CEP") + return err + } + + // We have successfully created the CEP and can return. Subsequent + // runs will update using localCEP. + needInit = false + return nil + case err != nil: + scopedLog.WithError(err).Warn("Error getting CEP") return err - } + default: - // We have successfully created the CEP and can return. Subsequent - // runs will update using localCEP. - needInit = false - return nil + } } // We have no localCEP copy. We need to fetch it for updates, below.