Skip to content

Commit

Permalink
Merge pull request #1259 from dqminh/epoll-io
Browse files Browse the repository at this point in the history
Use Epoll to perform I/O in linux
  • Loading branch information
stevvooe committed Jul 31, 2017
2 parents 834f55d + 8e53465 commit 92d737f
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 22 deletions.
6 changes: 4 additions & 2 deletions linux/shim/exec.go
Expand Up @@ -105,10 +105,11 @@ func newExecProcess(context context.Context, path string, r *shimapi.ExecProcess
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve console master")
}
e.console = console
if err := copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup, &copyWaitGroup); err != nil {
console, err = e.parent.platform.copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup, &copyWaitGroup)
if err != nil {
return nil, errors.Wrap(err, "failed to start console copy")
}
e.console = console
} else {
if err := copyPipes(context, io, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup, &copyWaitGroup); err != nil {
return nil, errors.Wrap(err, "failed to start io pipe copy")
Expand Down Expand Up @@ -142,6 +143,7 @@ func (e *execProcess) ExitedAt() time.Time {
func (e *execProcess) Exited(status int) {
e.status = status
e.exited = time.Now()
e.parent.platform.shutdownConsole(context.Background(), e.console)
e.Wait()
if e.io != nil {
for _, c := range e.closers {
Expand Down
42 changes: 24 additions & 18 deletions linux/shim/init.go
Expand Up @@ -39,21 +39,22 @@ type initProcess struct {
// the reaper interface.
mu sync.Mutex

id string
bundle string
console console.Console
io runc.IO
runtime *runc.Runc
status int
exited time.Time
pid int
closers []io.Closer
stdin io.Closer
stdio stdio
rootfs string
id string
bundle string
console console.Console
platform platform
io runc.IO
runtime *runc.Runc
status int
exited time.Time
pid int
closers []io.Closer
stdin io.Closer
stdio stdio
rootfs string
}

func newInitProcess(context context.Context, path, namespace string, r *shimapi.CreateTaskRequest) (*initProcess, error) {
func newInitProcess(context context.Context, plat platform, path, namespace string, r *shimapi.CreateTaskRequest) (*initProcess, error) {
var success bool

if err := identifiers.Validate(r.ID); err != nil {
Expand Down Expand Up @@ -98,9 +99,10 @@ func newInitProcess(context context.Context, path, namespace string, r *shimapi.
Root: filepath.Join(RuncRoot, namespace),
}
p := &initProcess{
id: r.ID,
bundle: r.Bundle,
runtime: runtime,
id: r.ID,
bundle: r.Bundle,
runtime: runtime,
platform: plat,
stdio: stdio{
stdin: r.Stdin,
stdout: r.Stdout,
Expand Down Expand Up @@ -170,10 +172,11 @@ func newInitProcess(context context.Context, path, namespace string, r *shimapi.
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve console master")
}
p.console = console
if err := copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup, &copyWaitGroup); err != nil {
console, err = plat.copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup, &copyWaitGroup)
if err != nil {
return nil, errors.Wrap(err, "failed to start console copy")
}
p.console = console
} else {
if err := copyPipes(context, io, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup, &copyWaitGroup); err != nil {
return nil, errors.Wrap(err, "failed to start io pipe copy")
Expand Down Expand Up @@ -238,6 +241,9 @@ func (p *initProcess) Delete(context context.Context) error {
return fmt.Errorf("cannot delete a running container")
}
p.killAll(context)
if err := p.platform.shutdownConsole(context, p.console); err != nil {
log.G(context).WithError(err).Warn("Failed to shutdown container console")
}
p.Wait()
err = p.runtime.Delete(context, p.id, nil)
if p.io != nil {
Expand Down
14 changes: 13 additions & 1 deletion linux/shim/service.go
Expand Up @@ -42,10 +42,20 @@ func NewService(path, namespace string, client publisher) (*Service, error) {
namespace: namespace,
context: context,
}
if err := s.initPlatform(); err != nil {
return nil, errors.Wrap(err, "failed to initialized platform behavior")
}
go s.forward(client)
return s, nil
}

// platform handles platform-specific behavior that may differs across
// platform implementations
type platform interface {
copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error)
shutdownConsole(ctx context.Context, console console.Console) error
}

type Service struct {
initProcess *initProcess
path string
Expand All @@ -58,10 +68,12 @@ type Service struct {
deferredEvent interface{}
namespace string
context context.Context

platform platform
}

func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
process, err := newInitProcess(ctx, s.path, s.namespace, r)
process, err := newInitProcess(ctx, s.platform, s.path, s.namespace, r)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
Expand Down
87 changes: 87 additions & 0 deletions linux/shim/service_linux.go
@@ -0,0 +1,87 @@
package shim

import (
"io"
"sync"
"syscall"

"github.com/containerd/console"
"github.com/containerd/fifo"
"github.com/pkg/errors"
"golang.org/x/net/context"
)

type linuxPlatform struct {
epoller *console.Epoller
}

func (p *linuxPlatform) copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
if p.epoller == nil {
return nil, errors.New("uninitialized epoller")
}

epollConsole, err := p.epoller.Add(console)
if err != nil {
return nil, err
}

if stdin != "" {
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(epollConsole, in)
}()
}

outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
if err != nil {
return nil, err
}
outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(outw, epollConsole)
epollConsole.Close()
outr.Close()
outw.Close()
wg.Done()
}()
return epollConsole, nil
}

func (p *linuxPlatform) shutdownConsole(ctx context.Context, cons console.Console) error {
if p.epoller == nil {
return errors.New("uninitialized epoller")
}
epollConsole, ok := cons.(*console.EpollConsole)
if !ok {
return errors.Errorf("expected EpollConsole, got %#v", cons)
}
return epollConsole.Shutdown(p.epoller.CloseConsole)
}

// initialize a single epoll fd to manage our consoles. `initPlatform` should
// only be called once.
func (s *Service) initPlatform() error {
if s.platform != nil {
return nil
}
epoller, err := console.NewEpoller()
if err != nil {
return errors.Wrap(err, "failed to initialize epoller")
}
s.platform = &linuxPlatform{
epoller: epoller,
}
go epoller.Wait()
return nil
}
58 changes: 58 additions & 0 deletions linux/shim/service_unix.go
@@ -0,0 +1,58 @@
// +build !windows,!linux

package shim

import (
"io"
"sync"
"syscall"

"github.com/containerd/console"
"github.com/containerd/fifo"
"golang.org/x/net/context"
)

type unixPlatform struct {
}

func (p *unixPlatform) copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
if stdin != "" {
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(console, in)
}()
}
outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
if err != nil {
return nil, err
}
outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(outw, console)
console.Close()
outr.Close()
outw.Close()
wg.Done()
}()
return console, nil
}

func (p *unixPlatform) shutdownConsole(ctx context.Context, cons console.Console) error {
return nil
}

func (s *Service) initPlatform() error {
s.platform = &unixPlatform{}
return nil
}
2 changes: 1 addition & 1 deletion vendor.conf
@@ -1,6 +1,6 @@
github.com/coreos/go-systemd 48702e0da86bd25e76cfef347e2adeb434a0d0a6
github.com/containerd/go-runc 2774a2ea124a5c2d0aba13b5c2dd8a5a9a48775d
github.com/containerd/console 7fed77e673ca4abcd0cbd6d4d0e0e22137cbd778
github.com/containerd/console 2ce1c681f3c3c0dfa7d0af289428d36567c9a6bc
github.com/containerd/cgroups 4fd64a776f25b5540cddcb72eea6e35e58baca6e
github.com/docker/go-metrics 8fd5772bf1584597834c6f7961a530f06cbfbb87
github.com/docker/go-events 9461782956ad83b30282bf90e31fa6a70c255ba9
Expand Down
2 changes: 2 additions & 0 deletions vendor/github.com/containerd/console/console.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/github.com/containerd/console/console_linux.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions vendor/github.com/containerd/console/console_unix.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions vendor/github.com/containerd/console/console_windows.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 92d737f

Please sign in to comment.