Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pod as an owner of a CiliumEndpoint and remove useless Delete #11195

Merged
merged 4 commits into from May 1, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 10 additions & 6 deletions daemon/cmd/endpoint.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cilium/cilium/pkg/endpoint/regeneration"
"github.com/cilium/cilium/pkg/k8s"
k8sConst "github.com/cilium/cilium/pkg/k8s/apis/cilium.io"
"github.com/cilium/cilium/pkg/k8s/types"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/labelsfilter"
"github.com/cilium/cilium/pkg/logging/logfields"
Expand Down Expand Up @@ -151,24 +152,26 @@ func NewPutEndpointIDHandler(d *Daemon) PutEndpointIDHandler {

// fetchK8sLabelsAndAnnotations wraps the k8s package to fetch and provide
// endpoint metadata. It implements endpoint.MetadataResolverCB.
func (d *Daemon) fetchK8sLabelsAndAnnotations(nsName, podName string) (labels.Labels, labels.Labels, map[string]string, error) {
// The returned pod is deepcopied which means the its fields can be written
// into.
func (d *Daemon) fetchK8sLabelsAndAnnotations(nsName, podName string) (*types.Pod, labels.Labels, labels.Labels, map[string]string, error) {
aanm marked this conversation as resolved.
Show resolved Hide resolved
p, err := d.k8sWatcher.GetCachedPod(nsName, podName)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
ns, err := d.k8sWatcher.GetCachedNamespace(nsName)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

lbls, annotations, err := k8s.GetPodMetadata(ns, p)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

k8sLbls := labels.Map2Labels(lbls, labels.LabelSourceK8s)
identityLabels, infoLabels := labelsfilter.Filter(k8sLbls)
return identityLabels, infoLabels, annotations, nil
return p, identityLabels, infoLabels, annotations, nil
}

func invalidDataError(ep *endpoint.Endpoint, err error) (*endpoint.Endpoint, int, error) {
Expand Down Expand Up @@ -262,10 +265,11 @@ func (d *Daemon) createEndpoint(ctx context.Context, epTemplate *models.Endpoint
}

if ep.K8sNamespaceAndPodNameIsSet() && k8s.IsEnabled() {
identityLabels, info, _, err := d.fetchK8sLabelsAndAnnotations(ep.K8sNamespace, ep.K8sPodName)
pod, identityLabels, info, _, err := d.fetchK8sLabelsAndAnnotations(ep.K8sNamespace, ep.K8sPodName)
if err != nil {
ep.Logger("api").WithError(err).Warning("Unable to fetch kubernetes labels")
} else {
ep.SetPod(pod)
addLabels.MergeLabels(identityLabels)
infoLabels.MergeLabels(info)
}
Expand Down
9 changes: 1 addition & 8 deletions daemon/cmd/state.go
Expand Up @@ -22,7 +22,6 @@ import (
"os"
"sync"

"github.com/cilium/cilium/pkg/annotation"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/datapath/connector"
"github.com/cilium/cilium/pkg/endpoint"
Expand Down Expand Up @@ -92,13 +91,7 @@ func (d *Daemon) validateEndpoint(ep *endpoint.Endpoint) (valid bool, err error)
return
}

ep.UpdateVisibilityPolicy(func(ns, podName string) (proxyVisibility string, err error) {
_, _, annotations, err := d.fetchK8sLabelsAndAnnotations(ns, podName)
if err != nil {
return "", err
}
return annotations[annotation.ProxyVisibility], nil
})
ep.RunMetadataResolver(d.fetchK8sLabelsAndAnnotations)
}()
}

Expand Down
26 changes: 23 additions & 3 deletions pkg/endpoint/endpoint.go
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/identity/identitymanager"
ciliumio "github.com/cilium/cilium/pkg/k8s/apis/cilium.io"
"github.com/cilium/cilium/pkg/k8s/types"
"github.com/cilium/cilium/pkg/labels"
pkgLabels "github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/lock"
Expand Down Expand Up @@ -211,6 +212,9 @@ type Endpoint struct {
// K8sNamespace is the Kubernetes namespace of the endpoint
K8sNamespace string

// pod
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💎

pod *types.Pod

// policyRevision is the policy revision this endpoint is currently on
// to modify this field please use endpoint.setPolicyRevision instead
policyRevision uint64
Expand Down Expand Up @@ -1047,6 +1051,21 @@ func (e *Endpoint) GetK8sNamespace() string {
return ns
}

// SetPod sets the pod related to this endpoint.
func (e *Endpoint) SetPod(pod *types.Pod) {
e.unconditionalLock()
e.pod = pod
e.unlock()
}

// GetPod retrieves the pod related to this endpoint
func (e *Endpoint) GetPod() *types.Pod {
e.unconditionalRLock()
pod := e.pod
e.runlock()
return pod
}

// SetK8sNamespace modifies the endpoint's pod name
func (e *Endpoint) SetK8sNamespace(name string) {
e.unconditionalLock()
Expand Down Expand Up @@ -1459,7 +1478,7 @@ func APICanModify(e *Endpoint) error {

// MetadataResolverCB provides an implementation for resolving the endpoint
// metadata for an endpoint such as the associated labels and annotations.
type MetadataResolverCB func(ns, podName string) (identityLabels labels.Labels, infoLabels labels.Labels, annotations map[string]string, err error)
type MetadataResolverCB func(ns, podName string) (pod *types.Pod, identityLabels labels.Labels, infoLabels labels.Labels, annotations map[string]string, err error)

// RunMetadataResolver starts a controller associated with the received
// endpoint which will periodically attempt to resolve the metadata for the
Expand All @@ -1481,13 +1500,14 @@ func (e *Endpoint) RunMetadataResolver(resolveMetadata MetadataResolverCB) {
controller.ControllerParams{
DoFunc: func(ctx context.Context) error {
ns, podName := e.GetK8sNamespace(), e.GetK8sPodName()
identityLabels, info, _, err := resolveMetadata(ns, podName)
pod, identityLabels, info, _, err := resolveMetadata(ns, podName)
if err != nil {
e.Logger(controllerName).WithError(err).Warning("Unable to fetch kubernetes labels")
return err
}
e.SetPod(pod)
e.UpdateVisibilityPolicy(func(_, _ string) (proxyVisibility string, err error) {
_, _, annotations, err := resolveMetadata(ns, podName)
_, _, _, annotations, err := resolveMetadata(ns, podName)
if err != nil {
return "", err
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/k8s/watchers/endpointsynchronizer.go
Expand Up @@ -135,11 +135,25 @@ func (epSync *EndpointSynchronizer) RunK8sCiliumEndpointSync(e *endpoint.Endpoin
// It's only an error if it exists but something else happened
switch {
case k8serrors.IsNotFound(err):
pod := e.GetPod()
if pod == nil {
scopedLog.Debug("Skipping CiliumEndpoint update because it has no k8s pod")
return nil
}

// We can't create localCEP directly, it must come from the k8s
// server via an API call.
cep := &cilium_v2.CiliumEndpoint{
ObjectMeta: meta_v1.ObjectMeta{
Name: podName,
OwnerReferences: []meta_v1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: pod.GetObjectMeta().GetName(),
UID: pod.GetObjectMeta().GetUID(),
},
},
},
}
if mdl != nil {
Expand Down