Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pause queues #15928

Merged
merged 27 commits into from
Jan 22, 2022
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4501559
Start adding mechanism to return unhandled data
zeripath May 16, 2021
e329084
Create pushback interface
zeripath May 17, 2021
4fec9f3
Add Pausable interface to WorkerPool and Manager
zeripath May 17, 2021
22572ec
Implement Pausable and PushBack for the bytefifos
zeripath May 17, 2021
a73c034
Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQu…
zeripath May 19, 2021
8866235
Wire in UI for pausing
zeripath May 19, 2021
8d02208
add testcases and fix a few issues
zeripath May 20, 2021
e9cbc9e
Merge branch 'main' into pause-queues
lafriks May 20, 2021
7cc33d7
Merge branch 'main' into pause-queues
zeripath May 21, 2021
68525c5
Merge remote-tracking branch 'origin/main' into pause-queues
zeripath Jun 14, 2021
a9702a7
Merge branch 'main' into pause-queues
6543 Jun 17, 2021
faf5f75
Merge remote-tracking branch 'origin/main' into pause-queues
zeripath Jul 21, 2021
4730ec1
Merge branch 'main' into pause-queues
zeripath Jul 23, 2021
2f406c7
fix build
zeripath Jul 23, 2021
cd17d66
prevent "race" in the test
zeripath Jul 23, 2021
af1bacd
Merge branch 'main' into pause-queues
techknowlogick Jul 27, 2021
6ca3946
fix jsoniter mismerge
zeripath Jul 28, 2021
dfd16ce
Merge branch 'main' into pause-queues
zeripath Aug 7, 2021
09d7e15
Merge remote-tracking branch 'origin/main' into pause-queues
zeripath Jan 21, 2022
0f11dac
fix conflicts
zeripath Jan 21, 2022
899ed54
fix format
zeripath Jan 21, 2022
ebed513
Add warnings for no worker configurations and prevent data-loss with …
zeripath Jan 21, 2022
46bd99e
Merge branch 'main' into pause-queues
lafriks Jan 22, 2022
71f03fc
Merge remote-tracking branch 'origin/main' into pause-queues
zeripath Jan 22, 2022
cd6dde2
Use StopTimer
zeripath Jan 22, 2022
c8c2175
Merge branch 'main' into pause-queues
wxiaoguang Jan 22, 2022
061dd4a
Merge branch 'main' into pause-queues
zeripath Jan 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions modules/indexer/code/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ func Init() {
// Create the Queue
switch setting.Indexer.RepoType {
case "bleve", "elasticsearch":
handler := func(data ...queue.Data) {
handler := func(data ...queue.Data) []queue.Data {
idx, err := indexer.get()
if idx == nil || err != nil {
log.Error("Codes indexer handler: unable to get indexer!")
return
return data
}

for _, datum := range data {
Expand All @@ -153,6 +153,7 @@ func Init() {
continue
}
}
return nil
}

indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{})
Expand Down
5 changes: 3 additions & 2 deletions modules/indexer/issues/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func InitIssueIndexer(syncReindex bool) {
// Create the Queue
switch setting.Indexer.IssueType {
case "bleve", "elasticsearch":
handler := func(data ...queue.Data) {
handler := func(data ...queue.Data) []queue.Data {
indexer := holder.get()
if indexer == nil {
log.Error("Issue indexer handler: unable to get indexer!")
return
return data
}

iData := make([]*IndexerData, 0, len(data))
Expand All @@ -127,6 +127,7 @@ func InitIssueIndexer(syncReindex bool) {
if err := indexer.Index(iData); err != nil {
log.Error("Error whilst indexing: %v Error: %v", iData, err)
}
return nil
}

issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
Expand Down
3 changes: 2 additions & 1 deletion modules/indexer/stats/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import (
var statsQueue queue.UniqueQueue

// handle passed PR IDs and test the PRs
func handle(data ...queue.Data) {
func handle(data ...queue.Data) []queue.Data {
for _, datum := range data {
opts := datum.(int64)
if err := indexer.Index(opts); err != nil {
log.Error("stats queue indexer.Index(%d) failed: %v", opts, err)
}
}
return nil
}

func initStatsQueue() error {
Expand Down
3 changes: 2 additions & 1 deletion modules/notification/ui/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ func NewNotifier() base.Notifier {
return ns
}

func (ns *notificationService) handle(data ...queue.Data) {
func (ns *notificationService) handle(data ...queue.Data) []queue.Data {
for _, datum := range data {
opts := datum.(issueNotificationOpts)
if err := models.CreateOrUpdateIssueNotifications(opts.IssueID, opts.CommentID, opts.NotificationAuthorID, opts.ReceiverID); err != nil {
log.Error("Was unable to create issue notification: %v", err)
}
}
return nil
}

func (ns *notificationService) Run() {
Expand Down
7 changes: 7 additions & 0 deletions modules/queue/bytefifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type ByteFIFO interface {
Pop(ctx context.Context) ([]byte, error)
// Close this fifo
Close() error
// PushBack pushes data back to the top of the fifo
PushBack(ctx context.Context, data []byte) error
}

// UniqueByteFIFO defines a FIFO that Uniques its contents
Expand Down Expand Up @@ -50,6 +52,11 @@ func (*DummyByteFIFO) Len(ctx context.Context) int64 {
return 0
}

// PushBack pushes data back to the top of the fifo
func (*DummyByteFIFO) PushBack(ctx context.Context, data []byte) error {
return nil
}

var _ UniqueByteFIFO = &DummyUniqueByteFIFO{}

// DummyUniqueByteFIFO represents a dummy unique fifo
Expand Down
56 changes: 55 additions & 1 deletion modules/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ type Flushable interface {
IsEmpty() bool
}

// Pausable represents a pool or queue that is Pausable
type Pausable interface {
// IsPaused will return if the pool or queue is paused
IsPaused() bool
// Pause will pause the pool or queue
Pause()
// Resume will resume the pool or queue
Resume()
// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
IsPausedIsResumed() (paused, resumed <-chan struct{})
}

// ManagedPool is a simple interface to get certain details from a worker pool
type ManagedPool interface {
// AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
Expand Down Expand Up @@ -192,6 +204,14 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
wg.Done()
continue
}
if pausable, ok := mq.Managed.(Pausable); ok {
// no point flushing paused queues
if pausable.IsPaused() {
wg.Done()
continue
}
}

allEmpty = false
if flushable, ok := mq.Managed.(Flushable); ok {
log.Debug("Flushing (flushable) queue: %s", mq.Name)
Expand All @@ -215,7 +235,7 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
log.Debug("All queues are empty")
break
}
// Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushign
// Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushing
// but don't delay cancellation here.
select {
case <-ctx.Done():
Expand Down Expand Up @@ -298,6 +318,12 @@ func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.Can
return nil
}

// Flushable returns true if the queue is flushable
func (q *ManagedQueue) Flushable() bool {
_, ok := q.Managed.(Flushable)
return ok
}

// Flush flushes the queue with a timeout
func (q *ManagedQueue) Flush(timeout time.Duration) error {
if flushable, ok := q.Managed.(Flushable); ok {
Expand All @@ -315,6 +341,34 @@ func (q *ManagedQueue) IsEmpty() bool {
return true
}

// Pausable returns whether the queue is Pausable
func (q *ManagedQueue) Pausable() bool {
_, ok := q.Managed.(Pausable)
return ok
}

// Pause pauses the queue
func (q *ManagedQueue) Pause() {
if pausable, ok := q.Managed.(Pausable); ok {
pausable.Pause()
}
}

// IsPaused reveals if the queue is paused
func (q *ManagedQueue) IsPaused() bool {
if pausable, ok := q.Managed.(Pausable); ok {
return pausable.IsPaused()
}
return false
}

// Resume resumes the queue
func (q *ManagedQueue) Resume() {
if pausable, ok := q.Managed.(Pausable); ok {
pausable.Resume()
}
}

// NumberOfWorkers returns the number of workers in the queue
func (q *ManagedQueue) NumberOfWorkers() int {
if pool, ok := q.Managed.(ManagedPool); ok {
Expand Down
8 changes: 7 additions & 1 deletion modules/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Type string
type Data interface{}

// HandlerFunc is a function that takes a variable amount of data and processes it
type HandlerFunc func(...Data)
type HandlerFunc func(...Data) (unhandled []Data)

// NewQueueFunc is a function that creates a queue
type NewQueueFunc func(handler HandlerFunc, config, exemplar interface{}) (Queue, error)
Expand All @@ -61,6 +61,12 @@ type Queue interface {
Push(Data) error
}

// PushBackable queues can be pushed back to
type PushBackable interface {
// PushBack pushes data back to the top of the fifo
PushBack(Data) error
}

// DummyQueueType is the type for the dummy queue
const DummyQueueType Type = "dummy"

Expand Down
Loading