Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enrich: fixes post control plane #3285

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 19 additions & 14 deletions pkg/containers/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func (c *Containers) GetCgroupVersion() cgroup.CgroupVersion {

// Populate populates Containers struct by reading mounted proc and cgroups fs.
func (c *Containers) Populate() error {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.populate()
}

Expand Down Expand Up @@ -147,18 +149,20 @@ func (c *Containers) populate() error {

inodeNumber := stat.Ino
statusChange := time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec)
_, err = c.CgroupUpdate(inodeNumber, path, statusChange)
_, err = c.cgroupUpdate(inodeNumber, path, statusChange)

return errfmt.WrapError(err)
}

return filepath.WalkDir(c.cgroups.GetDefaultCgroup().GetMountPoint(), fn)
}

// CgroupUpdate checks if given path belongs to a known container runtime,
// cgroupUpdate checks if given path belongs to a known container runtime,
// saving container information in Containers CgroupInfo map.
// NOTE: ALL given cgroup dir paths are stored in CgroupInfo map.
func (c *Containers) CgroupUpdate(cgroupId uint64, path string, ctime time.Time) (CgroupInfo, error) {
// NOTE: not thread-safe, lock should be placed in the external calling function, depending
// on the transaction length.
func (c *Containers) cgroupUpdate(cgroupId uint64, path string, ctime time.Time) (CgroupInfo, error) {
// Cgroup paths should be stored and evaluated relative to the mountpoint,
// trim it from the path.
path = strings.TrimPrefix(path, c.cgroups.GetDefaultCgroup().GetMountPoint())
Expand All @@ -175,9 +179,7 @@ func (c *Containers) CgroupUpdate(cgroupId uint64, path string, ctime time.Time)
Ctime: ctime,
}

c.mtx.Lock()
c.cgroupsMap[uint32(cgroupId)] = info
c.mtx.Unlock()

return info, nil
}
Expand All @@ -187,11 +189,11 @@ func (c *Containers) CgroupUpdate(cgroupId uint64, path string, ctime time.Time)
// it returns the retrieved metadata and a relevant error
// this function shouldn't be called twice for the same cgroupId unless attempting a retry
func (c *Containers) EnrichCgroupInfo(cgroupId uint64) (cruntime.ContainerMetadata, error) {
var metadata cruntime.ContainerMetadata
c.mtx.Lock()
defer c.mtx.Unlock()

c.mtx.RLock()
var metadata cruntime.ContainerMetadata
info, ok := c.cgroupsMap[uint32(cgroupId)]
c.mtx.RUnlock()

// if there is no cgroup anymore for some reason, return early
if !ok {
Expand Down Expand Up @@ -221,14 +223,12 @@ func (c *Containers) EnrichCgroupInfo(cgroupId uint64) (cruntime.ContainerMetada
}

info.Container = metadata
c.mtx.Lock()
// we read the dictionary again to make sure the cgroup still exists
// otherwise we risk reintroducing it despite not existing
_, ok = c.cgroupsMap[uint32(cgroupId)]
if ok {
c.cgroupsMap[uint32(cgroupId)] = info
}
c.mtx.Unlock()

return metadata, nil
}
Expand Down Expand Up @@ -338,20 +338,22 @@ func (c *Containers) CgroupMkdir(cgroupId uint64, subPath string, hierarchyID ui
}

// Find container cgroup dir path to get directory stats
c.mtx.Lock()
defer c.mtx.Unlock()
curTime := time.Now()
path, err := cgroup.GetCgroupPath(c.cgroups.GetDefaultCgroup().GetMountPoint(), cgroupId, subPath)
if err == nil {
var stat syscall.Stat_t
if err := syscall.Stat(path, &stat); err == nil {
// Add cgroupInfo to Containers struct w/ found path (and its last modification time)
return c.CgroupUpdate(cgroupId, path, time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec))
return c.cgroupUpdate(cgroupId, path, time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec))
}
}

// No entry found: container may have already exited.
// Add cgroupInfo to Containers struct with existing data.
// In this case, ctime is just an estimation (current time).
return c.CgroupUpdate(cgroupId, subPath, curTime)
return c.cgroupUpdate(cgroupId, subPath, curTime)
}

// FindContainerCgroupID32LSB returns the 32 LSB of the Cgroup ID for a given container ID
Expand All @@ -378,11 +380,14 @@ func (c *Containers) GetCgroupInfo(cgroupId uint64) CgroupInfo {
// cgroupInfo in the Containers struct. An empty subPath will make
// getCgroupPath() to walk all cgroupfs directories until it finds the
// directory of given cgroupId.
c.mtx.Lock()
defer c.mtx.Unlock()

path, err := cgroup.GetCgroupPath(c.cgroups.GetDefaultCgroup().GetMountPoint(), cgroupId, "")
if err == nil {
var stat syscall.Stat_t
if err = syscall.Stat(path, &stat); err == nil {
info, err := c.CgroupUpdate(cgroupId, path, time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec))
info, err := c.cgroupUpdate(cgroupId, path, time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec))
if err == nil {
return info
}
Expand All @@ -391,8 +396,8 @@ func (c *Containers) GetCgroupInfo(cgroupId uint64) CgroupInfo {
}

c.mtx.RLock()
defer c.mtx.RUnlock()
cgroupInfo := c.cgroupsMap[uint32(cgroupId)]
c.mtx.RUnlock()

return cgroupInfo
}
Expand Down
39 changes: 19 additions & 20 deletions pkg/ebpf/controlplane/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func NewController(bpfModule *libbpfgo.Module, cgroupManager *containers.Contain
lostSignalChan: make(chan uint64),
bpfModule: bpfModule,
cgroupManager: cgroupManager,
enrichEnabled: enrichEnabled,
}
p.signalBuffer, err = bpfModule.InitPerfBuf("signals", p.signalChan, p.lostSignalChan, 1024)
if err != nil {
Expand Down Expand Up @@ -83,27 +84,25 @@ func (p *Controller) Start() error {

func (p *Controller) Run(ctx context.Context) {
p.ctx = ctx
go func() {
for {
select {
case signalData := <-p.signalChan:
signal := signal{}
err := signal.Unmarshal(signalData)
if err != nil {
logger.Errorw("error unmarshaling signal ebpf buffer", "error", err)
continue
}
err = p.processSignal(signal)
if err != nil {
logger.Errorw("error processing control plane signal", "error", err)
}
case lost := <-p.lostSignalChan:
logger.Warnw(fmt.Sprintf("Lost %d control plane signals", lost))
case <-p.ctx.Done():
return
for {
select {
case signalData := <-p.signalChan:
signal := signal{}
err := signal.Unmarshal(signalData)
if err != nil {
logger.Errorw("error unmarshaling signal ebpf buffer", "error", err)
continue
}
err = p.processSignal(signal)
if err != nil {
logger.Errorw("error processing control plane signal", "error", err)
}
case lost := <-p.lostSignalChan:
logger.Warnw(fmt.Sprintf("Lost %d control plane signals", lost))
case <-p.ctx.Done():
return
}
}()
}
}

func (p *Controller) Stop() error {
Expand Down Expand Up @@ -160,7 +159,7 @@ func (p *Controller) processCgroupMkdir(args []trace.Argument) error {
if p.enrichEnabled {
// If cgroupId belongs to a container, enrich now (in a goroutine)
go func() {
_, err = p.cgroupManager.EnrichCgroupInfo(cgroupId)
_, err := p.cgroupManager.EnrichCgroupInfo(cgroupId)
if err != nil {
logger.Errorw("error triggering container enrich in control plane", "error", err)
}
Expand Down