Skip to content

Commit

Permalink
feature: add exec timeout support
Browse files Browse the repository at this point in the history
Signed-off-by: zhangyue <zy675793960@yeah.net>
  • Loading branch information
zhangyue committed Jul 11, 2019
1 parent e254431 commit 98da71a
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 59 deletions.
2 changes: 1 addition & 1 deletion apis/server/exec_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s *Server) startContainerExec(ctx context.Context, rw http.ResponseWriter,
}
}

if err := s.ContainerMgr.StartExec(ctx, name, attach); err != nil {
if err := s.ContainerMgr.StartExec(ctx, name, attach, 0); err != nil {
if config.Detach {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cri/stream/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []stri
Terminal: createConfig.Tty,
}

if err := s.containerMgr.StartExec(ctx, execid, attachCfg); err != nil {
if err := s.containerMgr.StartExec(ctx, execid, attachCfg, 0); err != nil {
return 0, fmt.Errorf("failed to exec for container %q: %v", containerID, err)
}

Expand Down
12 changes: 2 additions & 10 deletions cri/v1alpha2/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -1146,15 +1146,6 @@ func (c *CriManager) ReopenContainerLog(ctx context.Context, r *runtime.ReopenCo
func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) {
id := r.GetContainerId()

timeout := time.Duration(r.GetTimeout()) * time.Second
var cancel context.CancelFunc
if timeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, timeout)
}
defer cancel()

createConfig := &apitypes.ExecCreateConfig{
Cmd: r.GetCmd(),
}
Expand All @@ -1170,7 +1161,8 @@ func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (
UseStderr: true,
Stderr: stderrBuf,
}
if err := c.ContainerMgr.StartExec(ctx, execid, attachCfg); err != nil {

if err := c.ContainerMgr.StartExec(ctx, execid, attachCfg, int(r.GetTimeout())); err != nil {
return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err)
}

Expand Down
97 changes: 55 additions & 42 deletions ctrd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,27 +86,22 @@ func (c *Client) containerStats(ctx context.Context, id string) (*containerdtype
}

// ExecContainer executes a process in container.
func (c *Client) ExecContainer(ctx context.Context, process *Process) error {
if err := c.execContainer(ctx, process); err != nil {
func (c *Client) ExecContainer(ctx context.Context, process *Process, timeout int) error {
if err := c.execContainer(ctx, process, timeout); err != nil {
return convertCtrdErr(err)
}
return nil
}

// execContainer executes a process in container.
func (c *Client) execContainer(ctx context.Context, process *Process) error {
func (c *Client) execContainer(ctx context.Context, process *Process, timeout int) error {
pack, err := c.watch.get(process.ContainerID)
if err != nil {
return err
}

closeStdinCh := make(chan struct{})

// make sure the closeStdinCh has been closed.
defer func() {
close(closeStdinCh)
}()

var (
cntrID, execID = pack.container.ID(), process.ExecID
withStdin, withTerminal = process.IO.Stream().Stdin() != nil, process.P.Terminal
Expand Down Expand Up @@ -137,50 +132,68 @@ func (c *Client) execContainer(ctx context.Context, process *Process) error {
return errors.Wrap(err, "failed to exec process")
}

errCh := make(chan error, 1)
defer close(errCh)

go func() {
var msg *Message
var msg *Message
defer func() {
if msg != nil {
// XXX: if exec process get run, io should be closed in this function,
for _, hook := range c.hooks {
if err := hook(process.ExecID, msg); err != nil {
logrus.Errorf("failed to execute the exec exit hooks: %v", err)
break
}
}

if startErr := <-errCh; startErr != nil {
msg = &Message{
err: startErr,
exitCode: 126,
exitTime: time.Now().UTC(),
// delete the finished exec process in containerd
if _, err := execProcess.Delete(context.TODO()); err != nil {
logrus.Warnf("failed to delete exec process %s: %s", process.ExecID, err)
}
}
}()

// success to start which means the cmd is valid and wait
// for process.
if msg == nil {
status := <-exitStatus
msg = &Message{
err: status.Error(),
exitCode: status.ExitCode(),
exitTime: status.ExitTime(),
}
// start the exec process
if err := execProcess.Start(ctx); err != nil {
msg = &Message{
err: err,
exitCode: 126,
exitTime: time.Now().UTC(),
}
return errors.Wrapf(err, "failed to start exec, exec id %s", execID)
}
// make sure the closeStdinCh has been closed.
close(closeStdinCh)

t := time.Duration(timeout) * time.Second
var timeCh <-chan time.Time
if t == 0 {
timeCh = make(chan time.Time)
} else {
timeCh = time.After(t)
}

// XXX: if exec process get run, io should be closed in this function,
for _, hook := range c.hooks {
if err := hook(process.ExecID, msg); err != nil {
logrus.Errorf("failed to execute the exec exit hooks: %v", err)
break
select {
case status := <-exitStatus:
msg = &Message{
err: status.Error(),
exitCode: status.ExitCode(),
exitTime: status.ExitTime(),
}
case <-timeCh:
// Ignore the not found error because the process may exit itself before kill
if err := execProcess.Kill(ctx, syscall.SIGKILL); err != nil && !errdefs.IsNotFound(err) {
//try to force kill the exec process
if err := execProcess.Kill(ctx, syscall.SIGTERM); err != nil && !errdefs.IsNotFound(err) {
logrus.Errorf("failed to kill the exec process, %v", err)
}
}

// delete the finished exec process in containerd
if _, err := execProcess.Delete(context.TODO()); err != nil {
logrus.Warnf("failed to delete exec process %s: %s", process.ExecID, err)
// Wait for process to be killed
status := <-exitStatus
msg = &Message{
err: errors.Wrapf(status.Error(), "failed to exec process %s, timeout", execID),
exitCode: status.ExitCode(),
exitTime: status.ExitTime(),
}
}()

// start the exec process
if err := execProcess.Start(ctx); err != nil {
errCh <- err
return err
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion ctrd/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type ContainerAPIClient interface {
// ContainerStats returns stats of the container.
ContainerStats(ctx context.Context, id string) (*containerdtypes.Metric, error)
// ExecContainer executes a process in container.
ExecContainer(ctx context.Context, process *Process) error
ExecContainer(ctx context.Context, process *Process, timeout int) error
// ResizeContainer changes the size of the TTY of the exec process running
// in the container to the given height and width.
ResizeExec(ctx context.Context, id string, execid string, opts types.ResizeOptions) error
Expand Down
2 changes: 1 addition & 1 deletion daemon/mgr/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type ContainerMgr interface {
CreateExec(ctx context.Context, name string, config *types.ExecCreateConfig) (string, error)

// StartExec executes a new process in container.
StartExec(ctx context.Context, execid string, cfg *streams.AttachConfig) error
StartExec(ctx context.Context, execid string, cfg *streams.AttachConfig, timeout int) error

// InspectExec returns low-level information about exec command.
InspectExec(ctx context.Context, execid string) (*types.ContainerExecInspect, error)
Expand Down
5 changes: 3 additions & 2 deletions daemon/mgr/container_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func (mgr *ContainerManager) ResizeExec(ctx context.Context, execid string, opts
}

// StartExec executes a new process in container.
func (mgr *ContainerManager) StartExec(ctx context.Context, execid string, cfg *streams.AttachConfig) (err0 error) {
// timeout = 0 means no timeout
func (mgr *ContainerManager) StartExec(ctx context.Context, execid string, cfg *streams.AttachConfig, timeout int) (err0 error) {
// GetExecConfig should not error, since we have done this before call StartExec
execConfig, err := mgr.GetExecConfig(ctx, execid)
if err != nil {
Expand Down Expand Up @@ -161,7 +162,7 @@ func (mgr *ContainerManager) StartExec(ctx context.Context, execid string, cfg *
ExecID: execid,
IO: eio,
P: process,
}); err != nil {
}, timeout); err != nil {
return err
}
return <-attachErrCh
Expand Down
1 change: 0 additions & 1 deletion hack/testing/run_daemon_cri_integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ export PATH="${GOPATH}/bin:${PATH}"

# CRI_SKIP skips the test to skip.
DEFAULT_CRI_SKIP="should error on create with wrong options"
DEFAULT_CRI_SKIP+="|runtime should support execSync with timeout"
CRI_SKIP="${CRI_SKIP:-"${DEFAULT_CRI_SKIP}"}"

# CRI_FOCUS focuses the test to run.
Expand Down

0 comments on commit 98da71a

Please sign in to comment.