From b1704b82f28033acdba17df1b29f43a985b53446 Mon Sep 17 00:00:00 2001 From: Brandon Duffany Date: Fri, 17 May 2024 09:21:33 -0400 Subject: [PATCH] Avoid sending task keepalive ping after stream is closed (#6514) --- .../scheduling/task_leaser/task_leaser.go | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/enterprise/server/scheduling/task_leaser/task_leaser.go b/enterprise/server/scheduling/task_leaser/task_leaser.go index ba26f0e1746..a4a827a3504 100644 --- a/enterprise/server/scheduling/task_leaser/task_leaser.go +++ b/enterprise/server/scheduling/task_leaser/task_leaser.go @@ -38,7 +38,6 @@ type TaskLeaser struct { mu sync.Mutex // protects stream stream scpb.Scheduler_LeaseTaskClient ttl time.Duration - closed bool cancelFunc context.CancelFunc } @@ -49,7 +48,6 @@ func NewTaskLeaser(env environment.Env, executorID string, taskID string) *TaskL taskID: taskID, quit: make(chan struct{}), ttl: 100 * time.Second, - closed: true, // set to false in Claim. } } @@ -63,6 +61,12 @@ func (t *TaskLeaser) sendRequest(req *scpb.LeaseTaskRequest) (*scpb.LeaseTaskRes func (t *TaskLeaser) pingServer(ctx context.Context) (b []byte, err error) { t.mu.Lock() defer t.mu.Unlock() + + if t.closed() { + // Don't try to send keepalive pings after Close() was called. + return nil, nil + } + req := &scpb.LeaseTaskRequest{ ExecutorId: t.executorID, TaskId: t.taskID, @@ -153,7 +157,6 @@ func (t *TaskLeaser) Claim(ctx context.Context) (context.Context, []byte, error) t.stream = stream serializedTask, err := t.pingServer(leaseTaskCtx) if err == nil { - t.closed = false defer t.keepLease(leaseTaskCtx) log.CtxInfof(ctx, "Worker leased task: %q", t.taskID) } @@ -162,12 +165,22 @@ func (t *TaskLeaser) Claim(ctx context.Context) (context.Context, []byte, error) return ctx, serializedTask, err } +func (t *TaskLeaser) closed() bool { + select { + case <-t.quit: + return true + default: + return false + } +} + func (t *TaskLeaser) Close(ctx context.Context, taskErr error, retry bool) { t.mu.Lock() defer t.mu.Unlock() log.CtxInfof(ctx, "TaskLeaser %q Close() called with err: %v", t.taskID, taskErr) - if t.closed { + if t.closed() { log.CtxInfof(ctx, "TaskLeaser %q was already closed. Short-circuiting.", t.taskID) + return } close(t.quit) // This cancels our lease-keep-alive background goroutine. @@ -217,8 +230,6 @@ func (t *TaskLeaser) Close(ctx context.Context, taskErr error, retry bool) { } else { log.CtxInfof(ctx, "TaskLeaser %q: closed cleanly :)", t.taskID) } - - t.closed = true } func APIKey() string {