Skip to content

Commit

Permalink
garp: Switch processor to use cilium EndpointManager
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Pashmfouroush <mark@isovalent.com>
  • Loading branch information
markpash authored and joestringer committed Jun 15, 2023
1 parent e10961d commit 16a5ba1
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 200 deletions.
19 changes: 0 additions & 19 deletions pkg/datapath/garp/cells.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,8 @@ package garp

import (
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"

"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/hive/cell"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/resource"
"github.com/cilium/cilium/pkg/k8s/utils"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
)

const (
Expand All @@ -34,24 +27,12 @@ func (def Config) Flags(flags *pflag.FlagSet) {
flags.Bool(EnableL2PodAnnouncements, def.EnableL2PodAnnouncements, "Enable announcing Pod IPs with Gratuitous ARP")
}

var localPodsCell = cell.ProvidePrivate(func(lc hive.Lifecycle, c k8sClient.Clientset) (resource.Resource[*corev1.Pod], error) {
if !c.IsEnabled() {
return nil, nil
}

lw := utils.ListerWatcherWithFields(utils.ListerWatcherFromTyped[*corev1.PodList](
c.CoreV1().Pods("")), fields.OneTermEqualSelector("spec.nodeName", nodeTypes.GetName()),
)
return resource.New[*corev1.Pod](lc, lw), nil
})

// Cell processes k8s pod events for the local node and determines if a
// Gratuitous ARP packet needs to be sent.
var Cell = cell.Module(
"l2-pod-announcements-garp",
"GARP processor sends gratuitous ARP packets for local pods",

localPodsCell,
cell.Provide(newGARPSender),

// This cell can't have a default config, it's entirely env dependent.
Expand Down
136 changes: 38 additions & 98 deletions pkg/datapath/garp/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,150 +4,90 @@
package garp

import (
"context"
"net/netip"

"github.com/cilium/workerpool"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"

"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/endpoint"
"github.com/cilium/cilium/pkg/endpointmanager"
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/k8s/resource"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging/logfields"
)

type Processor interface {
Start(hive.HookContext) error
Stop(hive.HookContext) error
}

type processorParams struct {
cell.In

Logger logrus.FieldLogger
Lifecycle hive.Lifecycle
Pods resource.Resource[*corev1.Pod]
GARPSender Sender
Config Config
Logger logrus.FieldLogger
EndpointManager endpointmanager.EndpointManager
GARPSender Sender
Config Config
}

func newGARPProcessor(p processorParams) Processor {
func newGARPProcessor(p processorParams) *processor {
if !p.Config.EnableL2PodAnnouncements {
return nil
}

if p.Pods == nil {
return nil
}

gp := &processor{
log: p.Logger,
pods: p.Pods,
garpSender: p.GARPSender,
podIPsState: make(map[resource.Key]netip.Addr),
endpointIPs: make(map[uint16]netip.Addr),
}

p.Lifecycle.Append(gp)
if p.EndpointManager != nil {
p.EndpointManager.Subscribe(gp)
}

p.Logger.Info("initialised gratuitous arp processor")

return gp
}

type processor struct {
wp *workerpool.WorkerPool
mu lock.Mutex

log logrus.FieldLogger
pods resource.Resource[*corev1.Pod]
garpSender Sender

podIPsState map[resource.Key]netip.Addr
}

func (gp *processor) Start(hive.HookContext) error {
gp.wp = workerpool.New(1)
gp.wp.Submit("GARPProcessorLoop", gp.run)
return nil
}

func (gp *processor) Stop(hive.HookContext) error {
gp.wp.Close()
return nil
endpointIPs map[uint16]netip.Addr
}

func (gp *processor) run(ctx context.Context) error {
pods := gp.pods.Events(ctx)

for pods != nil {
event, ok := <-pods
if !ok {
pods = nil
continue
}
var _ endpointmanager.Subscriber = &processor{}

if event.Kind == resource.Upsert {
if err := gp.upsert(&event); err != nil {
event.Done(err)
continue
}
}
// EndpointCreated implements endpointmanager.Subscriber
func (gp *processor) EndpointCreated(ep *endpoint.Endpoint) {
gp.mu.Lock()
defer gp.mu.Unlock()

if event.Kind == resource.Delete {
delete(gp.podIPsState, event.Key)
}

event.Done(nil)
newIP := ep.IPv4
if newIP.IsUnspecified() {
return
}

return nil
}

func (gp *processor) upsert(event *resource.Event[*corev1.Pod]) error {
if event.Object.Status.PodIPs == nil {
return nil
}

newIP := getPodIPv4(event.Object.Status.PodIPs)
if !newIP.IsValid() {
return nil
}

oldIP, ok := gp.podIPsState[event.Key]
oldIP, ok := gp.endpointIPs[ep.ID]
if ok && oldIP == newIP {
return nil
return
}

gp.podIPsState[event.Key] = newIP
gp.endpointIPs[ep.ID] = newIP

if err := gp.garpSender.Send(newIP); err != nil {
return err
}

gp.log.WithFields(logrus.Fields{
logfields.K8sPodName: event.Key.Name,
log := gp.log.WithFields(logrus.Fields{
logfields.K8sPodName: ep.K8sPodName,
logfields.IPAddr: newIP,
}).Debug("pod upsert gratuitous arp sent")
})

return nil
if err := gp.garpSender.Send(newIP); err != nil {
log.WithError(err).Warn("Failed to send gratuitous arp")
} else {
log.Debug("pod upsert gratuitous arp sent")
}
}

// getPodIPv4 returns the IPv4 address from the given Pod IPs, if
// available.
func getPodIPv4(podIPs []corev1.PodIP) netip.Addr {
for _, podIP := range podIPs {
ip, err := netip.ParseAddr(podIP.IP)
if err != nil {
continue
}

ip = ip.Unmap()
if ip.Is4() {
// Valid v4 address found, return it.
return ip
}
}
// EndpointDeleted implements endpointmanager.Subscriber
func (gp *processor) EndpointDeleted(ep *endpoint.Endpoint, conf endpoint.DeleteConfig) {
gp.mu.Lock()
defer gp.mu.Unlock()

// No valid v4 address found, return the zero value.
return netip.Addr{}
delete(gp.endpointIPs, ep.ID)
}
109 changes: 26 additions & 83 deletions pkg/datapath/garp/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,12 @@ import (
"time"

. "github.com/cilium/checkmate"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientv1 "k8s.io/client-go/applyconfigurations/core/v1"

"github.com/cilium/cilium/pkg/checker"
"github.com/cilium/cilium/pkg/endpoint"
"github.com/cilium/cilium/pkg/endpointmanager"
"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/hive/cell"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/resource"
)

// fakeSender mocks the GARP Sender, allowing for a feedback channel.
Expand All @@ -35,33 +32,24 @@ func (s *garpSuite) TestProcessorCell(c *C) {

// These allow us to inspect the state of the processor cell.
garpSent := make(chan netip.Addr)
var processorState *map[resource.Key]netip.Addr

_, cs := k8sClient.NewFakeClientset()
var processorState *processor

h := hive.New(cell.Module(
"test-garp-processor-cell",
"TestProcessorCell",

// Provide the mock k8s client cell.
cell.Provide(func() k8sClient.Clientset { return cs }),
cell.Config(Config{}),

// Provide the mock GARP Sender cell, passing in feedback
// channel.
cell.Provide(func() Sender { return &fakeSender{sent: garpSent} }),

cell.Config(Config{}),

localPodsCell,
cell.Provide(func() endpointmanager.EndpointManager { return nil }),
cell.Provide(newGARPProcessor),

// Force invocation.
cell.Invoke(func(ep Processor) {
e, _ := ep.(*processor)
c.Assert(e, NotNil)
// Here we keep a reference to the internal pod IP state in
// the parent scope so we can inspect later.
processorState = &e.podIPsState
cell.Invoke(func(p *processor) {
// Here we keep a reference to processor for inspection.
processorState = p
}),
))

Expand Down Expand Up @@ -91,87 +79,42 @@ func (s *garpSuite) TestProcessorCell(c *C) {

// checkState is a helper function that asserts that the GARP
// processor state matches the given desired state.
checkState := func(desired map[string]string) {
c.Assert(*processorState, HasLen, len(desired))
desiredState := make(map[resource.Key]netip.Addr)
for name, ip := range desired {
desiredState[resource.Key{Name: name, Namespace: "default"}] = netip.MustParseAddr(ip)
checkState := func(desired map[uint16]string) {
c.Assert(processorState.endpointIPs, HasLen, len(desired))
desiredState := make(map[uint16]netip.Addr)
for id, ip := range desired {
desiredState[id] = netip.MustParseAddr(ip)
}
c.Assert(*processorState, checker.DeepEquals, desiredState)
c.Assert(processorState.endpointIPs, checker.DeepEquals, desiredState)
}

// Create a Pod. This should sent a GARP packet, and should present
// an item in the state.
podOne := makePod(c, cs, "pod-1", "1.2.3.4")
// Create an endpoint. This should sent a GARP packet, and should
// present an item in the state.
go processorState.EndpointCreated(&endpoint.Endpoint{ID: 1, IPv4: netip.MustParseAddr("1.2.3.4")})
garpEvent := getGARPEvent()
c.Assert(garpEvent, NotNil) // GARP packet sent
c.Assert(garpEvent.String(), Equals, "1.2.3.4")
checkState(map[string]string{"pod-1": "1.2.3.4"})
checkState(map[uint16]string{1: "1.2.3.4"})

// Update the previous Pod with the same IP. This should not send
// Update the previous endpoint with the same IP. This should not send
// any GARP packets or change the state.
_ = updatePodIP(c, cs, podOne, "1.2.3.4")
go processorState.EndpointCreated(&endpoint.Endpoint{ID: 1, IPv4: netip.MustParseAddr("1.2.3.4")})
garpEvent = getGARPEvent()
c.Assert(garpEvent, IsNil) // NO GARP packet sent
checkState(map[string]string{"pod-1": "1.2.3.4"})
checkState(map[uint16]string{1: "1.2.3.4"})

// Update the previous Pod with a new IP. This should send a new
// Update the previous endpoint with a new IP. This should send a new
// GARP packet and the state should reflect the new IP.
_ = updatePodIP(c, cs, podOne, "4.3.2.1")
go processorState.EndpointCreated(&endpoint.Endpoint{ID: 1, IPv4: netip.MustParseAddr("4.3.2.1")})
garpEvent = getGARPEvent()
c.Assert(garpEvent, NotNil) // GARP packet sent
c.Assert(garpEvent.String(), Equals, "4.3.2.1")
checkState(map[string]string{"pod-1": "4.3.2.1"})
checkState(map[uint16]string{1: "4.3.2.1"})

// Delete the previous Pod. This should not send any GARP packets,
// and the pod should no longer be present in the state.
deletePod(c, cs, "pod-1")
go processorState.EndpointDeleted(&endpoint.Endpoint{ID: 1, IPv4: netip.MustParseAddr("4.3.2.1")}, endpoint.DeleteConfig{})
garpEvent = getGARPEvent()
c.Assert(garpEvent, IsNil) // NO GARP packet sent
checkState(map[string]string{})
}

func (s *garpSuite) TestPodIPv4ParseFunc(c *C) {
c.Assert(getPodIPv4([]corev1.PodIP{}), Equals, netip.Addr{})
c.Assert(getPodIPv4([]corev1.PodIP{{IP: "::1"}}), Equals, netip.Addr{})
c.Assert(getPodIPv4([]corev1.PodIP{{IP: "::1"}, {IP: "1.2.3.4"}}), Equals, netip.MustParseAddr("1.2.3.4"))
c.Assert(getPodIPv4([]corev1.PodIP{{IP: "1.2.3.4"}, {IP: "::1"}}), Equals, netip.MustParseAddr("1.2.3.4"))
c.Assert(getPodIPv4([]corev1.PodIP{{IP: "1.2.3.4"}}), Equals, netip.MustParseAddr("1.2.3.4"))
}

// makePod makes a test pod with the provided IP.
func makePod(c *C, cs k8sClient.Clientset, name string, ip string) *corev1.Pod {
addr := netip.MustParseAddr(ip).String()

podDefinition := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "default"},
Status: corev1.PodStatus{PodIP: addr, PodIPs: []corev1.PodIP{{IP: addr}}},
}

pod, err := cs.CoreV1().Pods("default").Create(context.Background(), &podDefinition, metav1.CreateOptions{})
c.Assert(err, IsNil)

return pod
}

// updatePodIP updates the status field of given pod with a new IP and applies it.
func updatePodIP(c *C, cs k8sClient.Clientset, pod *corev1.Pod, ip string) *corev1.Pod {
addr := netip.MustParseAddr(ip).String()

extractedPod, err := clientv1.ExtractPod(pod, "")
c.Assert(err, IsNil)

podStatusApply := clientv1.PodStatus().WithPodIP(addr).WithPodIPs(clientv1.PodIP().WithIP(addr))
podApply := extractedPod.WithStatus(podStatusApply)

updated, err := cs.CoreV1().Pods("default").ApplyStatus(context.Background(), podApply, metav1.ApplyOptions{})
c.Assert(err, IsNil)

return updated
}

// deletePod deletes the given pod.
func deletePod(c *C, cs k8sClient.Clientset, pod string) {
err := cs.CoreV1().Pods("default").Delete(context.Background(), "pod-1", metav1.DeleteOptions{})
c.Assert(err, IsNil)
checkState(map[uint16]string{})
}

0 comments on commit 16a5ba1

Please sign in to comment.