diff --git a/Dockerfile b/Dockerfile index 0edfced5c..46a1ce05c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,6 +12,7 @@ COPY go.mod go.mod COPY go.sum go.sum # cache deps before building and copying source so that we don't need to re-download as much # and so that source changes don't invalidate our downloaded layer +RUN go env -w GOPROXY=https://goproxy.io,direct RUN go mod download # Copy the go source diff --git a/docs/images/advanced_cpuset_manager.png b/docs/images/advanced_cpuset_manager.png new file mode 100644 index 000000000..60be3b964 Binary files /dev/null and b/docs/images/advanced_cpuset_manager.png differ diff --git a/docs/proposals/20220228-advanced-cpuset-manger.md b/docs/proposals/20220228-advanced-cpuset-manger.md new file mode 100644 index 000000000..a0578fb33 --- /dev/null +++ b/docs/proposals/20220228-advanced-cpuset-manger.md @@ -0,0 +1,97 @@ +--- +title: Advanced CPUSet Manager +authors: + - "@szy441687879" +reviewers: + - "@yan234280533" + - "@mfanjie" +creation-date: 2022-02-28 +last-updated: 2022-03-16 +status: provisional +--- + +# Advanced CPUSet Manager +- Static CPU manager is supported by kubelet, when a guaranteed Pod is running on a node, kubelet allocate specific cpu cores to the processes exclusively, which generally keeps the cpu utilization of the node low. +This proposal provides a new mechanism to manage cpusets, which allows sharing cpu cores with other processes while binds cpuset.It also allows to revise cpuset when pod is running and relaxes restrictions of binding cpus in kubelet. + +## Table of Contents + + + +- [Advanced CPUSet Manager](#advanced-cpuset-manager) + - [Table of Contents](#table-of-contents) + - [Motivation](#motivation) + - [Goals](#goals) + - [Non-Goals/Future Work](#non-goalsfuture-work) + - [Proposal](#proposal) + - [Relax restrictions of cpuset allocation](#relax-restrictions-of-cpuset-allocation) + - [Add new annotation to describe the requirement of cpuset contorl manger](#add-new-annotation-to-describe-the--requirement-of-cpuset-contorl-manger) + - [Advanced CPU Manager component](#advanced-cpu-manager-component) + - [User Stories](#user-stories) + - [Story 1](#story-1) + - [Story 2](#story-2) + - [Risks and Mitigations](#risks-and-mitigations) + + +## Motivation +Some latency-sensitive applications have lower lantency and cpu usage when running with specific cores, which results in fewer context switchs and higer cache affinity. +But kubelet will always exclude assigned cores in shared cores, which may waste resources.Offline and other online pods can running on the cores actually. In our experiment, for the most part, it is barely noticeable for performance of service. + +### Goals + +- Provide a new mechanism to manage cpuset bypass +- Provide a new cpuset manager method "shared" +- Allow revise cpuset when pod running +- Relax restrictions of binding cpus + + +### Non-Goals/Future Work + +- Solve the conflicts with kubelet static cpuset manager, you need to set kubelet cpuset manager to "none" +- Numa manager will support in future, CCX/CCD manager also be considered + +## Proposal +### Relax restrictions of cpuset allocation +Kubelet allocate cpus for containers should meet the conditions: + +1. requests and limits are specified for all the containers and they are equal + +2. the container's resource limit for the limit of CPU is an integer greater than or equal to one and equal to request request of CPU. + +In Crane, only need to meet condition No.2 +### Add new annotation to describe the requirement of cpuset contorl manger +```yaml +apiVersion: v1 +kind: Pod +metadata: + annotations: + qos.gocrane.io/cpu-manager: none/exclusive/share +``` +Provide three polices for cpuset manager: +- none: containers of this pod shares a set of cpus which not allocated to exclusive containers +- exclusive: containers of this pod monopolize the allocated CPUs , other containers not allowed to use. +- share: containers of this pod runs in theallocated CPUs , but other containers can also use. + +### Advanced CPU Manager component +
+ +- Crane-agent use podLister informs to sense the creation of pod. +- Crane-agent allocate cpus when pod is binded, and loop in cycle to addContainer(change cpuset) until the containers are created +- Update/Delete pod will handle in reconcile state. +- state.State referenced from kubelet and topology_cpu_assignment copied from kubelet + + +### User Stories + +- Users can update pod annotaion to control cpuset policy flexibly + +#### Story 1 + make pod from none to share without recreating pod +#### Story 2 + make pod from exclusive to share, so offline process can use these CPUs + +### Risks and Mitigations + +- kubelet cpu manger policy need to be set to none, otherwise will be conflicted with crane-agent +- if crane-agent can not allocate CPUs for pods, it will not refuse to start pod as kubelet + diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 9dd56fed3..8c7f0c1a3 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -21,6 +21,7 @@ import ( "github.com/gocrane/api/pkg/generated/informers/externalversions/ensurance/v1alpha1" predictionv1 "github.com/gocrane/api/pkg/generated/informers/externalversions/prediction/v1alpha1" "github.com/gocrane/crane/pkg/ensurance/analyzer" + "github.com/gocrane/crane/pkg/ensurance/cm" "github.com/gocrane/crane/pkg/ensurance/collector" "github.com/gocrane/crane/pkg/ensurance/executor" "github.com/gocrane/crane/pkg/ensurance/manager" @@ -56,20 +57,24 @@ func NewAgent(ctx context.Context, utilruntime.Must(ensuranceapi.AddToScheme(scheme.Scheme)) stateCollector := collector.NewStateCollector(nodeName, nepInformer.Lister(), podInformer.Lister(), nodeInformer.Lister(), ifaces, healthCheck, CollectInterval) - managers = append(managers, stateCollector) + managers = appendManagerIfNotNil(managers, stateCollector) analyzerManager := analyzer.NewAnormalyAnalyzer(kubeClient, nodeName, podInformer, nodeInformer, nepInformer, actionInformer, stateCollector.AnalyzerChann, noticeCh) - managers = append(managers, analyzerManager) + managers = appendManagerIfNotNil(managers, analyzerManager) avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint) - managers = append(managers, avoidanceManager) + managers = appendManagerIfNotNil(managers, avoidanceManager) + if craneCpuSetManager := utilfeature.DefaultFeatureGate.Enabled(features.CraneCpuSetManager); craneCpuSetManager { + cpuManager := cm.NewAdvancedCpuManager(podInformer, runtimeEndpoint, stateCollector.GetCollectors()) + managers = appendManagerIfNotNil(managers, cpuManager) + } if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource { nodeResourceManager := resource.NewNodeResourceManager(kubeClient, nodeName, podInformer, nodeInformer, tspInformer, runtimeEndpoint, stateCollector.NodeResourceChann) - managers = append(managers, nodeResourceManager) + managers = appendManagerIfNotNil(managers, nodeResourceManager) } if podResource := utilfeature.DefaultFeatureGate.Enabled(features.CranePodResource); podResource { - podResourceManager := resource.NewPodResourceManager(kubeClient, nodeName, podInformer, runtimeEndpoint, stateCollector.PodResourceChann, stateCollector.GetCollectors()) - managers = append(managers, podResourceManager) + podResourceManager := resource.NewPodResourceManager(kubeClient, nodeName, podInformer, runtimeEndpoint, stateCollector.PodResourceChann, stateCollector.GetCollectors()) + managers = appendManagerIfNotNil(managers, podResourceManager) } return &Agent{ @@ -111,3 +116,10 @@ func (a *Agent) Run(healthCheck *metrics.HealthCheck, enableProfiling bool, bind func getAgentName(nodeName string) string { return nodeName + "_" + string(uuid.NewUUID()) } + +func appendManagerIfNotNil(managers []manager.Manager, m manager.Manager) []manager.Manager { + if m != nil { + return append(managers, m) + } + return managers +} diff --git a/pkg/ensurance/cm/advanced_cpu_manager.go b/pkg/ensurance/cm/advanced_cpu_manager.go new file mode 100644 index 000000000..18cada4bd --- /dev/null +++ b/pkg/ensurance/cm/advanced_cpu_manager.go @@ -0,0 +1,354 @@ +package cm + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "strings" + "sync" + "time" + + "google.golang.org/grpc" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + pb "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/cm/containermap" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + + "github.com/gocrane/crane/pkg/ensurance/collector/cadvisor" + "github.com/gocrane/crane/pkg/ensurance/collector/types" + cruntime "github.com/gocrane/crane/pkg/ensurance/runtime" +) + +const ( + // cpuManagerStateFileName is the file name where cpu manager stores its state + cpuManagerStateFileName = "cpu_manager_state" + craneCpusetPolicyName = "crane" + // stateFilePath holds the directory where the state file for checkpoints is held. + stateFilePath = "/rootvar/run/crane" + kubeletStateFilePath = "/kubelet/" + // cpusetReconcilePeriod is the duration between calls to reconcileState. + cpusetReconcilePeriod = 5 * time.Second + // intervalRetryAddContainer is the interval between try add container + intervalRetryAddContainer = 200 * time.Millisecond + // timeoutRetryAddContainer is the timeout for adding container + timeoutRetryAddContainer = 2 * time.Second +) + +type AdvancedCpuManager struct { + isStarted bool + + sync.RWMutex + policy Policy + + podLister corelisters.PodLister + podSynced cache.InformerSynced + + runtimeClient pb.RuntimeServiceClient + runtimeConn *grpc.ClientConn + + // reconcilePeriod is the duration between calls to reconcileState. + reconcilePeriod time.Duration + + // state allows pluggable CPU assignment policies while sharing a common + // representation of state for the system to inspect and reconcile. + state state.State + + // stateFileDirectory holds the directory where the state file for checkpoints is held. + stateFileDirectory string + + collectors *sync.Map +} + +func NewAdvancedCpuManager(podInformer coreinformers.PodInformer, runtimeEndpoint string, collectors *sync.Map) *AdvancedCpuManager { + runtimeClient, runtimeConn, err := cruntime.GetRuntimeClient(runtimeEndpoint, true) + if err != nil { + klog.Errorf("GetRuntimeClient failed %s", err.Error()) + return nil + } + + m := &AdvancedCpuManager{ + podLister: podInformer.Lister(), + podSynced: podInformer.Informer().HasSynced, + runtimeClient: runtimeClient, + runtimeConn: runtimeConn, + reconcilePeriod: cpusetReconcilePeriod, + stateFileDirectory: stateFilePath, + collectors: collectors, + } + //pod add actions need to handle quickly, delete/update can handle in loop laterly + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(new interface{}) { + pod := new.(*v1.Pod) + for _, container := range pod.Spec.Containers { + err := m.Allocate(pod, &container) + if err == nil { + _ = wait.PollImmediate(intervalRetryAddContainer, timeoutRetryAddContainer, + func() (bool, error) { + return m.AddContainer(pod, &container) == nil, nil + }) + } + } + }, + }) + return m +} + +func (m *AdvancedCpuManager) Name() string { + return "AdvancedCpuManager" +} + +func (m *AdvancedCpuManager) Run(stop <-chan struct{}) { + klog.Infof("Starting advanced cpu manager") + value, exists := m.collectors.Load(types.CadvisorCollectorType) + if !exists { + klog.Errorf("GetCadvisorCollector failed, %+v", m.collectors) + return + } + c := value.(*cadvisor.CadvisorCollector) + machineInfo, err := c.Manager.GetMachineInfo() + if err != nil { + klog.Errorf("GetMachineInfo failed %s", err.Error()) + return + } + topo, err := topology.Discover(machineInfo) + if err != nil { + klog.Errorf("Topology Discover failed %s", err.Error()) + return + } + klog.Infof("Node topology: %+v", topo) + m.policy, err = NewAdvancedStaticPolicy(topo) + if err != nil { + klog.Errorf("New static policy error: %v", err) + return + } + // Wait for the caches to be synced before starting workers + if !cache.WaitForNamedCacheSync("advanced-cpuset-manager", + stop, + m.podSynced, + ) { + return + } + + containerMap, err := buildContainerMapFromRuntime(m.runtimeClient) + if err != nil { + klog.Errorf("Failed to build ContainerMapFromRuntime: %v", err) + return + } + if p := m.loadKubeletPolicy(kubeletStateFilePath + cpuManagerStateFileName); p != "none" { + klog.Errorf("Can not read kubelet policy or is not none, %s", p) + return + } + stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, craneCpusetPolicyName, containerMap) + if err != nil { + klog.Errorf("Could not initialize checkpoint manager: %v, please remove policy state file", err) + return + } + m.state = stateImpl + + err = m.policy.Start(m.state) + if err != nil { + klog.Errorf("[Advancedcpumanager] policy start error: %v", err) + return + } + + go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, stop) + m.isStarted = true +} + +func (m *AdvancedCpuManager) Allocate(p *v1.Pod, c *v1.Container) error { + // wait cpu manger start + _ = wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) { return m.isStarted, nil }, wait.NeverStop) + // Garbage collect any stranded resources before allocating CPUs, do not need to allocate + m.syncState(false) + + m.Lock() + defer m.Unlock() + + // Call down into the policy to assign this container CPUs if required. + err := m.policy.Allocate(m.state, p, c) + if err != nil { + klog.Errorf("[Advancedcpumanager] Allocate error: %v", err) + return err + } + return nil +} + +func (m *AdvancedCpuManager) AddContainer(p *v1.Pod, c *v1.Container) error { + containerID := GetContainerIdFromPod(p, c.Name) + cset, ok := m.state.GetCPUSet(string(p.UID), c.Name) + if !ok { + cset = m.getSharedCpu().Union(m.state.GetDefaultCPUSet()) + } + err := m.updateContainerCPUSet(containerID, cset) + if err != nil { + klog.Errorf("[Advancedcpumanager] AddContainer error: error updating CPUSet for container (pod: %s, container: %s, container id: %s, err: %v)", p.Name, c.Name, containerID, err) + return err + } + klog.V(5).Infof("[Advancedcpumanager] update container resources is skipped due to cpu set is empty") + return nil +} + +func (m *AdvancedCpuManager) reconcileState() { + m.syncState(true) + sharedCPUSet := m.getSharedCpu().Union(m.state.GetDefaultCPUSet()) + for _, pod := range m.activepods() { + for _, container := range pod.Spec.Containers { + containerID := GetContainerIdFromPod(pod, container.Name) + cset, ok := m.state.GetCPUSet(string(pod.UID), container.Name) + if !ok { + cset = sharedCPUSet + } + klog.Infof("[Advancedcpumanager] reconcileState: updating container (pod: %s, container: %s, container id: %s, cpuset: \"%v\")", + pod.Name, container.Name, containerID, cset) + err := m.updateContainerCPUSet(containerID, cset) + if err != nil { + klog.Errorf("[Advancedcpumanager] reconcileState: failed to update container (pod: %s, container: %s, container id: %s, cpuset: \"%v\", error: %v)", + pod.Name, container.Name, containerID, cset, err) + continue + } + } + } +} + +func (m *AdvancedCpuManager) syncState(doAllocate bool) { + // We grab the lock to ensure that no new containers will grab CPUs while + // executing the code below. Without this lock, its possible that we end up + // removing state that is newly added by an asynchronous call to + // AddContainer() during the execution of this code. + m.Lock() + defer m.Unlock() + assignments := m.state.GetCPUAssignments() + + // Build a list of (podUID, containerName) pairs for all need to be assigned containers in all active Pods. + toBeAssignedContainers := make(map[string]map[string]struct{}) + for _, pod := range m.activepods() { + toBeAssignedContainers[string(pod.UID)] = make(map[string]struct{}) + for _, container := range pod.Spec.Containers { + if m.policy.NeedAllocated(pod, &container) { + toBeAssignedContainers[string(pod.UID)][container.Name] = struct{}{} + if _, ok := assignments[string(pod.UID)][container.Name]; !ok && doAllocate { + err := m.policy.Allocate(m.state, pod, &container) + if err != nil { + klog.Errorf("[Advancedcpumanager] Allocate error: %v", err) + continue + } + } + } + } + } + + // Loop through the CPUManager state. Remove any state for containers not + // in the `toBeAssignedContainers` list built above. + for podUID := range assignments { + for containerName := range assignments[podUID] { + if _, ok := toBeAssignedContainers[podUID][containerName]; !ok { + klog.Errorf("[Advancedcpumanager] removeStaleState: removing (pod %s, container: %s)", podUID, containerName) + err := m.policyRemoveContainerByRef(podUID, containerName) + if err != nil { + klog.Errorf("[Advancedcpumanager] removeStaleState: failed to remove (pod %s, container %s), error: %v)", podUID, containerName, err) + } + } + } + } +} + +func (m *AdvancedCpuManager) policyRemoveContainerByRef(podUID string, containerName string) error { + err := m.policy.RemoveContainer(m.state, podUID, containerName) + return err +} + +func (m *AdvancedCpuManager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) error { + return cruntime.UpdateContainerResources( + m.runtimeClient, + containerID, + cruntime.UpdateOptions{CpusetCpus: cpus.String()}, + ) +} + +func (m *AdvancedCpuManager) loadKubeletPolicy(fileName string) string { + data, err := ioutil.ReadFile(fileName) + if err != nil { + klog.Errorf("[Advancedcpumanager] loadKubeletPolicy: %v", err) + return "" + } + var cpumangerCheckpoint state.CPUManagerCheckpoint + err = json.Unmarshal(data, &cpumangerCheckpoint) + if err != nil { + klog.Errorf("[Advancedcpumanager] unmarshal KubeletPolicy: %v", err) + return "" + } + return cpumangerCheckpoint.PolicyName +} + +func (m *AdvancedCpuManager) getSharedCpu() cpuset.CPUSet { + sharedCPUSet := cpuset.NewCPUSet() + for _, pod := range m.activepods() { + for _, container := range pod.Spec.Containers { + if cset, ok := m.state.GetCPUSet(string(pod.UID), container.Name); ok { + if csp := GetPodCPUSetType(pod, &container); csp == CPUSetShare { + sharedCPUSet = sharedCPUSet.Union(cset) + } + } + } + } + return sharedCPUSet +} + +func (m *AdvancedCpuManager) activepods() []*v1.Pod { + allPods, _ := m.podLister.List(labels.Everything()) + activePods := make([]*v1.Pod, 0, len(allPods)) + for _, pod := range allPods { + //todo judge terminating status + if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded || + (pod.DeletionTimestamp != nil && IsPodNotRunning(pod.Status.ContainerStatuses)) { + continue + } + activePods = append(activePods, pod) + } + return activePods +} + +func buildContainerMapFromRuntime(runtimeClient pb.RuntimeServiceClient) (containermap.ContainerMap, error) { + podSandboxMap := make(map[string]string) + podSandboxList, _ := cruntime.ListPodSandboxes(runtimeClient, cruntime.ListOptions{}) + + for _, p := range podSandboxList { + podSandboxMap[p.Id] = p.Metadata.Uid + } + + containerMap := containermap.NewContainerMap() + containerList, _ := cruntime.ListContainers(runtimeClient, cruntime.ListOptions{}) + for _, c := range containerList { + if _, exists := podSandboxMap[c.PodSandboxId]; !exists { + return nil, fmt.Errorf("no PodsandBox found with Id '%s'", c.PodSandboxId) + } + containerMap.Add(podSandboxMap[c.PodSandboxId], c.Metadata.Name, c.Id) + } + + return containerMap, nil +} + +func GetContainerIdFromPod(pod *v1.Pod, name string) string { + if name == "" { + return "" + } + + for _, v := range pod.Status.ContainerStatuses { + if v.Name == name { + strList := strings.Split(v.ContainerID, "//") + if len(strList) > 0 { + return strList[len(strList)-1] + } + } + } + + return "" +} diff --git a/pkg/ensurance/cm/advanced_cpu_manager_test.go b/pkg/ensurance/cm/advanced_cpu_manager_test.go new file mode 100644 index 000000000..37d49ba4f --- /dev/null +++ b/pkg/ensurance/cm/advanced_cpu_manager_test.go @@ -0,0 +1,32 @@ +package cm + +import ( + "testing" +) + +func TestAdvancedCpuManager_loadKubeletPolicy(t *testing.T) { + tests := []struct { + name string + fileName string + want string + }{ + { + name: "file not exist", + fileName: "/not-exist", + want: "", + }, + { + name: "file get none", + fileName: "./test/" + cpuManagerStateFileName, + want: "none", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &AdvancedCpuManager{} + if got := m.loadKubeletPolicy(tt.fileName); got != tt.want { + t.Errorf("AdvancedCpuManager.loadKubeletPolicy() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/ensurance/cm/advanced_policy_static.go b/pkg/ensurance/cm/advanced_policy_static.go new file mode 100644 index 000000000..dfb9b8d4b --- /dev/null +++ b/pkg/ensurance/cm/advanced_policy_static.go @@ -0,0 +1,171 @@ +package cm + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/util/format" +) + +// AdvancedPolicyStatic is the name of the advanced static policy +const AdvancedPolicyStatic policyName = "advanced-static" + +type advancedStaticPolicy struct { + // cpu socket topology + topology *topology.CPUTopology + // set of CPUs that is not available for exclusive assignment + reserved cpuset.CPUSet +} + +func NewAdvancedStaticPolicy(topology *topology.CPUTopology) (Policy, error) { + return &advancedStaticPolicy{ + topology: topology, + reserved: cpuset.MustParse("0"), + }, nil +} + +func (p *advancedStaticPolicy) Name() string { + return string(AdvancedPolicyStatic) +} + +func (p *advancedStaticPolicy) Start(s state.State) error { + if err := p.validateState(s); err != nil { + klog.Errorf("[Advancedcpumanager] advanced static policy invalid state: %v, please drain node and remove policy state file", err) + return err + } + return nil +} + +func (p *advancedStaticPolicy) validateState(s state.State) error { + tmpAssignments := s.GetCPUAssignments() + tmpDefaultCPUset := s.GetDefaultCPUSet() + + // Default cpuset cannot be empty when assignments exist + if tmpDefaultCPUset.IsEmpty() { + if len(tmpAssignments) != 0 { + return fmt.Errorf("default cpuset cannot be empty") + } + // state is empty initialize + allCPUs := p.topology.CPUDetails.CPUs() + s.SetDefaultCPUSet(allCPUs) + return nil + } + + // State has already been initialized from file (is not empty) + // 1. Check if the reserved cpuset is not part of default cpuset because: + // - kube/system reserved have changed (increased) - may lead to some containers not being able to start + // - user tampered with file + if !p.reserved.Intersection(tmpDefaultCPUset).Equals(p.reserved) { + return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"", + p.reserved.String(), tmpDefaultCPUset.String()) + } + + // 2. Check if state for static policy is consistent + for pod := range tmpAssignments { + for container, cset := range tmpAssignments[pod] { + // None of the cpu in DEFAULT cset should be in s.assignments + if !tmpDefaultCPUset.Intersection(cset).IsEmpty() { + return fmt.Errorf("pod: %s, container: %s cpuset: \"%s\" overlaps with default cpuset \"%s\"", + pod, container, cset.String(), tmpDefaultCPUset.String()) + } + } + } + + // 3. It's possible that the set of available CPUs has changed since + // the state was written. This can be due to for example + // offlining a CPU when kubelet is not running. If this happens, + // CPU manager will run into trouble when later it tries to + // assign non-existent CPUs to containers. Validate that the + // topology that was received during CPU manager startup matches with + // the set of CPUs stored in the state. + totalKnownCPUs := tmpDefaultCPUset.Clone() + tmpCPUSets := []cpuset.CPUSet{} + for pod := range tmpAssignments { + for _, cset := range tmpAssignments[pod] { + tmpCPUSets = append(tmpCPUSets, cset) + } + } + totalKnownCPUs = totalKnownCPUs.UnionAll(tmpCPUSets) + if !totalKnownCPUs.Equals(p.topology.CPUDetails.CPUs()) { + return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"", + p.topology.CPUDetails.CPUs().String(), totalKnownCPUs.String()) + } + + return nil +} + +func (p *advancedStaticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error { + if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 { + klog.Infof("[Advancedcpumanager] advanced static policy: Allocate (pod: %s, container: %s)", format.Pod(pod), container.Name) + if _, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { + klog.Infof("[Advancedcpumanager] advanced static policy: container already present in state, skipping (pod: %s, container: %s)", format.Pod(pod), container.Name) + return nil + } + + // Allocate CPUs according to the NUMA affinity contained in the hint. + cpuset, err := p.allocateCPUs(s, numCPUs) + if err != nil { + klog.Errorf("[Advancedcpumanager] unable to allocate %d CPUs (pod: %s, container: %s, error: %v)", numCPUs, format.Pod(pod), container.Name, err) + return err + } + s.SetCPUSet(string(pod.UID), container.Name, cpuset) + } + // container belongs in the shared pool (nothing to do; use default cpuset) + return nil +} + +func (p *advancedStaticPolicy) allocateCPUs(s state.State, numCPUs int) (cpuset.CPUSet, error) { + klog.Infof("[Advancedcpumanager] allocateCpus: (numCPUs: %d)", numCPUs) + + result := cpuset.NewCPUSet() + klog.Infof("[Advancedcpumanager] allocateCpu %+v", p.assignableCPUs(s)) + // Get any remaining CPUs from what's leftover after attempting to grab aligned ones. + remainingCPUs, err := takeByTopology(p.topology, p.assignableCPUs(s), numCPUs) + if err != nil { + return cpuset.NewCPUSet(), err + } + result = result.Union(remainingCPUs) + + // Remove allocated CPUs from the shared CPUSet. + s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result)) + + klog.Infof("[Advancedcpumanager] allocateCPUs: returning \"%v\"", result) + return result, nil +} + +// assignableCPUs returns the set of unassigned CPUs minus the reserved set. +func (p *advancedStaticPolicy) assignableCPUs(s state.State) cpuset.CPUSet { + return s.GetDefaultCPUSet().Difference(p.reserved) +} + +func (p *advancedStaticPolicy) RemoveContainer(s state.State, podUID string, containerName string) error { + klog.Infof("[Advancedcpumanager] static policy: RemoveContainer (pod: %s, container: %s)", podUID, containerName) + if toRelease, ok := s.GetCPUSet(podUID, containerName); ok { + s.Delete(podUID, containerName) + // Mutate the shared pool, adding released cpus. + s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease)) + } + return nil +} + +func (p *advancedStaticPolicy) NeedAllocated(pod *v1.Pod, container *v1.Container) bool { + return p.guaranteedCPUs(pod, container) > 0 +} + +func (p *advancedStaticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int { + cpuQuantity := container.Resources.Requests[v1.ResourceCPU] + if cpuQuantity.Value()*1000 != cpuQuantity.MilliValue() { + return 0 + } + if container.Resources.Requests[v1.ResourceCPU] != container.Resources.Limits[v1.ResourceCPU] { + return 0 + } + if csp := GetPodCPUSetType(pod, container); csp == CPUSetNone { + return 0 + } + return int(cpuQuantity.Value()) +} diff --git a/pkg/ensurance/cm/advanced_policy_static_test.go b/pkg/ensurance/cm/advanced_policy_static_test.go new file mode 100644 index 000000000..d6ab54514 --- /dev/null +++ b/pkg/ensurance/cm/advanced_policy_static_test.go @@ -0,0 +1,74 @@ +package cm + +import ( + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func Test_advancedStaticPolicy_guaranteedCPUs(t *testing.T) { + type args struct { + pod *v1.Pod + container *v1.Container + } + tests := []struct { + name string + args args + want int + }{ + { + name: "pod guaranteedCPUs", + args: args{ + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + CPUSetAnnotation: string(CPUSetShare), + }, + }, + }, + &v1.Container{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("3"), + v1.ResourceMemory: resource.MustParse("1G"), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("3"), + v1.ResourceMemory: resource.MustParse("1G"), + }, + }, + }, + }, + want: 3, + }, + { + name: "pod guaranteedCPUs", + args: args{ + &v1.Pod{}, + &v1.Container{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("3"), + v1.ResourceMemory: resource.MustParse("1G"), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("3"), + v1.ResourceMemory: resource.MustParse("1G"), + }, + }, + }, + }, + want: 0, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &advancedStaticPolicy{} + if got := p.guaranteedCPUs(tt.args.pod, tt.args.container); got != tt.want { + t.Errorf("advancedStaticPolicy.guaranteedCPUs() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/ensurance/cm/cpu_assignment.go b/pkg/ensurance/cm/cpu_assignment.go new file mode 100644 index 000000000..da0216fb0 --- /dev/null +++ b/pkg/ensurance/cm/cpu_assignment.go @@ -0,0 +1,203 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// copied from kubernetes/pkg/kubelet/cm/cpumanager/cpu_assignment.go +package cm + +import ( + "fmt" + "sort" + + "k8s.io/klog/v2" + + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" +) + +type cpuAccumulator struct { + topo *topology.CPUTopology + details topology.CPUDetails + numCPUsNeeded int + result cpuset.CPUSet +} + +func newCPUAccumulator(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) *cpuAccumulator { + return &cpuAccumulator{ + topo: topo, + details: topo.CPUDetails.KeepOnly(availableCPUs), + numCPUsNeeded: numCPUs, + result: cpuset.NewCPUSet(), + } +} + +func (a *cpuAccumulator) take(cpus cpuset.CPUSet) { + a.result = a.result.Union(cpus) + a.details = a.details.KeepOnly(a.details.CPUs().Difference(a.result)) + a.numCPUsNeeded -= cpus.Size() +} + +// Returns true if the supplied socket is fully available in `topoDetails`. +func (a *cpuAccumulator) isSocketFree(socketID int) bool { + return a.details.CPUsInSockets(socketID).Size() == a.topo.CPUsPerSocket() +} + +// Returns true if the supplied core is fully available in `topoDetails`. +func (a *cpuAccumulator) isCoreFree(coreID int) bool { + return a.details.CPUsInCores(coreID).Size() == a.topo.CPUsPerCore() +} + +// Returns free socket IDs as a slice sorted by: +// - socket ID, ascending. +func (a *cpuAccumulator) freeSockets() []int { + return a.details.Sockets().Filter(a.isSocketFree).ToSlice() +} + +// Returns core IDs as a slice sorted by: +// - the number of whole available cores on the socket, ascending +// - socket ID, ascending +// - core ID, ascending +func (a *cpuAccumulator) freeCores() []int { + socketIDs := a.details.Sockets().ToSliceNoSort() + sort.Slice(socketIDs, + func(i, j int) bool { + iCores := a.details.CoresInSockets(socketIDs[i]).Filter(a.isCoreFree) + jCores := a.details.CoresInSockets(socketIDs[j]).Filter(a.isCoreFree) + return iCores.Size() < jCores.Size() || socketIDs[i] < socketIDs[j] + }) + + coreIDs := []int{} + for _, s := range socketIDs { + coreIDs = append(coreIDs, a.details.CoresInSockets(s).Filter(a.isCoreFree).ToSlice()...) + } + return coreIDs +} + +// Returns CPU IDs as a slice sorted by: +// - socket affinity with result +// - number of CPUs available on the same socket +// - number of CPUs available on the same core +// - socket ID. +// - core ID. +func (a *cpuAccumulator) freeCPUs() []int { + result := []int{} + cores := a.details.Cores().ToSlice() + + sort.Slice( + cores, + func(i, j int) bool { + iCore := cores[i] + jCore := cores[j] + + iCPUs := a.topo.CPUDetails.CPUsInCores(iCore).ToSlice() + jCPUs := a.topo.CPUDetails.CPUsInCores(jCore).ToSlice() + + iSocket := a.topo.CPUDetails[iCPUs[0]].SocketID + jSocket := a.topo.CPUDetails[jCPUs[0]].SocketID + + // Compute the number of CPUs in the result reside on the same socket + // as each core. + iSocketColoScore := a.topo.CPUDetails.CPUsInSockets(iSocket).Intersection(a.result).Size() + jSocketColoScore := a.topo.CPUDetails.CPUsInSockets(jSocket).Intersection(a.result).Size() + + // Compute the number of available CPUs available on the same socket + // as each core. + iSocketFreeScore := a.details.CPUsInSockets(iSocket).Size() + jSocketFreeScore := a.details.CPUsInSockets(jSocket).Size() + + // Compute the number of available CPUs on each core. + iCoreFreeScore := a.details.CPUsInCores(iCore).Size() + jCoreFreeScore := a.details.CPUsInCores(jCore).Size() + + return iSocketColoScore > jSocketColoScore || + iSocketFreeScore < jSocketFreeScore || + iCoreFreeScore < jCoreFreeScore || + iSocket < jSocket || + iCore < jCore + }) + + // For each core, append sorted CPU IDs to result. + for _, core := range cores { + result = append(result, a.details.CPUsInCores(core).ToSlice()...) + } + return result +} + +func (a *cpuAccumulator) needs(n int) bool { + return a.numCPUsNeeded >= n +} + +func (a *cpuAccumulator) isSatisfied() bool { + return a.numCPUsNeeded < 1 +} + +func (a *cpuAccumulator) isFailed() bool { + return a.numCPUsNeeded > a.details.CPUs().Size() +} + +func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) { + acc := newCPUAccumulator(topo, availableCPUs, numCPUs) + if acc.isSatisfied() { + return acc.result, nil + } + if acc.isFailed() { + return cpuset.NewCPUSet(), fmt.Errorf("not enough cpus available to satisfy request") + } + + // Algorithm: topology-aware best-fit + // 1. Acquire whole sockets, if available and the container requires at + // least a socket's-worth of CPUs. + if acc.needs(acc.topo.CPUsPerSocket()) { + for _, s := range acc.freeSockets() { + klog.V(4).Infof("[Advancedcpumanager] takeByTopology: claiming socket [%d]", s) + acc.take(acc.details.CPUsInSockets(s)) + if acc.isSatisfied() { + return acc.result, nil + } + if !acc.needs(acc.topo.CPUsPerSocket()) { + break + } + } + } + + // 2. Acquire whole cores, if available and the container requires at least + // a core's-worth of CPUs. + if acc.needs(acc.topo.CPUsPerCore()) { + for _, c := range acc.freeCores() { + klog.V(4).Infof("[Advancedcpumanager] takeByTopology: claiming core [%d]", c) + acc.take(acc.details.CPUsInCores(c)) + if acc.isSatisfied() { + return acc.result, nil + } + if !acc.needs(acc.topo.CPUsPerCore()) { + break + } + } + } + + // 3. Acquire single threads, preferring to fill partially-allocated cores + // on the same sockets as the whole cores we have already taken in this + // allocation. + for _, c := range acc.freeCPUs() { + klog.V(4).Infof("[Advancedcpumanager] takeByTopology: claiming CPU [%d]", c) + if acc.needs(1) { + acc.take(cpuset.NewCPUSet(c)) + } + if acc.isSatisfied() { + return acc.result, nil + } + } + + return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus") +} diff --git a/pkg/ensurance/cm/cpu_assignment_test.go b/pkg/ensurance/cm/cpu_assignment_test.go new file mode 100644 index 000000000..9965c46f5 --- /dev/null +++ b/pkg/ensurance/cm/cpu_assignment_test.go @@ -0,0 +1,424 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// copied from kubernetes/pkg/kubelet/cm/cpumanager/cpu_assignment_test.go + +package cm + +import ( + "reflect" + "testing" + + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" +) + +var ( + topoSingleSocketHT = &topology.CPUTopology{ + NumCPUs: 8, + NumSockets: 1, + NumCores: 4, + CPUDetails: map[int]topology.CPUInfo{ + 0: {CoreID: 0, SocketID: 0, NUMANodeID: 0}, + 1: {CoreID: 1, SocketID: 0, NUMANodeID: 0}, + 2: {CoreID: 2, SocketID: 0, NUMANodeID: 0}, + 3: {CoreID: 3, SocketID: 0, NUMANodeID: 0}, + 4: {CoreID: 0, SocketID: 0, NUMANodeID: 0}, + 5: {CoreID: 1, SocketID: 0, NUMANodeID: 0}, + 6: {CoreID: 2, SocketID: 0, NUMANodeID: 0}, + 7: {CoreID: 3, SocketID: 0, NUMANodeID: 0}, + }, + } + + topoDualSocketHT = &topology.CPUTopology{ + NumCPUs: 12, + NumSockets: 2, + NumCores: 6, + CPUDetails: map[int]topology.CPUInfo{ + 0: {CoreID: 0, SocketID: 0, NUMANodeID: 0}, + 1: {CoreID: 1, SocketID: 1, NUMANodeID: 1}, + 2: {CoreID: 2, SocketID: 0, NUMANodeID: 0}, + 3: {CoreID: 3, SocketID: 1, NUMANodeID: 1}, + 4: {CoreID: 4, SocketID: 0, NUMANodeID: 0}, + 5: {CoreID: 5, SocketID: 1, NUMANodeID: 1}, + 6: {CoreID: 0, SocketID: 0, NUMANodeID: 0}, + 7: {CoreID: 1, SocketID: 1, NUMANodeID: 1}, + 8: {CoreID: 2, SocketID: 0, NUMANodeID: 0}, + 9: {CoreID: 3, SocketID: 1, NUMANodeID: 1}, + 10: {CoreID: 4, SocketID: 0, NUMANodeID: 0}, + 11: {CoreID: 5, SocketID: 1, NUMANodeID: 1}, + }, + } +) + +func TestCPUAccumulatorFreeSockets(t *testing.T) { + testCases := []struct { + description string + topo *topology.CPUTopology + availableCPUs cpuset.CPUSet + expect []int + }{ + { + "single socket HT, 1 socket free", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + []int{0}, + }, + { + "single socket HT, 0 sockets free", + topoSingleSocketHT, + cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7), + []int{}, + }, + { + "dual socket HT, 2 sockets free", + topoDualSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + []int{0, 1}, + }, + { + "dual socket HT, 1 socket free", + topoDualSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11), + []int{1}, + }, + { + "dual socket HT, 0 sockets free", + topoDualSocketHT, + cpuset.NewCPUSet(0, 2, 3, 4, 5, 6, 7, 8, 9, 11), + []int{}, + }, + } + + for _, tc := range testCases { + acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0) + result := acc.freeSockets() + if !reflect.DeepEqual(result, tc.expect) { + t.Errorf("[%s] expected %v to equal %v", tc.description, result, tc.expect) + } + } +} + +func TestCPUAccumulatorFreeCores(t *testing.T) { + testCases := []struct { + description string + topo *topology.CPUTopology + availableCPUs cpuset.CPUSet + expect []int + }{ + { + "single socket HT, 4 cores free", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + []int{0, 1, 2, 3}, + }, + { + "single socket HT, 3 cores free", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 4, 5, 6), + []int{0, 1, 2}, + }, + { + "single socket HT, 3 cores free (1 partially consumed)", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6), + []int{0, 1, 2}, + }, + { + "single socket HT, 0 cores free", + topoSingleSocketHT, + cpuset.NewCPUSet(), + []int{}, + }, + { + "single socket HT, 0 cores free (4 partially consumed)", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3), + []int{}, + }, + { + "dual socket HT, 6 cores free", + topoDualSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + []int{0, 2, 4, 1, 3, 5}, + }, + { + "dual socket HT, 5 cores free (1 consumed from socket 0)", + topoDualSocketHT, + cpuset.NewCPUSet(2, 1, 3, 4, 5, 7, 8, 9, 10, 11), + []int{2, 4, 1, 3, 5}, + }, + { + "dual socket HT, 4 cores free (1 consumed from each socket)", + topoDualSocketHT, + cpuset.NewCPUSet(2, 3, 4, 5, 8, 9, 10, 11), + []int{2, 4, 3, 5}, + }, + } + + for _, tc := range testCases { + acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0) + result := acc.freeCores() + if !reflect.DeepEqual(result, tc.expect) { + t.Errorf("[%s] expected %v to equal %v", tc.description, result, tc.expect) + } + } +} + +func TestCPUAccumulatorFreeCPUs(t *testing.T) { + testCases := []struct { + description string + topo *topology.CPUTopology + availableCPUs cpuset.CPUSet + expect []int + }{ + { + "single socket HT, 8 cpus free", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + []int{0, 4, 1, 5, 2, 6, 3, 7}, + }, + { + "single socket HT, 5 cpus free", + topoSingleSocketHT, + cpuset.NewCPUSet(3, 4, 5, 6, 7), + []int{4, 5, 6, 3, 7}, + }, + { + "dual socket HT, 12 cpus free", + topoDualSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + []int{0, 6, 2, 8, 4, 10, 1, 7, 3, 9, 5, 11}, + }, + { + "dual socket HT, 11 cpus free", + topoDualSocketHT, + cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + []int{6, 2, 8, 4, 10, 1, 7, 3, 9, 5, 11}, + }, + { + "dual socket HT, 10 cpus free", + topoDualSocketHT, + cpuset.NewCPUSet(1, 2, 3, 4, 5, 7, 8, 9, 10, 11), + []int{2, 8, 4, 10, 1, 7, 3, 9, 5, 11}, + }, + } + + for _, tc := range testCases { + acc := newCPUAccumulator(tc.topo, tc.availableCPUs, 0) + result := acc.freeCPUs() + if !reflect.DeepEqual(result, tc.expect) { + t.Errorf("[%s] expected %v to equal %v", tc.description, result, tc.expect) + } + } +} + +func TestCPUAccumulatorTake(t *testing.T) { + testCases := []struct { + description string + topo *topology.CPUTopology + availableCPUs cpuset.CPUSet + takeCPUs []cpuset.CPUSet + numCPUs int + expectSatisfied bool + expectFailed bool + }{ + { + "take 0 cpus from a single socket HT, require 1", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + []cpuset.CPUSet{cpuset.NewCPUSet()}, + 1, + false, + false, + }, + { + "take 0 cpus from a single socket HT, require 1, none available", + topoSingleSocketHT, + cpuset.NewCPUSet(), + []cpuset.CPUSet{cpuset.NewCPUSet()}, + 1, + false, + true, + }, + { + "take 1 cpu from a single socket HT, require 1", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + []cpuset.CPUSet{cpuset.NewCPUSet(0)}, + 1, + true, + false, + }, + { + "take 1 cpu from a single socket HT, require 2", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + []cpuset.CPUSet{cpuset.NewCPUSet(0)}, + 2, + false, + false, + }, + { + "take 2 cpu from a single socket HT, require 4, expect failed", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2), + []cpuset.CPUSet{cpuset.NewCPUSet(0), cpuset.NewCPUSet(1)}, + 4, + false, + true, + }, + { + "take all cpus one at a time from a single socket HT, require 8", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + []cpuset.CPUSet{ + cpuset.NewCPUSet(0), + cpuset.NewCPUSet(1), + cpuset.NewCPUSet(2), + cpuset.NewCPUSet(3), + cpuset.NewCPUSet(4), + cpuset.NewCPUSet(5), + cpuset.NewCPUSet(6), + cpuset.NewCPUSet(7), + }, + 8, + true, + false, + }, + } + + for _, tc := range testCases { + acc := newCPUAccumulator(tc.topo, tc.availableCPUs, tc.numCPUs) + totalTaken := 0 + for _, cpus := range tc.takeCPUs { + acc.take(cpus) + totalTaken += cpus.Size() + } + if tc.expectSatisfied != acc.isSatisfied() { + t.Errorf("[%s] expected acc.isSatisfied() to be %t", tc.description, tc.expectSatisfied) + } + if tc.expectFailed != acc.isFailed() { + t.Errorf("[%s] expected acc.isFailed() to be %t", tc.description, tc.expectFailed) + } + for _, cpus := range tc.takeCPUs { + availableCPUs := acc.details.CPUs() + if cpus.Intersection(availableCPUs).Size() > 0 { + t.Errorf("[%s] expected intersection of taken cpus [%s] and acc.details.CPUs() [%s] to be empty", tc.description, cpus, availableCPUs) + } + if !cpus.IsSubsetOf(acc.result) { + t.Errorf("[%s] expected [%s] to be a subset of acc.result [%s]", tc.description, cpus, acc.result) + } + } + expNumCPUsNeeded := tc.numCPUs - totalTaken + if acc.numCPUsNeeded != expNumCPUsNeeded { + t.Errorf("[%s] expected acc.numCPUsNeeded to be %d (got %d)", tc.description, expNumCPUsNeeded, acc.numCPUsNeeded) + } + } +} + +func TestTakeByTopology(t *testing.T) { + testCases := []struct { + description string + topo *topology.CPUTopology + availableCPUs cpuset.CPUSet + numCPUs int + expErr string + expResult cpuset.CPUSet + }{ + { + "take more cpus than are available from single socket with HT", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 2, 4, 6), + 5, + "not enough cpus available to satisfy request", + cpuset.NewCPUSet(), + }, + { + "take zero cpus from single socket with HT", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + 0, + "", + cpuset.NewCPUSet(), + }, + { + "take one cpu from single socket with HT", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + 1, + "", + cpuset.NewCPUSet(0), + }, + { + "take one cpu from single socket with HT, some cpus are taken", + topoSingleSocketHT, + cpuset.NewCPUSet(1, 3, 5, 6, 7), + 1, + "", + cpuset.NewCPUSet(6), + }, + { + "take two cpus from single socket with HT", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + 2, + "", + cpuset.NewCPUSet(0, 4), + }, + { + "take all cpus from single socket with HT", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + 8, + "", + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + }, + { + "take two cpus from single socket with HT, only one core totally free", + topoSingleSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 6), + 2, + "", + cpuset.NewCPUSet(2, 6), + }, + { + "take one cpu from dual socket with HT - core from Socket 0", + topoDualSocketHT, + cpuset.NewCPUSet(1, 2, 3, 4, 5, 7, 8, 9, 10, 11), + 1, + "", + cpuset.NewCPUSet(2), + }, + { + "take a socket of cpus from dual socket with HT", + topoDualSocketHT, + cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + 6, + "", + cpuset.NewCPUSet(0, 2, 4, 6, 8, 10), + }, + } + + for _, tc := range testCases { + result, err := takeByTopology(tc.topo, tc.availableCPUs, tc.numCPUs) + if tc.expErr != "" && err.Error() != tc.expErr { + t.Errorf("expected error to be [%v] but it was [%v] in test \"%s\"", tc.expErr, err, tc.description) + } + if !result.Equals(tc.expResult) { + t.Errorf("expected result [%s] to equal [%s] in test \"%s\"", result, tc.expResult, tc.description) + } + } +} diff --git a/pkg/ensurance/cm/policy.go b/pkg/ensurance/cm/policy.go new file mode 100644 index 000000000..639f07534 --- /dev/null +++ b/pkg/ensurance/cm/policy.go @@ -0,0 +1,21 @@ +package cm + +import ( + v1 "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" +) + +type policyName string + +// Policy implements logic for pod container to CPU assignment. +type Policy interface { + Name() string + // Start is only called once to start policy + Start(s state.State) error + // Allocate call is idempotent to allocate cpus for containers + Allocate(s state.State, pod *v1.Pod, container *v1.Container) error + // RemoveContainer call is idempotent to reclaim cpus + RemoveContainer(s state.State, podUID string, containerName string) error + // NeedAllocated is called to judge if container needs to allocate cpu + NeedAllocated(pod *v1.Pod, container *v1.Container) bool +} diff --git a/pkg/ensurance/cm/test/cpu_manager_state b/pkg/ensurance/cm/test/cpu_manager_state new file mode 100644 index 000000000..bb91085a9 --- /dev/null +++ b/pkg/ensurance/cm/test/cpu_manager_state @@ -0,0 +1 @@ +{"policyName":"none","defaultCpuSet":"","checksum":3242152201} \ No newline at end of file diff --git a/pkg/ensurance/cm/util.go b/pkg/ensurance/cm/util.go new file mode 100644 index 000000000..02abcea08 --- /dev/null +++ b/pkg/ensurance/cm/util.go @@ -0,0 +1,31 @@ +package cm + +import v1 "k8s.io/api/core/v1" + +const CPUSetAnnotation string = "qos.gocrane.io/cpu-manager" + +// CPUSetPolicy the type for cpuset +type CPUSetPolicy string + +const ( + CPUSetNone CPUSetPolicy = "none" + CPUSetExclusive CPUSetPolicy = "exclusive" + CPUSetShare CPUSetPolicy = "share" +) + +func GetPodCPUSetType(pod *v1.Pod, _ *v1.Container) CPUSetPolicy { + csp := CPUSetPolicy(pod.GetAnnotations()[CPUSetAnnotation]) + if csp == "" { + return CPUSetNone + } + return csp +} + +func IsPodNotRunning(statuses []v1.ContainerStatus) bool { + for _, status := range statuses { + if status.State.Terminated == nil && status.State.Waiting == nil { + return false + } + } + return true +} diff --git a/pkg/ensurance/collector/collector.go b/pkg/ensurance/collector/collector.go index a97a4652d..7a12482db 100644 --- a/pkg/ensurance/collector/collector.go +++ b/pkg/ensurance/collector/collector.go @@ -59,6 +59,7 @@ func (s *StateCollector) Name() string { } func (s *StateCollector) Run(stop <-chan struct{}) { + s.UpdateCollectors() go func() { updateTicker := time.NewTicker(s.collectInterval) defer updateTicker.Stop() diff --git a/pkg/ensurance/runtime/container.go b/pkg/ensurance/runtime/container.go index 2e826995a..c4e183227 100644 --- a/pkg/ensurance/runtime/container.go +++ b/pkg/ensurance/runtime/container.go @@ -41,6 +41,8 @@ type ListOptions struct { podID string // Regular expression pattern to match pod or container nameRegexp string + // Regular expression pattern to match the pod namespace + podNamespaceRegexp string // state of the sandbox state string // labels are selectors for the sandbox @@ -184,10 +186,8 @@ func filterContainersList(containersList []*pb.Container, opts ListOptions) []*p var filtered = []*pb.Container{} for _, c := range containersList { - if matched, err := regexp.MatchString(opts.nameRegexp, c.Metadata.Name); err == nil { - if matched { - filtered = append(filtered, c) - } + if matchRegex(opts.nameRegexp, c.Metadata.Name) { + filtered = append(filtered, c) } } @@ -210,3 +210,15 @@ func filterContainersList(containersList []*pb.Container, opts ListOptions) []*p return filtered[:n] } + +func matchRegex(pattern, target string) bool { + if pattern == "" { + return true + } + matched, err := regexp.MatchString(pattern, target) + if err != nil { + // Assume it's not a match if an error occurs. + return false + } + return matched +} diff --git a/pkg/ensurance/runtime/sandbox.go b/pkg/ensurance/runtime/sandbox.go new file mode 100644 index 000000000..64e919b41 --- /dev/null +++ b/pkg/ensurance/runtime/sandbox.go @@ -0,0 +1,87 @@ +package runtime + +import ( + "context" + "fmt" + "sort" + "strings" + + pb "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + "k8s.io/klog/v2" +) + +type sandboxByCreated []*pb.PodSandbox + +func (a sandboxByCreated) Len() int { return len(a) } +func (a sandboxByCreated) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a sandboxByCreated) Less(i, j int) bool { + return a[i].CreatedAt > a[j].CreatedAt +} + +// ListPodSandboxes sends a ListPodSandboxRequest to the server, and parses +// the returned ListPodSandboxResponse. +// copied from kubernetes-sigs/cri-tools/cmd/crictl/sandbox.go +func ListPodSandboxes(client pb.RuntimeServiceClient, opts ListOptions) ([]*pb.PodSandbox, error) { + filter := &pb.PodSandboxFilter{} + if opts.id != "" { + filter.Id = opts.id + } + if opts.state != "" { + st := &pb.PodSandboxStateValue{} + st.State = pb.PodSandboxState_SANDBOX_NOTREADY + switch strings.ToLower(opts.state) { + case "ready": + st.State = pb.PodSandboxState_SANDBOX_READY + filter.State = st + case "notready": + st.State = pb.PodSandboxState_SANDBOX_NOTREADY + filter.State = st + default: + klog.Errorf("state should be ready or notready") + return []*pb.PodSandbox{}, fmt.Errorf("state should be ready or notready") + } + } + if opts.labels != nil { + filter.LabelSelector = opts.labels + } + request := &pb.ListPodSandboxRequest{ + Filter: filter, + } + klog.V(6).Info("ListPodSandboxRequest: %v", request) + r, err := client.ListPodSandbox(context.Background(), request) + if err != nil { + return []*pb.PodSandbox{}, err + } + klog.V(6).Info("ListPodSandboxResponse: %v", r) + + r.Items = filterSandboxesList(r.GetItems(), opts) + return r.Items, nil +} + +func filterSandboxesList(sandboxesList []*pb.PodSandbox, opts ListOptions) []*pb.PodSandbox { + filtered := []*pb.PodSandbox{} + for _, p := range sandboxesList { + // Filter by pod name/namespace regular expressions. + if matchRegex(opts.nameRegexp, p.Metadata.Name) && + matchRegex(opts.podNamespaceRegexp, p.Metadata.Namespace) { + filtered = append(filtered, p) + } + } + + sort.Sort(sandboxByCreated(filtered)) + n := len(filtered) + if opts.latest { + n = 1 + } + if opts.last > 0 { + n = opts.last + } + n = func(a, b int) int { + if a < b { + return a + } + return b + }(n, len(filtered)) + + return filtered[:n] +} diff --git a/pkg/features/features.go b/pkg/features/features.go index 9f44fc1cd..77a40c5c6 100644 --- a/pkg/features/features.go +++ b/pkg/features/features.go @@ -24,6 +24,9 @@ const ( // CraneTimeSeriesPrediction enables the time series prediction features. CraneTimeSeriesPrediction featuregate.Feature = "TimeSeriesPrediction" + + // CraneCpuSetManager enables the cpuset manger features. + CraneCpuSetManager featuregate.Feature = "CpusetManager" ) var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ @@ -33,6 +36,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ CranePodResource: {Default: true, PreRelease: featuregate.Alpha}, CraneClusterNodePrediction: {Default: false, PreRelease: featuregate.Alpha}, CraneTimeSeriesPrediction: {Default: true, PreRelease: featuregate.Alpha}, + CraneCpuSetManager: {Default: false, PreRelease: featuregate.Alpha}, } func init() {