From eea30d80e1bfa746eab70fc0908289f6ead3d466 Mon Sep 17 00:00:00 2001 From: Shang Wang Date: Fri, 15 Feb 2019 13:56:34 -0500 Subject: [PATCH] [proc] wip --- checks/net.go | 2 ++ checks/process.go | 34 ++++++++++++++++++++++++++++------ proto/agent.proto | 3 +++ 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/checks/net.go b/checks/net.go index 561e67603..516858830 100644 --- a/checks/net.go +++ b/checks/net.go @@ -222,11 +222,13 @@ func batchConnections(cfg *config.AgentConfig, groupID int32, cxs []*model.Conne for len(cxs) > 0 { batchSize := min(cfg.MaxConnsPerMessage, len(cxs)) + ctrByPid := Process.ctrByPid(connectionPIDs(cxs[:batchSize])) batches = append(batches, &model.CollectorConnections{ HostName: cfg.HostName, Connections: cxs[:batchSize], GroupId: groupID, GroupSize: groupSize, + CidByPid: ctrByPid, }) cxs = cxs[batchSize:] } diff --git a/checks/process.go b/checks/process.go index 9b7d8f12b..4d138ffed 100644 --- a/checks/process.go +++ b/checks/process.go @@ -30,6 +30,7 @@ type ProcessCheck struct { lastCPUTime cpu.TimesStat lastProcs map[int32]*process.FilledProcess lastCtrRates map[string]util.ContainerRateMetrics + lastCidByPid map[int32]string lastRun time.Time } @@ -74,6 +75,7 @@ func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Mess p.lastProcs = procs p.lastCPUTime = cpuTimes[0] p.lastCtrRates = util.ExtractContainerRateMetric(ctrList) + p.lastCidByPid = cidByPid(ctrList) p.lastRun = time.Now() return nil, nil } @@ -89,6 +91,7 @@ func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Mess p.lastCtrRates = util.ExtractContainerRateMetric(ctrList) p.lastCPUTime = cpuTimes[0] p.lastRun = time.Now() + p.lastCidByPid = cidByPid(ctrList) statsd.Client.Gauge("datadog.process.containers.host_count", float64(totalContainers), []string{}, 1) statsd.Client.Gauge("datadog.process.processes.host_count", float64(totalProcs), []string{}, 1) @@ -168,6 +171,16 @@ func chunkProcesses(procs []*model.Process, size int) [][]*model.Process { return chunks } +func cidByPid(ctrList []*containers.Container) map[int32]string { + cidByPid := make(map[int32]string, len(ctrList)) + for _, c := range ctrList { + for _, p := range c.Pids { + cidByPid[p] = c.ID + } + } + return cidByPid +} + // fmtProcesses goes through each process, converts them to process object and group them by containers // non-container processes would be in a single group with key as empty string "" func fmtProcesses( @@ -177,12 +190,7 @@ func fmtProcesses( syst2, syst1 cpu.TimesStat, lastRun time.Time, ) map[string][]*model.Process { - cidByPid := make(map[int32]string, len(ctrList)) - for _, c := range ctrList { - for _, p := range c.Pids { - cidByPid[p] = c.ID - } - } + cidByPid := cidByPid(ctrList) procsByCtr := make(map[string][]*model.Process) @@ -308,6 +316,20 @@ func skipProcess( return false } +// ctrByPid uses lastCidByPid and filter down only the pids -> cid that we need +func (p *ProcessCheck) ctrByPid(pids []uint32) map[int32]string { + p.Lock() + defer p.Unlock() + + ctrByPid := make(map[string][]uint32) + for _, pid := range pids { + if cid, ok := p.lastCidByPid[int32(pid)]; ok { + ctrByPid[cid] = append(ctrByPid[cid], pid) + } + } + return ctrByPid +} + func (p *ProcessCheck) createTimesforPIDs(pids []uint32) map[uint32]int64 { p.Lock() defer p.Unlock() diff --git a/proto/agent.proto b/proto/agent.proto index ea676132d..4f30b10ea 100644 --- a/proto/agent.proto +++ b/proto/agent.proto @@ -56,6 +56,9 @@ message CollectorConnections { // post-resolution field: all of the containers referenced in `connections` keyed by // containerId map resolvedContainers = 8; + + // mapping of processes running in each container + map cidByPid = 10; } message CollectorRealTime {