Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move io.go into io package #1782

Merged
merged 1 commit into from Nov 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 16 additions & 16 deletions io.go → cio/io.go
@@ -1,4 +1,4 @@
package containerd
package cio

import (
"context"
Expand All @@ -8,8 +8,8 @@ import (
"sync"
)

// IOConfig holds the io configurations.
type IOConfig struct {
// Config holds the io configurations.
type Config struct {
// Terminal is true if one has been allocated
Terminal bool
// Stdin path
Expand All @@ -23,7 +23,7 @@ type IOConfig struct {
// IO holds the io information for a task or process
type IO interface {
// Config returns the IO configuration.
Config() IOConfig
Config() Config
// Cancel aborts all current io operations
Cancel()
// Wait blocks until all io copy operations have completed
Expand All @@ -34,12 +34,12 @@ type IO interface {

// cio is a basic container IO implementation.
type cio struct {
config IOConfig
config Config

closer *wgCloser
}

func (c *cio) Config() IOConfig {
func (c *cio) Config() Config {
return c.config
}

Expand All @@ -64,23 +64,23 @@ func (c *cio) Close() error {
return c.closer.Close()
}

// IOCreation creates new IO sets for a task
type IOCreation func(id string) (IO, error)
// Creation creates new IO sets for a task
type Creation func(id string) (IO, error)

// IOAttach allows callers to reattach to running tasks
// Attach allows callers to reattach to running tasks
//
// There should only be one reader for a task's IO set
// because fifo's can only be read from one reader or the output
// will be sent only to the first reads
type IOAttach func(*FIFOSet) (IO, error)
type Attach func(*FIFOSet) (IO, error)

// NewIO returns an IOCreation that will provide IO sets without a terminal
func NewIO(stdin io.Reader, stdout, stderr io.Writer) IOCreation {
// NewIO returns an Creation that will provide IO sets without a terminal
func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creation {
return NewIOWithTerminal(stdin, stdout, stderr, false)
}

// NewIOWithTerminal creates a new io set with the provied io.Reader/Writers for use with a terminal
func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) IOCreation {
func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation {
return func(id string) (_ IO, err error) {
paths, err := NewFifos(id)
if err != nil {
Expand All @@ -91,7 +91,7 @@ func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool)
os.RemoveAll(paths.Dir)
}
}()
cfg := IOConfig{
cfg := Config{
Terminal: terminal,
Stdout: paths.Out,
Stderr: paths.Err,
Expand All @@ -113,12 +113,12 @@ func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool)
}

// WithAttach attaches the existing io for a task to the provided io.Reader/Writers
func WithAttach(stdin io.Reader, stdout, stderr io.Writer) IOAttach {
func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach {
return func(paths *FIFOSet) (IO, error) {
if paths == nil {
return nil, fmt.Errorf("cannot attach to existing fifos")
}
cfg := IOConfig{
cfg := Config{
Terminal: paths.Terminal,
Stdout: paths.Out,
Stderr: paths.Err,
Expand Down
8 changes: 4 additions & 4 deletions io_unix.go → cio/io_unix.go
@@ -1,6 +1,6 @@
// +build !windows

package containerd
package cio

import (
"context"
Expand Down Expand Up @@ -139,9 +139,9 @@ func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) {
return f, nil
}

// Config returns the IOConfig
func (f *DirectIO) Config() IOConfig {
return IOConfig{
// Config returns the Config
func (f *DirectIO) Config() Config {
return Config{
Terminal: f.terminal,
Stdin: f.set.In,
Stdout: f.set.Out,
Expand Down
2 changes: 1 addition & 1 deletion io_windows.go → cio/io_windows.go
@@ -1,4 +1,4 @@
package containerd
package cio

import (
"fmt"
Expand Down
3 changes: 2 additions & 1 deletion cmd/containerd-stress/main.go
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/namespaces"
specs "github.com/opencontainers/runtime-spec/specs-go"
Expand Down Expand Up @@ -206,7 +207,7 @@ func (w *worker) runContainer(ctx context.Context, id string) error {
}
defer c.Delete(ctx, containerd.WithSnapshotCleanup)

task, err := c.NewTask(ctx, containerd.NullIO)
task, err := c.NewTask(ctx, cio.NullIO)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/ctr/commands/tasks/attach.go
Expand Up @@ -4,7 +4,7 @@ import (
"os"

"github.com/containerd/console"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/cmd/ctr/commands"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
Expand Down Expand Up @@ -39,7 +39,7 @@ var attachCommand = cli.Command{
return err
}
}
task, err := container.Task(ctx, containerd.WithAttach(os.Stdin, os.Stdout, os.Stderr))
task, err := container.Task(ctx, cio.WithAttach(os.Stdin, os.Stdout, os.Stderr))
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/ctr/commands/tasks/exec.go
Expand Up @@ -4,7 +4,7 @@ import (
"errors"

"github.com/containerd/console"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/cmd/ctr/commands"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
Expand Down Expand Up @@ -60,11 +60,11 @@ var execCommand = cli.Command{
pspec.Terminal = tty
pspec.Args = args

io := containerd.Stdio
ioCreator := cio.Stdio
if tty {
io = containerd.StdioTerminal
ioCreator = cio.StdioTerminal
}
process, err := task.Exec(ctx, context.String("exec-id"), pspec, io)
process, err := task.Exec(ctx, context.String("exec-id"), pspec, ioCreator)
if err != nil {
return err
}
Expand Down
11 changes: 6 additions & 5 deletions cmd/ctr/commands/tasks/tasks_unix.go
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/containerd/console"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/log"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -44,21 +45,21 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol
// NewTask creates a new task
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, tty, nullIO bool) (containerd.Task, error) {
if checkpoint == "" {
io := containerd.Stdio
ioCreator := cio.Stdio
if tty {
io = containerd.StdioTerminal
ioCreator = cio.StdioTerminal
}
if nullIO {
if tty {
return nil, errors.New("tty and null-io cannot be used together")
}
io = containerd.NullIO
ioCreator = cio.NullIO
}
return container.NewTask(ctx, io)
return container.NewTask(ctx, ioCreator)
}
im, err := client.GetImage(ctx, checkpoint)
if err != nil {
return nil, err
}
return container.NewTask(ctx, containerd.Stdio, containerd.WithTaskCheckpoint(im))
return container.NewTask(ctx, cio.Stdio, containerd.WithTaskCheckpoint(im))
}
9 changes: 5 additions & 4 deletions cmd/ctr/commands/tasks/tasks_windows.go
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/containerd/console"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/log"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -41,15 +42,15 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol

// NewTask creates a new task
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, _ string, tty, nullIO bool) (containerd.Task, error) {
io := containerd.Stdio
ioCreator := cio.Stdio
if tty {
io = containerd.StdioTerminal
ioCreator = cio.StdioTerminal
}
if nullIO {
if tty {
return nil, errors.New("tty and null-io cannot be used together")
}
io = containerd.NullIO
ioCreator = cio.NullIO
}
return container.NewTask(ctx, io)
return container.NewTask(ctx, ioCreator)
}
19 changes: 10 additions & 9 deletions container.go
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/typeurl"
Expand All @@ -25,17 +26,17 @@ type Container interface {
// Delete removes the container
Delete(context.Context, ...DeleteOpts) error
// NewTask creates a new task based on the container metadata
NewTask(context.Context, IOCreation, ...NewTaskOpts) (Task, error)
NewTask(context.Context, cio.Creation, ...NewTaskOpts) (Task, error)
// Spec returns the OCI runtime specification
Spec(context.Context) (*specs.Spec, error)
// Task returns the current task for the container
//
// If IOAttach options are passed the client will reattach to the IO for the running
// If cio.Attach options are passed the client will reattach to the IO for the running
// task. If no task exists for the container a NotFound error is returned
//
// Clients must make sure that only one reader is attached to the task and consuming
// the output from the task's fifos
Task(context.Context, IOAttach) (Task, error)
Task(context.Context, cio.Attach) (Task, error)
// Image returns the image that the container is based on
Image(context.Context) (Image, error)
// Labels returns the labels set on the container
Expand Down Expand Up @@ -138,7 +139,7 @@ func (c *container) Delete(ctx context.Context, opts ...DeleteOpts) error {
return c.client.ContainerService().Delete(ctx, c.id)
}

func (c *container) Task(ctx context.Context, attach IOAttach) (Task, error) {
func (c *container) Task(ctx context.Context, attach cio.Attach) (Task, error) {
return c.loadTask(ctx, attach)
}

Expand All @@ -161,7 +162,7 @@ func (c *container) Image(ctx context.Context) (Image, error) {
}, nil
}

func (c *container) NewTask(ctx context.Context, ioCreate IOCreation, opts ...NewTaskOpts) (Task, error) {
func (c *container) NewTask(ctx context.Context, ioCreate cio.Creation, opts ...NewTaskOpts) (Task, error) {
i, err := ioCreate(c.id)
if err != nil {
return nil, err
Expand Down Expand Up @@ -251,7 +252,7 @@ func (c *container) Update(ctx context.Context, opts ...UpdateContainerOpts) err
return nil
}

func (c *container) loadTask(ctx context.Context, ioAttach IOAttach) (Task, error) {
func (c *container) loadTask(ctx context.Context, ioAttach cio.Attach) (Task, error) {
response, err := c.client.TaskService().Get(ctx, &tasks.GetRequest{
ContainerID: c.id,
})
Expand All @@ -262,7 +263,7 @@ func (c *container) loadTask(ctx context.Context, ioAttach IOAttach) (Task, erro
}
return nil, err
}
var i IO
var i cio.IO
if ioAttach != nil {
if i, err = attachExistingIO(response, ioAttach); err != nil {
return nil, err
Expand All @@ -281,9 +282,9 @@ func (c *container) get(ctx context.Context) (containers.Container, error) {
return c.client.ContainerService().Get(ctx, c.id)
}

func attachExistingIO(response *tasks.GetResponse, ioAttach IOAttach) (IO, error) {
func attachExistingIO(response *tasks.GetResponse, ioAttach cio.Attach) (cio.IO, error) {
// get the existing fifo paths from the task information stored by the daemon
paths := &FIFOSet{
paths := &cio.FIFOSet{
Dir: getFifoDir([]string{
response.Process.Stdin,
response.Process.Stdout,
Expand Down
15 changes: 8 additions & 7 deletions container_linux_test.go
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/containerd/cgroups"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/linux/runctypes"
Expand Down Expand Up @@ -294,7 +295,7 @@ func TestContainerAttach(t *testing.T) {

expected := "hello" + newLine

direct, err := NewDirectIO(ctx, false)
direct, err := cio.NewDirectIO(ctx, false)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -389,7 +390,7 @@ func TestContainerUsername(t *testing.T) {
t.Error(err)
return
}
direct, err := NewDirectIO(ctx, false)
direct, err := cio.NewDirectIO(ctx, false)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -482,7 +483,7 @@ func TestContainerAttachProcess(t *testing.T) {
expected := "hello" + newLine

// creating IO early for easy resource cleanup
direct, err := NewDirectIO(ctx, false)
direct, err := cio.NewDirectIO(ctx, false)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -598,7 +599,7 @@ func TestContainerUserID(t *testing.T) {
t.Error(err)
return
}
direct, err := NewDirectIO(ctx, false)
direct, err := cio.NewDirectIO(ctx, false)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -688,7 +689,7 @@ func TestContainerKillAll(t *testing.T) {
defer container.Delete(ctx, WithSnapshotCleanup)

stdout := bytes.NewBuffer(nil)
task, err := container.NewTask(ctx, NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil)))
task, err := container.NewTask(ctx, cio.NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil)))
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -940,7 +941,7 @@ func TestContainerKillInitPidHost(t *testing.T) {
defer container.Delete(ctx, WithSnapshotCleanup)

stdout := bytes.NewBuffer(nil)
task, err := container.NewTask(ctx, NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil)))
task, err := container.NewTask(ctx, cio.NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil)))
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -1039,7 +1040,7 @@ func testUserNamespaces(t *testing.T, readonlyRootFS bool) {
}
defer container.Delete(ctx, WithSnapshotCleanup)

task, err := container.NewTask(ctx, Stdio, func(_ context.Context, client *Client, r *TaskInfo) error {
task, err := container.NewTask(ctx, cio.Stdio, func(_ context.Context, client *Client, r *TaskInfo) error {
r.Options = &runctypes.CreateOptions{
IoUid: 1000,
IoGid: 1000,
Expand Down