Skip to content

Commit

Permalink
fix deadlock waiting for attached-dependencies
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
  • Loading branch information
ndeloof committed Nov 30, 2022
1 parent 06e7137 commit d4b37dc
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 7 deletions.
9 changes: 9 additions & 0 deletions pkg/compose/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ type logPrinter interface {
HandleEvent(event api.ContainerEvent)
Run(ctx context.Context, cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error)
Cancel()
Stop()
}

type printer struct {
queue chan api.ContainerEvent
consumer api.LogConsumer
stopC chan struct{}
}

// newLogPrinter builds a LogPrinter passing containers logs to LogConsumer
Expand All @@ -41,6 +43,7 @@ func newLogPrinter(consumer api.LogConsumer) logPrinter {
printer := printer{
consumer: consumer,
queue: queue,
stopC: make(chan struct{}),
}
return &printer
}
Expand All @@ -51,6 +54,10 @@ func (p *printer) Cancel() {
}
}

func (p *printer) Stop() {
p.stopC <- struct{}{}
}

func (p *printer) HandleEvent(event api.ContainerEvent) {
p.queue <- event
}
Expand All @@ -66,6 +73,8 @@ func (p *printer) Run(ctx context.Context, cascadeStop bool, exitCodeFrom string
select {
case <-ctx.Done():
return exitCode, ctx.Err()
case <-p.stopC:
return exitCode, nil
case event := <-p.queue:
container := event.Container
switch event.Type {
Expand Down
18 changes: 13 additions & 5 deletions pkg/compose/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import (

func (s *composeService) Start(ctx context.Context, projectName string, options api.StartOptions) error {
return progress.Run(ctx, func(ctx context.Context) error {
return s.start(ctx, strings.ToLower(projectName), options, nil)
return s.start(ctx, strings.ToLower(projectName), options, nil, nil)
})
}

func (s *composeService) start(ctx context.Context, projectName string, options api.StartOptions, listener api.ContainerEventListener) error {
func (s *composeService) start(ctx context.Context, projectName string, options api.StartOptions, listener api.ContainerEventListener, stopC chan struct{}) error {
project := options.Project
if project == nil {
var containers Containers
Expand All @@ -57,15 +57,17 @@ func (s *composeService) start(ctx context.Context, projectName string, options
}
}

eg, ctx := errgroup.WithContext(ctx)
watchCtx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
eg, gctx := errgroup.WithContext(ctx)
if listener != nil {
attached, err := s.attach(ctx, project, listener, options.AttachTo)
if err != nil {
return err
}

eg.Go(func() error {
return s.watchContainers(context.Background(), project.Name, options.AttachTo, listener, attached, func(container moby.Container) error {
return s.watchContainers(watchCtx, project.Name, options.AttachTo, listener, attached, func(container moby.Container) error {
return s.attachContainer(ctx, container, listener)
})
})
Expand Down Expand Up @@ -96,7 +98,13 @@ func (s *composeService) start(ctx context.Context, projectName string, options
}
}

return eg.Wait()
select {
case <-stopC:
cancelFunc()
return nil
case <-gctx.Done():
return eg.Wait()
}
}

// getDependencyCondition checks if service is depended on by other services
Expand Down
8 changes: 6 additions & 2 deletions pkg/compose/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
return err
}
if options.Start.Attach == nil {
return s.start(ctx, project.Name, options.Start, nil)
return s.start(ctx, project.Name, options.Start, nil, nil)
}
return nil
})
Expand All @@ -51,6 +51,7 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
}

printer := newLogPrinter(options.Start.Attach)
stopC := make(chan struct{})

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -74,18 +75,21 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
<-signalChan
printer.Cancel()
fmt.Println("Gracefully stopping... (press Ctrl+C again to force)")
stopC <- struct{}{}
stopFunc() //nolint:errcheck
printer.Stop()
}()

var exitCode int
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
code, err := printer.Run(context.Background(), options.Start.CascadeStop, options.Start.ExitCodeFrom, stopFunc)
fmt.Println("printer is done")
exitCode = code
return err
})

err = s.start(ctx, project.Name, options.Start, printer.HandleEvent)
err = s.start(ctx, project.Name, options.Start, printer.HandleEvent, stopC)
if err != nil {
return err
}
Expand Down

0 comments on commit d4b37dc

Please sign in to comment.