diff --git a/cmd/nerdctl/container_logs_test.go b/cmd/nerdctl/container_logs_test.go index 3f78ba8e2c..d7334dc929 100644 --- a/cmd/nerdctl/container_logs_test.go +++ b/cmd/nerdctl/container_logs_test.go @@ -18,6 +18,7 @@ package main import ( "fmt" + "runtime" "strings" "testing" "time" @@ -143,3 +144,52 @@ func TestLogsWithFailingContainer(t *testing.T) { base.Cmd("logs", "-f", containerName).AssertNoOut("baz") base.Cmd("rm", "-f", containerName).AssertOK() } + +func TestLogsWithRunningContainer(t *testing.T) { + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", "-f", containerName).Run() + expected := make([]string, 10) + for i := 0; i < 10; i++ { + expected[i] = fmt.Sprint(i + 1) + } + + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "sh", "-euc", "for i in `seq 1 10`; do echo $i; sleep 1; done").AssertOK() + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) +} + +func TestLogsWithoutNewlineOrEOF(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("FIXME: test does not work on Windows yet because containerd doesn't send an exit event appropriately after task exit on Windows") + } + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", "-f", containerName).Run() + expected := []string{"Hello World!", "There is no newline"} + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "printf", "'Hello World!\nThere is no newline'").AssertOK() + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) +} + +func TestLogsAfterRestartingContainer(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("FIXME: test does not work on Windows yet. Restarting a container fails with: failed to create shim task: hcs::CreateComputeSystem : The requested operation for attach namespace failed.: unknown") + } + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", "-f", containerName).Run() + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "printf", "'Hello World!\nThere is no newline'").AssertOK() + expected := []string{"Hello World!", "There is no newline"} + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) + // restart and check logs again + base.Cmd("start", containerName) + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) +} diff --git a/pkg/cmd/container/create.go b/pkg/cmd/container/create.go index 88246e3d7b..a595b9cea9 100644 --- a/pkg/cmd/container/create.go +++ b/pkg/cmd/container/create.go @@ -161,7 +161,7 @@ func Create(ctx context.Context, client *containerd.Client, args []string, netMa // 1, nerdctl run --name demo -it imagename // 2, ctrl + c to stop demo container // 3, nerdctl start/restart demo - logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, options.GOptions.Namespace) + logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, options.GOptions.Namespace, options.GOptions.Address) if err != nil { return nil, nil, err } @@ -655,8 +655,9 @@ func writeCIDFile(path, id string) error { } // generateLogConfig creates a LogConfig for the current container store -func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, ns string) (logConfig logging.LogConfig, err error) { +func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, ns, hostAddress string) (logConfig logging.LogConfig, err error) { var u *url.URL + logConfig.HostAddress = hostAddress if u, err = url.Parse(logDriver); err == nil && u.Scheme != "" { logConfig.LogURI = logDriver } else { diff --git a/pkg/cmd/container/logs.go b/pkg/cmd/container/logs.go index c971a15d48..b41e1d701c 100644 --- a/pkg/cmd/container/logs.go +++ b/pkg/cmd/container/logs.go @@ -90,6 +90,10 @@ func Logs(ctx context.Context, client *containerd.Client, container string, opti // Setup goroutine to send stop event if container task finishes: go func() { <-waitCh + // Wait for logger to process remaining logs after container exit + if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { + logrus.WithError(err).Error("failed to wait for logger shutdown") + } logrus.Debugf("container task has finished, sending kill signal to log viewer") stopChannel <- os.Interrupt }() diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index f81180503f..1122be01b4 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -27,9 +27,12 @@ import ( "path/filepath" "sort" "sync" + "time" + "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/runtime/v2/logging" + "github.com/containerd/nerdctl/pkg/lockutil" "github.com/sirupsen/logrus" ) @@ -113,9 +116,10 @@ func Main(argv2 string) error { // LogConfig is marshalled as "log-config.json" type LogConfig struct { - Driver string `json:"driver"` - Opts map[string]string `json:"opts,omitempty"` - LogURI string `json:"-"` + Driver string `json:"driver"` + Opts map[string]string `json:"opts,omitempty"` + HostAddress string `json:"host"` + LogURI string `json:"-"` } // LogConfigFilePath returns the path of log-config.json @@ -140,10 +144,76 @@ func LoadLogConfig(dataStore, ns, id string) (LogConfig, error) { return logConfig, nil } -func loggingProcessAdapter(driver Driver, dataStore string, config *logging.Config) error { +func getLockPath(dataStore, ns, id string) string { + return filepath.Join(dataStore, "containers", ns, id, "logger-lock") +} + +// WaitForLogger waits until the logger has finished executing and processing container logs +func WaitForLogger(dataStore, ns, id string) error { + return lockutil.WithDirLock(getLockPath(dataStore, ns, id), func() error { + return nil + }) +} + +// getContainerWait loads the container from ID and returns its wait channel +func getContainerWait(ctx context.Context, hostAddress string, config *logging.Config) (<-chan containerd.ExitStatus, error) { + client, err := containerd.New(hostAddress, containerd.WithDefaultNamespace(config.Namespace)) + if err != nil { + return nil, err + } + con, err := client.LoadContainer(ctx, config.ID) + if err != nil { + return nil, err + } + + task, err := con.Task(ctx, nil) + if err == nil { + return task.Wait(ctx) + } + if !errdefs.IsNotFound(err) { + return nil, err + } + + // If task was not found, it's possible that the container runtime is still being created. + // Retry every 100ms. + ticker := time.NewTicker(100 * time.Millisecond) + for { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("timed out waiting for container task to start") + case <-ticker.C: + task, err = con.Task(ctx, nil) + if err != nil { + if errdefs.IsNotFound(err) { + continue + } + return nil, err + } + return task.Wait(ctx) + } + } +} + +func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAddress string, config *logging.Config) error { if err := driver.PreProcess(dataStore, config); err != nil { return err } + + // initialize goroutines to copy stdout and stderr streams to a closable pipe + stdoutR, stdoutW := io.Pipe() + stderrR, stderrW := io.Pipe() + copyStream := func(reader io.Reader, writer *io.PipeWriter) { + // copy using a buffer of size 32K + buf := make([]byte, 32<<10) + _, err := io.CopyBuffer(writer, reader, buf) + if err != nil { + logrus.Errorf("failed to copy stream: %s", err) + } + } + go copyStream(config.Stdout, stdoutW) + go copyStream(config.Stderr, stderrW) + + // scan and process logs from pipes var wg sync.WaitGroup wg.Add(3) stdout := make(chan string, 10000) @@ -161,12 +231,24 @@ func loggingProcessAdapter(driver Driver, dataStore string, config *logging.Conf } } - go processLogFunc(config.Stdout, stdout) - go processLogFunc(config.Stderr, stderr) + go processLogFunc(stdoutR, stdout) + go processLogFunc(stderrR, stderr) go func() { defer wg.Done() driver.Process(stdout, stderr) }() + go func() { + // close stdout and stderr upon container exit + defer stdoutW.Close() + defer stderrW.Close() + + exitCh, err := getContainerWait(ctx, hostAddress, config) + if err != nil { + logrus.Errorf("failed to get container task wait channel: %v", err) + return + } + <-exitCh + }() wg.Wait() return driver.PostProcess() } @@ -175,7 +257,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { if dataStore == "" { return nil, errors.New("got empty data store") } - return func(_ context.Context, config *logging.Config, ready func() error) error { + return func(ctx context.Context, config *logging.Config, ready func() error) error { if config.Namespace == "" || config.ID == "" { return errors.New("got invalid config") } @@ -189,11 +271,24 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { if err != nil { return err } - if err := ready(); err != nil { + + loggerLock := getLockPath(dataStore, config.Namespace, config.ID) + f, err := os.Create(loggerLock) + if err != nil { return err } + defer f.Close() + + // the logger will obtain an exclusive lock on a file until the container is + // stopped and the driver has finished processing all output, + // so that waiting log viewers can be signalled when the process is complete. + return lockutil.WithDirLock(loggerLock, func() error { + if err := ready(); err != nil { + return err + } - return loggingProcessAdapter(driver, dataStore, config) + return loggingProcessAdapter(ctx, driver, dataStore, logConfig.HostAddress, config) + }) } else if !errors.Is(err, os.ErrNotExist) { // the file does not exist if the container was created with nerdctl < 0.20 return err