Skip to content
This repository has been archived by the owner on Jul 27, 2023. It is now read-only.

Commit

Permalink
[proc] wip
Browse files Browse the repository at this point in the history
  • Loading branch information
shang-wang committed Feb 15, 2019
1 parent 987df41 commit eea30d8
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
2 changes: 2 additions & 0 deletions checks/net.go
Expand Up @@ -222,11 +222,13 @@ func batchConnections(cfg *config.AgentConfig, groupID int32, cxs []*model.Conne

for len(cxs) > 0 {
batchSize := min(cfg.MaxConnsPerMessage, len(cxs))
ctrByPid := Process.ctrByPid(connectionPIDs(cxs[:batchSize]))
batches = append(batches, &model.CollectorConnections{
HostName: cfg.HostName,
Connections: cxs[:batchSize],
GroupId: groupID,
GroupSize: groupSize,
CidByPid: ctrByPid,
})
cxs = cxs[batchSize:]
}
Expand Down
34 changes: 28 additions & 6 deletions checks/process.go
Expand Up @@ -30,6 +30,7 @@ type ProcessCheck struct {
lastCPUTime cpu.TimesStat
lastProcs map[int32]*process.FilledProcess
lastCtrRates map[string]util.ContainerRateMetrics
lastCidByPid map[int32]string
lastRun time.Time
}

Expand Down Expand Up @@ -74,6 +75,7 @@ func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Mess
p.lastProcs = procs
p.lastCPUTime = cpuTimes[0]
p.lastCtrRates = util.ExtractContainerRateMetric(ctrList)
p.lastCidByPid = cidByPid(ctrList)
p.lastRun = time.Now()
return nil, nil
}
Expand All @@ -89,6 +91,7 @@ func (p *ProcessCheck) Run(cfg *config.AgentConfig, groupID int32) ([]model.Mess
p.lastCtrRates = util.ExtractContainerRateMetric(ctrList)
p.lastCPUTime = cpuTimes[0]
p.lastRun = time.Now()
p.lastCidByPid = cidByPid(ctrList)

statsd.Client.Gauge("datadog.process.containers.host_count", float64(totalContainers), []string{}, 1)
statsd.Client.Gauge("datadog.process.processes.host_count", float64(totalProcs), []string{}, 1)
Expand Down Expand Up @@ -168,6 +171,16 @@ func chunkProcesses(procs []*model.Process, size int) [][]*model.Process {
return chunks
}

func cidByPid(ctrList []*containers.Container) map[int32]string {
cidByPid := make(map[int32]string, len(ctrList))
for _, c := range ctrList {
for _, p := range c.Pids {
cidByPid[p] = c.ID
}
}
return cidByPid
}

// fmtProcesses goes through each process, converts them to process object and group them by containers
// non-container processes would be in a single group with key as empty string ""
func fmtProcesses(
Expand All @@ -177,12 +190,7 @@ func fmtProcesses(
syst2, syst1 cpu.TimesStat,
lastRun time.Time,
) map[string][]*model.Process {
cidByPid := make(map[int32]string, len(ctrList))
for _, c := range ctrList {
for _, p := range c.Pids {
cidByPid[p] = c.ID
}
}
cidByPid := cidByPid(ctrList)

procsByCtr := make(map[string][]*model.Process)

Expand Down Expand Up @@ -308,6 +316,20 @@ func skipProcess(
return false
}

// ctrByPid uses lastCidByPid and filter down only the pids -> cid that we need
func (p *ProcessCheck) ctrByPid(pids []uint32) map[int32]string {
p.Lock()
defer p.Unlock()

ctrByPid := make(map[string][]uint32)
for _, pid := range pids {
if cid, ok := p.lastCidByPid[int32(pid)]; ok {
ctrByPid[cid] = append(ctrByPid[cid], pid)
}
}
return ctrByPid
}

func (p *ProcessCheck) createTimesforPIDs(pids []uint32) map[uint32]int64 {
p.Lock()
defer p.Unlock()
Expand Down
3 changes: 3 additions & 0 deletions proto/agent.proto
Expand Up @@ -56,6 +56,9 @@ message CollectorConnections {
// post-resolution field: all of the containers referenced in `connections` keyed by
// containerId
map<string, ContainerMetadata> resolvedContainers = 8;

// mapping of processes running in each container
map<int32, string> cidByPid = 10;
}

message CollectorRealTime {
Expand Down

0 comments on commit eea30d8

Please sign in to comment.