Skip to content

Commit

Permalink
Add create/start to exec processes in shim
Browse files Browse the repository at this point in the history
This splits up the create and start of an exec process in the shim to
have two separate steps like the initial process.  This will allow
better state reporting for individual process along with a more robust
wait for execs.

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
  • Loading branch information
crosbymichael committed Aug 2, 2017
1 parent 2533bfe commit 63878d1
Show file tree
Hide file tree
Showing 11 changed files with 746 additions and 246 deletions.
1 change: 1 addition & 0 deletions api/services/events/v1/container.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

280 changes: 235 additions & 45 deletions api/services/events/v1/task.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions api/services/events/v1/task.proto
Expand Up @@ -53,6 +53,11 @@ message TaskOOM {
message TaskExecAdded {
string container_id = 1;
string exec_id = 2;
}

message TaskExecStarted {
string container_id = 1;
string exec_id = 2;
uint32 pid = 3;
}

Expand Down
11 changes: 9 additions & 2 deletions cmd/ctr/shim.go
Expand Up @@ -149,7 +149,9 @@ var shimStartCommand = cli.Command{
if err != nil {
return err
}
_, err = service.Start(gocontext.Background(), empty)
_, err = service.Start(gocontext.Background(), &shim.StartRequest{
ID: context.Args().First(),
})
return err
},
}
Expand Down Expand Up @@ -261,7 +263,12 @@ var shimExecCommand = cli.Command{
Stderr: context.String("stderr"),
Terminal: tty,
}
r, err := service.Exec(ctx, rq)
if _, err := service.Exec(ctx, rq); err != nil {
return err
}
r, err := service.Start(ctx, &shim.StartRequest{
ID: id,
})
if err != nil {
return err
}
Expand Down
136 changes: 70 additions & 66 deletions linux/shim/exec.go
Expand Up @@ -36,6 +36,8 @@ type execProcess struct {
closers []io.Closer
stdin io.Closer
stdio stdio
path string
spec specs.Process

parent *initProcess
}
Expand All @@ -44,83 +46,25 @@ func newExecProcess(context context.Context, path string, r *shimapi.ExecProcess
if err := identifiers.Validate(id); err != nil {
return nil, errors.Wrapf(err, "invalid exec id")
}
// process exec request
var spec specs.Process
if err := json.Unmarshal(r.Spec.Value, &spec); err != nil {
return nil, err
}
spec.Terminal = r.Terminal

e := &execProcess{
id: id,
path: path,
parent: parent,
spec: spec,
stdio: stdio{
stdin: r.Stdin,
stdout: r.Stdout,
stderr: r.Stderr,
terminal: r.Terminal,
},
}
var (
err error
socket *runc.Socket
io runc.IO
pidfile = filepath.Join(path, fmt.Sprintf("%s.pid", id))
)
if r.Terminal {
if socket, err = runc.NewConsoleSocket(filepath.Join(path, "pty.sock")); err != nil {
return nil, errors.Wrap(err, "failed to create runc console socket")
}
defer os.Remove(socket.Path())
} else {
// TODO: get uid/gid
if io, err = runc.NewPipeIO(0, 0); err != nil {
return nil, errors.Wrap(err, "failed to create runc io pipes")
}
e.io = io
}
opts := &runc.ExecOpts{
PidFile: pidfile,
IO: io,
Detach: true,
}
if socket != nil {
opts.ConsoleSocket = socket
}
// process exec request
var spec specs.Process
if err := json.Unmarshal(r.Spec.Value, &spec); err != nil {
return nil, err
}
spec.Terminal = r.Terminal

if err := parent.runtime.Exec(context, parent.id, spec, opts); err != nil {
return nil, parent.runtimeError(err, "OCI runtime exec failed")
}
if r.Stdin != "" {
sc, err := fifo.OpenFifo(context, r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return nil, errors.Wrapf(err, "failed to open stdin fifo %s", r.Stdin)
}
e.closers = append(e.closers, sc)
e.stdin = sc
}
var copyWaitGroup sync.WaitGroup
if socket != nil {
console, err := socket.ReceiveMaster()
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve console master")
}
console, err = e.parent.platform.copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup, &copyWaitGroup)
if err != nil {
return nil, errors.Wrap(err, "failed to start console copy")
}
e.console = console
} else {
if err := copyPipes(context, io, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup, &copyWaitGroup); err != nil {
return nil, errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(opts.PidFile)
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve OCI runtime exec pid")
}
e.pid = pid
return e, nil
}

Expand Down Expand Up @@ -178,3 +122,63 @@ func (e *execProcess) Stdin() io.Closer {
func (e *execProcess) Stdio() stdio {
return e.stdio
}

func (e *execProcess) Start(ctx context.Context) (err error) {
var (
socket *runc.Socket
io runc.IO
pidfile = filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id))
)
if e.stdio.terminal {
if socket, err = runc.NewConsoleSocket(filepath.Join(e.path, "pty.sock")); err != nil {
return errors.Wrap(err, "failed to create runc console socket")
}
defer os.Remove(socket.Path())
} else {
if io, err = runc.NewPipeIO(0, 0); err != nil {
return errors.Wrap(err, "failed to create runc io pipes")
}
e.io = io
}
opts := &runc.ExecOpts{
PidFile: pidfile,
IO: io,
Detach: true,
}
if socket != nil {
opts.ConsoleSocket = socket
}
if err := e.parent.runtime.Exec(ctx, e.parent.id, e.spec, opts); err != nil {
return e.parent.runtimeError(err, "OCI runtime exec failed")
}
if e.stdio.stdin != "" {
sc, err := fifo.OpenFifo(ctx, e.stdio.stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", e.stdio.stdin)
}
e.closers = append(e.closers, sc)
e.stdin = sc
}
var copyWaitGroup sync.WaitGroup
if socket != nil {
console, err := socket.ReceiveMaster()
if err != nil {
return errors.Wrap(err, "failed to retrieve console master")
}
e.console = console
if err := e.parent.platform.copyConsole(ctx, console, e.stdio.stdin, e.stdio.stdout, e.stdio.stderr, &e.WaitGroup, &copyWaitGroup); err != nil {
return errors.Wrap(err, "failed to start console copy")
}
} else {
if err := copyPipes(ctx, io, e.stdio.stdin, e.stdio.stdout, e.stdio.stderr, &e.WaitGroup, &copyWaitGroup); err != nil {
return errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(opts.PidFile)
if err != nil {
return errors.Wrap(err, "failed to retrieve OCI runtime exec pid")
}
e.pid = pid
return nil
}
4 changes: 2 additions & 2 deletions linux/shim/local.go
Expand Up @@ -27,7 +27,7 @@ func (c *local) Create(ctx context.Context, in *shimapi.CreateTaskRequest, opts
return c.s.Create(ctx, in)
}

func (c *local) Start(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error) {
func (c *local) Start(ctx context.Context, in *shimapi.StartRequest, opts ...grpc.CallOption) (*shimapi.StartResponse, error) {
return c.s.Start(ctx, in)
}

Expand All @@ -43,7 +43,7 @@ func (c *local) DeleteProcess(ctx context.Context, in *shimapi.DeleteProcessRequ
return c.s.DeleteProcess(ctx, in)
}

func (c *local) Exec(ctx context.Context, in *shimapi.ExecProcessRequest, opts ...grpc.CallOption) (*shimapi.ExecProcessResponse, error) {
func (c *local) Exec(ctx context.Context, in *shimapi.ExecProcessRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) {
return c.s.Exec(ctx, in)
}

Expand Down
2 changes: 2 additions & 0 deletions linux/shim/process.go
Expand Up @@ -38,4 +38,6 @@ type process interface {
Kill(context.Context, uint32, bool) error
// Stdio returns io information for the container
Stdio() stdio
// Start execution of the process
Start(context.Context) error
}
47 changes: 28 additions & 19 deletions linux/shim/service.go
Expand Up @@ -108,18 +108,36 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh
}, nil
}

func (s *Service) Start(ctx context.Context, r *google_protobuf.Empty) (*google_protobuf.Empty, error) {
if s.initProcess == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.StartResponse, error) {
p, ok := s.processes[r.ID]
if !ok {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", r.ID)
}
if err := s.initProcess.Start(ctx); err != nil {
if err := p.Start(ctx); err != nil {
return nil, err
}
s.events <- &eventsapi.TaskStart{
ContainerID: s.id,
Pid: uint32(s.initProcess.Pid()),
if r.ID == s.id {
s.events <- &eventsapi.TaskStart{
ContainerID: s.id,
Pid: uint32(s.initProcess.Pid()),
}
} else {
pid := p.Pid()
cmd := &reaper.Cmd{
ExitCh: make(chan int, 1),
}
reaper.Default.Register(pid, cmd)
go s.waitExit(p, pid, cmd)
s.events <- &eventsapi.TaskExecStarted{
ContainerID: s.id,
ExecID: r.ID,
Pid: uint32(pid),
}
}
return empty, nil
return &shimapi.StartResponse{
ID: p.ID(),
Pid: uint32(p.Pid()),
}, nil
}

func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimapi.DeleteResponse, error) {
Expand Down Expand Up @@ -170,7 +188,7 @@ func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessReq
}, nil
}

func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*shimapi.ExecProcessResponse, error) {
func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*google_protobuf.Empty, error) {
if s.initProcess == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
Expand All @@ -181,22 +199,13 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*shi
if err != nil {
return nil, errdefs.ToGRPC(err)
}
pid := process.Pid()
cmd := &reaper.Cmd{
ExitCh: make(chan int, 1),
}
reaper.Default.Register(pid, cmd)
s.processes[r.ID] = process

s.events <- &eventsapi.TaskExecAdded{
ContainerID: s.id,
ExecID: r.ID,
Pid: uint32(pid),
}
go s.waitExit(process, pid, cmd)
return &shimapi.ExecProcessResponse{
Pid: uint32(pid),
}, nil
return empty, nil
}

func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*google_protobuf.Empty, error) {
Expand Down

0 comments on commit 63878d1

Please sign in to comment.