Skip to content

Commit

Permalink
Merge pull request #3331 from harness/revert-3330-queue-deadlock-fix
Browse files Browse the repository at this point in the history
Revert "fix scheduler queue deadlock"
  • Loading branch information
TP Honey committed Jul 5, 2023
2 parents 9b7f1d5 + ba806ec commit 5a80929
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ steps:
- name: test
image: golang:1.14.15
commands:
- go test -race ./...
- go test ./...
- go build -o /dev/null github.com/drone/drone/cmd/drone-server
- go build -o /dev/null -tags "oss nolimit" github.com/drone/drone/cmd/drone-server

Expand Down
12 changes: 5 additions & 7 deletions scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, e
variant: params.Variant,
labels: params.Labels,
channel: make(chan *core.Stage),
done: ctx.Done(),
}
q.Lock()
q.workers[w] = struct{}{}
Expand All @@ -109,6 +108,9 @@ func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, e

select {
case <-ctx.Done():
q.Lock()
delete(q.workers, w)
q.Unlock()
return nil, ctx.Err()
case b := <-w.channel:
return b, nil
Expand Down Expand Up @@ -209,12 +211,9 @@ func (q *queue) signal(ctx context.Context) error {
// }
select {
case w.channel <- item:
case <-w.done:
case <-time.After(q.interval):
delete(q.workers, w)
break loop
}

delete(q.workers, w)
break loop
}
}
return nil
Expand Down Expand Up @@ -242,7 +241,6 @@ type worker struct {
variant string
labels map[string]string
channel chan *core.Stage
done <-chan struct{}
}

type counter struct {
Expand Down
40 changes: 7 additions & 33 deletions scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,16 @@ func TestQueueCancel(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

ctx, cancel := context.WithCancel(context.Background())
store := mock.NewMockStageStore(controller)
store.EXPECT().ListIncomplete(gomock.Any()).Return(nil, nil)
store.EXPECT().ListIncomplete(ctx).Return(nil, nil)

q := newQueue(store)
q.ctx = ctx

var wg sync.WaitGroup
wg.Add(1)

ctx, cancel := context.WithCancel(context.Background())
go func() {
build, err := q.Request(ctx, core.Filter{OS: "linux/amd64", Arch: "amd64"})
if err != context.Canceled {
Expand Down Expand Up @@ -101,7 +102,10 @@ func TestQueuePush(t *testing.T) {
ctx := context.Background()
store := mock.NewMockStageStore(controller)

q := newQueue(store)
q := &queue{
store: store,
ready: make(chan struct{}, 1),
}
q.Schedule(ctx, item1)
q.Schedule(ctx, item2)
select {
Expand Down Expand Up @@ -352,33 +356,3 @@ func TestWithinLimits_Old(t *testing.T) {
}
}
}

func TestQueueContextCanceling(t *testing.T) {
listIncompleteResponse := []*core.Stage{
{ID: 1, OS: "linux/amd64", Arch: "amd64", Status: drone.StatusPending},
}

controller := gomock.NewController(t)
defer controller.Finish()

globCtx := context.Background()

mockStageStore := mock.NewMockStageStore(controller)
mockStageStore.EXPECT().ListIncomplete(globCtx).Return(listIncompleteResponse, nil).AnyTimes()

q := newQueue(mockStageStore)

for k := 0; k < 1000; k++ {
reqCtx, reqCanc := context.WithCancel(context.Background())
go reqCanc() // asynchronously cancel the context

stage, err := q.Request(reqCtx, core.Filter{OS: "linux/amd64", Arch: "amd64"})
if stage == nil && err == context.Canceled {
continue // we got the ctx canceled error
}
if stage == listIncompleteResponse[0] && err == nil {
continue // we got a stage before the context got canceled
}
t.Errorf("got neither the context canceled error nor the data: stage=%v err=%v", stage, err)
}
}

0 comments on commit 5a80929

Please sign in to comment.