Skip to content

Commit

Permalink
Change static pod uid/hash generation/checking
Browse files Browse the repository at this point in the history
Fixes an issue where the kubelet would not update static pods when the apiserver is unavailable.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Oct 4, 2022
1 parent 008b065 commit 53d8510
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 32 deletions.
98 changes: 71 additions & 27 deletions pkg/rke2/spw.go
Expand Up @@ -2,7 +2,7 @@ package rke2

import (
"context"
"fmt"
"encoding/json"
"os"
"path/filepath"
"sync"
Expand All @@ -18,6 +18,10 @@ import (
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
)

type containerInfo struct {
Config *runtimeapi.ContainerConfig `json:"config,omitempty"`
}

// checkStaticManifests validates that the pods started with rke2 match the static manifests
// provided in /var/lib/rancher/rke2/agent/pod-manifests. When restarting rke2, it takes time
// for any changes to static manifests to be pulled by kubelet. Additionally this prevents errors
Expand All @@ -26,7 +30,7 @@ func checkStaticManifests(containerRuntimeEndpoint, dataDir string) cmds.Startup
return func(ctx context.Context, wg *sync.WaitGroup, args cmds.StartupHookArgs) error {
go func() {
defer wg.Done()
if err := wait.PollImmediate(20*time.Second, 30*time.Minute, func() (bool, error) {
if err := wait.PollImmediateWithContext(ctx, 20*time.Second, 30*time.Minute, func(ctx context.Context) (bool, error) {
if containerRuntimeEndpoint == "" {
containerRuntimeEndpoint = containerdSock
}
Expand All @@ -42,38 +46,78 @@ func checkStaticManifests(containerRuntimeEndpoint, dataDir string) cmds.Startup

for _, pod := range []string{"etcd", "kube-apiserver"} {
manifestFile := filepath.Join(manifestDir, pod+".yaml")
if f, err := os.Open(manifestFile); err == nil {
defer f.Close()
podManifest := v1.Pod{}
decoder := yaml.NewYAMLToJSONDecoder(f)
err = decoder.Decode(&podManifest)
if err != nil {
logrus.Fatalf("Failed to decode %s manifest: %v", pod, err)
}
podFilter := &runtimeapi.ContainerFilter{
LabelSelector: map[string]string{
"io.kubernetes.pod.uid": string(podManifest.UID),
},
}
resp, err := cRuntime.ListContainers(ctx, &runtimeapi.ListContainersRequest{Filter: podFilter})
if err != nil {
return false, err
}
if len(resp.Containers) < 1 {
logrus.Infof("%s pod not found, retrying", pod)
return false, nil
if err := checkManifestDeployed(ctx, cRuntime, manifestFile); err != nil {
if errors.Is(err, os.ErrNotExist) {
// Since split-role servers exist, we don't care if no manifest is found
continue
}
logrus.Infof("Latest %s manifest deployed", pod)
} else if !errors.Is(err, os.ErrNotExist) {
// Since split-role servers exist, we don't care if no manifest is found
return false, fmt.Errorf("failed to open %s manifest: %v", pod, err)
logrus.Infof("Container for %s not found (%v), retrying", pod, err)
return false, nil
}
logrus.Infof("Container for %s is running", pod)
}
return true, nil
}); err != nil {
logrus.Fatalf("Failed waiting for manifests to deploy: %v", err)
logrus.Fatalf("Failed waiting for static pods to deploy: %v", err)
}
}()
return nil
}
}

// checkManifestDeployed returns an error if the static pod's manifest cannot be decoded and
// verified as present and running with the current pod hash in the container runtime.
func checkManifestDeployed(ctx context.Context, cRuntime runtimeapi.RuntimeServiceClient, manifestFile string) error {
f, err := os.Open(manifestFile)
if err != nil {
return errors.Wrap(err, "failed to open manifest")
}
defer f.Close()

podManifest := v1.Pod{}
decoder := yaml.NewYAMLToJSONDecoder(f)
err = decoder.Decode(&podManifest)
if err != nil {
return errors.Wrap(err, "failed to decode manifest")
}

var podHash string
for _, env := range podManifest.Spec.Containers[0].Env {
if env.Name == "POD_HASH" {
podHash = env.Value
break
}
}

filter := &runtimeapi.ContainerFilter{
State: &runtimeapi.ContainerStateValue{
State: runtimeapi.ContainerState_CONTAINER_RUNNING,
},
LabelSelector: map[string]string{
"io.kubernetes.pod.uid": string(podManifest.UID),
},
}

resp, err := cRuntime.ListContainers(ctx, &runtimeapi.ListContainersRequest{Filter: filter})
if err != nil {
return errors.Wrap(err, "failed to list containers")
}

for _, container := range resp.Containers {
resp, err := cRuntime.ContainerStatus(ctx, &runtimeapi.ContainerStatusRequest{ContainerId: container.Id, Verbose: true})
if err != nil {
return errors.Wrap(err, "failed to get container status")
}
info := &containerInfo{}
err = json.Unmarshal([]byte(resp.Info["info"]), &info)
if err != nil || info.Config == nil {
return errors.Wrap(err, "failed to unmarshal container config")
}
for _, env := range info.Config.Envs {
if env.Key == "POD_HASH" && env.Value == podHash {
return nil
}
}
}
return errors.New("no matching container found")
}
19 changes: 14 additions & 5 deletions pkg/staticpod/staticpod.go
Expand Up @@ -93,14 +93,23 @@ func Run(dir string, args Args) error {

manifestPath := filepath.Join(dir, args.Command+".yaml")

// We hash the completed pod manifest use that as the UID; this mimics what upstream does:
// https://github.com/kubernetes/kubernetes/blob/v1.24.0/pkg/kubelet/config/common.go#L58-68
// Generate a stable UID based on the manifest path. This allows the kubelet to reconcile the pod's
// containers even when the apiserver is unavailable. If the UID is not stable, the kubelet
// will consider the manifest change as two separate add/remove operations, and may start the new pod
// before terminating the old one. Cleanup of removed pods is disabled until all sources have synced,
// so if the apiserver is down, the newly added pod may get stuck in a crash loop due to the old pod
// still using its ports. See https://github.com/rancher/rke2/issues/3387
hasher := md5.New()
hash.DeepHashObject(hasher, pod)
fmt.Fprintf(hasher, "host:%s", os.Getenv("NODE_NAME"))
fmt.Fprintf(hasher, "file:%s", manifestPath)
fmt.Fprint(hasher, manifestPath)
pod.UID = types.UID(hex.EncodeToString(hasher.Sum(nil)[0:]))

// Append a hash of the completed pod manifest to the container environment for later use when checking
// to see if the pod has been updated. It's fine that setting this changes the actual hash; we
// just need a stable values that we can compare between the file on disk and the running
// container to see if the kubelet has reconciled yet.
hash.DeepHashObject(hasher, pod)
pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, v1.EnvVar{Name: "POD_HASH", Value: hex.EncodeToString(hasher.Sum(nil)[0:])})

b, err := yaml.Marshal(pod)
if err != nil {
return err
Expand Down

0 comments on commit 53d8510

Please sign in to comment.