From cd48cde78f6d2597308fac1c7edf1a44f2d95f8b Mon Sep 17 00:00:00 2001 From: fahed dorgaa Date: Fri, 29 Mar 2024 16:43:33 +0100 Subject: [PATCH] support attachable binary I/O Signed-off-by: fahed dorgaa --- cmd/containerd-shim-runc-v2/process/io.go | 204 +++++++++++++++++- .../process/io_test.go | 20 ++ 2 files changed, 220 insertions(+), 4 deletions(-) diff --git a/cmd/containerd-shim-runc-v2/process/io.go b/cmd/containerd-shim-runc-v2/process/io.go index 466b506b18c68..f162d0a0fb31c 100644 --- a/cmd/containerd-shim-runc-v2/process/io.go +++ b/cmd/containerd-shim-runc-v2/process/io.go @@ -32,6 +32,8 @@ import ( "syscall" "time" + "github.com/containerd/containerd/v2/defaults" + "github.com/containerd/containerd/v2/pkg/cio" "github.com/containerd/containerd/v2/pkg/namespaces" "github.com/containerd/containerd/v2/pkg/stdio" "github.com/containerd/fifo" @@ -107,6 +109,16 @@ func createIO(ctx context.Context, id string, ioUID, ioGID int, stdio stdio.Stdi pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio)) case "binary": pio.io, err = NewBinaryIO(ctx, id, u) + case "attachablebinary": + var fifoSet *cio.FIFOSet + fifoSet, err = NewAttachableBinary(ctx, id, u) + if err != nil { + return nil, err + } + pio.stdio.Stdout = fifoSet.Stdout + pio.stdio.Stderr = fifoSet.Stderr + pio.copy = true + pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio)) case "file": filePath := u.Path if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil { @@ -243,8 +255,18 @@ func (c *countingWriteCloser) Close() error { return c.WriteCloser.Close() } +type pipesReader struct { + stdout io.ReadCloser + stderr io.ReadCloser +} + +type pipesWriter struct { + stdout io.WriteCloser + stderr io.WriteCloser +} + // NewBinaryIO runs a custom binary process for pluggable shim logging -func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err error) { +func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, retErr error) { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err @@ -252,14 +274,14 @@ func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err e var closers []func() error defer func() { - if err == nil { + if retErr == nil { return } - result := []error{err} + result := []error{retErr} for _, fn := range closers { result = append(result, fn()) } - err = errors.Join(result...) + retErr = errors.Join(result...) }() out, err := newPipe() @@ -307,6 +329,180 @@ func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err e }, nil } +// NewAttachableBinary runs a custom binary process and opens attachable fifos for pluggable shim logging +func NewAttachableBinary(ctx context.Context, id string, uri *url.URL) (*cio.FIFOSet, error) { + binaryFifos, err := cio.NewFIFOSetInDir(defaults.DefaultFIFODir, id, true) + if err != nil { + return nil, fmt.Errorf("failed to create birany fifos: %w", err) + } + + binaryFifosPipes, err := openBinaryFifos(ctx, binaryFifos) + if err != nil { + return nil, err + } + + attachableFifos, err := cio.NewFIFOSetInDir(filepath.Join(defaults.DefaultFIFODir, "attach"), id, true) + if err != nil { + return nil, fmt.Errorf("failed to create attachable fifos: %w", err) + } + type closer func() error + var closers []closer + + defer func() { + if err == nil { + return + } + result := []error{err} + for _, fn := range closers { + result = append(result, fn()) + } + err = errors.Join(result...) + }() + + pipesAttachableFifosPipes, err := openAttachableFifos(ctx, attachableFifos) + if err != nil { + return nil, err + } + closers = append(closers, []closer{pipesAttachableFifosPipes.stdout.Close, pipesAttachableFifosPipes.stderr.Close}...) + + binaryOut, err := newPipe() + if err != nil { + return nil, fmt.Errorf("failed to create binary stdout pipes: %w", err) + } + closers = append(closers, binaryOut.Close) + + binarySerr, err := newPipe() + if err != nil { + return nil, fmt.Errorf("failed to create binary stderr pipes: %w", err) + } + closers = append(closers, binarySerr.Close) + + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + closers = append(closers, r.Close, w.Close) + + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + cmd := NewBinaryCmd(uri, id, ns) + cmd.ExtraFiles = append(cmd.ExtraFiles, binaryOut.r, binarySerr.r, w) + // don't need to register this with the reaper or wait when + // running inside a shim + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start binary process: %w", err) + } + closers = append(closers, func() error { return cmd.Process.Kill() }) + + // close our side of the pipe after start + if err := w.Close(); err != nil { + return nil, fmt.Errorf("failed to close write pipe after start: %w", err) + } + + // wait for the logging binary to be ready + b := make([]byte, 1) + if _, err := r.Read(b); err != nil && err != io.EOF { + return nil, fmt.Errorf("failed to read from logging binary: %w", err) + } + stdoutWriters := io.MultiWriter(binaryOut.w, pipesAttachableFifosPipes.stdout) + stderrWriters := io.MultiWriter(binarySerr.w, pipesAttachableFifosPipes.stderr) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + if _, err := io.Copy(stdoutWriters, binaryFifosPipes.stdout); err != nil { + log.G(ctx).Debug(err.Error()) + log.G(ctx).Warn("error copying stdout") + } + wg.Done() + binaryOut.w.Close() + pipesAttachableFifosPipes.stdout.Close() + }() + go func() { + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + if _, err := io.Copy(stderrWriters, binaryFifosPipes.stderr); err != nil { + log.G(ctx).Debug(err.Error()) + log.G(ctx).Warn("error copying stdout") + } + wg.Done() + binarySerr.w.Close() + pipesAttachableFifosPipes.stderr.Close() + }() + go func() { + wg.Wait() + // Send SIGTERM first, so logger process has a chance to flush and exit properly + if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { + log.L.WithError(err).Warn("failed to send SIGTERM") + if err := cmd.Process.Kill(); err != nil { + log.L.WithError(err).Warn("failed to kill process after faulty SIGTERM") + } + return + } + + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + + select { + case err := <-done: + log.L.WithError(err).Warn("failed to kill shim logger process") + case <-time.After(binaryIOProcTermTimeout): + log.L.Warn("failed to wait for shim logger process to exit, killing") + err := cmd.Process.Kill() + if err != nil { + log.L.WithError(err).Warn("failed to kill shim logger process") + } + } + }() + return binaryFifos, nil +} + +func openBinaryFifos(ctx context.Context, fifos *cio.FIFOSet) (f pipesReader, retErr error) { + + if fifos.Stdout != "" { + if f.stdout, retErr = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil { + return f, fmt.Errorf("failed to open stdout fifo: %w", retErr) + } + } + if fifos.Stderr != "" { + defer func() { + if retErr != nil { + f.stdout.Close() + } + }() + if f.stderr, retErr = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil { + return f, fmt.Errorf("failed to open stderr fifo: %w", retErr) + } + } + return f, nil +} + +func openAttachableFifos(ctx context.Context, fifos *cio.FIFOSet) (f pipesWriter, retErr error) { + if fifos.Stdout != "" { + if f.stdout, retErr = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil { + return f, fmt.Errorf("failed to open stdout fifo: %w", retErr) + } + } + if fifos.Stderr != "" { + defer func() { + if retErr != nil { + f.stdout.Close() + } + }() + if f.stderr, retErr = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil { + return f, fmt.Errorf("failed to open stderr fifo: %w", retErr) + } + } + return f, nil +} + type binaryIO struct { cmd *exec.Cmd out, err *pipe diff --git a/cmd/containerd-shim-runc-v2/process/io_test.go b/cmd/containerd-shim-runc-v2/process/io_test.go index 30dabce26887b..6961a74889722 100644 --- a/cmd/containerd-shim-runc-v2/process/io_test.go +++ b/cmd/containerd-shim-runc-v2/process/io_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/containerd/containerd/v2/pkg/namespaces" + "github.com/containerd/containerd/v2/pkg/testutil" ) func TestNewBinaryIO(t *testing.T) { @@ -49,6 +50,25 @@ func TestNewBinaryIO(t *testing.T) { } } +func TestNewAttachableBinaryIO(t *testing.T) { + testutil.RequiresRoot(t) + + ctx := namespaces.WithNamespace(context.Background(), "test") + uri, _ := url.Parse("attachablebinary:///bin/echo?test") + + before := descriptorCount(t) + + _, err := NewAttachableBinary(ctx, "1", uri) + if err != nil { + t.Fatal(err) + } + + after := descriptorCount(t) + if after != before+9 { + t.Fatalf("descriptors weren't created correctly (%d != %d -1)", before, after) + } +} + func TestNewBinaryIOCleanup(t *testing.T) { ctx := namespaces.WithNamespace(context.Background(), "test") uri, _ := url.Parse("binary:///not/existing")