Skip to content

Commit

Permalink
Avoid sending task keepalive ping after stream is closed (#6514)
Browse files Browse the repository at this point in the history
  • Loading branch information
bduffany committed May 17, 2024
1 parent 480b733 commit b1704b8
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions enterprise/server/scheduling/task_leaser/task_leaser.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type TaskLeaser struct {
mu sync.Mutex // protects stream
stream scpb.Scheduler_LeaseTaskClient
ttl time.Duration
closed bool
cancelFunc context.CancelFunc
}

Expand All @@ -49,7 +48,6 @@ func NewTaskLeaser(env environment.Env, executorID string, taskID string) *TaskL
taskID: taskID,
quit: make(chan struct{}),
ttl: 100 * time.Second,
closed: true, // set to false in Claim.
}
}

Expand All @@ -63,6 +61,12 @@ func (t *TaskLeaser) sendRequest(req *scpb.LeaseTaskRequest) (*scpb.LeaseTaskRes
func (t *TaskLeaser) pingServer(ctx context.Context) (b []byte, err error) {
t.mu.Lock()
defer t.mu.Unlock()

if t.closed() {
// Don't try to send keepalive pings after Close() was called.
return nil, nil
}

req := &scpb.LeaseTaskRequest{
ExecutorId: t.executorID,
TaskId: t.taskID,
Expand Down Expand Up @@ -153,7 +157,6 @@ func (t *TaskLeaser) Claim(ctx context.Context) (context.Context, []byte, error)
t.stream = stream
serializedTask, err := t.pingServer(leaseTaskCtx)
if err == nil {
t.closed = false
defer t.keepLease(leaseTaskCtx)
log.CtxInfof(ctx, "Worker leased task: %q", t.taskID)
}
Expand All @@ -162,12 +165,22 @@ func (t *TaskLeaser) Claim(ctx context.Context) (context.Context, []byte, error)
return ctx, serializedTask, err
}

func (t *TaskLeaser) closed() bool {
select {
case <-t.quit:
return true
default:
return false
}
}

func (t *TaskLeaser) Close(ctx context.Context, taskErr error, retry bool) {
t.mu.Lock()
defer t.mu.Unlock()
log.CtxInfof(ctx, "TaskLeaser %q Close() called with err: %v", t.taskID, taskErr)
if t.closed {
if t.closed() {
log.CtxInfof(ctx, "TaskLeaser %q was already closed. Short-circuiting.", t.taskID)
return
}
close(t.quit) // This cancels our lease-keep-alive background goroutine.

Expand Down Expand Up @@ -217,8 +230,6 @@ func (t *TaskLeaser) Close(ctx context.Context, taskErr error, retry bool) {
} else {
log.CtxInfof(ctx, "TaskLeaser %q: closed cleanly :)", t.taskID)
}

t.closed = true
}

func APIKey() string {
Expand Down

0 comments on commit b1704b8

Please sign in to comment.