Skip to content

Commit

Permalink
*: add DrainExecSyncIOTimeout config and disable as by default
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Fu <fuweid89@gmail.com>
(cherry picked from commit 3c18dec)
Signed-off-by: Wei Fu <fuweid89@gmail.com>
  • Loading branch information
fuweid committed Feb 16, 2024
1 parent fb26231 commit 0438e47
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 25 deletions.
14 changes: 14 additions & 0 deletions pkg/cri/config/config.go
Expand Up @@ -306,6 +306,13 @@ type PluginConfig struct {
// IgnoreDeprecationWarnings is the list of the deprecation IDs (such as "io.containerd.deprecation/pull-schema-1-image")
// that should be ignored for checking "ContainerdHasNoDeprecationWarnings" condition.
IgnoreDeprecationWarnings []string `toml:"ignore_deprecation_warnings" json:"ignoreDeprecationWarnings"`
// DrainExecSyncIOTimeout is the maximum duration to wait for ExecSync
// API' IO EOF event after exec init process exits. A zero value means
// there is no timeout.
//
// The string is in the golang duration format, see:
// https://golang.org/pkg/time/#ParseDuration
DrainExecSyncIOTimeout string `toml:"drain_exec_sync_io_timeout" json:"drainExecSyncIOTimeout"`
}

// X509KeyPairStreaming contains the x509 configuration for streaming
Expand Down Expand Up @@ -467,5 +474,12 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) ([]deprecation.W
return warnings, fmt.Errorf("invalid stream idle timeout: %w", err)
}
}

// Validation for drain_exec_sync_io_timeout
if c.DrainExecSyncIOTimeout != "" {
if _, err := time.ParseDuration(c.DrainExecSyncIOTimeout); err != nil {
return warnings, fmt.Errorf("invalid drain exec sync io timeout: %w", err)
}
}
return warnings, nil
}
1 change: 1 addition & 0 deletions pkg/cri/config/config_unix.go
Expand Up @@ -104,5 +104,6 @@ func DefaultConfig() PluginConfig {
ImageDecryption: ImageDecryption{
KeyModel: KeyModelNode,
},
DrainExecSyncIOTimeout: "0",
}
}
1 change: 1 addition & 0 deletions pkg/cri/config/config_windows.go
Expand Up @@ -62,5 +62,6 @@ func DefaultConfig() PluginConfig {
ImageDecryption: ImageDecryption{
KeyModel: KeyModelNode,
},
DrainExecSyncIOTimeout: "0",
}
}
52 changes: 29 additions & 23 deletions pkg/cri/server/container_execsync.go
Expand Up @@ -38,23 +38,6 @@ import (
cioutil "github.com/containerd/containerd/pkg/ioutil"
)

// defaultDrainExecIOTimeout is used to drain exec io after exec process exits.
//
// By default, the child processes spawned by exec process will inherit standard
// io file descriptors. The shim server creates a pipe as data channel. Both
// exec process and its children write data into the write end of the pipe.
// And the shim server will read data from the pipe. If the write end is still
// open, the shim server will continue to wait for data from pipe.
//
// If the exec command is like `bash -c "sleep 365d &"`, the exec process
// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold
// the write end of the pipe for a year! It doesn't make senses that CRI plugin
// should wait for it.
//
// So, CRI plugin uses 15 seconds to drain the exec io and then deletes exec
// process to stop copy io in shim side.
const defaultDrainExecIOTimeout = 15 * time.Second

type cappedWriter struct {
w io.WriteCloser
remain int
Expand Down Expand Up @@ -135,6 +118,11 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
ctx, cancel := context.WithCancel(ctx)
defer cancel()

drainExecSyncIOTimeout, err := time.ParseDuration(c.config.DrainExecSyncIOTimeout)
if err != nil {
return nil, fmt.Errorf("failed to parse drain_exec_sync_io_timeout %q: %w", c.config.DrainExecSyncIOTimeout, err)
}

spec, err := container.Spec(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get container spec: %w", err)
Expand Down Expand Up @@ -227,7 +215,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v",
execID, exitRes.ExitCode(), exitRes.Error())

if err := drainExecIO(ctx, process, attachDone); err != nil {
if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil {
log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID)
}

Expand All @@ -239,7 +227,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err)
}

if err := drainExecIO(ctx, process, attachDone); err != nil {
if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil {
return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err)
}
return &code, nil
Expand Down Expand Up @@ -275,12 +263,30 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp
return c.execInternal(ctx, cntr.Container, id, opts)
}

func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone <-chan struct{}) error {
timer := time.NewTimer(defaultDrainExecIOTimeout)
defer timer.Stop()
// drainExecSyncIO drains process IO with timeout after exec init process exits.
//
// By default, the child processes spawned by exec process will inherit standard
// io file descriptors. The shim server creates a pipe as data channel. Both
// exec process and its children write data into the write end of the pipe.
// And the shim server will read data from the pipe. If the write end is still
// open, the shim server will continue to wait for data from pipe.
//
// If the exec command is like `bash -c "sleep 365d &"`, the exec process
// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold
// the write end of the pipe for a year! It doesn't make senses that CRI plugin
// should wait for it.
func drainExecSyncIO(ctx context.Context, execProcess containerd.Process, drainExecIOTimeout time.Duration, attachDone <-chan struct{}) error {
var timerCh <-chan time.Time

if drainExecIOTimeout != 0 {
timer := time.NewTimer(drainExecIOTimeout)
defer timer.Stop()

timerCh = timer.C
}

select {
case <-timer.C:
case <-timerCh:

case <-attachDone:
log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID())
Expand Down
98 changes: 98 additions & 0 deletions pkg/cri/server/container_execsync_test.go
Expand Up @@ -18,8 +18,14 @@ package server

import (
"bytes"
"context"
"os"
"syscall"
"testing"
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
cioutil "github.com/containerd/containerd/pkg/ioutil"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -50,3 +56,95 @@ func TestCWClose(t *testing.T) {
err := cw.Close()
assert.NoError(t, err)
}

func TestDrainExecSyncIO(t *testing.T) {
ctx := context.TODO()

t.Run("NoTimeout", func(t *testing.T) {
ep := &fakeExecProcess{
id: t.Name(),
pid: uint32(os.Getpid()),
}

attachDoneCh := make(chan struct{})
time.AfterFunc(2*time.Second, func() { close(attachDoneCh) })
assert.NoError(t, drainExecSyncIO(ctx, ep, 0, attachDoneCh))
assert.Equal(t, 0, len(ep.actionEvents))
})

t.Run("With3Seconds", func(t *testing.T) {
ep := &fakeExecProcess{
id: t.Name(),
pid: uint32(os.Getpid()),
}

attachDoneCh := make(chan struct{})
time.AfterFunc(100*time.Second, func() { close(attachDoneCh) })
assert.Error(t, drainExecSyncIO(ctx, ep, 3*time.Second, attachDoneCh))
assert.Equal(t, []string{"Delete"}, ep.actionEvents)
})
}

type fakeExecProcess struct {
id string
pid uint32
actionEvents []string
}

// ID of the process
func (p *fakeExecProcess) ID() string {
return p.id
}

// Pid is the system specific process id
func (p *fakeExecProcess) Pid() uint32 {
return p.pid
}

// Start starts the process executing the user's defined binary
func (p *fakeExecProcess) Start(context.Context) error {
p.actionEvents = append(p.actionEvents, "Start")
return nil
}

// Delete removes the process and any resources allocated returning the exit status
func (p *fakeExecProcess) Delete(context.Context, ...containerd.ProcessDeleteOpts) (*containerd.ExitStatus, error) {
p.actionEvents = append(p.actionEvents, "Delete")
return nil, nil
}

// Kill sends the provided signal to the process
func (p *fakeExecProcess) Kill(context.Context, syscall.Signal, ...containerd.KillOpts) error {
p.actionEvents = append(p.actionEvents, "Kill")
return nil
}

// Wait asynchronously waits for the process to exit, and sends the exit code to the returned channel
func (p *fakeExecProcess) Wait(context.Context) (<-chan containerd.ExitStatus, error) {
p.actionEvents = append(p.actionEvents, "Wait")
return nil, nil
}

// CloseIO allows various pipes to be closed on the process
func (p *fakeExecProcess) CloseIO(context.Context, ...containerd.IOCloserOpts) error {
p.actionEvents = append(p.actionEvents, "CloseIO")
return nil
}

// Resize changes the width and height of the process's terminal
func (p *fakeExecProcess) Resize(ctx context.Context, w, h uint32) error {
p.actionEvents = append(p.actionEvents, "Resize")
return nil
}

// IO returns the io set for the process
func (p *fakeExecProcess) IO() cio.IO {
p.actionEvents = append(p.actionEvents, "IO")
return nil
}

// Status returns the executing status of the process
func (p *fakeExecProcess) Status(context.Context) (containerd.Status, error) {
p.actionEvents = append(p.actionEvents, "Status")
return containerd.Status{}, nil
}
16 changes: 14 additions & 2 deletions script/test/utils.sh
Expand Up @@ -35,11 +35,23 @@ CONTAINERD_RUNTIME=${CONTAINERD_RUNTIME:-""}
if [ -z "${CONTAINERD_CONFIG_FILE}" ]; then
config_file="${CONTAINERD_CONFIG_DIR}/containerd-config-cri.toml"
truncate --size 0 "${config_file}"
echo "version=2" >> ${config_file}
# TODO(fuweid): if the config.Imports supports patch update, it will be easy
# to write the integration test case with different configuration, like:
#
# 1. write configuration into importable containerd config path.
# 2. restart containerd
# 3. verify the behaviour
# 4. delete the configuration
# 5. restart containerd
cat >>${config_file} <<EOF
version=2
[plugins."io.containerd.grpc.v1.cri"]
drain_exec_sync_io_timeout = "10s"
EOF

if command -v sestatus >/dev/null 2>&1; then
cat >>${config_file} <<EOF
[plugins."io.containerd.grpc.v1.cri"]
enable_selinux = true
EOF
fi
Expand Down

0 comments on commit 0438e47

Please sign in to comment.