Skip to content

Commit

Permalink
Merge pull request #7480 from vmarmol/runtime-syncpod
Browse files Browse the repository at this point in the history
Move ComputePodChanges to the Docker runtime
  • Loading branch information
yujuhong committed Apr 29, 2015
2 parents 9fd8cba + fe4600b commit ba1140a
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 195 deletions.
9 changes: 0 additions & 9 deletions pkg/kubelet/dockertools/docker.go
Expand Up @@ -19,15 +19,13 @@ package dockertools
import (
"fmt"
"hash/adler32"
"io"
"math/rand"
"os"
"strconv"
"strings"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/leaky"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
Expand Down Expand Up @@ -275,13 +273,6 @@ func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface {
return client
}

// TODO(yifan): Move this to container.Runtime.
type ContainerCommandRunner interface {
RunInContainer(containerID string, cmd []string) ([]byte, error)
ExecInContainer(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error
}

func milliCPUToShares(milliCPU int64) int64 {
if milliCPU == 0 {
// zero milliCPU means unset. Use kernel default.
Expand Down
5 changes: 3 additions & 2 deletions pkg/kubelet/dockertools/docker_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
kubeletProber "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
docker "github.com/fsouza/go-dockerclient"
Expand Down Expand Up @@ -394,7 +395,7 @@ func TestGetRunningContainers(t *testing.T) {
fakeDocker := &FakeDockerClient{Errors: make(map[string]error)}
fakeRecorder := &record.FakeRecorder{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np)
containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{})
tests := []struct {
containers map[string]*docker.Container
inputIDs []string
Expand Down Expand Up @@ -660,7 +661,7 @@ func TestFindContainersByPod(t *testing.T) {
}
fakeClient := &FakeDockerClient{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np)
containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{})
for i, test := range tests {
fakeClient.ContainerList = test.containerList
fakeClient.ExitedContainerList = test.exitedContainerList
Expand Down
176 changes: 173 additions & 3 deletions pkg/kubelet/dockertools/manager.go
Expand Up @@ -35,7 +35,9 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
docker "github.com/fsouza/go-dockerclient"
Expand Down Expand Up @@ -86,6 +88,11 @@ type DockerManager struct {

// Network plugin.
networkPlugin network.NetworkPlugin

// TODO(vmarmol): Make this non-public when we remove the circular dependency
// with prober.
// Health check prober.
Prober prober.Prober
}

func NewDockerManager(
Expand All @@ -98,7 +105,8 @@ func NewDockerManager(
burst int,
containerLogsDir string,
osInterface kubecontainer.OSInterface,
networkPlugin network.NetworkPlugin) *DockerManager {
networkPlugin network.NetworkPlugin,
prober prober.Prober) *DockerManager {
// Work out the location of the Docker runtime, defaulting to /var/lib/docker
// if there are any problems.
dockerRoot := "/var/lib/docker"
Expand Down Expand Up @@ -142,6 +150,7 @@ func NewDockerManager(
dockerRoot: dockerRoot,
containerLogsDir: containerLogsDir,
networkPlugin: networkPlugin,
Prober: prober,
}
}

Expand Down Expand Up @@ -692,8 +701,8 @@ func (dm *DockerManager) IsImagePresent(image string) (bool, error) {
return dm.Puller.IsImagePresent(image)
}

// PodInfraContainer returns true if the pod infra container has changed.
func (dm *DockerManager) PodInfraContainerChanged(pod *api.Pod, podInfraContainer *kubecontainer.Container) (bool, error) {
// podInfraContainerChanged returns true if the pod infra container has changed.
func (dm *DockerManager) podInfraContainerChanged(pod *api.Pod, podInfraContainer *kubecontainer.Container) (bool, error) {
networkMode := ""
var ports []api.ContainerPort

Expand Down Expand Up @@ -1112,3 +1121,164 @@ func (dm *DockerManager) CreatePodInfraContainer(pod *api.Pod, generator kubecon
}
return id, util.ApplyOomScoreAdj(containerInfo.State.Pid, podOomScoreAdj)
}

// TODO(vmarmol): This will soon be made non-public when its only use is internal.
// Structure keeping information on changes that need to happen for a pod. The semantics is as follows:
// - startInfraContainer is true if new Infra Containers have to be started and old one (if running) killed.
// Additionally if it is true then containersToKeep have to be empty
// - infraContainerId have to be set iff startInfraContainer is false. It stores dockerID of running Infra Container
// - containersToStart keeps indices of Specs of containers that have to be started.
// - containersToKeep stores mapping from dockerIDs of running containers to indices of their Specs for containers that
// should be kept running. If startInfraContainer is false then it contains an entry for infraContainerId (mapped to -1).
// It shouldn't be the case where containersToStart is empty and containersToKeep contains only infraContainerId. In such case
// Infra Container should be killed, hence it's removed from this map.
// - all running containers which are NOT contained in containersToKeep should be killed.
type empty struct{}
type PodContainerChangesSpec struct {
StartInfraContainer bool
InfraContainerId kubeletTypes.DockerID
ContainersToStart map[int]empty
ContainersToKeep map[kubeletTypes.DockerID]int
}

// TODO(vmarmol): This will soon be made non-public when its only use is internal.
func (dm *DockerManager) ComputePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (PodContainerChangesSpec, error) {
podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)

containersToStart := make(map[int]empty)
containersToKeep := make(map[kubeletTypes.DockerID]int)
createPodInfraContainer := false

var err error
var podInfraContainerID kubeletTypes.DockerID
var changed bool
podInfraContainer := runningPod.FindContainerByName(PodInfraContainerName)
if podInfraContainer != nil {
glog.V(4).Infof("Found pod infra container for %q", podFullName)
changed, err = dm.podInfraContainerChanged(pod, podInfraContainer)
if err != nil {
return PodContainerChangesSpec{}, err
}
}

createPodInfraContainer = true
if podInfraContainer == nil {
glog.V(2).Infof("Need to restart pod infra container for %q because it is not found", podFullName)
} else if changed {
glog.V(2).Infof("Need to restart pod infra container for %q because it is changed", podFullName)
} else {
glog.V(4).Infof("Pod infra container looks good, keep it %q", podFullName)
createPodInfraContainer = false
podInfraContainerID = kubeletTypes.DockerID(podInfraContainer.ID)
containersToKeep[podInfraContainerID] = -1
}

for index, container := range pod.Spec.Containers {
expectedHash := HashContainer(&container)

c := runningPod.FindContainerByName(container.Name)
if c == nil {
if shouldContainerBeRestarted(&container, pod, &podStatus, dm.readinessManager) {
// If we are here it means that the container is dead and should be restarted, or never existed and should
// be created. We may be inserting this ID again if the container has changed and it has
// RestartPolicy::Always, but it's not a big deal.
glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
containersToStart[index] = empty{}
}
continue
}

containerID := kubeletTypes.DockerID(c.ID)
hash := c.Hash
glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID)

if createPodInfraContainer {
// createPodInfraContainer == true and Container exists
// If we're creating infra containere everything will be killed anyway
// If RestartPolicy is Always or OnFailure we restart containers that were running before we
// killed them when restarting Infra Container.
if pod.Spec.RestartPolicy != api.RestartPolicyNever {
glog.V(1).Infof("Infra Container is being recreated. %q will be restarted.", container.Name)
containersToStart[index] = empty{}
}
continue
}

// At this point, the container is running and pod infra container is good.
// We will look for changes and check healthiness for the container.
containerChanged := hash != 0 && hash != expectedHash
if containerChanged {
glog.Infof("pod %q container %q hash changed (%d vs %d), it will be killed and re-created.", podFullName, container.Name, hash, expectedHash)
containersToStart[index] = empty{}
continue
}

result, err := dm.Prober.Probe(pod, podStatus, container, string(c.ID), c.Created)
if err != nil {
// TODO(vmarmol): examine this logic.
glog.V(2).Infof("probe no-error: %q", container.Name)
containersToKeep[containerID] = index
continue
}
if result == probe.Success {
glog.V(4).Infof("probe success: %q", container.Name)
containersToKeep[containerID] = index
continue
}
glog.Infof("pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result)
containersToStart[index] = empty{}
}

// After the loop one of the following should be true:
// - createPodInfraContainer is true and containersToKeep is empty.
// (In fact, when createPodInfraContainer is false, containersToKeep will not be touched).
// - createPodInfraContainer is false and containersToKeep contains at least ID of Infra Container

// If Infra container is the last running one, we don't want to keep it.
if !createPodInfraContainer && len(containersToStart) == 0 && len(containersToKeep) == 1 {
containersToKeep = make(map[kubeletTypes.DockerID]int)
}

return PodContainerChangesSpec{
StartInfraContainer: createPodInfraContainer,
InfraContainerId: podInfraContainerID,
ContainersToStart: containersToStart,
ContainersToKeep: containersToKeep,
}, nil
}

func shouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus, readinessManager *kubecontainer.ReadinessManager) bool {
podFullName := kubecontainer.GetPodFullName(pod)

// Get all dead container status.
var resultStatus []*api.ContainerStatus
for i, containerStatus := range podStatus.ContainerStatuses {
if containerStatus.Name == container.Name && containerStatus.State.Termination != nil {
resultStatus = append(resultStatus, &podStatus.ContainerStatuses[i])
}
}

// Set dead containers to unready state.
for _, c := range resultStatus {
readinessManager.RemoveReadiness(kubecontainer.TrimRuntimePrefixFromImage(c.ContainerID))
}

// Check RestartPolicy for dead container.
if len(resultStatus) > 0 {
if pod.Spec.RestartPolicy == api.RestartPolicyNever {
glog.V(4).Infof("Already ran container %q of pod %q, do nothing", container.Name, podFullName)
return false
}
if pod.Spec.RestartPolicy == api.RestartPolicyOnFailure {
// Check the exit code of last run. Note: This assumes the result is sorted
// by the created time in reverse order.
if resultStatus[0].State.Termination.ExitCode == 0 {
glog.V(4).Infof("Already successfully ran container %q of pod %q, do nothing", container.Name, podFullName)
return false
}
}
}
return true
}
5 changes: 3 additions & 2 deletions pkg/kubelet/handlers.go
Expand Up @@ -24,18 +24,19 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)

type handlerRunner struct {
httpGetter httpGetter
commandRunner dockertools.ContainerCommandRunner
commandRunner prober.ContainerCommandRunner
containerManager *dockertools.DockerManager
}

// TODO(yifan): Merge commandRunner and containerManager once containerManager implements the ContainerCommandRunner interface.
func newHandlerRunner(httpGetter httpGetter, commandRunner dockertools.ContainerCommandRunner, containerManager *dockertools.DockerManager) kubecontainer.HandlerRunner {
func newHandlerRunner(httpGetter httpGetter, commandRunner prober.ContainerCommandRunner, containerManager *dockertools.DockerManager) kubecontainer.HandlerRunner {
return &handlerRunner{
httpGetter: httpGetter,
commandRunner: commandRunner,
Expand Down

0 comments on commit ba1140a

Please sign in to comment.