diff --git a/internal/cri/config/config.go b/internal/cri/config/config.go index 1479a684e4f3..4609c02705b0 100644 --- a/internal/cri/config/config.go +++ b/internal/cri/config/config.go @@ -409,6 +409,9 @@ type RuntimeConfig struct { // IgnoreDeprecationWarnings is the list of the deprecation IDs (such as "io.containerd.deprecation/pull-schema-1-image") // that should be ignored for checking "ContainerdHasNoDeprecationWarnings" condition. IgnoreDeprecationWarnings []string `toml:"ignore_deprecation_warnings" json:"ignoreDeprecationWarnings"` + + // StatsMetricsPeriod is the period (in seconds) of containersm metrics stats collection. + StatsMetricsPeriod int `toml:"stats_metrics_period" json:"statsMetricsPeriod"` } // X509KeyPairStreaming contains the x509 configuration for streaming diff --git a/internal/cri/server/container_stats_list.go b/internal/cri/server/container_stats_list.go index 476d4b999e05..5ca9249cf462 100644 --- a/internal/cri/server/container_stats_list.go +++ b/internal/cri/server/container_stats_list.go @@ -160,13 +160,20 @@ func (c *criService) toContainerStats( return nil, fmt.Errorf("failed to decode container metrics for %q: %w", cntr.ID, err) } - if cs.stats.Cpu != nil && cs.stats.Cpu.UsageCoreNanoSeconds != nil { - // this is a calculated value and should be computed for all OSes - nanoUsage, err := c.getUsageNanoCores(cntr.Metadata.ID, false, cs.stats.Cpu.UsageCoreNanoSeconds.Value, time.Unix(0, cs.stats.Cpu.Timestamp)) - if err != nil { - return nil, fmt.Errorf("failed to get usage nano cores, containerID: %s: %w", cntr.Metadata.ID, err) + if cs.stats.Cpu != nil { + // check if stats metrics has scheduled collection enabled + // otherwise request at runtime. + if c.config.StatsMetricsPeriod == 0 && cs.stats.Cpu.UsageCoreNanoSeconds != nil { + nanoUsage, err := c.getUsageNanoCores(cntr.Metadata.ID, false, cs.stats.Cpu.UsageCoreNanoSeconds.Value, time.Unix(0, cs.stats.Cpu.Timestamp)) + if err != nil { + return nil, fmt.Errorf("failed to get usage nano cores, containerID: %s: %w", cntr.Metadata.ID, err) + } + cs.stats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoUsage} + } else if cntr.Stats != nil { + // this is a calculated value and should be computed for all OSes + // this is a calculated value every 10s + cs.stats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: cntr.Stats.UsageNanoCores} } - cs.stats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoUsage} } css = append(css, cs) } @@ -326,14 +333,15 @@ func (c *criService) windowsContainerMetrics( } if stats != nil { - s, err := typeurl.UnmarshalAny(stats.Data) + data, err := convertMetric(stats) if err != nil { return containerStats{}, fmt.Errorf("failed to extract container metrics: %w", err) } - wstats := s.(*wstats.Statistics).GetWindows() + wstats := data.(*wstats.Statistics).GetWindows() if wstats == nil { return containerStats{}, errors.New("windows stats is empty") } + if wstats.Processor != nil { cs.Cpu = &runtime.CpuUsage{ Timestamp: (protobuf.FromTimestamp(wstats.Timestamp)).UnixNano(), @@ -382,38 +390,27 @@ func (c *criService) linuxContainerMetrics( } if stats != nil { - var data interface{} + data, err := convertMetric(stats) + if err != nil { + return containerStats{}, err + } + switch { - case typeurl.Is(stats.Data, (*cg1.Metrics)(nil)): - data = &cg1.Metrics{} - if err := typeurl.UnmarshalTo(stats.Data, data); err != nil { - return containerStats{}, fmt.Errorf("failed to extract container metrics: %w", err) - } - pids = data.(*cg1.Metrics).GetPids().GetCurrent() case typeurl.Is(stats.Data, (*cg2.Metrics)(nil)): - data = &cg2.Metrics{} - if err := typeurl.UnmarshalTo(stats.Data, data); err != nil { - return containerStats{}, fmt.Errorf("failed to extract container metrics: %w", err) - } pids = data.(*cg2.Metrics).GetPids().GetCurrent() - default: - return containerStats{}, errors.New("cannot convert metric data to cgroups.Metrics") } - cpuStats, err := c.cpuContainerStats(meta.ID, false /* isSandbox */, data, protobuf.FromTimestamp(stats.Timestamp)) + cpuStats, err := c.cpuContainerStats(data, protobuf.FromTimestamp(stats.Timestamp)) if err != nil { return containerStats{}, fmt.Errorf("failed to obtain cpu stats: %w", err) } cs.Cpu = cpuStats - memoryStats, err := c.memoryContainerStats(meta.ID, data, protobuf.FromTimestamp(stats.Timestamp)) + memoryStats, err := c.memoryContainerStats(data, protobuf.FromTimestamp(stats.Timestamp)) if err != nil { return containerStats{}, fmt.Errorf("failed to obtain memory stats: %w", err) } cs.Memory = memoryStats - if err != nil { - return containerStats{}, fmt.Errorf("failed to obtain pid count: %w", err) - } } return containerStats{&cs, pids}, nil @@ -470,7 +467,7 @@ func getAvailableBytesV2(memory *cg2.MemoryStat, workingSetBytes uint64) uint64 return 0 } -func (c *criService) cpuContainerStats(ID string, isSandbox bool, stats interface{}, timestamp time.Time) (*runtime.CpuUsage, error) { +func (c *criService) cpuContainerStats(stats interface{}, timestamp time.Time) (*runtime.CpuUsage, error) { switch metrics := stats.(type) { case *cg1.Metrics: metrics.GetCPU().GetUsage() @@ -496,7 +493,7 @@ func (c *criService) cpuContainerStats(ID string, isSandbox bool, stats interfac return nil, nil } -func (c *criService) memoryContainerStats(ID string, stats interface{}, timestamp time.Time) (*runtime.MemoryUsage, error) { +func (c *criService) memoryContainerStats(stats interface{}, timestamp time.Time) (*runtime.MemoryUsage, error) { switch metrics := stats.(type) { case *cg1.Metrics: if metrics.Memory != nil && metrics.Memory.Usage != nil { diff --git a/internal/cri/server/container_stats_list_test.go b/internal/cri/server/container_stats_list_test.go index 5154d1183d3a..bbeba9f6a73c 100644 --- a/internal/cri/server/container_stats_list_test.go +++ b/internal/cri/server/container_stats_list_test.go @@ -329,7 +329,7 @@ func TestContainerMetricsMemory(t *testing.T) { } { test := test t.Run(test.desc, func(t *testing.T) { - got, err := c.memoryContainerStats("ID", test.metrics, timestamp) + got, err := c.memoryContainerStats(test.metrics, timestamp) assert.NoError(t, err) assert.Equal(t, test.expected, got) }) diff --git a/internal/cri/server/sandbox_stats_linux.go b/internal/cri/server/sandbox_stats_linux.go index e5ab75f0b733..72f395acc496 100644 --- a/internal/cri/server/sandbox_stats_linux.go +++ b/internal/cri/server/sandbox_stats_linux.go @@ -59,13 +59,17 @@ func (c *criService) podSandboxStats( if stats != nil { timestamp := time.Now() - cpuStats, err := c.cpuContainerStats(meta.ID, true /* isSandbox */, stats, timestamp) + cpuStats, err := c.cpuContainerStats(stats, timestamp) if err != nil { return nil, fmt.Errorf("failed to obtain cpu stats: %w", err) } + if cpuStats != nil && sandbox.Stats != nil { + // this is a calculated value every metrics synchronization period + cpuStats.UsageNanoCores = &runtime.UInt64Value{Value: sandbox.Stats.UsageNanoCores} + } podSandboxStats.Linux.Cpu = cpuStats - memoryStats, err := c.memoryContainerStats(meta.ID, stats, timestamp) + memoryStats, err := c.memoryContainerStats(stats, timestamp) if err != nil { return nil, fmt.Errorf("failed to obtain memory stats: %w", err) } diff --git a/internal/cri/server/sandbox_stats_windows.go b/internal/cri/server/sandbox_stats_windows.go index 83a9bcea040c..cd2797937860 100644 --- a/internal/cri/server/sandbox_stats_windows.go +++ b/internal/cri/server/sandbox_stats_windows.go @@ -86,7 +86,9 @@ func (c *criService) podSandboxStats( ProcessCount: &runtime.UInt64Value{Value: pidCount}, } - c.saveSandBoxMetrics(podSandboxStats.Attributes.Id, podSandboxStats) + if c.config.StatsMetricsPeriod == 0 { + c.saveSandBoxMetrics(podSandboxStats.Attributes.Id, podSandboxStats) + } return podSandboxStats, nil } @@ -132,6 +134,7 @@ func (c *criService) toPodSandboxStats(sandbox sandboxstore.Sandbox, statsMap ma return nil, nil, fmt.Errorf("failed to covert container metrics for sandbox with id %s: %w", sandbox.ID, err) } + containerNanoCoreTotal := uint64(0) windowsContainerStats := make([]*runtime.WindowsContainerStats, 0, len(statsMap)) for _, cntr := range containers { containerMetric := statsMap[cntr.ID] @@ -153,9 +156,19 @@ func (c *criService) toPodSandboxStats(sandbox sandboxstore.Sandbox, statsMap ma } // Calculate NanoCores for container - if containerStats.Cpu != nil && containerStats.Cpu.UsageCoreNanoSeconds != nil { - nanoCoreUsage := getUsageNanoCores(containerStats.Cpu.UsageCoreNanoSeconds.Value, cntr.Stats, containerStats.Cpu.Timestamp) - containerStats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoCoreUsage} + + if containerStats.Cpu != nil { + if c.config.StatsMetricsPeriod == 0 && containerStats.Cpu.UsageCoreNanoSeconds != nil { + nanoCoreUsage := getUsageNanoCores(containerStats.Cpu.UsageCoreNanoSeconds.Value, cntr.Stats, containerStats.Cpu.Timestamp) + containerStats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoCoreUsage} + } else { + containerNanoCores := uint64(0) + if cntr.Stats != nil { + containerNanoCores = cntr.Stats.UsageNanoCores + } + containerNanoCoreTotal += containerNanoCores + containerStats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: containerNanoCores} + } } // On Windows we need to add up all the podStatsData to get the Total for the Pod as there isn't something @@ -189,9 +202,21 @@ func (c *criService) toPodSandboxStats(sandbox sandboxstore.Sandbox, statsMap ma } // Calculate NanoCores for pod after adding containers cpu including the pods cpu - if podRuntimeStats.Cpu != nil && podRuntimeStats.Cpu.UsageCoreNanoSeconds != nil { - nanoCoreUsage := getUsageNanoCores(podRuntimeStats.Cpu.UsageCoreNanoSeconds.Value, sandbox.Stats, podRuntimeStats.Cpu.Timestamp) - podRuntimeStats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoCoreUsage} + if podRuntimeStats.Cpu != nil { + if c.config.StatsMetricsPeriod == 0 && podRuntimeStats.Cpu.UsageCoreNanoSeconds != nil { + nanoCoreUsage := getUsageNanoCores(podRuntimeStats.Cpu.UsageCoreNanoSeconds.Value, sandbox.Stats, podRuntimeStats.Cpu.Timestamp) + podRuntimeStats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoCoreUsage} + } else { + sandboxNanoCores := uint64(0) + if sandbox.Stats != nil { + sandboxNanoCores = sandbox.Stats.UsageNanoCores + } + // There is not a cgroup equivalent on windows where we can get the total cpu usage for the pod + // The sandbox container stats are for the "sandbox" container only + // To get the total for the pod we need to add the sandbox container stats to the total of the containers + // This could possibly change when internal/cri/server/podsandbox/sandbox_stats.go is implemented + podRuntimeStats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: sandboxNanoCores + containerNanoCoreTotal} + } } return podRuntimeStats, windowsContainerStats, nil diff --git a/internal/cri/server/sandbox_stats_windows_test.go b/internal/cri/server/sandbox_stats_windows_test.go index f0afe6e03435..da499648b7f7 100644 --- a/internal/cri/server/sandbox_stats_windows_test.go +++ b/internal/cri/server/sandbox_stats_windows_test.go @@ -29,55 +29,6 @@ import ( runtime "k8s.io/cri-api/pkg/apis/runtime/v1" ) -func TestGetUsageNanoCores(t *testing.T) { - timestamp := time.Now() - secondAfterTimeStamp := timestamp.Add(time.Second) - ID := "ID" - - for _, test := range []struct { - desc string - firstCPUValue uint64 - secondCPUValue uint64 - expectedNanoCoreUsageFirst uint64 - expectedNanoCoreUsageSecond uint64 - }{ - { - desc: "metrics", - firstCPUValue: 50, - secondCPUValue: 500, - expectedNanoCoreUsageFirst: 0, - expectedNanoCoreUsageSecond: 450, - }, - } { - test := test - t.Run(test.desc, func(t *testing.T) { - container, err := containerstore.NewContainer( - containerstore.Metadata{ID: ID}, - ) - assert.NoError(t, err) - - // calculate for first iteration - // first run so container stats will be nil - assert.Nil(t, container.Stats) - cpuUsage := getUsageNanoCores(test.firstCPUValue, container.Stats, timestamp.UnixNano()) - assert.NoError(t, err) - assert.Equal(t, test.expectedNanoCoreUsageFirst, cpuUsage) - - // fill in the stats as if they now exist - container.Stats = &stats.ContainerStats{} - container.Stats.UsageCoreNanoSeconds = test.firstCPUValue - container.Stats.Timestamp = timestamp - assert.NotNil(t, container.Stats) - - // calculate for second iteration - cpuUsage = getUsageNanoCores(test.secondCPUValue, container.Stats, secondAfterTimeStamp.UnixNano()) - assert.NoError(t, err) - assert.Equal(t, test.expectedNanoCoreUsageSecond, cpuUsage) - }) - } - -} - func Test_criService_podSandboxStats(t *testing.T) { initialStatsTimestamp := time.Now() currentStatsTimestamp := initialStatsTimestamp.Add(time.Second) @@ -250,11 +201,12 @@ func Test_criService_podSandboxStats(t *testing.T) { Container: windowsStat(currentStatsTimestamp, 400, 20, 20), }, }, - sandbox: sandboxPod("s1", initialStatsTimestamp, 400), + sandbox: sandboxPod("s1", initialStatsTimestamp, 400, 200), containers: []containerstore.Container{ newContainer("c1", running, &stats.ContainerStats{ Timestamp: initialStatsTimestamp, UsageCoreNanoSeconds: 200, + UsageNanoCores: 200, }), }, expectedPodStats: &expectedStats{ @@ -281,11 +233,12 @@ func Test_criService_podSandboxStats(t *testing.T) { }, "s1": nil, }, - sandbox: sandboxPod("s1", initialStatsTimestamp, 200), + sandbox: sandboxPod("s1", initialStatsTimestamp, 200, 0), containers: []containerstore.Container{ newContainer("c1", running, &stats.ContainerStats{ Timestamp: initialStatsTimestamp, UsageCoreNanoSeconds: 200, + UsageNanoCores: 200, }), }, expectedPodStats: &expectedStats{ @@ -312,11 +265,12 @@ func Test_criService_podSandboxStats(t *testing.T) { }, "s1": {}, }, - sandbox: sandboxPod("s1", initialStatsTimestamp, 200), + sandbox: sandboxPod("s1", initialStatsTimestamp, 200, 0), containers: []containerstore.Container{ newContainer("c1", running, &stats.ContainerStats{ Timestamp: initialStatsTimestamp, UsageCoreNanoSeconds: 200, + UsageNanoCores: 200, }), }, expectedPodStats: &expectedStats{ @@ -341,7 +295,7 @@ func Test_criService_podSandboxStats(t *testing.T) { "c1": {}, "s1": {}, }, - sandbox: sandboxPod("s1", initialStatsTimestamp, 200), + sandbox: sandboxPod("s1", initialStatsTimestamp, 200, 0), containers: []containerstore.Container{ newContainer("c1", running, &stats.ContainerStats{ Timestamp: initialStatsTimestamp, @@ -355,7 +309,7 @@ func Test_criService_podSandboxStats(t *testing.T) { { desc: "pod sandbox with no stats in metric mapp will fail", metrics: map[string]*wstats.Statistics{}, - sandbox: sandboxPod("s1", initialStatsTimestamp, 200), + sandbox: sandboxPod("s1", initialStatsTimestamp, 200, 0), containers: []containerstore.Container{}, expectedPodStats: nil, expectedContainerStats: []expectedStats{}, @@ -378,7 +332,7 @@ func Test_criService_podSandboxStats(t *testing.T) { } assert.Equal(t, test.expectedPodStats.UsageCoreNanoSeconds, actualPodStats.Cpu.UsageCoreNanoSeconds.Value) - assert.Equal(t, test.expectedPodStats.UsageNanoCores, actualPodStats.Cpu.UsageNanoCores.Value) + assert.Equal(t, int(test.expectedPodStats.UsageNanoCores), int(actualPodStats.Cpu.UsageNanoCores.Value)) for i, expectedStat := range test.expectedContainerStats { actutalStat := actualContainerStats[i] @@ -390,12 +344,13 @@ func Test_criService_podSandboxStats(t *testing.T) { } } -func sandboxPod(id string, timestamp time.Time, cachedCPU uint64) sandboxstore.Sandbox { +func sandboxPod(id string, timestamp time.Time, cachedCPU uint64, cachedNanoCores uint64) sandboxstore.Sandbox { return sandboxstore.Sandbox{ Metadata: sandboxstore.Metadata{ID: id, RuntimeHandler: "runc"}, Stats: &stats.ContainerStats{ Timestamp: timestamp, UsageCoreNanoSeconds: cachedCPU, + UsageNanoCores: cachedNanoCores, }} } diff --git a/internal/cri/server/service.go b/internal/cri/server/service.go index 4c215c2a65eb..38d601671901 100644 --- a/internal/cri/server/service.go +++ b/internal/cri/server/service.go @@ -157,6 +157,8 @@ type criService struct { sandboxService sandboxService // runtimeHandlers contains runtime handler info runtimeHandlers []*runtime.RuntimeHandler + // metric Monitor is used to calculate and cache stat values + metricMonitor *metricMonitor } type CRIServiceOptions struct { @@ -220,6 +222,7 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi } c.eventMonitor = events.NewEventMonitor(&criEventHandler{c: c}) + c.metricMonitor = newMetricMonitor(c) c.cniNetConfMonitor = make(map[string]*cniNetConfSyncer) for name, i := range c.netPlugin { @@ -287,6 +290,11 @@ func (c *criService) Run(ready func()) error { }() } + if c.config.StatsMetricsPeriod > 0 { + log.L.Info("Starting metric cache provider") + c.metricMonitor.Start() + } + // Start streaming server. log.L.Info("Start streaming server") streamServerErrCh := make(chan error) diff --git a/internal/cri/server/stats.go b/internal/cri/server/stats.go new file mode 100644 index 000000000000..3c0940c4e10f --- /dev/null +++ b/internal/cri/server/stats.go @@ -0,0 +1,200 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package server + +import ( + "context" + "fmt" + "time" + + wstats "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/stats" + cg1 "github.com/containerd/cgroups/v3/cgroup1/stats" + cg2 "github.com/containerd/cgroups/v3/cgroup2/stats" + "github.com/containerd/containerd/api/services/tasks/v1" + ctrdutil "github.com/containerd/containerd/v2/internal/cri/util" + "github.com/containerd/containerd/v2/pkg/protobuf" + + "github.com/containerd/containerd/api/types" + "github.com/containerd/containerd/v2/internal/cri/store/stats" + "github.com/containerd/log" +) + +// MetricMonitor is used to monitor stat. +type metricMonitor struct { + c *criService + collectionPeriod time.Duration +} + +// newMetricMonitor creates a new monitor which will collect stats for containers and sandboxes +// every 10 seconds and calculate the NanoCores Usage +func newMetricMonitor(c *criService) *metricMonitor { + return &metricMonitor{ + c: c, + collectionPeriod: time.Duration(c.config.StatsMetricsPeriod) * time.Second, + } +} + +func (m *metricMonitor) Start() { + go func() { + log.L.Info("Starting stat sync") + for { + err := m.collect() + if err != nil { + log.L.Warnf("Failed to collect stat: %v", err) + } + time.Sleep(m.collectionPeriod) + } + }() +} + +func (m *metricMonitor) collect() error { + // split building the request into two parts + // get all the containers and sandboxes + req, sandboxes, containers := m.buildMetricsRequest() + if len(sandboxes) == 0 && len(containers) == 0 { + log.L.Debugf("No containers or sandboxes to collect stats for") + return nil + } + + resp, err := m.c.client.TaskService().Metrics(ctrdutil.WithNamespace(context.Background()), req) + if err != nil { + return fmt.Errorf("failed to get stat: %v", err) + } + for _, stat := range resp.Metrics { + err := m.processMetric(stat, sandboxes) + if err != nil { + log.L.Debugf("Failed to process metric: %v", err) + continue + } + } + + return nil +} + +func (m *metricMonitor) processMetric(stat *types.Metric, sandboxes map[string]struct{}) error { + if stat == nil { + return fmt.Errorf("empty metric") + } + + oldStats, saveStatsFunc, err := m.getExistingStats(stat, sandboxes) + if err != nil { + return fmt.Errorf("failed to find container or sandbox: %v", err) + } + + data, err := m.extractStats(sandboxes, stat) + if err != nil { + return fmt.Errorf("failed to extract stats: %v", err) + } + + var currentCPU uint64 + var currentTimestamp = protobuf.FromTimestamp(stat.Timestamp) + switch m := data.(type) { + case *cg1.Metrics: + if m.CPU == nil || m.CPU.Usage == nil { + return fmt.Errorf("missing CPU data") + } + currentCPU = m.CPU.Usage.Total + case *cg2.Metrics: + if m.CPU == nil { + return fmt.Errorf("missing CPU data") + } + // convert to nano seconds + currentCPU = m.CPU.UsageUsec * 1000 + case *wstats.Statistics: + wstats := data.(*wstats.Statistics).GetWindows() + if _, ok := sandboxes[stat.ID]; ok && wstats == nil { + // hostprocess containers sandboxes don't have stats + currentTimestamp = protobuf.FromTimestamp(stat.Timestamp) + currentCPU = 0 + } else if wstats != nil && wstats.Processor != nil { + // this is slightly more accurate as it came from the windows shim + currentTimestamp = protobuf.FromTimestamp(wstats.Timestamp) + currentCPU = wstats.Processor.TotalRuntimeNS + } else { + return fmt.Errorf("invalid Windows data") + } + default: + return fmt.Errorf("unknown metric type") + } + + nanoCores := calculateNanoCores(currentCPU, oldStats, currentTimestamp) + newStats := &stats.ContainerStats{ + UsageCoreNanoSeconds: currentCPU, + UsageNanoCores: nanoCores, + Timestamp: currentTimestamp, + } + err = saveStatsFunc(stat.ID, newStats) + if err != nil { + return fmt.Errorf("failed to save container stats: %v", err) + } + + return nil +} + +func (m *metricMonitor) buildMetricsRequest() (*tasks.MetricsRequest, map[string]struct{}, map[string]struct{}) { + req := &tasks.MetricsRequest{} + sandboxes := map[string]struct{}{} + containers := map[string]struct{}{} + for _, sandbox := range m.c.sandboxStore.List() { + sandboxes[sandbox.ID] = struct{}{} + req.Filters = append(req.Filters, "id=="+sandbox.ID) + } + + for _, cntr := range m.c.containerStore.List() { + containers[cntr.ID] = struct{}{} + req.Filters = append(req.Filters, "id=="+cntr.ID) + } + return req, sandboxes, containers +} + +type saveContainerStats func(id string, containerStats *stats.ContainerStats) error + +func (m *metricMonitor) getExistingStats(stat *types.Metric, sandboxes map[string]struct{}) (*stats.ContainerStats, saveContainerStats, error) { + // it was seen as a sandbox + if _, ok := sandboxes[stat.ID]; ok { + sandbox, err := m.c.sandboxStore.Get(stat.ID) + if err == nil { + saveStatsFunc := m.c.sandboxStore.UpdateContainerStats + return sandbox.Stats, saveStatsFunc, err + } + } + + cntr, err := m.c.containerStore.Get(stat.ID) + if err == nil { + saveStatsFunc := m.c.containerStore.UpdateContainerStats + return cntr.Stats, saveStatsFunc, err + } + + // means we can't find it in either store + return nil, nil, fmt.Errorf("container not found: %s", stat.ID) +} + +func calculateNanoCores(usageCoreNanoSeconds uint64, oldStats *stats.ContainerStats, newtimestamp time.Time) uint64 { + if oldStats == nil { + return 0 + } + + nanoSeconds := newtimestamp.UnixNano() - oldStats.Timestamp.UnixNano() + + // zero or negative interval + if nanoSeconds <= 0 { + return 0 + } + + return uint64(float64(usageCoreNanoSeconds-oldStats.UsageCoreNanoSeconds) / + float64(nanoSeconds) * float64(time.Second/time.Nanosecond)) +} diff --git a/internal/cri/server/stats_linux.go b/internal/cri/server/stats_linux.go new file mode 100644 index 000000000000..30dedfda72a4 --- /dev/null +++ b/internal/cri/server/stats_linux.go @@ -0,0 +1,64 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package server + +import ( + "errors" + "fmt" + + cg1 "github.com/containerd/cgroups/v3/cgroup1/stats" + cg2 "github.com/containerd/cgroups/v3/cgroup2/stats" + "github.com/containerd/typeurl/v2" + + "github.com/containerd/containerd/api/types" +) + +func (m *metricMonitor) extractStats(sandboxes map[string]struct{}, stat *types.Metric) (any, error) { + // in the case of Linux we override the result from the stat call with a call to cgroup directly + // This might not be needed once internal/cri/server/podsandbox/sandbox_stats.go is implemented + if _, ok := sandboxes[stat.ID]; ok { + sandbox, err := m.c.sandboxStore.Get(stat.ID) + if err != nil { + return nil, err + } + sbmetric, err := metricsForSandbox(sandbox) + if err != nil { + return nil, err + } + return sbmetric, nil + } + + return convertMetric(stat) +} + +func convertMetric(stats *types.Metric) (any, error) { + var data interface{} + switch { + case typeurl.Is(stats.Data, (*cg1.Metrics)(nil)): + data = &cg1.Metrics{} + case typeurl.Is(stats.Data, (*cg2.Metrics)(nil)): + data = &cg2.Metrics{} + default: + return nil, errors.New("cannot convert metric data to cgroups.Metrics") + } + + if err := typeurl.UnmarshalTo(stats.Data, data); err != nil { + return nil, fmt.Errorf("failed to extract container stat: %w", err) + } + + return data, nil +} diff --git a/internal/cri/server/stats_linux_test.go b/internal/cri/server/stats_linux_test.go new file mode 100644 index 000000000000..50cacdd59096 --- /dev/null +++ b/internal/cri/server/stats_linux_test.go @@ -0,0 +1,128 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package server + +import ( + "testing" + + "github.com/containerd/containerd/api/types" + + cg1 "github.com/containerd/cgroups/v3/cgroup1/stats" + cg2 "github.com/containerd/cgroups/v3/cgroup2/stats" +) + +func TestProcessMetricLinux(t *testing.T) { + testCases := []struct { + desc string + stat *types.Metric + sandboxes map[string]struct{} + containers map[string]struct{} + expectErr bool + expectedUsageCoreNanoSeconds uint64 + }{ + { + desc: "cg1.Metrics with valid metric for container", + stat: &types.Metric{ + ID: "container1", + Data: toProto(&cg1.Metrics{ + CPU: &cg1.CPUStat{ + Usage: &cg1.CPUUsage{ + Total: 1000, + }, + }, + })}, + sandboxes: map[string]struct{}{"sandbox1": {}}, + containers: map[string]struct{}{"container1": {}}, + expectErr: false, + expectedUsageCoreNanoSeconds: 1000, + }, + { + desc: "cg1.Metrics with invalid metric should fail", + stat: &types.Metric{ + ID: "container1", + Data: toProto(&cg1.Metrics{ + CPU: &cg1.CPUStat{ + Usage: nil, + }, + })}, + sandboxes: map[string]struct{}{"sandbox1": {}}, + containers: map[string]struct{}{"container1": {}}, + expectErr: true, + }, + { + desc: "cg2.Metrics with valid metric for container", + stat: &types.Metric{ + ID: "container1", + Data: toProto(&cg2.Metrics{ + CPU: &cg2.CPUStat{ + UsageUsec: 1000, + }, + })}, + sandboxes: map[string]struct{}{"sandbox1": {}}, + containers: map[string]struct{}{"container1": {}}, + expectErr: false, + expectedUsageCoreNanoSeconds: 1000000, + }, + { + desc: "cg2.Metrics with invalid metric should fail", + stat: &types.Metric{ + ID: "container1", + Data: toProto(&cg2.Metrics{ + CPU: nil, + })}, + sandboxes: map[string]struct{}{"sandbox1": {}}, + containers: map[string]struct{}{"container1": {}}, + expectErr: true, + }, + { + desc: "can't find container in store", + stat: &types.Metric{ + ID: "container1", + Data: toProto(&cg1.Metrics{ + CPU: &cg1.CPUStat{ + Usage: &cg1.CPUUsage{ + Total: 1000, + }, + }, + })}, + sandboxes: map[string]struct{}{"sandbox1": {}}, + containers: map[string]struct{}{}, + expectErr: true, + }, + { + desc: "can't find container in store or sandbox", + stat: &types.Metric{ + ID: "sandbox2", + Data: toProto(&cg1.Metrics{ + CPU: &cg1.CPUStat{ + Usage: &cg1.CPUUsage{ + Total: 1000, + }, + }, + })}, + sandboxes: map[string]struct{}{"sandbox1": {}}, + containers: map[string]struct{}{"container1": {}}, + expectErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + runProcessMetricTest(tc, t) + }) + } +} diff --git a/internal/cri/server/stats_other.go b/internal/cri/server/stats_other.go new file mode 100644 index 000000000000..e7b7669f5253 --- /dev/null +++ b/internal/cri/server/stats_other.go @@ -0,0 +1,33 @@ +//go:build !windows && !linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package server + +import ( + "fmt" + + "github.com/containerd/containerd/api/types" +) + +func (m *metricMonitor) extractStats(sandboxes map[string]struct{}, stat *types.Metric) (any, error) { + return nil, fmt.Errorf("stat is not support on this platform") +} + +func convertMetric(stats *types.Metric) (any, error) { + return nil, fmt.Errorf("stat is not support on this platform") +} diff --git a/internal/cri/server/stats_test.go b/internal/cri/server/stats_test.go new file mode 100644 index 000000000000..5f89eceb8ba5 --- /dev/null +++ b/internal/cri/server/stats_test.go @@ -0,0 +1,259 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package server + +import ( + "strings" + "testing" + "time" + + "google.golang.org/protobuf/types/known/anypb" + + "github.com/containerd/containerd/api/types" + containerstore "github.com/containerd/containerd/v2/internal/cri/store/container" + sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox" + "github.com/containerd/containerd/v2/internal/cri/store/stats" + "github.com/containerd/containerd/v2/pkg/protobuf" + "github.com/stretchr/testify/assert" +) + +func TestGetUsageNanoCores(t *testing.T) { + timestamp := time.Now() + secondAfterTimeStamp := timestamp.Add(time.Second) + ID := "ID" + + for _, test := range []struct { + desc string + firstCPUValue uint64 + secondCPUValue uint64 + expectedNanoCoreUsageFirst uint64 + expectedNanoCoreUsageSecond uint64 + }{ + { + desc: "stat", + firstCPUValue: 50, + secondCPUValue: 500, + expectedNanoCoreUsageFirst: 0, + expectedNanoCoreUsageSecond: 450, + }, + } { + test := test + t.Run(test.desc, func(t *testing.T) { + container, err := containerstore.NewContainer( + containerstore.Metadata{ID: ID}, + ) + assert.NoError(t, err) + + // calculate for first iteration + // first run so container stats will be nil + assert.Nil(t, container.Stats) + nanoCoreUsage := calculateNanoCores(test.firstCPUValue, container.Stats, timestamp) + assert.NoError(t, err) + assert.Equal(t, test.expectedNanoCoreUsageFirst, nanoCoreUsage) + + // fill in the stats as if they now exist + container.Stats = &stats.ContainerStats{} + container.Stats.UsageCoreNanoSeconds = test.firstCPUValue + container.Stats.Timestamp = timestamp + assert.NotNil(t, container.Stats) + + // calculate for second iteration + nanoCoreUsage = calculateNanoCores(test.secondCPUValue, container.Stats, secondAfterTimeStamp) + assert.NoError(t, err) + assert.Equal(t, test.expectedNanoCoreUsageSecond, nanoCoreUsage) + }) + } + +} + +func TestBuildMetricsRequest(t *testing.T) { + testCases := []struct { + desc string + sandboxIDs []string + containerIDs []string + expectedFilters []string + expectedSandboxes map[string]struct{} + expectedContainers map[string]struct{} + }{ + { + desc: "single sandbox and container", + sandboxIDs: []string{"sandbox1"}, + containerIDs: []string{"container1"}, + expectedFilters: []string{"id==sandbox1", "id==container1"}, + expectedSandboxes: map[string]struct{}{"sandbox1": {}}, + expectedContainers: map[string]struct{}{"container1": {}}, + }, + { + desc: "multiple sandboxes and containers", + sandboxIDs: []string{"sandbox1", "sandbox2"}, + containerIDs: []string{"container1", "container2"}, + expectedFilters: []string{"id==sandbox1", "id==sandbox2", "id==container1", "id==container2"}, + expectedSandboxes: map[string]struct{}{"sandbox1": {}, "sandbox2": {}}, + expectedContainers: map[string]struct{}{"container1": {}, "container2": {}}, + }, + { + desc: "no sandboxes or containers", + sandboxIDs: []string{}, + containerIDs: []string{}, + expectedFilters: []string{}, + expectedSandboxes: map[string]struct{}{}, + expectedContainers: map[string]struct{}{}, + }, + { + desc: "single sandbox, no containers", + sandboxIDs: []string{"sandbox1"}, + containerIDs: []string{}, + expectedFilters: []string{"id==sandbox1"}, + expectedSandboxes: map[string]struct{}{"sandbox1": {}}, + expectedContainers: map[string]struct{}{}, + }, + { + desc: "no sandboxes, single container", + sandboxIDs: []string{}, + containerIDs: []string{"container1"}, + expectedFilters: []string{"id==container1"}, + expectedSandboxes: map[string]struct{}{}, + expectedContainers: map[string]struct{}{"container1": {}}, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + c := newTestCRIService() + mm := newMetricMonitor(c) + + // Add sandboxes and containers to the stores + for _, id := range tc.sandboxIDs { + c.sandboxStore.Add(sandboxstore.Sandbox{ + Metadata: sandboxstore.Metadata{ID: id}, + }) + } + for _, id := range tc.containerIDs { + c.containerStore.Add(containerstore.Container{ + Metadata: containerstore.Metadata{ID: id}, + }) + } + + req, sandboxes, containers := mm.buildMetricsRequest() + + assert.NotNil(t, req) + assert.NotNil(t, sandboxes) + assert.NotNil(t, containers) + for _, filter := range tc.expectedFilters { + assert.Contains(t, req.Filters, filter) + } + for sandboxID := range tc.expectedSandboxes { + assert.Contains(t, sandboxes, sandboxID) + } + for containerID := range tc.expectedContainers { + assert.Contains(t, containers, containerID) + } + }) + } +} + +func TestCollectNoSandboxesOrContainers(t *testing.T) { + c := newTestCRIService() + mm := newMetricMonitor(c) + + // No sandboxes or containers are added to the stores + err := mm.collect() + assert.Nil(t, err) +} + +func TestProcessMetric(t *testing.T) { + testCases := []struct { + desc string + stat *types.Metric + sandboxes map[string]struct{} + containers map[string]struct{} + expectErr bool + expectedUsageCoreNanoSeconds uint64 + }{ + { + desc: "nil stat", + stat: nil, + sandboxes: map[string]struct{}{}, + containers: map[string]struct{}{}, + expectErr: true, + }, + { + desc: "Unknown metric type", + stat: &types.Metric{ + Data: toProto(&types.Sandbox{})}, + sandboxes: map[string]struct{}{"sandbox1": {}}, + expectErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + runProcessMetricTest(tc, t) + }) + } +} + +func runProcessMetricTest(tc struct { + desc string + stat *types.Metric + sandboxes map[string]struct{} + containers map[string]struct{} + expectErr bool + expectedUsageCoreNanoSeconds uint64 +}, t *testing.T) { + c := newTestCRIService() + mm := newMetricMonitor(c) + + for sandboxID := range tc.sandboxes { + c.sandboxStore.Add(sandboxstore.Sandbox{ + Metadata: sandboxstore.Metadata{ID: sandboxID}, + }) + } + for containerID := range tc.containers { + c.containerStore.Add(containerstore.Container{ + Metadata: containerstore.Metadata{ID: containerID}, + }) + } + + err := mm.processMetric(tc.stat, tc.sandboxes) + if tc.expectErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + + var stat *stats.ContainerStats + if strings.HasPrefix(tc.stat.ID, "sandbox") { + cntr, err := c.sandboxStore.Get(tc.stat.ID) + assert.NoError(t, err) + stat = cntr.Stats + } else { + cntr, err := c.containerStore.Get(tc.stat.ID) + assert.NoError(t, err) + stat = cntr.Stats + } + assert.NotNil(t, stat) + assert.Equal(t, tc.expectedUsageCoreNanoSeconds, stat.UsageCoreNanoSeconds) +} + +func toProto(metric any) *anypb.Any { + data, err := protobuf.MarshalAnyToProto(metric) + if err != nil { + panic("failed to marshal proto: " + err.Error()) + } + return data +} diff --git a/internal/cri/server/stats_windows.go b/internal/cri/server/stats_windows.go new file mode 100644 index 000000000000..f1df8830bcda --- /dev/null +++ b/internal/cri/server/stats_windows.go @@ -0,0 +1,38 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package server + +import ( + "fmt" + + "github.com/containerd/typeurl/v2" + + "github.com/containerd/containerd/api/types" +) + +func (m *metricMonitor) extractStats(sandboxes map[string]struct{}, stat *types.Metric) (any, error) { + return convertMetric(stat) +} + +func convertMetric(stats *types.Metric) (any, error) { + containerStatsData, err := typeurl.UnmarshalAny(stats.Data) + if err != nil { + return nil, fmt.Errorf("failed to extract stat for container with id %s: %w", stats.ID, err) + } + + return containerStatsData, nil +} diff --git a/internal/cri/server/stats_windows_test.go b/internal/cri/server/stats_windows_test.go new file mode 100644 index 000000000000..bc2625058476 --- /dev/null +++ b/internal/cri/server/stats_windows_test.go @@ -0,0 +1,144 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package server + +import ( + "testing" + "time" + + wstats "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/stats" + "github.com/containerd/containerd/api/types" + "github.com/containerd/containerd/v2/pkg/protobuf" +) + +func TestProcessMetricWindows(t *testing.T) { + testCases := []struct { + desc string + stat *types.Metric + sandboxes map[string]struct{} + containers map[string]struct{} + expectErr bool + expectedUsageCoreNanoSeconds uint64 + }{ + { + desc: "wstats.Statistics with valid metric for container", + stat: &types.Metric{ + ID: "container1", + Data: toProto(&wstats.Statistics{ + Container: &wstats.Statistics_Windows{ + Windows: &wstats.WindowsContainerStatistics{ + Timestamp: protobuf.ToTimestamp(time.Now()), + Processor: &wstats.WindowsContainerProcessorStatistics{ + TotalRuntimeNS: 100, + }, + }, + }, + })}, + sandboxes: map[string]struct{}{"sandbox1": {}}, + containers: map[string]struct{}{"container1": {}}, + expectErr: false, + expectedUsageCoreNanoSeconds: 100, + }, + { + desc: "wstats.Statistics with valid metric for sandbox", + stat: &types.Metric{ + ID: "sandbox1", + Data: toProto(&wstats.Statistics{ + Container: &wstats.Statistics_Windows{ + Windows: &wstats.WindowsContainerStatistics{ + Timestamp: protobuf.ToTimestamp(time.Now()), + Processor: &wstats.WindowsContainerProcessorStatistics{ + TotalRuntimeNS: 100, + }, + }, + }, + })}, + sandboxes: map[string]struct{}{"sandbox1": {}}, + containers: map[string]struct{}{"container1": {}}, + expectErr: false, + expectedUsageCoreNanoSeconds: 100, + }, + { + desc: "wstats.Statistics with nil should save as zero (hostprocess container scenario)", + stat: &types.Metric{ + ID: "sandbox1", + Data: toProto(&wstats.Statistics{ + Container: nil, + }), + }, + sandboxes: map[string]struct{}{"sandbox1": {}}, + containers: map[string]struct{}{"container1": {}}, + expectErr: false, + expectedUsageCoreNanoSeconds: 0, + }, + { + desc: "wstats.Statistics with invalid metric should fail", + stat: &types.Metric{ + ID: "sandbox1", + Data: toProto(&wstats.Statistics{ + Container: &wstats.Statistics_Windows{ + Windows: nil, + }, + })}, + sandboxes: map[string]struct{}{"sandbox1": {}}, + containers: map[string]struct{}{"container1": {}}, + expectErr: true, + }, + { + desc: "can't find container in store", + stat: &types.Metric{ + ID: "container1", + Data: toProto(&wstats.Statistics{ + Container: &wstats.Statistics_Windows{ + Windows: &wstats.WindowsContainerStatistics{ + Timestamp: protobuf.ToTimestamp(time.Now()), + Processor: &wstats.WindowsContainerProcessorStatistics{ + TotalRuntimeNS: 100, + }, + }, + }, + })}, + sandboxes: map[string]struct{}{"sandbox1": {}}, + containers: map[string]struct{}{}, + expectErr: true, + }, + { + desc: "can't find container in store or sandbox", + stat: &types.Metric{ + ID: "sandbox2", + Data: toProto(&wstats.Statistics{ + Container: &wstats.Statistics_Windows{ + Windows: &wstats.WindowsContainerStatistics{ + Timestamp: protobuf.ToTimestamp(time.Now()), + Processor: &wstats.WindowsContainerProcessorStatistics{ + TotalRuntimeNS: 100, + }, + }, + }, + })}, + sandboxes: map[string]struct{}{"sandbox1": {}}, + containers: map[string]struct{}{"container1": {}}, + expectErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + runProcessMetricTest(tc, t) + }) + } +} diff --git a/internal/cri/store/stats/stats.go b/internal/cri/store/stats/stats.go index b2dd581ab51e..9b75262823da 100644 --- a/internal/cri/store/stats/stats.go +++ b/internal/cri/store/stats/stats.go @@ -24,4 +24,6 @@ type ContainerStats struct { Timestamp time.Time // Cumulative CPU usage (sum across all cores) since object creation. UsageCoreNanoSeconds uint64 + // Usage NanoCores + UsageNanoCores uint64 }