Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/k8s: add pod IP event change #16190

Merged
merged 1 commit into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ generate-k8s-api: ## Generate Cilium k8s API client, deepcopy and deepequal Go s
ipam:types\
alibabacloud:types\
k8s:types\
k8s:utils\
maps:ctmap\
maps:encrypt\
maps:eppolicymap\
Expand Down
3 changes: 3 additions & 0 deletions pkg/k8s/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,6 @@ func (in *CiliumEndpoint) DeepEqual(other *CiliumEndpoint) bool {

return in.deepEqual(other)
}

// +deepequal-gen=true
type IPSlice []string
20 changes: 20 additions & 0 deletions pkg/k8s/types/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions pkg/k8s/types/zz_generated.deepequal.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

87 changes: 61 additions & 26 deletions pkg/k8s/watchers/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cilium/cilium/pkg/k8s/informer"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
slim_metav1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1"
k8sTypes "github.com/cilium/cilium/pkg/k8s/types"
k8sUtils "github.com/cilium/cilium/pkg/k8s/utils"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/labels"
Expand Down Expand Up @@ -231,10 +232,6 @@ func (k *K8sWatcher) addK8sPodV1(pod *slim_corev1.Pod) error {
if len(podIPs) > 0 {
err = k.updatePodHostData(nil, pod, nil, podIPs)

// There might be duplicate callbacks here since this function is also
// called from updateK8sPodV1, the consumer will need to handle the duplicate
// events accordingly.
// GH issue #13136.
if option.Config.EnableLocalRedirectPolicy {
k.redirectPolicyManager.OnAddPod(pod)
}
Expand All @@ -253,6 +250,17 @@ func (k *K8sWatcher) updateK8sPodV1(oldK8sPod, newK8sPod *slim_corev1.Pod) error
return nil
}

logger := log.WithFields(logrus.Fields{
logfields.K8sPodName: newK8sPod.ObjectMeta.Name,
logfields.K8sNamespace: newK8sPod.ObjectMeta.Namespace,
"new-podIP": newK8sPod.Status.PodIP,
"new-podIPs": newK8sPod.Status.PodIPs,
"new-hostIP": newK8sPod.Status.PodIP,
"old-podIP": oldK8sPod.Status.PodIP,
"old-podIPs": oldK8sPod.Status.PodIPs,
"old-hostIP": oldK8sPod.Status.PodIP,
})

// In Kubernetes Jobs, Pods can be left in Kubernetes until the Job
// is deleted. If the Job is never deleted, Cilium will never receive a Pod
// delete event, causing the IP to be left in the ipcache.
Expand All @@ -263,10 +271,19 @@ func (k *K8sWatcher) updateK8sPodV1(oldK8sPod, newK8sPod *slim_corev1.Pod) error
return k.deleteK8sPodV1(newK8sPod)
}

// The pod IP can never change, it can only switch from unassigned to
// assigned
// Process IP updates
k.addK8sPodV1(newK8sPod)
if newK8sPod.Spec.HostNetwork {
aditighag marked this conversation as resolved.
Show resolved Hide resolved
logger.Debug("Pod is using host networking")
return nil
}

oldPodIPs := k8sUtils.ValidIPs(oldK8sPod.Status)
newPodIPs := k8sUtils.ValidIPs(newK8sPod.Status)
err := k.updatePodHostData(oldK8sPod, newK8sPod, oldPodIPs, newPodIPs)
aditighag marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
logger.WithError(err).Warning("Unable to update ipcache map entry on pod update")
return err
}

// Check annotation updates.
oldAnno := oldK8sPod.ObjectMeta.Annotations
Expand Down Expand Up @@ -584,6 +601,7 @@ func (k *K8sWatcher) upsertHostPortMapping(oldPod, newPod *slim_corev1.Pod, oldP
if !option.Config.EnableHostPort {
return nil
}

var svcsAdded []loadbalancer.L3n4Addr

logger := log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -674,24 +692,29 @@ func (k *K8sWatcher) deleteHostPortMapping(pod *slim_corev1.Pod, podIPs []string
return nil
}

func (k *K8sWatcher) updatePodHostData(oldPod, newPod *slim_corev1.Pod, oldPodIPs, newPodIPs []string) error {
func (k *K8sWatcher) updatePodHostData(oldPod, newPod *slim_corev1.Pod, oldPodIPs, newPodIPs k8sTypes.IPSlice) error {
var namedPortsChanged bool

ipSliceEqual := oldPodIPs != nil && oldPodIPs.DeepEqual(&newPodIPs)

defer func() {
// delete all IPs that were not added regardless if the insertion of the
// entry in the ipcache map was successful or not because we will not
// receive any other event with these old IP addresses.
for _, oldPodIP := range oldPodIPs {
var found bool
for _, newPodIP := range newPodIPs {
if newPodIP == oldPodIP {
found = true
break
if !ipSliceEqual {
// delete all IPs that were not added regardless if the insertion of the
// entry in the ipcache map was successful or not because we will not
// receive any other event with these old IP addresses.
for _, oldPodIP := range oldPodIPs {
var found bool
for _, newPodIP := range newPodIPs {
if newPodIP == oldPodIP {
found = true
break
}
}
}
if !found {
npc := ipcache.IPIdentityCache.Delete(oldPodIP, source.Kubernetes)
if npc {
namedPortsChanged = true
if !found {
npc := ipcache.IPIdentityCache.Delete(oldPodIP, source.Kubernetes)
if npc {
namedPortsChanged = true
}
}
}
}
Expand All @@ -702,9 +725,21 @@ func (k *K8sWatcher) updatePodHostData(oldPod, newPod *slim_corev1.Pod, oldPodIP
}
}()

err := k.upsertHostPortMapping(oldPod, newPod, oldPodIPs, newPodIPs)
if err != nil {
return fmt.Errorf("cannot upsert hostPort for PodIPs: %s", newPodIPs)
specEqual := oldPod != nil && newPod.Spec.DeepEqual(&oldPod.Spec)
hostIPEqual := oldPod != nil && newPod.Status.HostIP != oldPod.Status.HostIP

// only upsert HostPort Mapping if spec or ip slice is different
if !specEqual || !ipSliceEqual {
err := k.upsertHostPortMapping(oldPod, newPod, oldPodIPs, newPodIPs)
if err != nil {
return fmt.Errorf("cannot upsert hostPort for PodIPs: %s", newPodIPs)
aditighag marked this conversation as resolved.
Show resolved Hide resolved
}
}

// is spec and hostIPs are the same there no need to perform the remaining
// operations
if specEqual && hostIPEqual {
return nil
}

hostIP := net.ParseIP(newPod.Status.HostIP)
Expand Down