Skip to content

Commit

Permalink
Use /proc fs to access mount namespace of files for hash calculation
Browse files Browse the repository at this point in the history
The `/proc` fs is now used to acess the filesystem mount of a container,
when enriching exec events with files hashes. This allows to drop the
required config option to the containerd path.
  • Loading branch information
patrickpichler committed Mar 13, 2024
1 parent 15bdb2b commit 88fac20
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 49 deletions.
7 changes: 5 additions & 2 deletions cmd/agent/daemon/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io/fs"
"net/http"
"net/http/pprof"
"os"
Expand Down Expand Up @@ -45,7 +46,6 @@ type Config struct {
PyroscopeAddr string
IngestorAddr string
ContainerdSockPath string
ContainersBasePath string
HostCgroupsDir string
HostProcDir string
TCPSampleOutputMinDurationSeconds int
Expand Down Expand Up @@ -217,7 +217,10 @@ func (a *App) Run(ctx context.Context) error {
enrichmentService := enrichment.NewService(log, enrichment.Config{
WorkerCount: runtime.NumCPU(),
EventEnrichers: []enrichment.EventEnricher{
enrichment.EnrichWithFileHash(log, os.DirFS(a.cfg.ContainersBasePath)),
enrichment.EnrichWithFileHash(log,
containersClient.LookupContainerForCgroupInCache,
// DirFS guarantees to return a fs.StatFS implementation, hence we can simply cast it here
os.DirFS(a.cfg.HostProcDir).(fs.StatFS)),
},
EnrichableEvents: []castpb.EventType{
castpb.EventType_EVENT_EXEC,
Expand Down
2 changes: 0 additions & 2 deletions cmd/agent/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var (
logRateBurst = pflag.Int("log-rate-burst", 100, "Log rate burst")

sendLogLevel = pflag.String("send-logs-level", "", "send logs level")
containerdK8sPath = pflag.String("containerd-k8s-path", "/run/containerd/io.containerd.runtime.v2.task/k8s.io", "Path to containerd k8s containers")
containerdSockPath = pflag.String("containerd-sock", "/run/containerd/containerd.sock", "Path to containerd socket file")
ingestorAddr = pflag.String("ingestor-server-addr", "kvisord-server.kvisord.svc.cluster.local.:6061", "Ingestor server grpc API address")
eventsQueueSize = pflag.Int("events-queue-size", 65536, "Events batch size")
Expand Down Expand Up @@ -91,7 +90,6 @@ func NewCommand(version string) *cobra.Command {
PyroscopeAddr: *pyroscopeAddr,
IngestorAddr: *ingestorAddr,
ContainerdSockPath: *containerdSockPath,
ContainersBasePath: *containerdK8sPath,
HostCgroupsDir: *hostCgroupsDir,
HostProcDir: *hostProcDir,
TCPSampleOutputMinDurationSeconds: *bpfTCPSampleSeconds,
Expand Down
151 changes: 129 additions & 22 deletions cmd/agent/daemon/enrichment/enrichers.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,149 @@
package enrichment

import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"path/filepath"
"strconv"
"syscall"

castpb "github.com/castai/kvisor/api/v1/runtime"
"github.com/castai/kvisor/pkg/containers"
"github.com/castai/kvisor/pkg/logging"
"github.com/minio/sha256-simd"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/minio/sha256-simd"
)

func EnrichWithFileHash(log *logging.Logger, fsys fs.FS) func(*castpb.Event) {
return func(e *castpb.Event) {
exec := e.GetExec()
if exec == nil || exec.Path == "" {
return
}
type fileHashCacheKey string
type ContainerForCgroupGetter func(cgroup uint64) (*containers.Container, bool, error)

func EnrichWithFileHash(log *logging.Logger, containerForCgroupGetter ContainerForCgroupGetter, procFS fs.StatFS) EventEnricher {
cache, err := lru.New[fileHashCacheKey, []byte](1024)
if err != nil {
// This case can never happen, as err is only thrown if cache size is <0, which it isn't.
panic(err)
}

return &fileHashEnricher{
log: log,
containerForCgroupGetter: containerForCgroupGetter,
procFS: procFS,
cache: cache,
}
}

type fileHashEnricher struct {
log *logging.Logger
containerForCgroupGetter ContainerForCgroupGetter
procFS fs.StatFS
cache *lru.Cache[fileHashCacheKey, []byte]
}

func (enricher *fileHashEnricher) Enrich(ctx context.Context, e *castpb.Event) {
exec := e.GetExec()
if exec == nil || exec.Path == "" {
return
}

cont, found, err := enricher.containerForCgroupGetter(e.CgroupId)
// if we do not have the container cached, we cannot easily accesss the FS and hence simply skip
if err != nil || !found {
return
}

path := filepath.Join(e.ContainerId, "rootfs", exec.Path)
f, err := fsys.Open(path)
for _, pid := range cont.PIDs {
sha, err := enricher.calcFileHashForPID(cont, pid, exec.Path)
// We search for the first PID we can successfully calculate a filehash for.
if err != nil {
return
}
defer f.Close()
if errors.Is(err, ErrFileDoesNotExist) {
// If the wanted file does not exist in the PID mount namespace, it will also not exist in the mounts of the other.
// We can hence simply return, as we will not find the wanted file.
return
}

h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
log.Debugf("error while processing `%s`: %v", path, err)
return
continue
}

sum := h.Sum(nil)
if exec.Meta == nil {
exec.Meta = &castpb.ExecMetadata{
HashSha256: sum,
}
} else {
exec.Meta.HashSha256 = sum
exec.Meta = &castpb.ExecMetadata{}
}

exec.Meta.HashSha256 = sha
return
}
}

func (enricher *fileHashEnricher) calcFileHashForPID(cont *containers.Container, pid uint32, execPath string) ([]byte, error) {
pidString := strconv.FormatInt(int64(pid), 10)

_, err := enricher.procFS.Stat(pidString)
if err != nil {
// If the /proc/<pid> folder doesn't exist, there is nothing we can do.
return nil, ErrProcFolderDoesNotExist
}

path := filepath.Join(pidString, "root", execPath)
info, err := enricher.procFS.Stat(path)
if err != nil {
// If the wanted file does not exist inside the mount namespace, there is also nothing we can do.
return nil, ErrFileDoesNotExist
}

key := enricher.buildCacheKey(cont, info)
hash, found := enricher.checkCache(key)
if found {
return hash, nil
}

f, err := enricher.procFS.Open(path)
if err != nil {
return nil, ErrFileDoesNotExist
}
defer f.Close()

h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
return nil, err
}

hash = h.Sum(nil)
enricher.cacheHash(key, hash)

return hash, nil
}

var (
ErrCannotGetInode = errors.New("cannot get inode for path")
ErrProcFolderDoesNotExist = errors.New("/proc/<pid> folder does not exist")
ErrFileDoesNotExist = errors.New("wanted file does not exist")
)

func (enricher *fileHashEnricher) buildCacheKey(cont *containers.Container, info fs.FileInfo) fileHashCacheKey {
stat, ok := info.Sys().(*syscall.Stat_t)
if !ok {
return ""
}

return fileHashCacheKey(fmt.Sprintf("%s:%d", cont.Cgroup.ContainerID, stat.Ino))
}

func (enricher *fileHashEnricher) checkCache(key fileHashCacheKey) ([]byte, bool) {
if key == "" {
// An empty key indicates an error when calculating the hash key, hence we treat it as not cached.
return nil, false
}

return enricher.cache.Get(key)
}

func (enricher *fileHashEnricher) cacheHash(key fileHashCacheKey, hash []byte) {
if key == "" {
// An empty key indicates an error when calculating the hash key, hence nothing will be cached
return
}

enricher.cache.Add(key, hash)
}
Loading

0 comments on commit 88fac20

Please sign in to comment.