From b8dfb4d8f54bad021fbf1bf1cc239077bfb5c7c7 Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Wed, 13 Mar 2024 09:37:19 +0800 Subject: [PATCH] cri: support io by streaming api Signed-off-by: Abel Feng --- core/transfer/streaming/reader.go | 121 ++++++++++++++++ internal/cri/config/config.go | 16 +++ internal/cri/io/container_io.go | 23 ++- internal/cri/io/exec_io.go | 37 +++-- internal/cri/io/helpers.go | 163 +++++++++++++++++++--- internal/cri/server/container_create.go | 11 +- internal/cri/server/container_execsync.go | 27 +++- pkg/cio/io.go | 4 +- 8 files changed, 364 insertions(+), 38 deletions(-) create mode 100644 core/transfer/streaming/reader.go diff --git a/core/transfer/streaming/reader.go b/core/transfer/streaming/reader.go new file mode 100644 index 000000000000..e989b0d7326a --- /dev/null +++ b/core/transfer/streaming/reader.go @@ -0,0 +1,121 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package streaming + +import ( + "context" + "errors" + "fmt" + "io" + + transferapi "github.com/containerd/containerd/api/types/transfer" + "github.com/containerd/containerd/v2/core/streaming" + "github.com/containerd/typeurl/v2" +) + +type readByteStream struct { + ctx context.Context + stream streaming.Stream + window int32 + updated chan struct{} + errCh chan error + remaining []byte +} + +func ReadByteStream(ctx context.Context, stream streaming.Stream) io.ReadCloser { + rbs := &readByteStream{ + ctx: ctx, + stream: stream, + window: 0, + errCh: make(chan error), + updated: make(chan struct{}, 1), + } + go func() { + for { + if rbs.window >= windowSize { + select { + case <-ctx.Done(): + return + case <-rbs.updated: + continue + } + } + update := &transferapi.WindowUpdate{ + Update: windowSize, + } + anyType, err := typeurl.MarshalAny(update) + if err != nil { + rbs.errCh <- err + return + } + if err := stream.Send(anyType); err == nil { + rbs.window += windowSize + } else if !errors.Is(err, io.EOF) { + rbs.errCh <- err + } + } + + }() + return rbs +} + +func (r *readByteStream) Read(p []byte) (n int, err error) { + plen := len(p) + if len(r.remaining) > 0 { + copied := copy(p, r.remaining) + if len(r.remaining) > plen { + r.remaining = r.remaining[plen:] + } else { + r.remaining = nil + } + return copied, nil + } + select { + case <-r.ctx.Done(): + return 0, r.ctx.Err() + case err := <-r.errCh: + return 0, err + default: + } + anyType, err := r.stream.Recv() + if err != nil { + return 0, err + } + i, err := typeurl.UnmarshalAny(anyType) + if err != nil { + return 0, err + } + switch v := i.(type) { + case *transferapi.Data: + n := copy(p, v.Data) + if len(v.Data) > plen { + r.remaining = v.Data[plen:] + } + r.window = r.window - int32(n) + if r.window < windowSize { + r.updated <- struct{}{} + } + return n, nil + default: + return 0, fmt.Errorf("stream received error type %v", v) + } + +} + +func (r *readByteStream) Close() error { + return r.stream.Close() +} diff --git a/internal/cri/config/config.go b/internal/cri/config/config.go index d0b18ee9bd99..1d1c7fc63ebb 100644 --- a/internal/cri/config/config.go +++ b/internal/cri/config/config.go @@ -71,6 +71,10 @@ const ( // DefaultSandboxImage is the default image to use for sandboxes when empty or // for default configurations. DefaultSandboxImage = "registry.k8s.io/pause:3.9" + // IOTypeFifo is container io implemented by creating named pipe + IOTypeFifo = "fifo" + // IOTypeStreaming is container io implemented by connecting the streaming api to sandbox endpoint + IOTypeStreaming = "streaming" ) // Runtime struct to contain the type(ID), engine, and root variables for a default runtime @@ -116,6 +120,11 @@ type Runtime struct { // shim - means use whatever Controller implementation provided by shim (e.g. use RemoteController). // podsandbox - means use Controller implementation from sbserver podsandbox package. Sandboxer string `toml:"sandboxer" json:"sandboxer"` + // IOType defines how containerd transfer the io streams of the container + // if it is not set, the named pipe will be created for the container + // we can also set it to "streaming" to create a stream by streaming api, + // and use it as a channel to transfer the io stream + IOType string `toml:"io_type" json:"io_type"` } // ContainerdConfig contains toml config related to containerd @@ -527,6 +536,13 @@ func ValidateRuntimeConfig(ctx context.Context, c *RuntimeConfig) ([]deprecation r.Sandboxer = string(ModePodSandbox) c.ContainerdConfig.Runtimes[k] = r } + + if len(r.IOType) == 0 { + r.IOType = IOTypeFifo + } + if r.IOType != IOTypeStreaming && r.IOType != IOTypeFifo { + return warnings, errors.New("`io_type` can only be `streaming` or `named_pipe`") + } } // Validation for drain_exec_sync_io_timeout diff --git a/internal/cri/io/container_io.go b/internal/cri/io/container_io.go index c916df55c341..c0d60e48624f 100644 --- a/internal/cri/io/container_io.go +++ b/internal/cri/io/container_io.go @@ -18,14 +18,15 @@ package io import ( "errors" + "fmt" "io" "strings" "sync" - "github.com/containerd/containerd/v2/pkg/cio" "github.com/containerd/log" "github.com/containerd/containerd/v2/internal/cri/util" + "github.com/containerd/containerd/v2/pkg/cio" cioutil "github.com/containerd/containerd/v2/pkg/ioutil" ) @@ -39,7 +40,7 @@ type ContainerIO struct { id string fifos *cio.FIFOSet - *stdioPipes + *stdioStream stdoutGroup *cioutil.WriterGroup stderrGroup *cioutil.WriterGroup @@ -71,6 +72,20 @@ func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts { } } +// WithStreams creates new streams for the container io. +func WithStreams(address string, tty, stdin bool) ContainerIOOpts { + return func(c *ContainerIO) error { + if address == "" { + return fmt.Errorf("address can not be empty for io stream") + } + fifos, err := newStreams(address, c.id, tty, stdin) + if err != nil { + return err + } + return WithFIFOs(fifos)(c) + } +} + // NewContainerIO creates container io. func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err error) { c := &ContainerIO{ @@ -87,11 +102,11 @@ func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err err return nil, errors.New("fifos are not set") } // Create actual fifos. - stdio, closer, err := newStdioPipes(c.fifos) + stdio, closer, err := newStdioStream(c.fifos) if err != nil { return nil, err } - c.stdioPipes = stdio + c.stdioStream = stdio c.closer = closer return c, nil } diff --git a/internal/cri/io/exec_io.go b/internal/cri/io/exec_io.go index e2d5608f2e2f..495a40abca1d 100644 --- a/internal/cri/io/exec_io.go +++ b/internal/cri/io/exec_io.go @@ -20,36 +20,55 @@ import ( "io" "sync" + "github.com/containerd/log" + "github.com/containerd/containerd/v2/pkg/cio" cioutil "github.com/containerd/containerd/v2/pkg/ioutil" - "github.com/containerd/log" ) // ExecIO holds the exec io. type ExecIO struct { id string fifos *cio.FIFOSet - *stdioPipes + *stdioStream closer *wgCloser } var _ cio.IO = &ExecIO{} -// NewExecIO creates exec io. -func NewExecIO(id, root string, tty, stdin bool) (*ExecIO, error) { +// NewFifoExecIO creates exec io by named pipes. +func NewFifoExecIO(id, root string, tty, stdin bool) (*ExecIO, error) { fifos, err := newFifos(root, id, tty, stdin) if err != nil { return nil, err } - stdio, closer, err := newStdioPipes(fifos) + stdio, closer, err := newStdioStream(fifos) + if err != nil { + return nil, err + } + return &ExecIO{ + id: id, + fifos: fifos, + stdioStream: stdio, + closer: closer, + }, nil +} + +// NewStreamExecIO creates exec io with streaming. +func NewStreamExecIO(id, address string, tty, stdin bool) (*ExecIO, error) { + fifos, err := newStreams(address, id, tty, stdin) + if err != nil { + return nil, err + } + stdio, closer, err := newStdioStream(fifos) if err != nil { return nil, err } return &ExecIO{ - id: id, - fifos: fifos, - stdioPipes: stdio, - closer: closer, + id: id, + fifos: fifos, + stdioStream: stdio, + closer: closer, }, nil } diff --git a/internal/cri/io/helpers.go b/internal/cri/io/helpers.go index 74d345fb6208..64344dd1f53a 100644 --- a/internal/cri/io/helpers.go +++ b/internal/cri/io/helpers.go @@ -18,14 +18,26 @@ package io import ( "context" + "fmt" "io" + "net/url" "os" "path/filepath" + "strings" "sync" "syscall" + "time" - "github.com/containerd/containerd/v2/pkg/cio" + "github.com/containerd/ttrpc" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" + + streamingapi "github.com/containerd/containerd/v2/core/streaming" + "github.com/containerd/containerd/v2/core/streaming/proxy" + "github.com/containerd/containerd/v2/core/transfer/streaming" + "github.com/containerd/containerd/v2/pkg/cio" + "github.com/containerd/containerd/v2/pkg/shim" ) // AttachOptions specifies how to attach to a container. @@ -88,19 +100,35 @@ func newFifos(root, id string, tty, stdin bool) (*cio.FIFOSet, error) { return fifos, nil } -type stdioPipes struct { +// newStreams init streams for io of container. +func newStreams(address, id string, tty, stdin bool) (*cio.FIFOSet, error) { + fifos := cio.NewFIFOSet(cio.Config{}, func() error { return nil }) + if stdin { + streamID := id + "-stdin" + fifos.Stdin = fmt.Sprintf("%s/streaming?id=%s", address, streamID) + } + stdoutStreamID := id + "-stdout" + fifos.Stdout = fmt.Sprintf("%s/streaming?id=%s", address, stdoutStreamID) + if !tty { + stderrStreamID := id + "-stderr" + fifos.Stderr = fmt.Sprintf("%s/streaming?id=%s", address, stderrStreamID) + } + fifos.Terminal = tty + return fifos, nil +} + +type stdioStream struct { stdin io.WriteCloser stdout io.ReadCloser stderr io.ReadCloser } -// newStdioPipes creates actual fifos for stdio. -func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) { +// newStdioStream creates actual streams or fifos for stdio. +func newStdioStream(fifos *cio.FIFOSet) (_ *stdioStream, _ *wgCloser, err error) { var ( - f io.ReadWriteCloser set []io.Closer ctx, cancel = context.WithCancel(context.Background()) - p = &stdioPipes{} + p = &stdioStream{} ) defer func() { if err != nil { @@ -112,27 +140,30 @@ func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) { }() if fifos.Stdin != "" { - if f, err = openPipe(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, nil, err + in, err := openStdin(ctx, fifos.Stdin) + if err != nil { + return nil, nil, fmt.Errorf("failed to open stdin, %w", err) } - p.stdin = f - set = append(set, f) + p.stdin = in + set = append(set, in) } if fifos.Stdout != "" { - if f, err = openPipe(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, nil, err + out, err := openOutput(ctx, fifos.Stdout) + if err != nil { + return nil, nil, fmt.Errorf("failed to open stdout, %w", err) } - p.stdout = f - set = append(set, f) + p.stdout = out + set = append(set, out) } if fifos.Stderr != "" { - if f, err = openPipe(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, nil, err + out, err := openOutput(ctx, fifos.Stderr) + if err != nil { + return nil, nil, fmt.Errorf("failed to open stderr, %w", err) } - p.stderr = f - set = append(set, f) + p.stderr = out + set = append(set, out) } return p, &wgCloser{ @@ -142,3 +173,99 @@ func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) { cancel: cancel, }, nil } + +func openStdin(ctx context.Context, url string) (io.WriteCloser, error) { + ok := strings.Contains(url, "://") + if !ok { + return openPipe(ctx, url, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) + } + + return openStdinStream(ctx, url) +} + +func openStdinStream(ctx context.Context, url string) (io.WriteCloser, error) { + stream, err := openStream(ctx, url) + if err != nil { + return nil, err + } + return streaming.WriteByteStream(ctx, stream), nil +} + +func openOutput(ctx context.Context, url string) (io.ReadCloser, error) { + ok := strings.Contains(url, "://") + if !ok { + return openPipe(ctx, url, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) + } + + return openOutputStream(ctx, url) +} + +func openOutputStream(ctx context.Context, url string) (io.ReadCloser, error) { + stream, err := openStream(ctx, url) + if err != nil { + return nil, err + } + return streaming.ReadByteStream(ctx, stream), nil +} + +func openStream(ctx context.Context, urlStr string) (streamingapi.Stream, error) { + u, err := url.Parse(urlStr) + if err != nil { + return nil, fmt.Errorf("address url parse error: %v", err) + } + // The address returned from sandbox controller should be in the form like ttrpc+unix:// + // or grpc+vsock://:, we should get the protocol from the url first. + protocol, scheme, ok := strings.Cut(u.Scheme, "+") + if !ok { + return nil, fmt.Errorf("the scheme of sandbox address should be in " + + " the form of +, i.e. ttrpc+unix or grpc+vsock") + } + + if u.Path != "streaming" { + // TODO, support connect stream other than streaming api + return nil, fmt.Errorf("only
/streaming?id=xxx supported") + } + + id := u.Query().Get("id") + if id == "" { + return nil, fmt.Errorf("no stream id in url queries") + } + realAddress := fmt.Sprintf("%s://%s/%s", scheme, u.Host, u.Path) + conn, err := shim.AnonReconnectDialer(realAddress, 100*time.Second) + if err != nil { + return nil, fmt.Errorf("failed to connect the stream %v", err) + } + var stream streamingapi.Stream + + switch protocol { + case "ttrpc": + c := ttrpc.NewClient(conn) + streamCreator := proxy.NewStreamCreator(c) + stream, err = streamCreator.Create(ctx, id) + if err != nil { + return nil, err + } + return stream, nil + + case "grpc": + ctx, cancel := context.WithTimeout(ctx, time.Second*100) + defer cancel() + + gopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + } + conn, err := grpc.DialContext(ctx, realAddress, gopts...) + if err != nil { + return nil, err + } + streamCreator := proxy.NewStreamCreator(conn) + stream, err = streamCreator.Create(ctx, id) + if err != nil { + return nil, err + } + return stream, nil + default: + return nil, fmt.Errorf("protocol not supported") + } +} diff --git a/internal/cri/server/container_create.go b/internal/cri/server/container_create.go index 3318d382b50e..ca170c4738a9 100644 --- a/internal/cri/server/container_create.go +++ b/internal/cri/server/container_create.go @@ -247,8 +247,15 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta sandboxConfig.GetLogDirectory(), config.GetLogPath()) } - containerIO, err := cio.NewContainerIO(id, - cio.WithNewFIFOs(volatileContainerRootDir, config.GetTty(), config.GetStdin())) + var containerIO *cio.ContainerIO + switch ociRuntime.IOType { + case criconfig.IOTypeStreaming: + containerIO, err = cio.NewContainerIO(id, + cio.WithStreams(sandbox.Endpoint.Address, config.GetTty(), config.GetStdin())) + default: + containerIO, err = cio.NewContainerIO(id, + cio.WithNewFIFOs(volatileContainerRootDir, config.GetTty(), config.GetStdin())) + } if err != nil { return nil, fmt.Errorf("failed to create container io: %w", err) } diff --git a/internal/cri/server/container_execsync.go b/internal/cri/server/container_execsync.go index 44197d4b2ba0..2555bfe71638 100644 --- a/internal/cri/server/container_execsync.go +++ b/internal/cri/server/container_execsync.go @@ -24,16 +24,17 @@ import ( "syscall" "time" - containerd "github.com/containerd/containerd/v2/client" - containerdio "github.com/containerd/containerd/v2/pkg/cio" "github.com/containerd/containerd/v2/pkg/oci" "github.com/containerd/errdefs" "github.com/containerd/log" "k8s.io/client-go/tools/remotecommand" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" + containerd "github.com/containerd/containerd/v2/client" + "github.com/containerd/containerd/v2/internal/cri/config" cio "github.com/containerd/containerd/v2/internal/cri/io" "github.com/containerd/containerd/v2/internal/cri/util" + containerdio "github.com/containerd/containerd/v2/pkg/cio" cioutil "github.com/containerd/containerd/v2/pkg/ioutil" ) @@ -159,10 +160,28 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont log.G(ctx).Debugf("Generated exec id %q for container %q", execID, id) volatileRootDir := c.getVolatileContainerRootDir(id) var execIO *cio.ExecIO + process, err := task.Exec(ctx, execID, pspec, func(id string) (containerdio.IO, error) { - var err error - execIO, err = cio.NewExecIO(id, volatileRootDir, opts.tty, opts.stdin != nil) + cntr, err := c.containerStore.Get(container.ID()) + if err != nil { + return nil, fmt.Errorf("an error occurred when try to find container %q: %w", container.ID(), err) + } + sb, err := c.sandboxStore.Get(cntr.SandboxID) + if err != nil { + return nil, fmt.Errorf("an error occurred when try to find sandbox %q: %w", cntr.SandboxID, err) + } + ociRuntime, err := c.config.GetSandboxRuntime(sb.Config, sb.Metadata.RuntimeHandler) + if err != nil { + return nil, fmt.Errorf("failed to get sandbox runtime: %w", err) + } + switch ociRuntime.IOType { + case config.IOTypeStreaming: + execIO, err = cio.NewStreamExecIO(id, sb.Endpoint.Address, opts.tty, opts.stdin != nil) + default: + execIO, err = cio.NewFifoExecIO(id, volatileRootDir, opts.tty, opts.stdin != nil) + } + return execIO, err }, ) diff --git a/pkg/cio/io.go b/pkg/cio/io.go index 64267d0e9c8a..cc1bfbd32823 100644 --- a/pkg/cio/io.go +++ b/pkg/cio/io.go @@ -71,7 +71,9 @@ type Creator func(id string) (IO, error) // will be sent only to the first reads type Attach func(*FIFOSet) (IO, error) -// FIFOSet is a set of file paths to FIFOs for a task's standard IO streams +// FIFOSet is a set of file paths to FIFOs for a task's standard IO streams, +// Although it supports streaming io other than FIFOs, +// we do not change the name "FIFOSet" because it is referenced in too many codes. type FIFOSet struct { Config close func() error