Skip to content

Commit

Permalink
kubelet/dockertools: Add puller interfaces in the containerManager.
Browse files Browse the repository at this point in the history
  • Loading branch information
Yifan Gu committed Apr 13, 2015
1 parent 1be6847 commit 908cc02
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 30 deletions.
4 changes: 2 additions & 2 deletions pkg/kubelet/dockertools/docker.go
Expand Up @@ -96,8 +96,8 @@ type throttledDockerPuller struct {
limiter util.RateLimiter
}

// NewDockerPuller creates a new instance of the default implementation of DockerPuller.
func NewDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller {
// newDockerPuller creates a new instance of the default implementation of DockerPuller.
func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller {
dp := dockerPuller{
client: client,
keyring: credentialprovider.NewDockerKeyring(),
Expand Down
19 changes: 17 additions & 2 deletions pkg/kubelet/dockertools/manager.go
Expand Up @@ -60,18 +60,25 @@ type DockerManager struct {
// means that some entries may be recycled before a pod has been
// deleted.
reasonCache stringCache
// TODO(yifan): We export this for testability, so when we have a fake
// container manager, then we can unexport this. Also at that time, we
// use the concrete type so that we can record the pull failure and eliminate
// the image checking in GetPodStatus().
Puller DockerPuller
}

// Ensures DockerManager implements ConatinerRunner.
var _ kubecontainer.ContainerRunner = new(DockerManager)

func NewDockerManager(client DockerInterface, recorder record.EventRecorder, podInfraContainerImage string) *DockerManager {
func NewDockerManager(client DockerInterface, recorder record.EventRecorder, podInfraContainerImage string, qps float32, burst int) *DockerManager {
reasonCache := stringCache{cache: lru.New(maxReasonCacheEntries)}
return &DockerManager{
client: client,
recorder: recorder,
PodInfraContainerImage: podInfraContainerImage,
reasonCache: reasonCache}
reasonCache: reasonCache,
Puller: newDockerPuller(client, qps, burst),
}
}

// A cache which stores strings keyed by <pod_UID>_<container_name>.
Expand Down Expand Up @@ -520,3 +527,11 @@ func makeCapabilites(capAdd []api.CapabilityType, capDrop []api.CapabilityType)
}
return addCaps, dropCaps
}

func (self *DockerManager) Pull(image string) error {
return self.Puller.Pull(image)
}

func (self *DockerManager) IsImagePresent(image string) (bool, error) {
return self.Puller.IsImagePresent(image)
}
19 changes: 4 additions & 15 deletions pkg/kubelet/kubelet.go
Expand Up @@ -199,7 +199,7 @@ func NewMainKubelet(
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
statusManager := newStatusManager(kubeClient)
containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage)
containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage, pullQPS, pullBurst)

klet := &Kubelet{
hostname: hostname,
Expand All @@ -211,8 +211,6 @@ func NewMainKubelet(
readinessManager: kubecontainer.NewReadinessManager(),
runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
httpClient: &http.Client{},
pullQPS: pullQPS,
pullBurst: pullBurst,
sourcesReady: sourcesReady,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
Expand Down Expand Up @@ -289,18 +287,12 @@ type Kubelet struct {
// Tracks references for reporting events
containerRefManager *kubecontainer.RefManager

// Optional, defaults to simple Docker implementation
dockerPuller dockertools.DockerPuller
// Optional, defaults to /logs/ from /var/log
logServer http.Handler
// Optional, defaults to simple Docker implementation
runner dockertools.ContainerCommandRunner
// Optional, client for http requests, defaults to empty client
httpClient httpGetter
// Optional, maximum pull QPS from the docker registry, 0.0 means unlimited.
pullQPS float32
// Optional, maximum burst QPS from the docker registry, must be positive if QPS is > 0.0
pullBurst int

// cAdvisor used for container information.
cadvisor cadvisor.Interface
Expand Down Expand Up @@ -541,9 +533,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.dockerPuller == nil {
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
}
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
}
Expand Down Expand Up @@ -877,7 +866,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.Pod) (dockertools.DockerID,
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
// TODO: make this a TTL based pull (if image older than X policy, pull)
ok, err := kl.dockerPuller.IsImagePresent(container.Image)
ok, err := kl.containerManager.IsImagePresent(container.Image)
if err != nil {
if ref != nil {
kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err)
Expand Down Expand Up @@ -919,7 +908,7 @@ func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error {
metrics.ImagePullLatency.Observe(metrics.SinceInMicroseconds(start))
}()

if err := kl.dockerPuller.Pull(img); err != nil {
if err := kl.containerManager.Pull(img); err != nil {
if ref != nil {
kl.recorder.Eventf(ref, "failed", "Failed to pull image %q: %v", img, err)
}
Expand Down Expand Up @@ -1033,7 +1022,7 @@ func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Contain
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
if container.ImagePullPolicy != api.PullNever {
present, err := kl.dockerPuller.IsImagePresent(container.Image)
present, err := kl.containerManager.IsImagePresent(container.Image)
if err != nil {
if ref != nil {
kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err)
Expand Down
11 changes: 5 additions & 6 deletions pkg/kubelet/kubelet_test.go
Expand Up @@ -73,12 +73,11 @@ func newTestKubelet(t *testing.T) *TestKubelet {
fakeDockerCache := dockertools.NewFakeDockerCache(fakeDocker)
fakeRecorder := &record.FakeRecorder{}
fakeKubeClient := &testclient.Fake{}

kubelet := &Kubelet{}
kubelet.dockerClient = fakeDocker
kubelet.dockerCache = fakeDockerCache
kubelet.kubeClient = fakeKubeClient
kubelet.dockerPuller = &dockertools.FakeDockerPuller{}

kubelet.hostname = "testnode"
kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil {
Expand Down Expand Up @@ -113,7 +112,8 @@ func newTestKubelet(t *testing.T) *TestKubelet {
podManager, fakeMirrorClient := newFakePodManager()
kubelet.podManager = podManager
kubelet.containerRefManager = kubecontainer.NewRefManager()
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage)
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0)
kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{}
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
}

Expand Down Expand Up @@ -593,7 +593,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
waitGroup := testKubelet.waitGroup
puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller)
puller := kubelet.containerManager.Puller.(*dockertools.FakeDockerPuller)
puller.HasImages = []string{}
kubelet.containerManager.PodInfraContainerImage = "custom_image_name"
fakeDocker.ContainerList = []docker.APIContainers{}
Expand Down Expand Up @@ -1249,7 +1249,6 @@ func TestGetRootInfo(t *testing.T) {

kubelet := Kubelet{
dockerClient: &fakeDocker,
dockerPuller: &dockertools.FakeDockerPuller{},
cadvisor: mockCadvisor,
}

Expand Down Expand Up @@ -1652,7 +1651,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
waitGroup := testKubelet.waitGroup
puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller)
puller := kubelet.containerManager.Puller.(*dockertools.FakeDockerPuller)
puller.HasImages = []string{"existing_one", "want:latest"}
kubelet.containerManager.PodInfraContainerImage = "custom_image_name"
fakeDocker.ContainerList = []docker.APIContainers{}
Expand Down
3 changes: 0 additions & 3 deletions pkg/kubelet/runonce.go
Expand Up @@ -53,9 +53,6 @@ func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error) {

// runOnce runs a given set of pods and returns their status.
func (kl *Kubelet) runOnce(pods []api.Pod, retryDelay time.Duration) (results []RunPodResult, err error) {
if kl.dockerPuller == nil {
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
}
kl.handleNotFittingPods(pods)

ch := make(chan RunPodResult)
Expand Down
5 changes: 3 additions & 2 deletions pkg/kubelet/runonce_test.go
Expand Up @@ -144,8 +144,9 @@ func TestRunOnce(t *testing.T) {
},
t: t,
}
kb.dockerPuller = &dockertools.FakeDockerPuller{}
kb.containerManager = dockertools.NewDockerManager(kb.dockerClient, kb.recorder, dockertools.PodInfraContainerImage)

kb.containerManager = dockertools.NewDockerManager(kb.dockerClient, kb.recorder, dockertools.PodInfraContainerImage, 0, 0)
kb.containerManager.Puller = &dockertools.FakeDockerPuller{}

pods := []api.Pod{
{
Expand Down

0 comments on commit 908cc02

Please sign in to comment.