diff --git a/cmd/containerd-shim-runc-v2/process/io.go b/cmd/containerd-shim-runc-v2/process/io.go index 466b506b18c68..35c99d5545678 100644 --- a/cmd/containerd-shim-runc-v2/process/io.go +++ b/cmd/containerd-shim-runc-v2/process/io.go @@ -27,16 +27,19 @@ import ( "os" "os/exec" "path/filepath" - "sync" - "sync/atomic" - "syscall" - "time" + "github.com/containerd/containerd/v2/defaults" + "github.com/containerd/containerd/v2/pkg/cio" "github.com/containerd/containerd/v2/pkg/namespaces" "github.com/containerd/containerd/v2/pkg/stdio" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" "github.com/containerd/log" + + "sync" + "sync/atomic" + "syscall" + "time" ) const binaryIOProcTermTimeout = 12 * time.Second // Give logger process solid 10 seconds for cleanup @@ -107,6 +110,16 @@ func createIO(ctx context.Context, id string, ioUID, ioGID int, stdio stdio.Stdi pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio)) case "binary": pio.io, err = NewBinaryIO(ctx, id, u) + case "attachablebinary": + var fifoSet *cio.FIFOSet + fifoSet, err = NewAttachableBinary(ctx, id, u) + if err != nil { + return nil, err + } + pio.stdio.Stdout = fifoSet.Stdout + pio.stdio.Stderr = fifoSet.Stderr + pio.copy = true + pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio)) case "file": filePath := u.Path if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil { @@ -243,6 +256,16 @@ func (c *countingWriteCloser) Close() error { return c.WriteCloser.Close() } +type PipesReader struct { + Stdout io.ReadCloser + Stderr io.ReadCloser +} + +type PipesWriter struct { + Stdout io.WriteCloser + Stderr io.WriteCloser +} + // NewBinaryIO runs a custom binary process for pluggable shim logging func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err error) { ns, err := namespaces.NamespaceRequired(ctx) @@ -307,6 +330,201 @@ func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err e }, nil } +// NewAttachableBinary runs a custom binary process and opens attachable fifos for pluggable shim logging +func NewAttachableBinary(ctx context.Context, id string, uri *url.URL) (*cio.FIFOSet, error) { + binaryFifos, err := cio.NewFIFOSetInDir(defaults.DefaultFIFODir, id, true) + if err != nil { + return nil, fmt.Errorf("failed to create birany fifos: %w", err) + } + + binaryFifosPipes, err := openBinaryFifos(ctx, binaryFifos) + if err != nil { + return nil, err + } + + attachableFifos, err := cio.NewFIFOSetInDir(filepath.Join(defaults.DefaultFIFODir, "attach"), id, true) + if err != nil { + return nil, fmt.Errorf("failed to create attachable fifos: %w", err) + } + type closer func() error + var closers []closer + + defer func() { + if err == nil { + return + } + result := []error{err} + for _, fn := range closers { + result = append(result, fn()) + } + err = errors.Join(result...) + }() + + pipesAttachableFifosPipes, err := openAttachableFifos(ctx, attachableFifos) + if err != nil { + return nil, err + } + closers = append(closers, []closer{pipesAttachableFifosPipes.Stdout.Close, pipesAttachableFifosPipes.Stderr.Close}...) + + binaryOut, err := newPipe() + if err != nil { + return nil, fmt.Errorf("failed to create binary stdout pipes: %w", err) + } + closers = append(closers, binaryOut.Close) + + binarySerr, err := newPipe() + if err != nil { + return nil, fmt.Errorf("failed to create binary stderr pipes: %w", err) + } + closers = append(closers, binarySerr.Close) + + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + closers = append(closers, r.Close, w.Close) + + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + cmd := NewBinaryCmd(uri, id, ns) + cmd.ExtraFiles = append(cmd.ExtraFiles, binaryOut.r, binarySerr.r, w) + // don't need to register this with the reaper or wait when + // running inside a shim + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start binary process: %w", err) + } + closers = append(closers, func() error { return cmd.Process.Kill() }) + + // close our side of the pipe after start + if err := w.Close(); err != nil { + return nil, fmt.Errorf("failed to close write pipe after start: %w", err) + } + + // wait for the logging binary to be ready + b := make([]byte, 1) + if _, err := r.Read(b); err != nil && err != io.EOF { + return nil, fmt.Errorf("failed to read from logging binary: %w", err) + } + stdoutWriters := io.MultiWriter(binaryOut.w, pipesAttachableFifosPipes.Stdout) + stderrWriters := io.MultiWriter(binarySerr.w, pipesAttachableFifosPipes.Stderr) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + if _, err := io.Copy(stdoutWriters, binaryFifosPipes.Stdout); err != nil { + log.G(ctx).Debug(err.Error()) + log.G(ctx).Warn("error copying stdout") + } + wg.Done() + binaryOut.w.Close() + pipesAttachableFifosPipes.Stdout.Close() + }() + go func() { + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + if _, err := io.Copy(stderrWriters, binaryFifosPipes.Stderr); err != nil { + log.G(ctx).Debug(err.Error()) + log.G(ctx).Warn("error copying stdout") + } + wg.Done() + binarySerr.w.Close() + pipesAttachableFifosPipes.Stderr.Close() + }() + go func() { + wg.Wait() + // Send SIGTERM first, so logger process has a chance to flush and exit properly + if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { + log.L.WithError(err).Warn("failed to send SIGTERM") + if err := cmd.Process.Kill(); err != nil { + log.L.WithError(err).Warn("failed to kill process after faulty SIGTERM") + } + return + } + + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + + select { + case err := <-done: + log.L.WithError(err).Warn("failed to kill shim logger process") + case <-time.After(binaryIOProcTermTimeout): + log.L.Warn("failed to wait for shim logger process to exit, killing") + err := cmd.Process.Kill() + if err != nil { + log.L.WithError(err).Warn("failed to kill shim logger process") + } + } + }() + return binaryFifos, nil +} + +func openBinaryFifos(ctx context.Context, fifos *cio.FIFOSet) (f PipesReader, retErr error) { + defer func() { + if retErr != nil { + fifos.Close() + } + }() + + if fifos.Stdout != "" { + if f.Stdout, retErr = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil { + return f, fmt.Errorf("failed to open stdout fifo: %w", retErr) + } + defer func() { + if retErr != nil && f.Stdout != nil { + f.Stdout.Close() + } + }() + } + if fifos.Stderr != "" { + if f.Stderr, retErr = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil { + return f, fmt.Errorf("failed to open stderr fifo: %w", retErr) + } + defer func() { + if retErr != nil && f.Stderr != nil { + f.Stderr.Close() + } + }() + } + return f, nil +} + +func openAttachableFifos(ctx context.Context, fifos *cio.FIFOSet) (f PipesWriter, retErr error) { + defer func() { + if retErr != nil { + fifos.Close() + } + }() + + if fifos.Stdout != "" { + if f.Stdout, retErr = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil { + return f, fmt.Errorf("failed to open stdout fifo: %w", retErr) + } + defer func() { + if retErr != nil && f.Stdout != nil { + f.Stdout.Close() + } + }() + } + if fifos.Stderr != "" { + if f.Stderr, retErr = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); retErr != nil { + return f, fmt.Errorf("failed to open stderr fifo: %w", retErr) + } + defer func() { + if retErr != nil && f.Stderr != nil { + f.Stderr.Close() + } + }() + } + return f, nil +} + type binaryIO struct { cmd *exec.Cmd out, err *pipe diff --git a/cmd/containerd-shim-runc-v2/process/io_test.go b/cmd/containerd-shim-runc-v2/process/io_test.go index 30dabce26887b..6961a74889722 100644 --- a/cmd/containerd-shim-runc-v2/process/io_test.go +++ b/cmd/containerd-shim-runc-v2/process/io_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/containerd/containerd/v2/pkg/namespaces" + "github.com/containerd/containerd/v2/pkg/testutil" ) func TestNewBinaryIO(t *testing.T) { @@ -49,6 +50,25 @@ func TestNewBinaryIO(t *testing.T) { } } +func TestNewAttachableBinaryIO(t *testing.T) { + testutil.RequiresRoot(t) + + ctx := namespaces.WithNamespace(context.Background(), "test") + uri, _ := url.Parse("attachablebinary:///bin/echo?test") + + before := descriptorCount(t) + + _, err := NewAttachableBinary(ctx, "1", uri) + if err != nil { + t.Fatal(err) + } + + after := descriptorCount(t) + if after != before+9 { + t.Fatalf("descriptors weren't created correctly (%d != %d -1)", before, after) + } +} + func TestNewBinaryIOCleanup(t *testing.T) { ctx := namespaces.WithNamespace(context.Background(), "test") uri, _ := url.Parse("binary:///not/existing")