Skip to content

Commit

Permalink
Implement CRI container and pods stats
Browse files Browse the repository at this point in the history
See https://kep.k8s.io/2371

* Implement new CRI RPCs - `ListPodSandboxStats` and `PodSandboxStats`
  * `ListPodSandboxStats` and `PodSandboxStats` which return stats about
    pod sandbox. To obtain pod sandbox stats, underlying metrics are
    read from the pod sandbox cgroup parent.
  * Process info is obtained by calling into the underlying task
  * Network stats are taken by looking up network metrics based on the
    pod sandbox network namespace path
* Return more detailed stats for cpu and memory for existing container
  stats. These metrics use the underlying task's metrics to obtain
  stats.

Signed-off-by: David Porter <porterdavid@google.com>
  • Loading branch information
bobbypage committed Nov 4, 2021
1 parent b69bbe2 commit 2e6d570
Show file tree
Hide file tree
Showing 14 changed files with 1,089 additions and 32 deletions.
217 changes: 185 additions & 32 deletions pkg/cri/server/container_stats_list_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package server

import (
"time"

"github.com/containerd/containerd/api/types"
v1 "github.com/containerd/containerd/metrics/types/v1"
v2 "github.com/containerd/containerd/metrics/types/v2"
Expand All @@ -25,6 +27,7 @@ import (
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"

containerstore "github.com/containerd/containerd/pkg/cri/store/container"
stats "github.com/containerd/containerd/pkg/cri/store/stats"
)

func (c *criService) containerMetrics(
Expand Down Expand Up @@ -60,43 +63,87 @@ func (c *criService) containerMetrics(
if err != nil {
return nil, errors.Wrap(err, "failed to extract container metrics")
}
switch metrics := s.(type) {
case *v1.Metrics:
if metrics.CPU != nil && metrics.CPU.Usage != nil {
cs.Cpu = &runtime.CpuUsage{
Timestamp: stats.Timestamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: metrics.CPU.Usage.Total},
}
}
if metrics.Memory != nil && metrics.Memory.Usage != nil {
cs.Memory = &runtime.MemoryUsage{
Timestamp: stats.Timestamp.UnixNano(),
WorkingSetBytes: &runtime.UInt64Value{
Value: getWorkingSet(metrics.Memory),
},
}
}
case *v2.Metrics:
if metrics.CPU != nil {
cs.Cpu = &runtime.CpuUsage{
Timestamp: stats.Timestamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: metrics.CPU.UsageUsec * 1000},
}

cpuStats, err := c.cpuContainerStats(meta.ID, false /* isSandbox */, s, stats.Timestamp)
if err != nil {
return nil, errors.Wrap(err, "failed to obtain cpu stats")
}
cs.Cpu = cpuStats

memoryStats, err := c.memoryContainerStats(meta.ID, s, stats.Timestamp)
if err != nil {
return nil, errors.Wrap(err, "failed to obtain memory stats")
}
cs.Memory = memoryStats
}

return &cs, nil
}

func (c *criService) getUsageNanoCores(containerID string, isSandbox bool, currentUsageCoreNanoSeconds uint64, currentTimestamp time.Time) (uint64, error) {
var oldStats *stats.ContainerStats

if isSandbox {
sandbox, err := c.sandboxStore.Get(containerID)
if err != nil {
return 0, errors.Wrapf(err, "failed to get sandbox container: %s", containerID)
}
oldStats = sandbox.Stats
} else {
container, err := c.containerStore.Get(containerID)
if err != nil {
return 0, errors.Wrapf(err, "failed to get container ID: %s", containerID)
}
oldStats = container.Stats
}

if oldStats == nil {
newStats := &stats.ContainerStats{
UsageCoreNanoSeconds: currentUsageCoreNanoSeconds,
Timestamp: currentTimestamp,
}
if isSandbox {
err := c.sandboxStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, errors.Wrapf(err, "failed to update sandbox stats container ID: %s", containerID)
}
if metrics.Memory != nil {
cs.Memory = &runtime.MemoryUsage{
Timestamp: stats.Timestamp.UnixNano(),
WorkingSetBytes: &runtime.UInt64Value{
Value: getWorkingSetV2(metrics.Memory),
},
}
} else {
err := c.containerStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, errors.Wrapf(err, "failed to update container stats ID: %s", containerID)
}
default:
return &cs, errors.Errorf("unexpected metrics type: %v", metrics)
}
return 0, nil
}

return &cs, nil
nanoSeconds := currentTimestamp.UnixNano() - oldStats.Timestamp.UnixNano()

// zero or negative interval
if nanoSeconds <= 0 {
return 0, nil
}

newUsageNanoCores := uint64(float64(currentUsageCoreNanoSeconds-oldStats.UsageCoreNanoSeconds) /
float64(nanoSeconds) * float64(time.Second/time.Nanosecond))

newStats := &stats.ContainerStats{
UsageCoreNanoSeconds: currentUsageCoreNanoSeconds,
Timestamp: currentTimestamp,
}
if isSandbox {
err := c.sandboxStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, errors.Wrapf(err, "failed to update sandbox container stats: %s", containerID)
}

} else {
err := c.containerStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, errors.Wrapf(err, "failed to update container stats ID: %s", containerID)
}
}

return newUsageNanoCores, nil
}

// getWorkingSet calculates workingset memory from cgroup memory stats.
Expand All @@ -123,3 +170,109 @@ func getWorkingSetV2(memory *v2.MemoryStat) uint64 {
}
return workingSet
}

func isMemoryUnlimited(v uint64) bool {
// Size after which we consider memory to be "unlimited". This is not
// MaxInt64 due to rounding by the kernel.
// TODO: k8s or cadvisor should export this https://github.com/google/cadvisor/blob/2b6fbacac7598e0140b5bc8428e3bdd7d86cf5b9/metrics/prometheus.go#L1969-L1971
const maxMemorySize = uint64(1 << 62)

return v > maxMemorySize
}

// https://github.com/kubernetes/kubernetes/blob/b47f8263e18c7b13dba33fba23187e5e0477cdbd/pkg/kubelet/stats/helper.go#L68-L71
func getAvailableBytes(memory *v1.MemoryStat, workingSetBytes uint64) uint64 {
// memory limit - working set bytes
if !isMemoryUnlimited(memory.Usage.Limit) {
return memory.Usage.Limit - workingSetBytes
}
return 0
}

func getAvailableBytesV2(memory *v2.MemoryStat, workingSetBytes uint64) uint64 {
// memory limit (memory.max) for cgroupv2 - working set bytes
if !isMemoryUnlimited(memory.UsageLimit) {
return memory.UsageLimit - workingSetBytes
}
return 0
}

func (c *criService) cpuContainerStats(ID string, isSandbox bool, stats interface{}, timestamp time.Time) (*runtime.CpuUsage, error) {
switch metrics := stats.(type) {
case *v1.Metrics:
if metrics.CPU != nil && metrics.CPU.Usage != nil {

usageNanoCores, err := c.getUsageNanoCores(ID, isSandbox, metrics.CPU.Usage.Total, timestamp)
if err != nil {
return nil, errors.Wrapf(err, "failed to get usage nano cores, containerID: %s", ID)
}

return &runtime.CpuUsage{
Timestamp: timestamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: metrics.CPU.Usage.Total},
UsageNanoCores: &runtime.UInt64Value{Value: usageNanoCores},
}, nil
}
case *v2.Metrics:
if metrics.CPU != nil {
// convert to nano seconds
usageCoreNanoSeconds := metrics.CPU.UsageUsec * 1000

usageNanoCores, err := c.getUsageNanoCores(ID, isSandbox, usageCoreNanoSeconds, timestamp)
if err != nil {
return nil, errors.Wrapf(err, "failed to get usage nano cores, containerID: %s", ID)
}

return &runtime.CpuUsage{
Timestamp: timestamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: usageCoreNanoSeconds},
UsageNanoCores: &runtime.UInt64Value{Value: usageNanoCores},
}, nil
}
default:
return nil, errors.Errorf("unexpected metrics type: %v", metrics)
}
return nil, nil
}

func (c *criService) memoryContainerStats(ID string, stats interface{}, timestamp time.Time) (*runtime.MemoryUsage, error) {
switch metrics := stats.(type) {
case *v1.Metrics:
if metrics.Memory != nil && metrics.Memory.Usage != nil {
workingSetBytes := getWorkingSet(metrics.Memory)

return &runtime.MemoryUsage{
Timestamp: timestamp.UnixNano(),
WorkingSetBytes: &runtime.UInt64Value{
Value: workingSetBytes,
},
AvailableBytes: &runtime.UInt64Value{Value: getAvailableBytes(metrics.Memory, workingSetBytes)},
UsageBytes: &runtime.UInt64Value{Value: metrics.Memory.Usage.Usage},
RssBytes: &runtime.UInt64Value{Value: metrics.Memory.TotalRSS},
PageFaults: &runtime.UInt64Value{Value: metrics.Memory.TotalPgFault},
MajorPageFaults: &runtime.UInt64Value{Value: metrics.Memory.TotalPgMajFault},
}, nil
}
case *v2.Metrics:
if metrics.Memory != nil {
workingSetBytes := getWorkingSetV2(metrics.Memory)

return &runtime.MemoryUsage{
Timestamp: timestamp.UnixNano(),
WorkingSetBytes: &runtime.UInt64Value{
Value: workingSetBytes,
},
AvailableBytes: &runtime.UInt64Value{Value: getAvailableBytesV2(metrics.Memory, workingSetBytes)},
UsageBytes: &runtime.UInt64Value{Value: metrics.Memory.Usage},
// Use Anon memory for RSS as cAdvisor on cgroupv2
// see https://github.com/google/cadvisor/blob/a9858972e75642c2b1914c8d5428e33e6392c08a/container/libcontainer/handler.go#L799
RssBytes: &runtime.UInt64Value{Value: metrics.Memory.Anon},
PageFaults: &runtime.UInt64Value{Value: metrics.Memory.Pgfault},
MajorPageFaults: &runtime.UInt64Value{Value: metrics.Memory.Pgmajfault},
}, nil
}
default:
return nil, errors.Errorf("unexpected metrics type: %v", metrics)
}
return nil, nil
}

0 comments on commit 2e6d570

Please sign in to comment.