diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index cc673dc6f690..23d962c71f9b 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -96,8 +96,8 @@ type throttledDockerPuller struct { limiter util.RateLimiter } -// NewDockerPuller creates a new instance of the default implementation of DockerPuller. -func NewDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller { +// newDockerPuller creates a new instance of the default implementation of DockerPuller. +func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller { dp := dockerPuller{ client: client, keyring: credentialprovider.NewDockerKeyring(), diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 25a0c9e5f67a..cfabba03f4f7 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -60,18 +60,25 @@ type DockerManager struct { // means that some entries may be recycled before a pod has been // deleted. reasonCache stringCache + // TODO(yifan): We export this for testability, so when we have a fake + // container manager, then we can unexport this. Also at that time, we + // use the concrete type so that we can record the pull failure and eliminate + // the image checking in GetPodStatus(). + Puller DockerPuller } // Ensures DockerManager implements ConatinerRunner. var _ kubecontainer.ContainerRunner = new(DockerManager) -func NewDockerManager(client DockerInterface, recorder record.EventRecorder, podInfraContainerImage string) *DockerManager { +func NewDockerManager(client DockerInterface, recorder record.EventRecorder, podInfraContainerImage string, qps float32, burst int) *DockerManager { reasonCache := stringCache{cache: lru.New(maxReasonCacheEntries)} return &DockerManager{ client: client, recorder: recorder, PodInfraContainerImage: podInfraContainerImage, - reasonCache: reasonCache} + reasonCache: reasonCache, + Puller: newDockerPuller(client, qps, burst), + } } // A cache which stores strings keyed by _. @@ -520,3 +527,11 @@ func makeCapabilites(capAdd []api.CapabilityType, capDrop []api.CapabilityType) } return addCaps, dropCaps } + +func (self *DockerManager) Pull(image string) error { + return self.Puller.Pull(image) +} + +func (self *DockerManager) IsImagePresent(image string) (bool, error) { + return self.Puller.IsImagePresent(image) +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index b544402cc9a3..f7f2e5e7c27f 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -199,7 +199,7 @@ func NewMainKubelet( return nil, fmt.Errorf("failed to initialize image manager: %v", err) } statusManager := newStatusManager(kubeClient) - containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage) + containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage, pullQPS, pullBurst) klet := &Kubelet{ hostname: hostname, @@ -211,8 +211,6 @@ func NewMainKubelet( readinessManager: kubecontainer.NewReadinessManager(), runner: dockertools.NewDockerContainerCommandRunner(dockerClient), httpClient: &http.Client{}, - pullQPS: pullQPS, - pullBurst: pullBurst, sourcesReady: sourcesReady, clusterDomain: clusterDomain, clusterDNS: clusterDNS, @@ -289,18 +287,12 @@ type Kubelet struct { // Tracks references for reporting events containerRefManager *kubecontainer.RefManager - // Optional, defaults to simple Docker implementation - dockerPuller dockertools.DockerPuller // Optional, defaults to /logs/ from /var/log logServer http.Handler // Optional, defaults to simple Docker implementation runner dockertools.ContainerCommandRunner // Optional, client for http requests, defaults to empty client httpClient httpGetter - // Optional, maximum pull QPS from the docker registry, 0.0 means unlimited. - pullQPS float32 - // Optional, maximum burst QPS from the docker registry, must be positive if QPS is > 0.0 - pullBurst int // cAdvisor used for container information. cadvisor cadvisor.Interface @@ -541,9 +533,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { if kl.logServer == nil { kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))) } - if kl.dockerPuller == nil { - kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst) - } if kl.kubeClient == nil { glog.Warning("No api server defined - no node status update will be sent.") } @@ -877,7 +866,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.Pod) (dockertools.DockerID, glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) } // TODO: make this a TTL based pull (if image older than X policy, pull) - ok, err := kl.dockerPuller.IsImagePresent(container.Image) + ok, err := kl.containerManager.IsImagePresent(container.Image) if err != nil { if ref != nil { kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err) @@ -919,7 +908,7 @@ func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error { metrics.ImagePullLatency.Observe(metrics.SinceInMicroseconds(start)) }() - if err := kl.dockerPuller.Pull(img); err != nil { + if err := kl.containerManager.Pull(img); err != nil { if ref != nil { kl.recorder.Eventf(ref, "failed", "Failed to pull image %q: %v", img, err) } @@ -1033,7 +1022,7 @@ func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Contain glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) } if container.ImagePullPolicy != api.PullNever { - present, err := kl.dockerPuller.IsImagePresent(container.Image) + present, err := kl.containerManager.IsImagePresent(container.Image) if err != nil { if ref != nil { kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index ce8219f5df1b..b48fb5152916 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -73,12 +73,11 @@ func newTestKubelet(t *testing.T) *TestKubelet { fakeDockerCache := dockertools.NewFakeDockerCache(fakeDocker) fakeRecorder := &record.FakeRecorder{} fakeKubeClient := &testclient.Fake{} - kubelet := &Kubelet{} kubelet.dockerClient = fakeDocker kubelet.dockerCache = fakeDockerCache kubelet.kubeClient = fakeKubeClient - kubelet.dockerPuller = &dockertools.FakeDockerPuller{} + kubelet.hostname = "testnode" kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil { @@ -113,7 +112,8 @@ func newTestKubelet(t *testing.T) *TestKubelet { podManager, fakeMirrorClient := newFakePodManager() kubelet.podManager = podManager kubelet.containerRefManager = kubecontainer.NewRefManager() - kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage) + kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0) + kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{} return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient} } @@ -593,7 +593,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker waitGroup := testKubelet.waitGroup - puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) + puller := kubelet.containerManager.Puller.(*dockertools.FakeDockerPuller) puller.HasImages = []string{} kubelet.containerManager.PodInfraContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} @@ -1249,7 +1249,6 @@ func TestGetRootInfo(t *testing.T) { kubelet := Kubelet{ dockerClient: &fakeDocker, - dockerPuller: &dockertools.FakeDockerPuller{}, cadvisor: mockCadvisor, } @@ -1652,7 +1651,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) { kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker waitGroup := testKubelet.waitGroup - puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) + puller := kubelet.containerManager.Puller.(*dockertools.FakeDockerPuller) puller.HasImages = []string{"existing_one", "want:latest"} kubelet.containerManager.PodInfraContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index 4154b1c7efac..a4cb19478d54 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -53,9 +53,6 @@ func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error) { // runOnce runs a given set of pods and returns their status. func (kl *Kubelet) runOnce(pods []api.Pod, retryDelay time.Duration) (results []RunPodResult, err error) { - if kl.dockerPuller == nil { - kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst) - } kl.handleNotFittingPods(pods) ch := make(chan RunPodResult) diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index c7ad3dc86c8b..75ddab52b697 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -144,8 +144,9 @@ func TestRunOnce(t *testing.T) { }, t: t, } - kb.dockerPuller = &dockertools.FakeDockerPuller{} - kb.containerManager = dockertools.NewDockerManager(kb.dockerClient, kb.recorder, dockertools.PodInfraContainerImage) + + kb.containerManager = dockertools.NewDockerManager(kb.dockerClient, kb.recorder, dockertools.PodInfraContainerImage, 0, 0) + kb.containerManager.Puller = &dockertools.FakeDockerPuller{} pods := []api.Pod{ {