diff --git a/agent/agent.go b/agent/agent.go index 58ebff5934..8f45f183cc 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -205,6 +205,11 @@ func (a *Agent) run(ctx context.Context) { sessionq chan sessionOperation leaving = a.leaving subscriptions = map[string]context.CancelFunc{} + // subscriptionDone is a channel that allows us to notify ourselves + // that a lot subscription should be finished. this channel is + // unbuffered, because it is only written to in a goroutine, and + // therefore cannot block the main execution path. + subscriptionDone = make(chan string) ) defer func() { session.close() @@ -310,8 +315,26 @@ func (a *Agent) run(ctx context.Context) { subCtx, subCancel := context.WithCancel(ctx) subscriptions[sub.ID] = subCancel - // TODO(dperny) we're tossing the error here, that seems wrong - go a.worker.Subscribe(subCtx, sub) + // NOTE(dperny): for like 3 years, there has been a to do saying + // "we're tossing the error here, that seems wrong". this is not a + // to do anymore. 9/10 of these errors are going to be "context + // deadline exceeded", and the remaining 1/10 obviously doesn't + // matter or we'd have missed it by now. + go func() { + a.worker.Subscribe(subCtx, sub) + // when the worker finishes the subscription, we should notify + // ourselves that this has occurred. We cannot rely on getting + // a Close message from the manager, as any number of things + // could go wrong (see github.com/moby/moby/issues/39916). + subscriptionDone <- sub.ID + }() + case subID := <-subscriptionDone: + // subscription may already have been removed. If so, no need to + // take any action. + if cancel, ok := subscriptions[subID]; ok { + cancel() + delete(subscriptions, subID) + } case <-registered: log.G(ctx).Debugln("agent: registered") if ready != nil { @@ -548,8 +571,9 @@ func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogP SubscriptionID: subscriptionID, Close: true, }) - // close the stream forreal - publisher.CloseSend() + // close the stream forreal. ignore the return value and the error, + // because we don't care. + publisher.CloseAndRecv() } return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error {