Skip to content

Commit

Permalink
solve exec leak after containerd restart
Browse files Browse the repository at this point in the history
  • Loading branch information
mark-cjh committed Oct 23, 2023
1 parent 423c7ad commit de92f58
Showing 1 changed file with 96 additions and 0 deletions.
96 changes: 96 additions & 0 deletions runtime/v2/runc/task/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package task

import (
"context"
"encoding/binary"
"fmt"
"os"
"sync"
"syscall"

"github.com/containerd/cgroups/v3"
"github.com/containerd/cgroups/v3/cgroup1"
Expand Down Expand Up @@ -61,6 +63,7 @@ var (
func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TaskService, error) {
var (
ep oom.Watcher
nd int
err error
)
if cgroups.Mode() == cgroups.Unified {
Expand All @@ -71,18 +74,25 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
if err != nil {
return nil, err
}
nd, err = syscall.InotifyInit()
if err != nil {
return nil, err
}
go ep.Run(ctx)
s := &service{
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
notifier: os.NewFile(uintptr(nd), ""),
ids: make(map[int]*ids),
containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
}
go s.processExits()
go s.processBreakUp(ctx)
runcC.Monitor = reaper.Default
if err := s.initPlatform(); err != nil {
return nil, fmt.Errorf("failed to initialized platform behavior: %w", err)
Expand All @@ -101,6 +111,11 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
return s, nil
}

type ids struct {
cid string
eid string
}

// service is the shim implementation of a remote shim over GRPC
type service struct {
mu sync.Mutex
Expand All @@ -111,6 +126,8 @@ type service struct {
ec chan runcC.Exit
ep oom.Watcher

notifier *os.File
ids map[int]*ids
containers map[string]*runc.Container

lifecycleMu sync.Mutex
Expand Down Expand Up @@ -356,6 +373,15 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
if !ok {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
}

wd, err := syscall.InotifyAddWatch(int(s.notifier.Fd()), r.Stdout, syscall.IN_CLOSE)
if err != nil {
return nil, err
}
s.mu.Lock()
s.ids[wd] = &ids{cid: r.ID, eid: r.ExecID}
s.mu.Unlock()

process, err := container.Exec(ctx, r)
if err != nil {
cancel()
Expand Down Expand Up @@ -617,6 +643,76 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
}, nil
}

func (s *service) processBreakUp(ctx context.Context) {
var le = true
if 0x04030201 != binary.LittleEndian.Uint32([]byte{01, 02, 03, 04}) {
le = false
}
for {
select {
case <-ctx.Done():
default:
var (
evt syscall.InotifyEvent
err error
)
if le {
err = binary.Read(s.notifier, binary.LittleEndian, &evt)
} else {
err = binary.Read(s.notifier, binary.BigEndian, &evt)
}

if err != nil {
log.G(s.context).WithError(err).Error("watch event failed")
continue
}
if evt.Mask&syscall.IN_CLOSE == 0 {
continue
}
s.cleanup(int(evt.Wd))
}
}
}

func (s *service) cleanup(wd int) {
s.mu.Lock()
defer s.mu.Unlock()

ids, ok := s.ids[wd]
if !ok {
return
}
defer func() {
_, err := syscall.InotifyRmWatch(int(s.notifier.Fd()), uint32(wd))
if err != nil {
log.G(s.context).WithError(err).WithField("exec_id", ids.eid).Error("remove inotify failed")
}
delete(s.ids, wd)
}()

var p process.Process
c, ok := s.containers[ids.cid]
if !ok {
return
}
for _, pi := range c.ExecdProcesses() {
if pi.ID() != ids.eid {
continue
}
p = pi
break
}
if p == nil || !p.ExitedAt().IsZero() {
return
}

err := p.Kill(s.context, uint32(syscall.SIGKILL), true)
if err != nil {
log.G(s.context).WithError(err).WithField("exec_id", ids.eid).Error("kill exec failed")
}
return

Check failure on line 713 in runtime/v2/runc/task/service.go

View workflow job for this annotation

GitHub Actions / Linters (ubuntu-22.04)

S1023: redundant `return` statement (gosimple)
}

func (s *service) processExits() {
for e := range s.ec {
// While unlikely, it is not impossible for a container process to exit
Expand Down

0 comments on commit de92f58

Please sign in to comment.