Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #933 from Random-Liu/cherrypick-#926-release-1.2
Browse files Browse the repository at this point in the history
Cherrypick #926 release/1.2
  • Loading branch information
Random-Liu committed Sep 28, 2018
2 parents e5b175d + 84a720e commit 33624c1
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 39 deletions.
148 changes: 148 additions & 0 deletions integration/containerd_image_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2018 The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"golang.org/x/net/context"
"testing"
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
)

// Test to test the CRI plugin should see image pulled into containerd directly.
func TestContainerdImage(t *testing.T) {
testImage := "docker.io/library/busybox:latest"
ctx := context.Background()

t.Logf("make sure the test image doesn't exist in the cri plugin")
i, err := imageService.ImageStatus(&runtime.ImageSpec{Image: testImage})
require.NoError(t, err)
if i != nil {
require.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: testImage}))
}

t.Logf("pull the image into containerd")
_, err = containerdClient.Pull(ctx, testImage, containerd.WithPullUnpack)
assert.NoError(t, err)
defer func() {
// Make sure the image is cleaned up in any case.
if err := containerdClient.ImageService().Delete(ctx, testImage); err != nil {
assert.True(t, errdefs.IsNotFound(err), err)
}
assert.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: testImage}))
}()

t.Logf("the image should be seen by the cri plugin")
var id string
checkImage := func() (bool, error) {
img, err := imageService.ImageStatus(&runtime.ImageSpec{Image: testImage})
if err != nil {
return false, err
}
if img == nil {
t.Logf("Image %q not show up in the cri plugin yet", testImage)
return false, nil
}
id = img.Id
img, err = imageService.ImageStatus(&runtime.ImageSpec{Image: id})
if err != nil {
return false, err
}
if img == nil {
// We always generate image id as a reference first, it must
// be ready here.
return false, errors.New("can't reference image by id")
}
if len(img.RepoTags) != 1 {
// RepoTags must have been populated correctly.
return false, errors.Errorf("unexpected repotags: %+v", img.RepoTags)
}
if img.RepoTags[0] != testImage {
return false, errors.Errorf("unexpected repotag %q", img.RepoTags[0])
}
return true, nil
}
require.NoError(t, Eventually(checkImage, 100*time.Millisecond, 10*time.Second))
require.NoError(t, Consistently(checkImage, 100*time.Millisecond, time.Second))
defer func() {
t.Logf("image should still be seen by id if only tag get deleted")
if err := containerdClient.ImageService().Delete(ctx, testImage); err != nil {
assert.True(t, errdefs.IsNotFound(err), err)
}
assert.NoError(t, Consistently(func() (bool, error) {
img, err := imageService.ImageStatus(&runtime.ImageSpec{Image: id})
if err != nil {
return false, err
}
return img != nil, nil
}, 100*time.Millisecond, time.Second))
t.Logf("image should be removed from the cri plugin if all references get deleted")
if err := containerdClient.ImageService().Delete(ctx, id); err != nil {
assert.True(t, errdefs.IsNotFound(err), err)
}
assert.NoError(t, Eventually(func() (bool, error) {
img, err := imageService.ImageStatus(&runtime.ImageSpec{Image: id})
if err != nil {
return false, err
}
return img == nil, nil
}, 100*time.Millisecond, 10*time.Second))
}()

t.Logf("the image should be marked as managed")
imgByRef, err := containerdClient.GetImage(ctx, testImage)
assert.NoError(t, err)
assert.Equal(t, imgByRef.Labels()["io.cri-containerd.image"], "managed")

t.Logf("the image id should be created and managed")
imgByID, err := containerdClient.GetImage(ctx, id)
assert.NoError(t, err)
assert.Equal(t, imgByID.Labels()["io.cri-containerd.image"], "managed")

t.Logf("should be able to start container with the image")
sbConfig := PodSandboxConfig("sandbox", "containerd-image")
sb, err := runtimeService.RunPodSandbox(sbConfig)
require.NoError(t, err)
defer func() {
assert.NoError(t, runtimeService.StopPodSandbox(sb))
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
}()

cnConfig := ContainerConfig(
"test-container",
id,
WithCommand("top"),
)
cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig)
require.NoError(t, err)
require.NoError(t, runtimeService.StartContainer(cn))
checkContainer := func() (bool, error) {
s, err := runtimeService.ContainerStatus(cn)
if err != nil {
return false, err
}
return s.GetState() == runtime.ContainerState_CONTAINER_RUNNING, nil
}
require.NoError(t, Eventually(checkContainer, 100*time.Millisecond, 10*time.Second))
require.NoError(t, Consistently(checkContainer, 100*time.Millisecond, time.Second))
}
32 changes: 32 additions & 0 deletions integration/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package integration

import (
"sort"
"testing"
"time"

Expand Down Expand Up @@ -128,6 +129,17 @@ func TestContainerdRestart(t *testing.T) {
}
}

t.Logf("Pull test images")
for _, image := range []string{"busybox", "alpine"} {
img, err := imageService.PullImage(&runtime.ImageSpec{image}, nil)
require.NoError(t, err)
defer func() {
assert.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: img}))
}()
}
imagesBeforeRestart, err := imageService.ListImages(nil)
assert.NoError(t, err)

t.Logf("Kill containerd")
require.NoError(t, KillProcess("containerd"))
defer func() {
Expand Down Expand Up @@ -179,4 +191,24 @@ func TestContainerdRestart(t *testing.T) {
assert.NoError(t, runtimeService.StopPodSandbox(s.id))
assert.NoError(t, runtimeService.RemovePodSandbox(s.id))
}

t.Logf("Should recover all images")
imagesAfterRestart, err := imageService.ListImages(nil)
assert.NoError(t, err)
assert.Equal(t, len(imagesBeforeRestart), len(imagesAfterRestart))
for _, i1 := range imagesBeforeRestart {
found := false
for _, i2 := range imagesAfterRestart {
if i1.Id == i2.Id {
sort.Strings(i1.RepoTags)
sort.Strings(i1.RepoDigests)
sort.Strings(i2.RepoTags)
sort.Strings(i2.RepoDigests)
assert.Equal(t, i1, i2)
found = true
break
}
}
assert.True(t, found, "should find image %+v", i1)
}
}
20 changes: 20 additions & 0 deletions integration/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,26 @@ func Eventually(f CheckFunc, period, timeout time.Duration) error {
}
}

// Consistently makes sure that f consistently returns true without
// error before timeout exceeds. If f returns error, Consistently
// will return the same error immediately.
func Consistently(f CheckFunc, period, timeout time.Duration) error {
start := time.Now()
for {
ok, err := f()
if !ok {
return errors.New("get false")
}
if err != nil {
return err
}
if time.Since(start) >= timeout {
return nil
}
time.Sleep(period)
}
}

// Randomize adds uuid after a string.
func Randomize(str string) string {
return str + "-" + util.GenerateID()
Expand Down
41 changes: 18 additions & 23 deletions pkg/server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
"github.com/containerd/cri/pkg/store"
containerstore "github.com/containerd/cri/pkg/store/container"
imagestore "github.com/containerd/cri/pkg/store/image"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)

Expand All @@ -54,14 +53,12 @@ const (
// eventMonitor monitors containerd event and updates internal state correspondingly.
// TODO(random-liu): Handle event for each container in a separate goroutine.
type eventMonitor struct {
containerStore *containerstore.Store
sandboxStore *sandboxstore.Store
imageStore *imagestore.Store
ch <-chan *events.Envelope
errCh <-chan error
ctx context.Context
cancel context.CancelFunc
backOff *backOff
c *criService
ch <-chan *events.Envelope
errCh <-chan error
ctx context.Context
cancel context.CancelFunc
backOff *backOff
}

type backOff struct {
Expand All @@ -84,16 +81,14 @@ type backOffQueue struct {

// Create new event monitor. New event monitor will start subscribing containerd event. All events
// happen after it should be monitored.
func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store, i *imagestore.Store) *eventMonitor {
func newEventMonitor(c *criService) *eventMonitor {
// event subscribe doesn't need namespace.
ctx, cancel := context.WithCancel(context.Background())
return &eventMonitor{
containerStore: c,
sandboxStore: s,
imageStore: i,
ctx: ctx,
cancel: cancel,
backOff: newBackOff(),
c: c,
ctx: ctx,
cancel: cancel,
backOff: newBackOff(),
}
}

Expand Down Expand Up @@ -206,7 +201,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
case *eventtypes.TaskExit:
e := any.(*eventtypes.TaskExit)
logrus.Infof("TaskExit event %+v", e)
cntr, err := em.containerStore.Get(e.ContainerID)
cntr, err := em.c.containerStore.Get(e.ContainerID)
if err == nil {
if err := handleContainerExit(ctx, e, cntr); err != nil {
return errors.Wrap(err, "failed to handle container TaskExit event")
Expand All @@ -216,7 +211,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
return errors.Wrap(err, "can't find container for TaskExit event")
}
// Use GetAll to include sandbox in unknown state.
sb, err := em.sandboxStore.GetAll(e.ContainerID)
sb, err := em.c.sandboxStore.GetAll(e.ContainerID)
if err == nil {
if err := handleSandboxExit(ctx, e, sb); err != nil {
return errors.Wrap(err, "failed to handle sandbox TaskExit event")
Expand All @@ -229,12 +224,12 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
case *eventtypes.TaskOOM:
e := any.(*eventtypes.TaskOOM)
logrus.Infof("TaskOOM event %+v", e)
cntr, err := em.containerStore.Get(e.ContainerID)
cntr, err := em.c.containerStore.Get(e.ContainerID)
if err != nil {
if err != store.ErrNotExist {
return errors.Wrap(err, "can't find container for TaskOOM event")
}
if _, err = em.sandboxStore.Get(e.ContainerID); err != nil {
if _, err = em.c.sandboxStore.Get(e.ContainerID); err != nil {
if err != store.ErrNotExist {
return errors.Wrap(err, "can't find sandbox for TaskOOM event")
}
Expand All @@ -252,15 +247,15 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
case *eventtypes.ImageCreate:
e := any.(*eventtypes.ImageCreate)
logrus.Infof("ImageCreate event %+v", e)
return em.imageStore.Update(ctx, e.Name)
return em.c.updateImage(ctx, e.Name)
case *eventtypes.ImageUpdate:
e := any.(*eventtypes.ImageUpdate)
logrus.Infof("ImageUpdate event %+v", e)
return em.imageStore.Update(ctx, e.Name)
return em.c.updateImage(ctx, e.Name)
case *eventtypes.ImageDelete:
e := any.(*eventtypes.ImageDelete)
logrus.Infof("ImageDelete event %+v", e)
return em.imageStore.Update(ctx, e.Name)
return em.c.updateImage(ctx, e.Name)
}

return nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ const (
containerKindSandbox = "sandbox"
// containerKindContainer is a label value indicating container is application container
containerKindContainer = "container"
// imageLabelKey is the label key indicating the image is managed by cri plugin.
imageLabelKey = criContainerdPrefix + ".image"
// imageLabelValue is the label value indicating the image is managed by cri plugin.
imageLabelValue = "managed"
// sandboxMetadataExtension is an extension name that identify metadata of sandbox in CreateContainerRequest
sandboxMetadataExtension = criContainerdPrefix + ".sandbox.metadata"
// containerMetadataExtension is an extension name that identify metadata of container in CreateContainerRequest
Expand Down
7 changes: 5 additions & 2 deletions pkg/server/image_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ func (c *criService) LoadImage(ctx context.Context, r *api.LoadImageRequest) (*a
}
for _, repoTag := range repoTags {
// Update image store to reflect the newest state in containerd.
if err := c.imageStore.Update(ctx, repoTag); err != nil {
return nil, errors.Wrapf(err, "failed to update image store %q", repoTag)
// Image imported by importer.Import is not treated as managed
// by the cri plugin, call `updateImage` to make it managed.
// TODO(random-liu): Replace this with the containerd library (issue #909).
if err := c.updateImage(ctx, repoTag); err != nil {
return nil, errors.Wrapf(err, "update image store %q", repoTag)
}
logrus.Debugf("Imported image %q", repoTag)
}
Expand Down
Loading

0 comments on commit 33624c1

Please sign in to comment.