Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loader: clean up tcx bpf_links created by newer Cilium versions #31553

Merged
merged 2 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/bpf/bpffs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package bpf

import (
"errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -64,6 +65,15 @@ func MkdirBPF(path string) error {
return os.MkdirAll(path, 0755)
}

// Remove path ignoring ErrNotExist.
func Remove(path string) error {
err := os.RemoveAll(path)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("removing bpffs directory at %s: %w", path, err)
}
return err
}

func tcPathFromMountInfo(name string) string {
readMountInfo.Do(func() {
mountInfos, err := mountinfo.GetMountInfo()
Expand Down
14 changes: 13 additions & 1 deletion pkg/datapath/loader/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,19 @@ func (l *loader) reinitializeIPSec(ctx context.Context) error {
log.WithError(err).WithField(logfields.Interface, iface).Warn("Rpfilter could not be disabled, node to node encryption may fail")
}

finalize, err := replaceDatapath(ctx, iface, networkObj, progs, "")
device, err := netlink.LinkByName(iface)
if err != nil {
return fmt.Errorf("retrieving device %s: %w", iface, err)
}

finalize, err := replaceDatapath(ctx,
replaceDatapathOptions{
device: iface,
elf: networkObj,
programs: progs,
linkDir: bpffsDeviceLinksDir(bpf.CiliumPath(), device),
},
)
if err != nil {
log.WithField(logfields.Interface, iface).WithError(err).Error("Load encryption network failed")
// collect errors, but keep trying replacing other interfaces.
Expand Down
70 changes: 63 additions & 7 deletions pkg/datapath/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,23 @@ func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, o
// missing all tail calls.

// Replace programs on cilium_host.
host, err := netlink.LinkByName(ep.InterfaceName())
if err != nil {
return fmt.Errorf("retrieving device %s: %w", ep.InterfaceName(), err)
}

progs := []progDefinition{
{progName: symbolToHostEp, direction: dirIngress},
{progName: symbolFromHostEp, direction: dirEgress},
}
finalize, err := replaceDatapath(ctx, ep.InterfaceName(), objPath, progs, "")
finalize, err := replaceDatapath(ctx,
replaceDatapathOptions{
device: ep.InterfaceName(),
elf: objPath,
programs: progs,
linkDir: bpffsDeviceLinksDir(bpf.CiliumPath(), host),
},
)
if err != nil {
scopedLog := ep.Logger(subsystem).WithFields(logrus.Fields{
logfields.Path: objPath,
Expand All @@ -406,7 +418,8 @@ func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, o
defer finalize()

// Replace program on cilium_net.
if _, err := netlink.LinkByName(defaults.SecondHostDevice); err != nil {
net, err := netlink.LinkByName(defaults.SecondHostDevice)
if err != nil {
log.WithError(err).WithField("device", defaults.SecondHostDevice).Error("Link does not exist")
return fmt.Errorf("device '%s' not found: %w", defaults.SecondHostDevice, err)
}
Expand All @@ -420,7 +433,14 @@ func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, o
{progName: symbolToHostEp, direction: dirIngress},
}

finalize, err = replaceDatapath(ctx, defaults.SecondHostDevice, secondDevObjPath, progs, "")
finalize, err = replaceDatapath(ctx,
replaceDatapathOptions{
device: defaults.SecondHostDevice,
elf: secondDevObjPath,
programs: progs,
linkDir: bpffsDeviceLinksDir(bpf.CiliumPath(), net),
},
)
if err != nil {
scopedLog := ep.Logger(subsystem).WithFields(logrus.Fields{
logfields.Path: objPath,
Expand All @@ -435,7 +455,8 @@ func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, o

// Replace programs on physical devices.
for _, device := range option.Config.GetDevices() {
if _, err := netlink.LinkByName(device); err != nil {
iface, err := netlink.LinkByName(device)
if err != nil {
log.WithError(err).WithField("device", device).Warn("Link does not exist")
continue
}
Expand Down Expand Up @@ -465,7 +486,14 @@ func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, o
}
}

finalize, err := replaceDatapath(ctx, device, netdevObjPath, progs, "")
finalize, err := replaceDatapath(ctx,
replaceDatapathOptions{
device: device,
elf: netdevObjPath,
programs: progs,
linkDir: bpffsDeviceLinksDir(bpf.CiliumPath(), iface),
},
)
if err != nil {
scopedLog := ep.Logger(subsystem).WithFields(logrus.Fields{
logfields.Path: objPath,
Expand Down Expand Up @@ -514,7 +542,14 @@ func (l *loader) reloadDatapath(ctx context.Context, ep datapath.Endpoint, dirs
}
}

finalize, err := replaceDatapath(ctx, ep.InterfaceName(), objPath, progs, "")
finalize, err := replaceDatapath(ctx,
replaceDatapathOptions{
device: ep.InterfaceName(),
elf: objPath,
programs: progs,
linkDir: bpffsEndpointLinksDir(bpf.CiliumPath(), ep),
},
)
if err != nil {
scopedLog := ep.Logger(subsystem).WithFields(logrus.Fields{
logfields.Path: objPath,
Expand Down Expand Up @@ -555,12 +590,24 @@ func (l *loader) replaceOverlayDatapath(ctx context.Context, cArgs []string, ifa
log.WithError(err).Fatal("failed to compile overlay programs")
}

device, err := netlink.LinkByName(iface)
if err != nil {
return fmt.Errorf("retrieving device %s: %w", iface, err)
}

progs := []progDefinition{
{progName: symbolFromOverlay, direction: dirIngress},
{progName: symbolToOverlay, direction: dirEgress},
}

finalize, err := replaceDatapath(ctx, iface, overlayObj, progs, "")
finalize, err := replaceDatapath(ctx,
replaceDatapathOptions{
device: iface,
elf: overlayObj,
programs: progs,
linkDir: bpffsDeviceLinksDir(bpf.CiliumPath(), device),
},
)
if err != nil {
log.WithField(logfields.Interface, iface).WithError(err).Fatal("Load overlay network failed")
}
Expand Down Expand Up @@ -692,6 +739,15 @@ func (l *loader) Unload(ep datapath.Endpoint) {
removeEndpointRoute(ep, *iputil.AddrToIPNet(ip))
}
}

// If Cilium and the kernel support tcx to attach TC programs to the
// endpoint's veth device, its bpf_link object is pinned to a per-endpoint
// bpffs directory. When the endpoint gets deleted, removing the whole
// directory cleans up any pinned maps and links.
bpffsPath := bpffsEndpointDir(bpf.CiliumPath(), ep)
if err := bpf.Remove(bpffsPath); err != nil {
log.WithError(err).WithField(logfields.EndpointID, ep.StringID())
}
}

// EndpointHash hashes the specified endpoint configuration with the current
Expand Down
21 changes: 18 additions & 3 deletions pkg/datapath/loader/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,18 @@ func (s *LoaderTestSuite) TestReload(c *C) {
{progName: symbolFromEndpoint, direction: dirIngress},
{progName: symbolToEndpoint, direction: dirEgress},
}
finalize, err := replaceDatapath(ctx, ep.InterfaceName(), objPath, progs, "")
opts := replaceDatapathOptions{
device: ep.InterfaceName(),
elf: objPath,
programs: progs,
linkDir: testutils.TempBPFFS(c),
}
finalize, err := replaceDatapath(ctx, opts)
c.Assert(err, IsNil)
finalize()

finalize, err = replaceDatapath(ctx, ep.InterfaceName(), objPath, progs, "")
finalize, err = replaceDatapath(ctx, opts)

c.Assert(err, IsNil)
finalize()
}
Expand Down Expand Up @@ -330,10 +337,18 @@ func BenchmarkReplaceDatapath(b *testing.B) {
}

objPath := fmt.Sprintf("%s/%s", dirInfo.Output, endpointObj)
linkDir := testutils.TempBPFFS(b)
progs := []progDefinition{{progName: symbolFromEndpoint, direction: dirIngress}}
b.ResetTimer()
for i := 0; i < b.N; i++ {
finalize, err := replaceDatapath(ctx, ep.InterfaceName(), objPath, progs, "")
finalize, err := replaceDatapath(ctx,
replaceDatapathOptions{
device: ep.InterfaceName(),
elf: objPath,
programs: progs,
linkDir: linkDir,
},
)
if err != nil {
b.Fatal(err)
}
Expand Down
64 changes: 45 additions & 19 deletions pkg/datapath/loader/netlink.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"net"
"os"
"path/filepath"
"strings"

"github.com/vishvananda/netlink"
Expand Down Expand Up @@ -61,6 +62,14 @@ type progDefinition struct {
direction string
}

type replaceDatapathOptions struct {
device string // name of the netlink interface we attach to
elf string // path to object file
programs []progDefinition // programs that we want to attach/replace
xdpMode string // XDP driver mode, only applies when attaching XDP programs
linkDir string // path to bpffs dir holding bpf_links for the device/endpoint
}

// replaceDatapath replaces the qdisc and BPF program for an endpoint or XDP program.
//
// When successful, returns a finalizer to allow the map cleanup operation to be
Expand All @@ -74,25 +83,29 @@ type progDefinition struct {
// For example, this is the case with from-netdev and to-netdev. If eth0:to-netdev
// gets its program and maps replaced and unpinned, its eth0:from-netdev counterpart
// will miss tail calls (and drop packets) until it has been replaced as well.
func replaceDatapath(ctx context.Context, ifName, objPath string, progs []progDefinition, xdpMode string) (_ func(), err error) {
func replaceDatapath(ctx context.Context, opts replaceDatapathOptions) (_ func(), err error) {
// Avoid unnecessarily loading a prog.
if err := ctx.Err(); err != nil {
return nil, err
}

link, err := netlink.LinkByName(ifName)
if opts.linkDir == "" {
return nil, errors.New("opts.linkDir not set in replaceDatapath")
}

link, err := netlink.LinkByName(opts.device)
if err != nil {
return nil, fmt.Errorf("getting interface %s by name: %w", ifName, err)
return nil, fmt.Errorf("getting interface %s by name: %w", opts.device, err)
}

l := log.WithField("device", ifName).WithField("objPath", objPath).
l := log.WithField("device", opts.device).WithField("objPath", opts.elf).
WithField("ifindex", link.Attrs().Index)

// Load the ELF from disk.
l.Debug("Loading CollectionSpec from ELF")
spec, err := bpf.LoadCollectionSpec(objPath)
spec, err := bpf.LoadCollectionSpec(opts.elf)
if err != nil {
return nil, fmt.Errorf("loading eBPF ELF: %w", err)
return nil, fmt.Errorf("loading eBPF ELF %s: %w", opts.elf, err)
}

revert := func() {
Expand All @@ -103,7 +116,7 @@ func replaceDatapath(ctx context.Context, ifName, objPath string, progs []progDe
}
}

for _, prog := range progs {
for _, prog := range opts.programs {
if spec.Programs[prog.progName] == nil {
return nil, fmt.Errorf("no program %s found in eBPF ELF", prog.progName)
}
Expand Down Expand Up @@ -154,14 +167,14 @@ func replaceDatapath(ctx context.Context, ifName, objPath string, progs []progDe
// bpffs in the process.
finalize := func() {}
pinPath := bpf.TCGlobalsPath()
opts := ebpf.CollectionOptions{
collOpts := ebpf.CollectionOptions{
Maps: ebpf.MapOptions{PinPath: pinPath},
}
if err := bpf.MkdirBPF(pinPath); err != nil {
return nil, fmt.Errorf("creating bpffs pin path: %w", err)
}
l.Debug("Loading Collection into kernel")
coll, err := bpf.LoadCollection(spec, opts)
coll, err := bpf.LoadCollection(spec, collOpts)
if errors.Is(err, ebpf.ErrMapIncompatible) {
// Temporarily rename bpffs pins of maps whose definitions have changed in
// a new version of a datapath ELF.
Expand All @@ -179,7 +192,7 @@ func replaceDatapath(ctx context.Context, ifName, objPath string, progs []progDe

// Retry loading the Collection after starting map migration.
l.Debug("Retrying loading Collection into kernel after map migration")
coll, err = bpf.LoadCollection(spec, opts)
coll, err = bpf.LoadCollection(spec, collOpts)
}
var ve *ebpf.VerifierError
if errors.As(err, &ve) {
Expand Down Expand Up @@ -221,19 +234,19 @@ func replaceDatapath(ctx context.Context, ifName, objPath string, progs []progDe

// Finally, attach the endpoint's tc or xdp entry points to allow traffic to
// flow in.
for _, prog := range progs {
for _, prog := range opts.programs {
scopedLog := l.WithField("progName", prog.progName).WithField("direction", prog.direction)
if xdpMode != "" {
linkDir := bpffsDeviceLinksDir(bpf.CiliumPath(), link)
if err := bpf.MkdirBPF(linkDir); err != nil {
return nil, fmt.Errorf("creating bpffs link dir for device %s: %w", link.Attrs().Name, err)
}

if err := bpf.MkdirBPF(opts.linkDir); err != nil {
return nil, fmt.Errorf("creating bpffs link dir for device %s: %w", link.Attrs().Name, err)
}

if opts.xdpMode != "" {
scopedLog.Debug("Attaching XDP program to interface")
err = attachXDPProgram(link, coll.Programs[prog.progName], prog.progName, linkDir, xdpConfigModeToFlag(xdpMode))
err = attachXDPProgram(link, coll.Programs[prog.progName], prog.progName, opts.linkDir, xdpConfigModeToFlag(opts.xdpMode))
} else {
scopedLog.Debug("Attaching TC program to interface")
err = attachTCProgram(link, coll.Programs[prog.progName], prog.progName, directionToParent(prog.direction))
err = attachTCProgram(link, coll.Programs[prog.progName], prog.progName, opts.linkDir, directionToParent(prog.direction))
}

if err != nil {
Expand Down Expand Up @@ -276,11 +289,24 @@ func resolveAndInsertCalls(coll *ebpf.Collection, mapName string, calls []ebpf.M
}

// attachTCProgram attaches the TC program 'prog' to link.
func attachTCProgram(link netlink.Link, prog *ebpf.Program, progName string, qdiscParent uint32) error {
func attachTCProgram(link netlink.Link, prog *ebpf.Program, progName, bpffsDir string, qdiscParent uint32) error {
if prog == nil {
return errors.New("cannot attach a nil program")
}

// Remove tcx bpf_links created by newer versions of Cilium. They cannot be
// overwritten by netlink-based tc attachments, as tcx is a separate hook
// altogether. Remove the tcx link first to avoid tc programs being run twice
// for every packet. This cannot be done seamlessly and will cause a small
// window of connection interruption.
pin := filepath.Join(bpffsDir, progName)
if err := os.Remove(pin); err == nil {
log.WithField("device", link.Attrs().Name).WithField("pinPath", pin).
Info("Removed tcx link before legacy tc downgrade, possible connectivity interruption")
} else if !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("unpinning defunct link %s: %w", pin, err)
}

if err := replaceQdisc(link); err != nil {
return fmt.Errorf("replacing clsact qdisc for interface %s: %w", link.Attrs().Name, err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/datapath/loader/netlink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ func TestAttachRemoveTCProgram(t *testing.T) {

prog := mustTCProgram(t)

err = attachTCProgram(dummy, prog, "test", directionToParent(dirEgress))
bpffs := testutils.TempBPFFS(t)
err = attachTCProgram(dummy, prog, "test", bpffsDeviceLinksDir(bpffs, dummy), directionToParent(dirEgress))
require.NoError(t, err)

filters, err := netlink.FilterList(dummy, directionToParent(dirEgress))
Expand Down