Skip to content

Commit

Permalink
loader: refactor replaceDatapath to loadDatapath
Browse files Browse the repository at this point in the history
This unblocks #29333. See #32468 for more context.

This commit refactors replaceDatapath() to loadDatapath() by factoring device attachment
out of the function into the caller. The main reasons are flexibility and transparency.
replaceDatapath() was called from many places and needed to do a lot. This change is the
first step to handing individual callers an object representing actual bpf object handles,
so they can correctly manage its lifecycle. In the future, ebpf.LoadAndAssign will be used
for better readability.

Some callers attach the same program to multiple interfaces, some attach multiple programs
(ingress/egress) to the same interface, and some use a mixture of both. This has caused
loops to creep into replaceDatapath, giving it many arguments and many overall
responsibilities, making it hard to form intuition around.

Major changes made in this commit:
- lifted attach{SKB,XDP}Program out of the function, into all callers, making them
  call attach* methods explicitly
- removed `replaceDatapathOptions`
- reduced the window a potential 'rollback' can happen in (see code comments) due to the
  risks involved, and it never being correct to begin with.
- removed a few points where context cancellations are obeyed, to be continued in a
  subsequent commit

Fixes: #32468

Signed-off-by: Timo Beckers <timo@isovalent.com>
  • Loading branch information
ti-mo committed May 14, 2024
1 parent 2fef226 commit b657680
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 252 deletions.
40 changes: 20 additions & 20 deletions pkg/datapath/loader/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,12 @@ func (l *loader) reinitializeIPSec(ctx context.Context) error {
return nil
}

progs := []progDefinition{{progName: symbolFromNetwork, direction: dirIngress}}
coll, finalize, err := loadDatapath(ctx, networkObj)
if err != nil {
return fmt.Errorf("loading %s: %w", networkObj, err)
}
defer coll.Close()

var errs error
for _, iface := range interfaces {
if err := connector.DisableRpFilter(l.sysctl, iface); err != nil {
Expand All @@ -209,33 +214,28 @@ func (l *loader) reinitializeIPSec(ctx context.Context) error {

device, err := netlink.LinkByName(iface)
if err != nil {
return fmt.Errorf("retrieving device %s: %w", iface, err)
errs = errors.Join(errs, fmt.Errorf("retrieving device %s: %w", iface, err))
continue
}

finalize, err := replaceDatapath(ctx,
replaceDatapathOptions{
device: iface,
elf: networkObj,
programs: progs,
linkDir: bpffsDeviceLinksDir(bpf.CiliumPath(), device),
tcx: option.Config.EnableTCX,
},
)
if err != nil {
log.WithField(logfields.Interface, iface).WithError(err).Error("Load encryption network failed")
// collect errors, but keep trying replacing other interfaces.
errs = errors.Join(errs, err)
} else {
log.WithField(logfields.Interface, iface).Info("Encryption network program (re)loaded")
// Defer map removal until all interfaces' progs have been replaced.
defer finalize()
if err := attachSKBProgram(device, coll.Programs[symbolFromNetwork], symbolFromNetwork,
bpffsDeviceLinksDir(bpf.CiliumPath(), device), netlink.HANDLE_MIN_INGRESS, option.Config.EnableTCX); err != nil {

// Collect errors, keep attaching to other interfaces.
errs = errors.Join(errs, fmt.Errorf("interface %s: %w", iface, err))
continue
}

log.WithField(logfields.Interface, iface).Info("Encryption network program (re)loaded")
}

if errs != nil {
return fmt.Errorf("failed to load encryption program: %w", errs)
}

// Defer map removal until all interfaces' progs have been replaced.
finalize()

return nil
}

Expand Down Expand Up @@ -455,7 +455,7 @@ func (l *loader) Reinitialize(ctx context.Context, tunnelConfig tunnel.Config, d
}

if err := l.reinitializeOverlay(ctx, tunnelConfig); err != nil {
return err
log.WithError(err).Fatal("Failed to initialize overlay device")
}

if err := l.nodeHandler.NodeConfigurationChanged(l.localNodeConfig); err != nil {
Expand Down
199 changes: 78 additions & 121 deletions pkg/datapath/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,85 +405,58 @@ func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, o
// correct to begin with. Tail call maps were always reused when possible,
// causing control flow to transition through invalid states as new tail calls
// were sequentially upserted into the array.
//
// Take care not to call replaceDatapath() twice for a single ELF/interface.
// Map migration should only be run once per ELF, otherwise cilium_calls_*
// created by prior loads will be unpinned, causing them to be emptied,
// missing all tail calls.

// Replace programs on cilium_host.
host, err := netlink.LinkByName(ep.InterfaceName())
if err != nil {
return fmt.Errorf("retrieving device %s: %w", ep.InterfaceName(), err)
}

progs := []progDefinition{
{progName: symbolToHostEp, direction: dirIngress},
{progName: symbolFromHostEp, direction: dirEgress},
}
finalize, err := replaceDatapath(ctx,
replaceDatapathOptions{
device: ep.InterfaceName(),
elf: objPath,
programs: progs,
linkDir: bpffsDeviceLinksDir(bpf.CiliumPath(), host),
tcx: option.Config.EnableTCX,
},
)
coll, finalize, err := loadDatapath(ctx, objPath)
if err != nil {
scopedLog := ep.Logger(subsystem).WithFields(logrus.Fields{
logfields.Path: objPath,
logfields.Veth: ep.InterfaceName(),
})
// Don't log an error here if the context was canceled or timed out;
// this log message should only represent failures with respect to
// loading the program.
if ctx.Err() == nil {
scopedLog.WithError(err).Warningf("JoinEP: Failed to load program for %s", ep.InterfaceName())
}
return err
}
// Defer map removal until all interfaces' progs have been replaced.
defer finalize()
defer coll.Close()

// Attach cil_to_host to cilium_host ingress.
if err := attachSKBProgram(host, coll.Programs[symbolToHostEp], symbolToHostEp,
bpffsDeviceLinksDir(bpf.CiliumPath(), host), netlink.HANDLE_MIN_INGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s ingress: %w", ep.InterfaceName(), err)
}
// Attach cil_from_host to cilium_host egress.
if err := attachSKBProgram(host, coll.Programs[symbolFromHostEp], symbolFromHostEp,
bpffsDeviceLinksDir(bpf.CiliumPath(), host), netlink.HANDLE_MIN_EGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s egress: %w", ep.InterfaceName(), err)
}

finalize()

// Replace program on cilium_net.
net, err := netlink.LinkByName(defaults.SecondHostDevice)
if err != nil {
log.WithError(err).WithField("device", defaults.SecondHostDevice).Error("Link does not exist")
return fmt.Errorf("device '%s' not found: %w", defaults.SecondHostDevice, err)
return fmt.Errorf("retrieving device %s: %w", defaults.SecondHostDevice, err)
}

secondDevObjPath := path.Join(ep.StateDir(), hostEndpointPrefix+"_"+defaults.SecondHostDevice+".o")
if err := l.patchHostNetdevDatapath(ep, objPath, secondDevObjPath, defaults.SecondHostDevice); err != nil {
return err
}

progs = []progDefinition{
{progName: symbolToHostEp, direction: dirIngress},
}

finalize, err = replaceDatapath(ctx,
replaceDatapathOptions{
device: defaults.SecondHostDevice,
elf: secondDevObjPath,
programs: progs,
linkDir: bpffsDeviceLinksDir(bpf.CiliumPath(), net),
tcx: option.Config.EnableTCX,
},
)
coll, finalize, err = loadDatapath(ctx, secondDevObjPath)
if err != nil {
scopedLog := ep.Logger(subsystem).WithFields(logrus.Fields{
logfields.Path: objPath,
logfields.Veth: defaults.SecondHostDevice,
})
if ctx.Err() == nil {
scopedLog.WithError(err).Warningf("JoinEP: Failed to load program for %s", defaults.SecondHostDevice)
}
return err
}
defer finalize()
defer coll.Close()

// Attach cil_to_host to cilium_net.
if err := attachSKBProgram(net, coll.Programs[symbolToHostEp], symbolToHostEp,
bpffsDeviceLinksDir(bpf.CiliumPath(), net), netlink.HANDLE_MIN_INGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s ingress: %w", defaults.SecondHostDevice, err)
}

finalize()

// Replace programs on physical devices.
// Replace programs on physical devices, ignoring devices that don't exist.
for _, device := range devices {
iface, err := netlink.LinkByName(device)
if err != nil {
Expand All @@ -498,8 +471,17 @@ func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, o
return err
}

progs := []progDefinition{
{progName: symbolFromHostNetdevEp, direction: dirIngress},
// Load bpf_netdev_<dev>.o.
coll, finalize, err := loadDatapath(ctx, netdevObjPath)
if err != nil {
return err
}
defer coll.Close()

// Attach cil_from_netdev to ingress.
if err := attachSKBProgram(iface, coll.Programs[symbolFromHostNetdevEp], symbolFromHostNetdevEp,
linkDir, netlink.HANDLE_MIN_INGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s ingress: %w", device, err)
}

if option.Config.AreDevicesRequired() &&
Expand All @@ -508,7 +490,11 @@ func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, o
// the rev-NAT xlations.
device != wgTypes.IfaceName {

progs = append(progs, progDefinition{symbolToHostNetdevEp, dirEgress})
// Attach cil_to_netdev to egress.
if err := attachSKBProgram(iface, coll.Programs[symbolToHostNetdevEp], symbolToHostNetdevEp,
linkDir, netlink.HANDLE_MIN_EGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s egress: %w", device, err)
}
} else {
// Remove any previously attached device from egress path if BPF
// NodePort and host firewall are disabled.
Expand All @@ -517,26 +503,7 @@ func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, o
}
}

finalize, err := replaceDatapath(ctx,
replaceDatapathOptions{
device: device,
elf: netdevObjPath,
programs: progs,
linkDir: linkDir,
tcx: option.Config.EnableTCX,
},
)
if err != nil {
scopedLog := ep.Logger(subsystem).WithFields(logrus.Fields{
logfields.Path: objPath,
logfields.Veth: device,
})
if ctx.Err() == nil {
scopedLog.WithError(err).Warningf("JoinEP: Failed to load program for physical device %s", device)
}
return err
}
defer finalize()
finalize()
}

// call at the end of the function so that we can easily detect if this removes necessary
Expand Down Expand Up @@ -572,44 +539,35 @@ func (l *loader) reloadDatapath(ctx context.Context, ep datapath.Endpoint, dirs
return err
}
} else {
progs := []progDefinition{{progName: symbolFromEndpoint, direction: dirIngress}}
coll, finalize, err := loadDatapath(ctx, objPath)
if err != nil {
return err
}
defer coll.Close()

iface, err := netlink.LinkByName(device)
if err != nil {
return fmt.Errorf("retrieving device %s: %w", device, err)
}

linkDir := bpffsEndpointLinksDir(bpf.CiliumPath(), ep)
if err := attachSKBProgram(iface, coll.Programs[symbolFromEndpoint], symbolFromEndpoint,
linkDir, netlink.HANDLE_MIN_INGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s ingress: %w", device, err)
}

if ep.RequireEgressProg() {
progs = append(progs, progDefinition{progName: symbolToEndpoint, direction: dirEgress})
} else {
iface, err := netlink.LinkByName(device)
if err != nil {
log.WithError(err).WithField("device", device).Warn("Link does not exist")
if err := attachSKBProgram(iface, coll.Programs[symbolToEndpoint], symbolToEndpoint,
linkDir, netlink.HANDLE_MIN_EGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s egress: %w", device, err)
}
} else {
if err := detachSKBProgram(iface, symbolToEndpoint, linkDir, netlink.HANDLE_MIN_EGRESS); err != nil {
log.WithField("device", device).Error(err)
}
}

finalize, err := replaceDatapath(ctx,
replaceDatapathOptions{
device: device,
elf: objPath,
programs: progs,
linkDir: linkDir,
tcx: option.Config.EnableTCX,
},
)
if err != nil {
scopedLog := ep.Logger(subsystem).WithFields(logrus.Fields{
logfields.Path: objPath,
logfields.Veth: device,
})
// Don't log an error here if the context was canceled or timed out;
// this log message should only represent failures with respect to
// loading the program.
if ctx.Err() == nil {
scopedLog.WithError(err).Warn("JoinEP: Failed to attach program(s)")
}
return err
}
defer finalize()
finalize()
}

if ep.RequireEndpointRoute() {
Expand All @@ -633,31 +591,30 @@ func (l *loader) reloadDatapath(ctx context.Context, ep datapath.Endpoint, dirs

func (l *loader) replaceOverlayDatapath(ctx context.Context, cArgs []string, iface string) error {
if err := compileOverlay(ctx, cArgs); err != nil {
log.WithError(err).Fatal("failed to compile overlay programs")
return fmt.Errorf("compiling overlay program: %w", err)
}

device, err := netlink.LinkByName(iface)
if err != nil {
return fmt.Errorf("retrieving device %s: %w", iface, err)
}

progs := []progDefinition{
{progName: symbolFromOverlay, direction: dirIngress},
{progName: symbolToOverlay, direction: dirEgress},
coll, finalize, err := loadDatapath(ctx, overlayObj)
if err != nil {
return err
}
defer coll.Close()

finalize, err := replaceDatapath(ctx,
replaceDatapathOptions{
device: iface,
elf: overlayObj,
programs: progs,
linkDir: bpffsDeviceLinksDir(bpf.CiliumPath(), device),
tcx: option.Config.EnableTCX,
},
)
if err != nil {
log.WithField(logfields.Interface, iface).WithError(err).Fatal("Load overlay network failed")
linkDir := bpffsDeviceLinksDir(bpf.CiliumPath(), device)
if err := attachSKBProgram(device, coll.Programs[symbolFromOverlay], symbolFromOverlay,
linkDir, netlink.HANDLE_MIN_INGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s ingress: %w", device, err)
}
if err := attachSKBProgram(device, coll.Programs[symbolToOverlay], symbolToOverlay,
linkDir, netlink.HANDLE_MIN_EGRESS, option.Config.EnableTCX); err != nil {
return fmt.Errorf("interface %s egress: %w", device, err)
}

finalize()

return nil
Expand Down
Loading

0 comments on commit b657680

Please sign in to comment.