Skip to content

Commit

Permalink
kubelet: devices: skip allocation for running pods
Browse files Browse the repository at this point in the history
When kubelet initializes, runs admission for pods and possibly
allocated requested resources. We need to distinguish between
node reboot (no containers running) versus kubelet restart (containers
potentially running).

Running pods should always survive kubelet restart.
This means that device allocation on admission should not be attempted,
because if a container requires devices and is still running when kubelet
is restarting, that container already has devices allocated and working.

Thus, we need to properly detect this scenario in the allocation step
and handle it explicitely. We need to inform
the devicemanager about which pods are already running.

Note that if container runtime is down when kubelet restarts, the
approach implemented here won't work. In this scenario, so on kubelet
restart containers will again fail admission, hitting
kubernetes#118559 again.
This scenario should however be pretty rare.

Signed-off-by: Francesco Romani <fromani@redhat.com>
  • Loading branch information
ffromani committed Aug 8, 2023
1 parent 6af0a09 commit 0f53d95
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 30 deletions.
29 changes: 5 additions & 24 deletions pkg/kubelet/cm/container_manager_linux.go
Expand Up @@ -50,7 +50,6 @@ import (
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
Expand Down Expand Up @@ -557,9 +556,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
runtimeService internalapi.RuntimeService,
localStorageCapacityIsolation bool) error {

containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(runtimeService)

// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
containerMap := buildContainerMapFromRuntime(runtimeService)
containerMap, _ := buildContainerMapAndRunningSetFromRuntime(runtimeService)
err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start cpu manager error: %v", err)
Expand All @@ -568,7 +569,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,

// Initialize memory manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
containerMap := buildContainerMapFromRuntime(runtimeService)
containerMap, _ := buildContainerMapAndRunningSetFromRuntime(runtimeService)
err := cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start memory manager error: %v", err)
Expand Down Expand Up @@ -632,7 +633,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
}

// Starts device manager.
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil {
return err
}

Expand Down Expand Up @@ -726,26 +727,6 @@ func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
}
}

func buildContainerMapFromRuntime(runtimeService internalapi.RuntimeService) containermap.ContainerMap {
podSandboxMap := make(map[string]string)
podSandboxList, _ := runtimeService.ListPodSandbox(nil)
for _, p := range podSandboxList {
podSandboxMap[p.Id] = p.Metadata.Uid
}

containerMap := containermap.NewContainerMap()
containerList, _ := runtimeService.ListContainers(nil)
for _, c := range containerList {
if _, exists := podSandboxMap[c.PodSandboxId]; !exists {
klog.InfoS("no PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id)
continue
}
containerMap.Add(podSandboxMap[c.PodSandboxId], c.Metadata.Name, c.Id)
}

return containerMap
}

func isProcessRunningInHost(pid int) (bool, error) {
// Get init pid namespace.
initPidNs, err := os.Readlink("/proc/1/ns/pid")
Expand Down
4 changes: 3 additions & 1 deletion pkg/kubelet/cm/container_manager_windows.go
Expand Up @@ -84,8 +84,10 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
}
}

containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(runtimeService)

// Starts device manager.
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil {
return err
}

Expand Down
60 changes: 58 additions & 2 deletions pkg/kubelet/cm/devicemanager/manager.go
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
Expand Down Expand Up @@ -99,6 +100,15 @@ type ManagerImpl struct {

// pendingAdmissionPod contain the pod during the admission phase
pendingAdmissionPod *v1.Pod

// containerMap provides a mapping from (pod, container) -> containerID
// for all containers in a pod. Used to detect pods running across a restart
containerMap containermap.ContainerMap

// containerRunningSet identifies which container among those present in `containerMap`
// was reported running by the container runtime when `containerMap` was computed.
// Used to detect pods running across a restart
containerRunningSet sets.String
}

type endpointInfo struct {
Expand Down Expand Up @@ -269,11 +279,13 @@ func (m *ManagerImpl) checkpointFile() string {
// Start starts the Device Plugin Manager and start initialization of
// podDevices and allocatedDevices information from checkpointed state and
// starts device plugin registration service.
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error {
klog.V(2).InfoS("Starting Device Plugin manager")

m.activePods = activePods
m.sourcesReady = sourcesReady
m.containerMap = initialContainers
m.containerRunningSet = initialContainerRunningSet

// Loads in allocatedDevices information from disk.
err := m.readCheckpoint()
Expand Down Expand Up @@ -537,10 +549,31 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
}
}

// We have 3 major flows to handle:
// 1. kubelet running, normal allocation (needed > 0, container being [re]created). Steady state and most common case by far and large.
// 2. kubelet restart. In this scenario every other component of the stack (device plugins, app container, runtime) is still running.
// 3. node reboot. In this scenario device plugins may not be running yet when we try to allocate devices.
// note: if we get this far the runtime is surely running. This is usually enforced at OS level by startup system services dependencies.

// First we take care of the exceptional flow (scenarios 2 and 3). In both flows, kubelet is reinitializing, and while kubelet is initializing, sources are NOT all ready.
// Is this a simple kubelet restart (scenario 2)? To distinguish, we use the informations we got for runtime. If we are asked to allocate devices for containers reported
// running, then it can only be a kubelet restart. On node reboot the runtime and the containers were also shut down. Then, if the container was running, it can only be
// because it already has access to all the required devices, so we got nothing to do and we can bail out.
if !m.sourcesReady.AllReady() && m.isContainerAlreadyRunning(podUID, contName) {
klog.V(3).InfoS("container detected running, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
return nil, nil
}

// We dealt with scenario 2. If we got this far it's either scenario 3 (node reboot) or scenario 1 (steady state, normal flow).
klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
healthyDevices, hasRegistered := m.healthyDevices[resource]

// Check if resource registered with devicemanager
// The following checks are expected to fail only happen on scenario 3 (node reboot).
// The kubelet is reinitializing and got a container from sources. But there's no ordering, so an app container may attempt allocation _before_ the device plugin was created,
// has registered and reported back to kubelet the devices.
// This can only happen on scenario 3 because at steady state (scenario 1) the scheduler prevents pod to be sent towards node which don't report enough devices.
// Note: we need to check the device health and registration status *before* we check how many devices are needed, doing otherwise caused issue #109595
// Note: if the scheduler is bypassed, we fall back in scenario 1, so we still need these checks.
if !hasRegistered {
return nil, fmt.Errorf("cannot allocate unregistered device %s", resource)
}
Expand All @@ -555,7 +588,10 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
return nil, fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resource)
}

// We handled the known error paths in scenario 3 (node reboot), so from now on we can fall back in a common path.
// We cover container restart on kubelet steady state with the same flow.
if needed == 0 {
klog.V(3).InfoS("no devices needed, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
// No change, no work.
return nil, nil
}
Expand Down Expand Up @@ -1035,3 +1071,23 @@ func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) {

m.pendingAdmissionPod = pod
}

func (m *ManagerImpl) isContainerAlreadyRunning(podUID, cntName string) bool {
cntID, err := m.containerMap.GetContainerID(podUID, cntName)
if err != nil {
klog.V(4).InfoS("container not found in the initial map, assumed NOT running", "podUID", podUID, "containerName", cntName, "err", err)
return false
}

// note that if container runtime is down when kubelet restarts, this set will be empty,
// so on kubelet restart containers will again fail admission, hitting https://github.com/kubernetes/kubernetes/issues/118559 again.
// This scenario should however be rare enough.
if !m.containerRunningSet.Has(cntID) {
klog.V(4).InfoS("container not present in the initial running set", "podUID", podUID, "containerName", cntName, "containerID", cntID)
return false
}

// Once we make it here we know we have a running container.
klog.V(4).InfoS("container found in the initial set, assumed running", "podUID", podUID, "containerName", cntName, "containerID", cntID)
return true
}
4 changes: 3 additions & 1 deletion pkg/kubelet/cm/devicemanager/manager_stub.go
Expand Up @@ -18,6 +18,8 @@ package devicemanager

import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
Expand All @@ -34,7 +36,7 @@ func NewManagerStub() (*ManagerStub, error) {
}

// Start simply returns nil.
func (h *ManagerStub) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
func (h *ManagerStub) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error {
return nil
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/kubelet/cm/devicemanager/manager_test.go
Expand Up @@ -39,6 +39,7 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
Expand Down Expand Up @@ -273,7 +274,9 @@ func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitor
return []*v1.Pod{}
}

err = w.Start(activePods, &sourcesReadyStub{})
// test steady state, initialization where sourcesReady, containerMap and containerRunningSet
// are relevant will be tested with a different flow
err = w.Start(activePods, &sourcesReadyStub{}, containermap.NewContainerMap(), sets.NewString())
require.NoError(t, err)

return w, updateChan
Expand Down Expand Up @@ -1001,6 +1004,8 @@ func TestPodContainerDeviceToAllocate(t *testing.T) {
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: newPodDevices(),
activePods: func() []*v1.Pod { return []*v1.Pod{} },
sourcesReady: &sourcesReadyStub{},
}

testManager.podDevices.insert("pod1", "con1", resourceName1,
Expand Down
4 changes: 3 additions & 1 deletion pkg/kubelet/cm/devicemanager/types.go
Expand Up @@ -20,6 +20,8 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
Expand All @@ -31,7 +33,7 @@ import (
// Manager manages all the Device Plugins running on a node.
type Manager interface {
// Start starts device plugin registration service.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error

// Allocate configures and assigns devices to a container in a pod. From
// the requested device resources, Allocate will communicate with the
Expand Down
30 changes: 30 additions & 0 deletions pkg/kubelet/cm/helpers.go
Expand Up @@ -18,6 +18,11 @@ package cm

import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
)

Expand All @@ -44,3 +49,28 @@ func hardEvictionReservation(thresholds []evictionapi.Threshold, capacity v1.Res
}
return ret
}

func buildContainerMapAndRunningSetFromRuntime(runtimeService internalapi.RuntimeService) (containermap.ContainerMap, sets.String) {
podSandboxMap := make(map[string]string)
podSandboxList, _ := runtimeService.ListPodSandbox(nil)
for _, p := range podSandboxList {
podSandboxMap[p.Id] = p.Metadata.Uid
}

runningSet := sets.NewString()
containerMap := containermap.NewContainerMap()
containerList, _ := runtimeService.ListContainers(nil)
for _, c := range containerList {
if _, exists := podSandboxMap[c.PodSandboxId]; !exists {
klog.InfoS("No PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id)
continue
}
podUID := podSandboxMap[c.PodSandboxId]
containerMap.Add(podUID, c.Metadata.Name, c.Id)
if c.State == runtimeapi.ContainerState_CONTAINER_RUNNING {
klog.V(4).InfoS("Container reported running", "podSandboxId", c.PodSandboxId, "podUID", podUID, "containerName", c.Metadata.Name, "containerId", c.Id)
runningSet.Insert(c.Id)
}
}
return containerMap, runningSet
}

0 comments on commit 0f53d95

Please sign in to comment.