Skip to content

Commit

Permalink
Cleanup shim loading
Browse files Browse the repository at this point in the history
Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
  • Loading branch information
mxpv committed Nov 1, 2021
1 parent b554b57 commit df8c206
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 157 deletions.
133 changes: 6 additions & 127 deletions runtime/v2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/timeout"
"github.com/containerd/containerd/platforms"
Expand Down Expand Up @@ -94,6 +92,8 @@ func init() {
ID: "task",
Requires: []plugin.Type{
plugin.RuntimeShimPlugin,
plugin.EventPlugin,
plugin.MetadataPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
Expand All @@ -120,6 +120,10 @@ func init() {
return nil, err
}

if err := shimManager.loadExistingTasks(ic.Context); err != nil {
return nil, err
}

// Internally task manager relies on shim manager to launch task shims.
// It's also possible to use shim manager independently and launch other types of shims.
//
Expand Down Expand Up @@ -293,131 +297,6 @@ func (m *ShimManager) Delete(ctx context.Context, id string) error {
return err
}

func (m *ShimManager) loadExistingTasks(ctx context.Context) error {
nsDirs, err := os.ReadDir(m.state)
if err != nil {
return err
}
for _, nsd := range nsDirs {
if !nsd.IsDir() {
continue
}
ns := nsd.Name()
// skip hidden directories
if len(ns) > 0 && ns[0] == '.' {
continue
}
log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace")
if err := m.loadShims(namespaces.WithNamespace(ctx, ns)); err != nil {
log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace")
continue
}
if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns)); err != nil {
log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace")
continue
}
}
return nil
}

func (m *ShimManager) loadShims(ctx context.Context) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
shimDirs, err := os.ReadDir(filepath.Join(m.state, ns))
if err != nil {
return err
}
for _, sd := range shimDirs {
if !sd.IsDir() {
continue
}
id := sd.Name()
// skip hidden directories
if len(id) > 0 && id[0] == '.' {
continue
}
bundle, err := LoadBundle(ctx, m.state, id)
if err != nil {
// fine to return error here, it is a programmer error if the context
// does not have a namespace
return err
}
// fast path
bf, err := os.ReadDir(bundle.Path)
if err != nil {
bundle.Delete()
log.G(ctx).WithError(err).Errorf("fast path read bundle path for %s", bundle.Path)
continue
}
if len(bf) == 0 {
bundle.Delete()
continue
}
container, err := m.container(ctx, id)
if err != nil {
log.G(ctx).WithError(err).Errorf("loading container %s", id)
if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil {
log.G(ctx).WithError(err).Errorf("forceful unmount of rootfs %s", id)
}
bundle.Delete()
continue
}
binaryCall := shimBinary(bundle,
shimBinaryConfig{
runtime: container.Runtime.Name,
address: m.containerdAddress,
ttrpcAddress: m.containerdTTRPCAddress,
schedCore: m.schedCore,
})
shim, err := loadShim(ctx, bundle, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")

cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, binaryCall)
// Remove self from the runtime task list.
m.list.Delete(ctx, id)
})
if err != nil {
cleanupAfterDeadShim(ctx, id, ns, m.list, m.events, binaryCall)
continue
}
m.list.Add(ctx, shim)
}
return nil
}

func (m *ShimManager) container(ctx context.Context, id string) (*containers.Container, error) {
container, err := m.containers.Get(ctx, id)
if err != nil {
return nil, err
}
return &container, nil
}

func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
dirs, err := os.ReadDir(filepath.Join(m.root, ns))
if err != nil {
return err
}
for _, d := range dirs {
// if the task was not loaded, cleanup and empty working directory
// this can happen on a reboot where /run for the bundle state is cleaned up
// but that persistent working dir is left
if _, err := m.list.Get(ctx, d.Name()); err != nil {
path := filepath.Join(m.root, ns, d.Name())
if err := os.RemoveAll(path); err != nil {
log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path)
}
}
}
return nil
}

func parsePlatforms(platformStr []string) ([]ocispec.Platform, error) {
p := make([]ocispec.Platform, len(platformStr))
for i, v := range platformStr {
Expand Down
53 changes: 23 additions & 30 deletions runtime/v2/shim_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,13 @@ import (
"os"
"path/filepath"

"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
)

func loadExistingTasks(ic *plugin.InitContext, list *runtime.TaskList, events *exchange.Exchange, containers containers.Store) error {
var (
ctx = ic.Context
state = ic.State
root = ic.Root
containerdAddress = ic.Address
containerdTTRPCAddress = ic.TTRPCAddress
)

nsDirs, err := os.ReadDir(state)
func (m *ShimManager) loadExistingTasks(ctx context.Context) error {
nsDirs, err := os.ReadDir(m.state)
if err != nil {
return err
}
Expand All @@ -53,25 +41,24 @@ func loadExistingTasks(ic *plugin.InitContext, list *runtime.TaskList, events *e
continue
}
log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace")
if err := loadShims(namespaces.WithNamespace(ctx, ns), state, list, events, containers, containerdAddress, containerdTTRPCAddress); err != nil {
if err := m.loadShims(namespaces.WithNamespace(ctx, ns)); err != nil {
log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace")
continue
}
if err := cleanupWorkDirs(namespaces.WithNamespace(ctx, ns), root, list); err != nil {
if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns)); err != nil {
log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace")
continue
}
}

return nil
}

func loadShims(ctx context.Context, state string, list *runtime.TaskList, events *exchange.Exchange, containers containers.Store, containerdAddress string, containerdTTRPCAddress string) error {
func (m *ShimManager) loadShims(ctx context.Context) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
shimDirs, err := os.ReadDir(filepath.Join(state, ns))
shimDirs, err := os.ReadDir(filepath.Join(m.state, ns))
if err != nil {
return err
}
Expand All @@ -84,7 +71,7 @@ func loadShims(ctx context.Context, state string, list *runtime.TaskList, events
if len(id) > 0 && id[0] == '.' {
continue
}
bundle, err := LoadBundle(ctx, state, id)
bundle, err := LoadBundle(ctx, m.state, id)
if err != nil {
// fine to return error here, it is a programmer error if the context
// does not have a namespace
Expand All @@ -101,7 +88,7 @@ func loadShims(ctx context.Context, state string, list *runtime.TaskList, events
bundle.Delete()
continue
}
container, err := containers.Get(ctx, id)
container, err := m.containers.Get(ctx, id)
if err != nil {
log.G(ctx).WithError(err).Errorf("loading container %s", id)
if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil {
Expand All @@ -110,38 +97,44 @@ func loadShims(ctx context.Context, state string, list *runtime.TaskList, events
bundle.Delete()
continue
}
binaryCall := shimBinary(bundle, container.Runtime.Name, containerdAddress, containerdTTRPCAddress)
binaryCall := shimBinary(bundle,
shimBinaryConfig{
runtime: container.Runtime.Name,
address: m.containerdAddress,
ttrpcAddress: m.containerdTTRPCAddress,
schedCore: m.schedCore,
})
shim, err := loadShim(ctx, bundle, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")

cleanupAfterDeadShim(context.Background(), id, ns, list, events, binaryCall)
cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, binaryCall)
// Remove self from the runtime task list.
list.Delete(ctx, id)
m.list.Delete(ctx, id)
})
if err != nil {
cleanupAfterDeadShim(ctx, id, ns, list, events, binaryCall)
cleanupAfterDeadShim(ctx, id, ns, m.list, m.events, binaryCall)
continue
}
list.Add(ctx, shim)
m.list.Add(ctx, shim)
}
return nil
}

func cleanupWorkDirs(ctx context.Context, root string, list *runtime.TaskList) error {
func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
dirs, err := os.ReadDir(filepath.Join(root, ns))
dirs, err := os.ReadDir(filepath.Join(m.root, ns))
if err != nil {
return err
}
for _, d := range dirs {
// if the task was not loaded, cleanup and empty working directory
// this can happen on a reboot where /run for the bundle state is cleaned up
// but that persistent working dir is left
if _, err := list.Get(ctx, d.Name()); err != nil {
path := filepath.Join(root, ns, d.Name())
if _, err := m.list.Get(ctx, d.Name()); err != nil {
path := filepath.Join(m.root, ns, d.Name())
if err := os.RemoveAll(path); err != nil {
log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path)
}
Expand Down

0 comments on commit df8c206

Please sign in to comment.