Skip to content

Commit

Permalink
fix: enrich post control plane (#3285) (#3289)
Browse files Browse the repository at this point in the history
Locks in the container package were as short as possible.
This caused transaction issues once a few actors interacted with it.

Change the locking scheme to be more "transactional" with large locks,
instead of combining read locks and write locks in the same transaction.

Additional chores in the scope of the commit:
1. Initialize enrichEnabled field in control plane.
2. Control Plane's Run started a go routine internally, which was
    redundant.
3. Rename mutex in cgroups code to cgroupsMutex.
4. Capitalize BPF in RemoveFromBpfMap.
5. Delete the unused processing code for cgroup events.

commit: f638b4e (main), cherry-pick
  • Loading branch information
NDStrahilevitz committed Jun 29, 2023
1 parent 2b7382e commit 3944fe7
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 107 deletions.
77 changes: 41 additions & 36 deletions pkg/containers/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (

// Containers contains information about running containers in the host.
type Containers struct {
cgroups *cgroup.Cgroups
cgroupsMap map[uint32]CgroupInfo
deleted []uint64
mtx sync.RWMutex // protecting both cgroups and deleted fields
enricher runtimeInfoService
bpfMapName string
cgroups *cgroup.Cgroups
cgroupsMap map[uint32]CgroupInfo
deleted []uint64
cgroupsMutex sync.RWMutex // protecting both cgroups and deleted fields
enricher runtimeInfoService
bpfMapName string
}

// CgroupInfo represents a cgroup dir (might describe a container cgroup dir).
Expand All @@ -53,10 +53,10 @@ func New(
error,
) {
containers := &Containers{
cgroups: cgroups,
cgroupsMap: make(map[uint32]CgroupInfo),
mtx: sync.RWMutex{},
bpfMapName: mapName,
cgroups: cgroups,
cgroupsMap: make(map[uint32]CgroupInfo),
cgroupsMutex: sync.RWMutex{},
bpfMapName: mapName,
}

runtimeService := RuntimeInfoService(sockets)
Expand Down Expand Up @@ -101,6 +101,8 @@ func (c *Containers) GetCgroupVersion() cgroup.CgroupVersion {

// Populate populates Containers struct by reading mounted proc and cgroups fs.
func (c *Containers) Populate() error {
c.cgroupsMutex.Lock()
defer c.cgroupsMutex.Unlock()
return c.populate()
}

Expand Down Expand Up @@ -147,18 +149,20 @@ func (c *Containers) populate() error {

inodeNumber := stat.Ino
statusChange := time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec)
_, err = c.CgroupUpdate(inodeNumber, path, statusChange)
_, err = c.cgroupUpdate(inodeNumber, path, statusChange)

return errfmt.WrapError(err)
}

return filepath.WalkDir(c.cgroups.GetDefaultCgroup().GetMountPoint(), fn)
}

// CgroupUpdate checks if given path belongs to a known container runtime,
// cgroupUpdate checks if given path belongs to a known container runtime,
// saving container information in Containers CgroupInfo map.
// NOTE: ALL given cgroup dir paths are stored in CgroupInfo map.
func (c *Containers) CgroupUpdate(cgroupId uint64, path string, ctime time.Time) (CgroupInfo, error) {
// NOTE: not thread-safe, lock should be placed in the external calling function, depending
// on the transaction length.
func (c *Containers) cgroupUpdate(cgroupId uint64, path string, ctime time.Time) (CgroupInfo, error) {
// Cgroup paths should be stored and evaluated relative to the mountpoint,
// trim it from the path.
path = strings.TrimPrefix(path, c.cgroups.GetDefaultCgroup().GetMountPoint())
Expand All @@ -175,9 +179,7 @@ func (c *Containers) CgroupUpdate(cgroupId uint64, path string, ctime time.Time)
Ctime: ctime,
}

c.mtx.Lock()
c.cgroupsMap[uint32(cgroupId)] = info
c.mtx.Unlock()

return info, nil
}
Expand All @@ -187,11 +189,11 @@ func (c *Containers) CgroupUpdate(cgroupId uint64, path string, ctime time.Time)
// it returns the retrieved metadata and a relevant error
// this function shouldn't be called twice for the same cgroupId unless attempting a retry
func (c *Containers) EnrichCgroupInfo(cgroupId uint64) (cruntime.ContainerMetadata, error) {
var metadata cruntime.ContainerMetadata
c.cgroupsMutex.Lock()
defer c.cgroupsMutex.Unlock()

c.mtx.RLock()
var metadata cruntime.ContainerMetadata
info, ok := c.cgroupsMap[uint32(cgroupId)]
c.mtx.RUnlock()

// if there is no cgroup anymore for some reason, return early
if !ok {
Expand Down Expand Up @@ -221,14 +223,12 @@ func (c *Containers) EnrichCgroupInfo(cgroupId uint64) (cruntime.ContainerMetada
}

info.Container = metadata
c.mtx.Lock()
// we read the dictionary again to make sure the cgroup still exists
// otherwise we risk reintroducing it despite not existing
_, ok = c.cgroupsMap[uint32(cgroupId)]
if ok {
c.cgroupsMap[uint32(cgroupId)] = info
}
c.mtx.Unlock()

return metadata, nil
}
Expand Down Expand Up @@ -306,8 +306,8 @@ func (c *Containers) CgroupRemove(cgroupId uint64, hierarchyID uint32) {

now := time.Now()
var deleted []uint64
c.mtx.Lock()
defer c.mtx.Unlock()
c.cgroupsMutex.Lock()
defer c.cgroupsMutex.Unlock()

// process previously deleted cgroupInfo data (deleted cgroup dirs)
for _, id := range c.deleted {
Expand Down Expand Up @@ -338,27 +338,29 @@ func (c *Containers) CgroupMkdir(cgroupId uint64, subPath string, hierarchyID ui
}

// Find container cgroup dir path to get directory stats
c.cgroupsMutex.Lock()
defer c.cgroupsMutex.Unlock()
curTime := time.Now()
path, err := cgroup.GetCgroupPath(c.cgroups.GetDefaultCgroup().GetMountPoint(), cgroupId, subPath)
if err == nil {
var stat syscall.Stat_t
if err := syscall.Stat(path, &stat); err == nil {
// Add cgroupInfo to Containers struct w/ found path (and its last modification time)
return c.CgroupUpdate(cgroupId, path, time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec))
return c.cgroupUpdate(cgroupId, path, time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec))
}
}

// No entry found: container may have already exited.
// Add cgroupInfo to Containers struct with existing data.
// In this case, ctime is just an estimation (current time).
return c.CgroupUpdate(cgroupId, subPath, curTime)
return c.cgroupUpdate(cgroupId, subPath, curTime)
}

// FindContainerCgroupID32LSB returns the 32 LSB of the Cgroup ID for a given container ID
func (c *Containers) FindContainerCgroupID32LSB(containerID string) []uint32 {
var cgroupIDs []uint32
c.mtx.RLock()
defer c.mtx.RUnlock()
c.cgroupsMutex.RLock()
defer c.cgroupsMutex.RUnlock()
for k, v := range c.cgroupsMap {
if strings.HasPrefix(v.Container.ContainerId, containerID) {
cgroupIDs = append(cgroupIDs, k)
Expand All @@ -378,30 +380,33 @@ func (c *Containers) GetCgroupInfo(cgroupId uint64) CgroupInfo {
// cgroupInfo in the Containers struct. An empty subPath will make
// getCgroupPath() to walk all cgroupfs directories until it finds the
// directory of given cgroupId.
c.cgroupsMutex.Lock()
defer c.cgroupsMutex.Unlock()

path, err := cgroup.GetCgroupPath(c.cgroups.GetDefaultCgroup().GetMountPoint(), cgroupId, "")
if err == nil {
var stat syscall.Stat_t
if err = syscall.Stat(path, &stat); err == nil {
info, err := c.CgroupUpdate(cgroupId, path, time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec))
info, err := c.cgroupUpdate(cgroupId, path, time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec))
if err == nil {
return info
}
}
}
}

c.mtx.RLock()
c.cgroupsMutex.RLock()
defer c.cgroupsMutex.RUnlock()
cgroupInfo := c.cgroupsMap[uint32(cgroupId)]
c.mtx.RUnlock()

return cgroupInfo
}

// GetContainers provides a list of all existing containers.
func (c *Containers) GetContainers() map[uint32]CgroupInfo {
conts := map[uint32]CgroupInfo{}
c.mtx.RLock()
defer c.mtx.RUnlock()
c.cgroupsMutex.RLock()
defer c.cgroupsMutex.RUnlock()
for id, v := range c.cgroupsMap {
if v.ContainerRoot && v.expiresAt.IsZero() {
conts[id] = v
Expand All @@ -412,9 +417,9 @@ func (c *Containers) GetContainers() map[uint32]CgroupInfo {

// CgroupExists checks if there is a cgroupInfo data of a given cgroupId.
func (c *Containers) CgroupExists(cgroupId uint64) bool {
c.mtx.RLock()
c.cgroupsMutex.RLock()
_, ok := c.cgroupsMap[uint32(cgroupId)]
c.mtx.RUnlock()
c.cgroupsMutex.RUnlock()
return ok
}

Expand All @@ -430,19 +435,19 @@ func (c *Containers) PopulateBpfMap(bpfModule *libbpfgo.Module) error {
return errfmt.WrapError(err)
}

c.mtx.RLock()
c.cgroupsMutex.RLock()
for cgroupIdLsb, info := range c.cgroupsMap {
if info.ContainerRoot {
state := containerExisted
err = containersMap.Update(unsafe.Pointer(&cgroupIdLsb), unsafe.Pointer(&state))
}
}
c.mtx.RUnlock()
c.cgroupsMutex.RUnlock()

return errfmt.WrapError(err)
}

func (c *Containers) RemoveFromBpfMap(bpfModule *libbpfgo.Module, cgroupId uint64, hierarchyID uint32) error {
func (c *Containers) RemoveFromBPFMap(bpfModule *libbpfgo.Module, cgroupId uint64, hierarchyID uint32) error {
// cgroupv1: no need to check other controllers than the default
switch c.cgroups.GetDefaultCgroup().(type) {
case *cgroup.CgroupV1:
Expand Down
4 changes: 2 additions & 2 deletions pkg/containers/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func (ctx SignaturesDataSource) Get(key interface{}) (map[string]interface{}, er
if !ok {
return nil, detect.ErrKeyNotSupported
}
ctx.containers.mtx.RLock()
defer ctx.containers.mtx.RUnlock()
ctx.containers.cgroupsMutex.RLock()
defer ctx.containers.cgroupsMutex.RUnlock()
for _, cgroup := range ctx.containers.cgroupsMap {
if cgroup.Container.ContainerId == containerId {
containerData := cgroup.Container
Expand Down
41 changes: 20 additions & 21 deletions pkg/ebpf/controlplane/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func NewController(bpfModule *libbpfgo.Module, cgroupManager *containers.Contain
lostSignalChan: make(chan uint64),
bpfModule: bpfModule,
cgroupManager: cgroupManager,
enrichEnabled: enrichEnabled,
}
p.signalBuffer, err = bpfModule.InitPerfBuf("signals", p.signalChan, p.lostSignalChan, 1024)
if err != nil {
Expand Down Expand Up @@ -83,27 +84,25 @@ func (p *Controller) Start() error {

func (p *Controller) Run(ctx context.Context) {
p.ctx = ctx
go func() {
for {
select {
case signalData := <-p.signalChan:
signal := signal{}
err := signal.Unmarshal(signalData)
if err != nil {
logger.Errorw("error unmarshaling signal ebpf buffer", "error", err)
continue
}
err = p.processSignal(signal)
if err != nil {
logger.Errorw("error processing control plane signal", "error", err)
}
case lost := <-p.lostSignalChan:
logger.Warnw(fmt.Sprintf("Lost %d control plane signals", lost))
case <-p.ctx.Done():
return
for {
select {
case signalData := <-p.signalChan:
signal := signal{}
err := signal.Unmarshal(signalData)
if err != nil {
logger.Errorw("error unmarshaling signal ebpf buffer", "error", err)
continue
}
err = p.processSignal(signal)
if err != nil {
logger.Errorw("error processing control plane signal", "error", err)
}
case lost := <-p.lostSignalChan:
logger.Warnw(fmt.Sprintf("Lost %d control plane signals", lost))
case <-p.ctx.Done():
return
}
}()
}
}

func (p *Controller) Stop() error {
Expand Down Expand Up @@ -144,7 +143,7 @@ func (p *Controller) processCgroupMkdir(args []trace.Argument) error {
// removed from the containers bpf map.
err := capabilities.GetInstance().EBPF(
func() error {
return p.cgroupManager.RemoveFromBpfMap(p.bpfModule, cgroupId, hId)
return p.cgroupManager.RemoveFromBPFMap(p.bpfModule, cgroupId, hId)
},
)
if err != nil {
Expand All @@ -160,7 +159,7 @@ func (p *Controller) processCgroupMkdir(args []trace.Argument) error {
if p.enrichEnabled {
// If cgroupId belongs to a container, enrich now (in a goroutine)
go func() {
_, err = p.cgroupManager.EnrichCgroupInfo(cgroupId)
_, err := p.cgroupManager.EnrichCgroupInfo(cgroupId)
if err != nil {
logger.Errorw("error triggering container enrich in control plane", "error", err)
}
Expand Down
48 changes: 0 additions & 48 deletions pkg/ebpf/events_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,54 +309,6 @@ func (t *Tracee) processSchedProcessFork(event *trace.Event) error {
return t.convertArgMonotonicToEpochTime(event, "start_time")
}

func (t *Tracee) processCgroupMkdir(event *trace.Event) error {
cgroupId, err := parse.ArgVal[uint64](event.Args, "cgroup_id")
if err != nil {
return errfmt.Errorf("error parsing cgroup_mkdir args: %v", err)
}
path, err := parse.ArgVal[string](event.Args, "cgroup_path")
if err != nil {
return errfmt.Errorf("error parsing cgroup_mkdir args: %v", err)
}
hId, err := parse.ArgVal[uint32](event.Args, "hierarchy_id")
if err != nil {
return errfmt.Errorf("error parsing cgroup_mkdir args: %v", err)
}
info, err := t.containers.CgroupMkdir(cgroupId, path, hId)
if err == nil && info.Container.ContainerId == "" {
// If cgroupId is from a regular cgroup directory, and not the
// container base directory (from known runtimes), it should be
// removed from the containers bpf map.
err := capabilities.GetInstance().EBPF(
func() error {
return t.containers.RemoveFromBpfMap(t.bpfModule, cgroupId, hId)
},
)
if err != nil {
// If the cgroupId was not found in bpf map, this could mean that
// it is not a container cgroup and, as a systemd cgroup, could have been
// created and removed very quickly.
// In this case, we don't want to return an error.
logger.Debugw("Failed to remove entry from containers bpf map", "error", err)
}
}
return errfmt.WrapError(err)
}

func (t *Tracee) processCgroupRmdir(event *trace.Event) error {
cgroupId, err := parse.ArgVal[uint64](event.Args, "cgroup_id")
if err != nil {
return errfmt.Errorf("error parsing cgroup_rmdir args: %v", err)
}

hId, err := parse.ArgVal[uint32](event.Args, "hierarchy_id")
if err != nil {
return errfmt.Errorf("error parsing cgroup_mkdir args: %v", err)
}
t.containers.CgroupRemove(cgroupId, hId)
return nil
}

// In case FinitModule and InitModule occurs, it means that a kernel module
// was loaded and tracee needs to check if it hooked the syscall table and
// seq_ops
Expand Down

0 comments on commit 3944fe7

Please sign in to comment.