Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: shutdown logger after container process exits #2337

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 50 additions & 0 deletions cmd/nerdctl/container_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"fmt"
"runtime"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -143,3 +144,52 @@ func TestLogsWithFailingContainer(t *testing.T) {
base.Cmd("logs", "-f", containerName).AssertNoOut("baz")
base.Cmd("rm", "-f", containerName).AssertOK()
}

func TestLogsWithRunningContainer(t *testing.T) {
Copy link
Member

@fahedouch fahedouch Jul 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what this test covers comparing to this one ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other test runs the log command after the container has stopped running, but this one tests logs during the task execution and as logs are being sent to the logger with the --follow option.

t.Parallel()
base := testutil.NewBase(t)
containerName := testutil.Identifier(t)
defer base.Cmd("rm", "-f", containerName).Run()
expected := make([]string, 10)
for i := 0; i < 10; i++ {
expected[i] = fmt.Sprint(i + 1)
}

base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage,
"sh", "-euc", "for i in `seq 1 10`; do echo $i; sleep 1; done").AssertOK()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"sh", "-euc", "for i in `seq 1 10`; do echo $i; sleep 1; done").AssertOK()
"sh", "-euc", "for i in `seq 1 10`; do echo $i; done").AssertOK()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added sleep to have both the container task and the logs command running concurrently

base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
}

func TestLogsWithoutNewlineOrEOF(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("FIXME: test does not work on Windows yet because containerd doesn't send an exit event appropriately after task exit on Windows")
}
t.Parallel()
base := testutil.NewBase(t)
containerName := testutil.Identifier(t)
defer base.Cmd("rm", "-f", containerName).Run()
expected := []string{"Hello World!", "There is no newline"}
base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage,
"printf", "'Hello World!\nThere is no newline'").AssertOK()
time.Sleep(3 * time.Second)
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
}

func TestLogsAfterRestartingContainer(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("FIXME: test does not work on Windows yet. Restarting a container fails with: failed to create shim task: hcs::CreateComputeSystem <id>: The requested operation for attach namespace failed.: unknown")
}
fahedouch marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()
base := testutil.NewBase(t)
containerName := testutil.Identifier(t)
defer base.Cmd("rm", "-f", containerName).Run()
base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage,
"printf", "'Hello World!\nThere is no newline'").AssertOK()
expected := []string{"Hello World!", "There is no newline"}
time.Sleep(3 * time.Second)
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
// restart and check logs again
base.Cmd("start", containerName)
time.Sleep(3 * time.Second)
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
}
5 changes: 3 additions & 2 deletions pkg/cmd/container/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func Create(ctx context.Context, client *containerd.Client, args []string, netMa
// 1, nerdctl run --name demo -it imagename
// 2, ctrl + c to stop demo container
// 3, nerdctl start/restart demo
logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, options.GOptions.Namespace)
logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, options.GOptions.Namespace, options.GOptions.Address)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -655,8 +655,9 @@ func writeCIDFile(path, id string) error {
}

// generateLogConfig creates a LogConfig for the current container store
func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, ns string) (logConfig logging.LogConfig, err error) {
func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, ns, hostAddress string) (logConfig logging.LogConfig, err error) {
var u *url.URL
logConfig.HostAddress = hostAddress
if u, err = url.Parse(logDriver); err == nil && u.Scheme != "" {
logConfig.LogURI = logDriver
} else {
Expand Down
4 changes: 4 additions & 0 deletions pkg/cmd/container/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func Logs(ctx context.Context, client *containerd.Client, container string, opti
// Setup goroutine to send stop event if container task finishes:
go func() {
<-waitCh
// Wait for logger to process remaining logs after container exit
if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil {
logrus.WithError(err).Error("failed to wait for logger shutdown")
}
logrus.Debugf("container task has finished, sending kill signal to log viewer")
stopChannel <- os.Interrupt
}()
Expand Down
113 changes: 104 additions & 9 deletions pkg/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ import (
"path/filepath"
"sort"
"sync"
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/runtime/v2/logging"
"github.com/containerd/nerdctl/pkg/lockutil"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -113,9 +116,10 @@ func Main(argv2 string) error {

// LogConfig is marshalled as "log-config.json"
type LogConfig struct {
Driver string `json:"driver"`
Opts map[string]string `json:"opts,omitempty"`
LogURI string `json:"-"`
Driver string `json:"driver"`
Opts map[string]string `json:"opts,omitempty"`
HostAddress string `json:"host"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
HostAddress string `json:"host"`
ContainerdAddress string `json:"containerdAddress"`

Ideally log-config.json should not contain the address, but it can be refactored later

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a default value for HostAddress ?

LogURI string `json:"-"`
}

// LogConfigFilePath returns the path of log-config.json
Expand All @@ -140,10 +144,76 @@ func LoadLogConfig(dataStore, ns, id string) (LogConfig, error) {
return logConfig, nil
}

func loggingProcessAdapter(driver Driver, dataStore string, config *logging.Config) error {
func getLockPath(dataStore, ns, id string) string {
return filepath.Join(dataStore, "containers", ns, id, "logger-lock")
}

// WaitForLogger waits until the logger has finished executing and processing container logs
func WaitForLogger(dataStore, ns, id string) error {
return lockutil.WithDirLock(getLockPath(dataStore, ns, id), func() error {
return nil
})
}

// getContainerWait loads the container from ID and returns its wait channel
func getContainerWait(ctx context.Context, hostAddress string, config *logging.Config) (<-chan containerd.ExitStatus, error) {
client, err := containerd.New(hostAddress, containerd.WithDefaultNamespace(config.Namespace))
if err != nil {
return nil, err
}
con, err := client.LoadContainer(ctx, config.ID)
if err != nil {
return nil, err
}

task, err := con.Task(ctx, nil)
if err == nil {
return task.Wait(ctx)
}
if !errdefs.IsNotFound(err) {
return nil, err
}

// If task was not found, it's possible that the container runtime is still being created.
// Retry every 100ms.
ticker := time.NewTicker(100 * time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ticker := time.NewTicker(100 * time.Millisecond)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("timed out waiting for container task to start")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return nil, fmt.Errorf("timed out waiting for container task to start")
return nil, errors.New("timed out waiting for container task to start")

case <-ticker.C:
task, err = con.Task(ctx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
continue
}
return nil, err
}
return task.Wait(ctx)
}
}
}

func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAddress string, config *logging.Config) error {
if err := driver.PreProcess(dataStore, config); err != nil {
return err
}

// initialize goroutines to copy stdout and stderr streams to a closable pipe
stdoutR, stdoutW := io.Pipe()
stderrR, stderrW := io.Pipe()
copyStream := func(reader io.Reader, writer *io.PipeWriter) {
// copy using a buffer of size 32K
buf := make([]byte, 32<<10)
_, err := io.CopyBuffer(writer, reader, buf)
if err != nil {
logrus.Errorf("failed to copy stream: %s", err)
}
}
go copyStream(config.Stdout, stdoutW)
go copyStream(config.Stderr, stderrW)

// scan and process logs from pipes
var wg sync.WaitGroup
wg.Add(3)
stdout := make(chan string, 10000)
Expand All @@ -161,12 +231,24 @@ func loggingProcessAdapter(driver Driver, dataStore string, config *logging.Conf
}
}

go processLogFunc(config.Stdout, stdout)
go processLogFunc(config.Stderr, stderr)
go processLogFunc(stdoutR, stdout)
go processLogFunc(stderrR, stderr)
go func() {
defer wg.Done()
fahedouch marked this conversation as resolved.
Show resolved Hide resolved
driver.Process(stdout, stderr)
}()
go func() {
// close stdout and stderr upon container exit
defer stdoutW.Close()
defer stderrW.Close()

exitCh, err := getContainerWait(ctx, hostAddress, config)
if err != nil {
logrus.Errorf("failed to get container task wait channel: %v", err)
return
}
<-exitCh
}()
wg.Wait()
return driver.PostProcess()
}
Expand All @@ -175,7 +257,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) {
if dataStore == "" {
return nil, errors.New("got empty data store")
}
return func(_ context.Context, config *logging.Config, ready func() error) error {
return func(ctx context.Context, config *logging.Config, ready func() error) error {
if config.Namespace == "" || config.ID == "" {
return errors.New("got invalid config")
}
Expand All @@ -189,11 +271,24 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) {
if err != nil {
return err
}
if err := ready(); err != nil {

loggerLock := getLockPath(dataStore, config.Namespace, config.ID)
f, err := os.Create(loggerLock)
if err != nil {
return err
}
defer f.Close()

// the logger will obtain an exclusive lock on a file until the container is
// stopped and the driver has finished processing all output,
// so that waiting log viewers can be signalled when the process is complete.
return lockutil.WithDirLock(loggerLock, func() error {
if err := ready(); err != nil {
return err
}

return loggingProcessAdapter(driver, dataStore, config)
return loggingProcessAdapter(ctx, driver, dataStore, logConfig.HostAddress, config)
})
} else if !errors.Is(err, os.ErrNotExist) {
// the file does not exist if the container was created with nerdctl < 0.20
return err
Expand Down