Skip to content

Commit

Permalink
loader: remove ctx from uncancellable functions
Browse files Browse the repository at this point in the history
Loading bpf objects used to be done by iproute2, where propagating ctx to the
exec.Cmd invocations made sense, since realistically any shellout can hang for
arbitrary reasons.

Now the loader is fully hosted in the agent process, this no longer makes sense.
Once we're blocked in a bpf() syscall, e.g. for loading a program, the verifier
can be interrupted by sending a signal to the calling thread. Since the Go runtime
routinely sends these signals under normal operation, ebpf-go will retry a few
times when bpf() returns EINTR. The API currently doesn't expose a way to cancel
program loading/verification, and there's no clear benefit to doing so in the first
place.

Verification is relatively lightweight compared to datapath compilation, so
interrupting it during teardown is of questionable benefit. The agent doesn't expect
it to be interruptible, it's bound to leave endpoints in an undefined state.

This commit introduces the assumption that, once endpoint loading/attachment is
kicked off (after compilation), it cannot be cancelled. This is reflected in the
interface exposed to the rest of the system, by removing the ctx parameter on many
methods. Only compilation can be interrupted, since it can take a long time on some
systems, especially lower-spec.

Signed-off-by: Timo Beckers <timo@isovalent.com>
  • Loading branch information
ti-mo committed May 14, 2024
1 parent b657680 commit 38d44d4
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pkg/datapath/fake/types/datapath.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (f *FakeLoader) CompileOrLoad(ctx context.Context, ep datapath.Endpoint, st
panic("implement me")
}

func (f *FakeLoader) ReloadDatapath(ctx context.Context, ep datapath.Endpoint, stats *metrics.SpanStat) error {
func (f *FakeLoader) ReloadDatapath(ep datapath.Endpoint, stats *metrics.SpanStat) error {
panic("implement me")
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/datapath/loader/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func cleanIngressQdisc(devices []string) error {
}

// reinitializeIPSec is used to recompile and load encryption network programs.
func (l *loader) reinitializeIPSec(ctx context.Context) error {
func (l *loader) reinitializeIPSec() error {
// We need to take care not to load bpf_network and bpf_host onto the same
// device. If devices are required, we load bpf_host and hence don't need
// the code below, specific to EncryptInterface. Specifically, we will load
Expand Down Expand Up @@ -200,7 +200,7 @@ func (l *loader) reinitializeIPSec(ctx context.Context) error {
return nil
}

coll, finalize, err := loadDatapath(ctx, networkObj)
coll, finalize, err := loadDatapath(networkObj)
if err != nil {
return fmt.Errorf("loading %s: %w", networkObj, err)
}
Expand Down Expand Up @@ -449,7 +449,7 @@ func (l *loader) Reinitialize(ctx context.Context, tunnelConfig tunnel.Config, d
log.WithError(err).Fatal("failed to compile encryption programs")
}

if err := l.reinitializeIPSec(ctx); err != nil {
if err := l.reinitializeIPSec(); err != nil {
return err
}
}
Expand Down
32 changes: 15 additions & 17 deletions pkg/datapath/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,20 +399,14 @@ func removeObsoleteNetdevPrograms(devices []string) error {
// - cilium_host: ingress and egress
// - cilium_net: ingress
// - native devices: ingress and (optionally) egress if certain features require it
func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, objPath string, devices []string) error {
// Warning: here be dragons. There used to be a single loop over
// interfaces+objs+progs here from the iproute2 days, but this was never
// correct to begin with. Tail call maps were always reused when possible,
// causing control flow to transition through invalid states as new tail calls
// were sequentially upserted into the array.

func (l *loader) reloadHostDatapath(ep datapath.Endpoint, objPath string, devices []string) error {
// Replace programs on cilium_host.
host, err := netlink.LinkByName(ep.InterfaceName())
if err != nil {
return fmt.Errorf("retrieving device %s: %w", ep.InterfaceName(), err)
}

coll, finalize, err := loadDatapath(ctx, objPath)
coll, finalize, err := loadDatapath(objPath)
if err != nil {
return err
}
Expand Down Expand Up @@ -442,7 +436,7 @@ func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, o
return err
}

coll, finalize, err = loadDatapath(ctx, secondDevObjPath)
coll, finalize, err = loadDatapath(secondDevObjPath)
if err != nil {
return err
}
Expand Down Expand Up @@ -472,7 +466,7 @@ func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, o
}

// Load bpf_netdev_<dev>.o.
coll, finalize, err := loadDatapath(ctx, netdevObjPath)
coll, finalize, err := loadDatapath(netdevObjPath)
if err != nil {
return err
}
Expand Down Expand Up @@ -520,7 +514,7 @@ func (l *loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, o
return nil
}

func (l *loader) reloadDatapath(ctx context.Context, ep datapath.Endpoint, dirs *directoryInfo) error {
func (l *loader) reloadDatapath(ep datapath.Endpoint, dirs *directoryInfo) error {
// Replace the current program
objPath := path.Join(dirs.Output, endpointObj)
device := ep.InterfaceName()
Expand All @@ -535,11 +529,11 @@ func (l *loader) reloadDatapath(ctx context.Context, ep datapath.Endpoint, dirs
}

objPath = path.Join(dirs.Output, hostEndpointObj)
if err := l.reloadHostDatapath(ctx, ep, objPath, devices); err != nil {
if err := l.reloadHostDatapath(ep, objPath, devices); err != nil {
return err
}
} else {
coll, finalize, err := loadDatapath(ctx, objPath)
coll, finalize, err := loadDatapath(objPath)
if err != nil {
return err
}
Expand Down Expand Up @@ -594,12 +588,16 @@ func (l *loader) replaceOverlayDatapath(ctx context.Context, cArgs []string, ifa
return fmt.Errorf("compiling overlay program: %w", err)
}

if err := ctx.Err(); err != nil {
return err
}

device, err := netlink.LinkByName(iface)
if err != nil {
return fmt.Errorf("retrieving device %s: %w", iface, err)
}

coll, finalize, err := loadDatapath(ctx, overlayObj)
coll, finalize, err := loadDatapath(overlayObj)
if err != nil {
return err
}
Expand Down Expand Up @@ -693,19 +691,19 @@ func (l *loader) compileOrLoad(ctx context.Context, ep datapath.Endpoint, dirs *
}
stats.BpfWriteELF.End(err == nil)

return l.ReloadDatapath(ctx, ep, stats)
return l.ReloadDatapath(ep, stats)
}

// ReloadDatapath reloads the BPF datapath programs for the specified endpoint.
func (l *loader) ReloadDatapath(ctx context.Context, ep datapath.Endpoint, stats *metrics.SpanStat) (err error) {
func (l *loader) ReloadDatapath(ep datapath.Endpoint, stats *metrics.SpanStat) (err error) {
dirs := directoryInfo{
Library: option.Config.BpfDir,
Runtime: option.Config.StateDir,
State: ep.StateDir(),
Output: ep.StateDir(),
}
stats.BpfLoadProg.Start()
err = l.reloadDatapath(ctx, ep, &dirs)
err = l.reloadDatapath(ep, &dirs)
stats.BpfLoadProg.End(err == nil)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/datapath/loader/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestReload(t *testing.T) {
linkDir := testutils.TempBPFFS(t)

for range 2 {
coll, finalize, err := loadDatapath(ctx, objPath)
coll, finalize, err := loadDatapath(objPath)
require.NoError(t, err)

require.NoError(t, attachSKBProgram(l, coll.Programs[symbolFromEndpoint],
Expand Down Expand Up @@ -279,7 +279,7 @@ func BenchmarkReplaceDatapath(b *testing.B) {
objPath := fmt.Sprintf("%s/%s", dirInfo.Output, endpointObj)
b.ResetTimer()
for i := 0; i < b.N; i++ {
coll, finalize, err := loadDatapath(ctx, objPath)
coll, finalize, err := loadDatapath(objPath)
if err != nil {
b.Fatal(err)
}
Expand Down
8 changes: 1 addition & 7 deletions pkg/datapath/loader/netlink.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package loader

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -50,12 +49,7 @@ func directionToParent(dir string) uint32 {
// For example, this is the case with from-netdev and to-netdev. If eth0:to-netdev
// gets its program and maps replaced and unpinned, its eth0:from-netdev counterpart
// will miss tail calls (and drop packets) until it has been replaced as well.
func loadDatapath(ctx context.Context, obj string) (*ebpf.Collection, func(), error) {
// Avoid unnecessarily loading a prog.
if err := ctx.Err(); err != nil {
return nil, nil, err
}

func loadDatapath(obj string) (*ebpf.Collection, func(), error) {
l := log.WithField("objPath", obj)

// Load the ELF from disk.
Expand Down
2 changes: 1 addition & 1 deletion pkg/datapath/loader/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Loader interface {
HostDatapathInitialized() <-chan struct{}
Reinitialize(ctx context.Context, tunnelConfig tunnel.Config, deviceMTU int, iptMgr types.IptablesManager, p types.Proxy) error
ReinitializeXDP(ctx context.Context, extraCArgs []string) error
ReloadDatapath(ctx context.Context, ep types.Endpoint, stats *metrics.SpanStat) (err error)
ReloadDatapath(ep types.Endpoint, stats *metrics.SpanStat) (err error)
RestoreTemplates(stateDir string) error
Unload(ep types.Endpoint)
}
2 changes: 1 addition & 1 deletion pkg/datapath/loader/xdp.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func compileAndLoadXDPProg(ctx context.Context, xdpDev, xdpMode string, extraCAr
return fmt.Errorf("retrieving device %s: %w", xdpDev, err)
}

coll, finalize, err := loadDatapath(ctx, objPath)
coll, finalize, err := loadDatapath(objPath)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/datapath/types/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Loader interface {
CallsMapPath(id uint16) string
CustomCallsMapPath(id uint16) string
CompileOrLoad(ctx context.Context, ep Endpoint, stats *metrics.SpanStat) error
ReloadDatapath(ctx context.Context, ep Endpoint, stats *metrics.SpanStat) error
ReloadDatapath(ep Endpoint, stats *metrics.SpanStat) error
ReinitializeXDP(ctx context.Context, extraCArgs []string) error
EndpointHash(cfg EndpointConfiguration) (string, error)
Unload(ep Endpoint)
Expand Down
2 changes: 1 addition & 1 deletion pkg/endpoint/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ func (e *Endpoint) realizeBPFState(regenContext *regenerationContext) (compilati
}
compilationExecuted = true
} else { // RegenerateWithDatapathLoad
err = e.owner.Datapath().Loader().ReloadDatapath(datapathRegenCtxt.completionCtx, datapathRegenCtxt.epInfoCache, &stats.datapathRealization)
err = e.owner.Datapath().Loader().ReloadDatapath(datapathRegenCtxt.epInfoCache, &stats.datapathRealization)
if err == nil {
e.getLogger().Info("Reloaded endpoint BPF program")
} else {
Expand Down

0 comments on commit 38d44d4

Please sign in to comment.