Skip to content

Commit

Permalink
pkg/endpoint: set Pod as the endpoint owner of a CiliumEndpoint
Browse files Browse the repository at this point in the history
As a CiliumEndpoint can't exist without a Pod, we should set the Pod as
an Owner of a particular endpoint.

This will allow CiliumEndpoints from being automatically GCed by K8s
once the Pod is gone.

Signed-off-by: André Martins <andre@cilium.io>
  • Loading branch information
aanm committed Apr 28, 2020
1 parent d9c7efc commit 60765fa
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 17 deletions.
14 changes: 8 additions & 6 deletions daemon/cmd/endpoint.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cilium/cilium/pkg/endpoint/regeneration"
"github.com/cilium/cilium/pkg/k8s"
k8sConst "github.com/cilium/cilium/pkg/k8s/apis/cilium.io"
"github.com/cilium/cilium/pkg/k8s/types"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/labelsfilter"
"github.com/cilium/cilium/pkg/logging/logfields"
Expand Down Expand Up @@ -151,24 +152,24 @@ func NewPutEndpointIDHandler(d *Daemon) PutEndpointIDHandler {

// fetchK8sLabelsAndAnnotations wraps the k8s package to fetch and provide
// endpoint metadata. It implements endpoint.MetadataResolverCB.
func (d *Daemon) fetchK8sLabelsAndAnnotations(nsName, podName string) (labels.Labels, labels.Labels, map[string]string, error) {
func (d *Daemon) fetchK8sLabelsAndAnnotations(nsName, podName string) (*types.Pod, labels.Labels, labels.Labels, map[string]string, error) {
p, err := d.k8sWatcher.GetCachedPod(nsName, podName)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
ns, err := d.k8sWatcher.GetCachedNamespace(nsName)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

lbls, annotations, err := k8s.GetPodMetadata(ns, p)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

k8sLbls := labels.Map2Labels(lbls, labels.LabelSourceK8s)
identityLabels, infoLabels := labelsfilter.Filter(k8sLbls)
return identityLabels, infoLabels, annotations, nil
return p, identityLabels, infoLabels, annotations, nil
}

func invalidDataError(ep *endpoint.Endpoint, err error) (*endpoint.Endpoint, int, error) {
Expand Down Expand Up @@ -262,10 +263,11 @@ func (d *Daemon) createEndpoint(ctx context.Context, epTemplate *models.Endpoint
}

if ep.K8sNamespaceAndPodNameIsSet() && k8s.IsEnabled() {
identityLabels, info, _, err := d.fetchK8sLabelsAndAnnotations(ep.K8sNamespace, ep.K8sPodName)
pod, identityLabels, info, _, err := d.fetchK8sLabelsAndAnnotations(ep.K8sNamespace, ep.K8sPodName)
if err != nil {
ep.Logger("api").WithError(err).Warning("Unable to fetch kubernetes labels")
} else {
ep.SetPod(pod)
addLabels.MergeLabels(identityLabels)
infoLabels.MergeLabels(info)
}
Expand Down
9 changes: 1 addition & 8 deletions daemon/cmd/state.go
Expand Up @@ -22,7 +22,6 @@ import (
"os"
"sync"

"github.com/cilium/cilium/pkg/annotation"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/datapath/connector"
"github.com/cilium/cilium/pkg/endpoint"
Expand Down Expand Up @@ -92,13 +91,7 @@ func (d *Daemon) validateEndpoint(ep *endpoint.Endpoint) (valid bool, err error)
return
}

ep.UpdateVisibilityPolicy(func(ns, podName string) (proxyVisibility string, err error) {
_, _, annotations, err := d.fetchK8sLabelsAndAnnotations(ns, podName)
if err != nil {
return "", err
}
return annotations[annotation.ProxyVisibility], nil
})
ep.RunMetadataResolver(d.fetchK8sLabelsAndAnnotations)
}()
}

Expand Down
26 changes: 23 additions & 3 deletions pkg/endpoint/endpoint.go
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/identity/identitymanager"
ciliumio "github.com/cilium/cilium/pkg/k8s/apis/cilium.io"
"github.com/cilium/cilium/pkg/k8s/types"
"github.com/cilium/cilium/pkg/labels"
pkgLabels "github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/lock"
Expand Down Expand Up @@ -211,6 +212,9 @@ type Endpoint struct {
// K8sNamespace is the Kubernetes namespace of the endpoint
K8sNamespace string

// pod
pod *types.Pod

// policyRevision is the policy revision this endpoint is currently on
// to modify this field please use endpoint.setPolicyRevision instead
policyRevision uint64
Expand Down Expand Up @@ -1047,6 +1051,21 @@ func (e *Endpoint) GetK8sNamespace() string {
return ns
}

// SetPod sets the pod related to this endpoint.
func (e *Endpoint) SetPod(pod *types.Pod) {
e.unconditionalLock()
e.pod = pod
e.unlock()
}

// GetPod retrieves the pod related to this endpoint
func (e *Endpoint) GetPod() *types.Pod {
e.unconditionalRLock()
pod := e.pod
e.runlock()
return pod
}

// SetK8sNamespace modifies the endpoint's pod name
func (e *Endpoint) SetK8sNamespace(name string) {
e.unconditionalLock()
Expand Down Expand Up @@ -1459,7 +1478,7 @@ func APICanModify(e *Endpoint) error {

// MetadataResolverCB provides an implementation for resolving the endpoint
// metadata for an endpoint such as the associated labels and annotations.
type MetadataResolverCB func(ns, podName string) (identityLabels labels.Labels, infoLabels labels.Labels, annotations map[string]string, err error)
type MetadataResolverCB func(ns, podName string) (pod *types.Pod, identityLabels labels.Labels, infoLabels labels.Labels, annotations map[string]string, err error)

// RunMetadataResolver starts a controller associated with the received
// endpoint which will periodically attempt to resolve the metadata for the
Expand All @@ -1481,13 +1500,14 @@ func (e *Endpoint) RunMetadataResolver(resolveMetadata MetadataResolverCB) {
controller.ControllerParams{
DoFunc: func(ctx context.Context) error {
ns, podName := e.GetK8sNamespace(), e.GetK8sPodName()
identityLabels, info, _, err := resolveMetadata(ns, podName)
pod, identityLabels, info, _, err := resolveMetadata(ns, podName)
if err != nil {
e.Logger(controllerName).WithError(err).Warning("Unable to fetch kubernetes labels")
return err
}
e.SetPod(pod)
e.UpdateVisibilityPolicy(func(_, _ string) (proxyVisibility string, err error) {
_, _, annotations, err := resolveMetadata(ns, podName)
_, _, _, annotations, err := resolveMetadata(ns, podName)
if err != nil {
return "", err
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/k8s/watchers/endpointsynchronizer.go
Expand Up @@ -135,11 +135,25 @@ func (epSync *EndpointSynchronizer) RunK8sCiliumEndpointSync(e *endpoint.Endpoin
// It's only an error if it exists but something else happened
switch {
case k8serrors.IsNotFound(err):
pod := e.GetPod()
if pod == nil {
scopedLog.Debug("Skipping CiliumEndpoint update because it has no k8s pod")
return nil
}

// We can't create localCEP directly, it must come from the k8s
// server via an API call.
cep := &cilium_v2.CiliumEndpoint{
ObjectMeta: meta_v1.ObjectMeta{
Name: podName,
OwnerReferences: []meta_v1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: pod.GetObjectMeta().GetName(),
UID: pod.GetObjectMeta().GetUID(),
},
},
},
}
if mdl != nil {
Expand Down

0 comments on commit 60765fa

Please sign in to comment.