Skip to content

Commit

Permalink
add retry attempts in logger to wait for task start
Browse files Browse the repository at this point in the history
Signed-off-by: Mrudul Harwani <mharwani@amazon.com>
  • Loading branch information
mharwani committed Aug 1, 2023
1 parent 23b24a6 commit 3ce8c17
Showing 1 changed file with 33 additions and 11 deletions.
44 changes: 33 additions & 11 deletions pkg/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"path/filepath"
"sort"
"sync"
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
Expand Down Expand Up @@ -156,29 +157,45 @@ func WaitForLogger(dataStore, ns, id string) error {

// getContainerWait loads the container from ID and returns its wait channel
func getContainerWait(ctx context.Context, hostAddress string, config *logging.Config) (<-chan containerd.ExitStatus, error) {
// Try to retrieve container task for at most 10 seconds.
// FIXME: It's possible that container runtime takes longer than 10 seconds to be created.
// If taskutil can signal to the logger that the task has been created, this method would be better synchronized
taskTimeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

client, err := containerd.New(hostAddress, containerd.WithDefaultNamespace(config.Namespace))
if err != nil {
return nil, err
}
con, err := client.LoadContainer(ctx, config.ID)
con, err := client.LoadContainer(taskTimeoutCtx, config.ID)
if err != nil {
return nil, err
}
task, err := con.Task(ctx, nil)
if err != nil {
return nil, err

for {
select {
case <-taskTimeoutCtx.Done():
return nil, fmt.Errorf("timed out waiting for container task to start")
default:
task, err := con.Task(taskTimeoutCtx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
// If task was not found, it's possible that the container runtime is still being created.
// Try again in 10ms
time.Sleep(10 * time.Millisecond)
break
}
return nil, err
}
return task.Wait(ctx)
}
}
return task.Wait(ctx)
}

func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAddress string, config *logging.Config) error {
if err := driver.PreProcess(dataStore, config); err != nil {
return err
}
exitCh, err := getContainerWait(ctx, hostAddress, config)
if err != nil {
return err
}

// initialize goroutines to copy stdout and stderr streams to a closable pipe
stdoutR, stdoutW := io.Pipe()
Expand Down Expand Up @@ -220,9 +237,14 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAd
}()
go func() {
// close stdout and stderr upon container exit
defer stdoutW.Close()
defer stderrW.Close()

exitCh, err := getContainerWait(ctx, hostAddress, config)
if err != nil {
return
}
<-exitCh
stdoutW.Close()
stderrW.Close()
}()
wg.Wait()
return driver.PostProcess()
Expand Down

0 comments on commit 3ce8c17

Please sign in to comment.