Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "fix scheduler queue deadlock" #3331

Merged
merged 1 commit into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ steps:
- name: test
image: golang:1.14.15
commands:
- go test -race ./...
- go test ./...
- go build -o /dev/null github.com/drone/drone/cmd/drone-server
- go build -o /dev/null -tags "oss nolimit" github.com/drone/drone/cmd/drone-server

Expand Down
12 changes: 5 additions & 7 deletions scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, e
variant: params.Variant,
labels: params.Labels,
channel: make(chan *core.Stage),
done: ctx.Done(),
}
q.Lock()
q.workers[w] = struct{}{}
Expand All @@ -109,6 +108,9 @@ func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, e

select {
case <-ctx.Done():
q.Lock()
delete(q.workers, w)
q.Unlock()
return nil, ctx.Err()
case b := <-w.channel:
return b, nil
Expand Down Expand Up @@ -209,12 +211,9 @@ func (q *queue) signal(ctx context.Context) error {
// }
select {
case w.channel <- item:
case <-w.done:
case <-time.After(q.interval):
delete(q.workers, w)
break loop
}

delete(q.workers, w)
break loop
}
}
return nil
Expand Down Expand Up @@ -242,7 +241,6 @@ type worker struct {
variant string
labels map[string]string
channel chan *core.Stage
done <-chan struct{}
}

type counter struct {
Expand Down
40 changes: 7 additions & 33 deletions scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,16 @@ func TestQueueCancel(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

ctx, cancel := context.WithCancel(context.Background())
store := mock.NewMockStageStore(controller)
store.EXPECT().ListIncomplete(gomock.Any()).Return(nil, nil)
store.EXPECT().ListIncomplete(ctx).Return(nil, nil)

q := newQueue(store)
q.ctx = ctx

var wg sync.WaitGroup
wg.Add(1)

ctx, cancel := context.WithCancel(context.Background())
go func() {
build, err := q.Request(ctx, core.Filter{OS: "linux/amd64", Arch: "amd64"})
if err != context.Canceled {
Expand Down Expand Up @@ -101,7 +102,10 @@ func TestQueuePush(t *testing.T) {
ctx := context.Background()
store := mock.NewMockStageStore(controller)

q := newQueue(store)
q := &queue{
store: store,
ready: make(chan struct{}, 1),
}
q.Schedule(ctx, item1)
q.Schedule(ctx, item2)
select {
Expand Down Expand Up @@ -352,33 +356,3 @@ func TestWithinLimits_Old(t *testing.T) {
}
}
}

func TestQueueContextCanceling(t *testing.T) {
listIncompleteResponse := []*core.Stage{
{ID: 1, OS: "linux/amd64", Arch: "amd64", Status: drone.StatusPending},
}

controller := gomock.NewController(t)
defer controller.Finish()

globCtx := context.Background()

mockStageStore := mock.NewMockStageStore(controller)
mockStageStore.EXPECT().ListIncomplete(globCtx).Return(listIncompleteResponse, nil).AnyTimes()

q := newQueue(mockStageStore)

for k := 0; k < 1000; k++ {
reqCtx, reqCanc := context.WithCancel(context.Background())
go reqCanc() // asynchronously cancel the context

stage, err := q.Request(reqCtx, core.Filter{OS: "linux/amd64", Arch: "amd64"})
if stage == nil && err == context.Canceled {
continue // we got the ctx canceled error
}
if stage == listIncompleteResponse[0] && err == nil {
continue // we got a stage before the context got canceled
}
t.Errorf("got neither the context canceled error nor the data: stage=%v err=%v", stage, err)
}
}