From 4501559d811dd9e6f485ce254cf3cb54f1fd7bb5 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 16 May 2021 21:26:06 +0100 Subject: [PATCH 01/14] Start adding mechanism to return unhandled data Signed-off-by: Andrew Thornton --- modules/indexer/code/indexer.go | 5 +++-- modules/indexer/issues/indexer.go | 5 +++-- modules/indexer/stats/queue.go | 3 ++- modules/notification/ui/ui.go | 3 ++- modules/queue/queue.go | 2 +- modules/queue/queue_channel_test.go | 6 ++++-- modules/queue/queue_disk_channel_test.go | 3 ++- modules/queue/queue_disk_test.go | 3 ++- modules/queue/unique_queue_channel.go | 7 +++++-- modules/queue/unique_queue_disk_channel.go | 6 ++++-- modules/queue/unique_queue_wrapped.go | 7 +++++-- modules/task/task.go | 3 ++- services/mailer/mailer.go | 3 ++- services/pull/check.go | 3 ++- services/pull/check_test.go | 3 ++- services/repository/push.go | 3 ++- 16 files changed, 43 insertions(+), 22 deletions(-) diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index 67fa43eda89dc..6a88ab40b433c 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -132,11 +132,11 @@ func Init() { // Create the Queue switch setting.Indexer.RepoType { case "bleve", "elasticsearch": - handler := func(data ...queue.Data) { + handler := func(data ...queue.Data) []queue.Data { idx, err := indexer.get() if idx == nil || err != nil { log.Error("Codes indexer handler: unable to get indexer!") - return + return data } for _, datum := range data { @@ -158,6 +158,7 @@ func Init() { } } } + return nil } indexerQueue = queue.CreateQueue("code_indexer", handler, &IndexerData{}) diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 676b6686ea5b2..f82203e4c3484 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -101,11 +101,11 @@ func InitIssueIndexer(syncReindex bool) { // Create the Queue switch setting.Indexer.IssueType { case "bleve", "elasticsearch": - handler := func(data ...queue.Data) { + handler := func(data ...queue.Data) []queue.Data { indexer := holder.get() if indexer == nil { log.Error("Issue indexer handler: unable to get indexer!") - return + return data } iData := make([]*IndexerData, 0, setting.Indexer.IssueQueueBatchNumber) @@ -125,6 +125,7 @@ func InitIssueIndexer(syncReindex bool) { if err := indexer.Index(iData); err != nil { log.Error("Error whilst indexing: %v Error: %v", iData, err) } + return nil } issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{}) diff --git a/modules/indexer/stats/queue.go b/modules/indexer/stats/queue.go index 8309cfcd3bc4e..c9af68fad3103 100644 --- a/modules/indexer/stats/queue.go +++ b/modules/indexer/stats/queue.go @@ -17,13 +17,14 @@ import ( var statsQueue queue.UniqueQueue // handle passed PR IDs and test the PRs -func handle(data ...queue.Data) { +func handle(data ...queue.Data) []queue.Data { for _, datum := range data { opts := datum.(int64) if err := indexer.Index(opts); err != nil { log.Error("stats queue indexer.Index(%d) failed: %v", opts, err) } } + return nil } func initStatsQueue() error { diff --git a/modules/notification/ui/ui.go b/modules/notification/ui/ui.go index b1374f5608fd4..024d83907b1d4 100644 --- a/modules/notification/ui/ui.go +++ b/modules/notification/ui/ui.go @@ -37,13 +37,14 @@ func NewNotifier() base.Notifier { return ns } -func (ns *notificationService) handle(data ...queue.Data) { +func (ns *notificationService) handle(data ...queue.Data) []queue.Data { for _, datum := range data { opts := datum.(issueNotificationOpts) if err := models.CreateOrUpdateIssueNotifications(opts.IssueID, opts.CommentID, opts.NotificationAuthorID, opts.ReceiverID); err != nil { log.Error("Was unable to create issue notification: %v", err) } } + return nil } func (ns *notificationService) Run() { diff --git a/modules/queue/queue.go b/modules/queue/queue.go index 7159048c11689..54d85999d6d0a 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -36,7 +36,7 @@ type Type string type Data interface{} // HandlerFunc is a function that takes a variable amount of data and processes it -type HandlerFunc func(...Data) +type HandlerFunc func(...Data) (unhandled []Data) // NewQueueFunc is a function that creates a queue type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error) diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go index e7abe5b50b764..01220ca6167df 100644 --- a/modules/queue/queue_channel_test.go +++ b/modules/queue/queue_channel_test.go @@ -13,11 +13,12 @@ import ( func TestChannelQueue(t *testing.T) { handleChan := make(chan *testData) - handle := func(data ...Data) { + handle := func(data ...Data) []Data { for _, datum := range data { testDatum := datum.(*testData) handleChan <- testDatum } + return nil } nilFn := func(_ func()) {} @@ -52,12 +53,13 @@ func TestChannelQueue(t *testing.T) { func TestChannelQueue_Batch(t *testing.T) { handleChan := make(chan *testData) - handle := func(data ...Data) { + handle := func(data ...Data) []Data { assert.True(t, len(data) == 2) for _, datum := range data { testDatum := datum.(*testData) handleChan <- testDatum } + return nil } nilFn := func(_ func()) {} diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index 561f98ca907b6..db8d866f257a3 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -14,12 +14,13 @@ import ( func TestPersistableChannelQueue(t *testing.T) { handleChan := make(chan *testData) - handle := func(data ...Data) { + handle := func(data ...Data) []Data { assert.True(t, len(data) == 2) for _, datum := range data { testDatum := datum.(*testData) handleChan <- testDatum } + return nil } queueShutdown := []func(){} diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go index 1f884d4f8d76d..3c3e85be7497f 100644 --- a/modules/queue/queue_disk_test.go +++ b/modules/queue/queue_disk_test.go @@ -16,12 +16,13 @@ import ( func TestLevelQueue(t *testing.T) { handleChan := make(chan *testData) - handle := func(data ...Data) { + handle := func(data ...Data) []Data { assert.True(t, len(data) == 2) for _, datum := range data { testDatum := datum.(*testData) handleChan <- testDatum } + return nil } var lock sync.Mutex diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index 5bec67c4d355c..ca9e9b9150827 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -63,13 +63,16 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue workers: config.Workers, name: config.Name, } - queue.WorkerPool = NewWorkerPool(func(data ...Data) { + queue.WorkerPool = NewWorkerPool(func(data ...Data) (unhandled []Data) { for _, datum := range data { queue.lock.Lock() delete(queue.table, datum) queue.lock.Unlock() - handle(datum) + if u := handle(datum); u != nil { + unhandled = append(unhandled, u...) + } } + return unhandled }, config.WorkerPoolConfiguration) queue.qid = GetManager().Add(queue, ChannelUniqueQueueType, config, exemplar) diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index 65a3941519954..1243800e28d81 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -89,13 +89,14 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac closed: make(chan struct{}), } - levelQueue, err := NewLevelUniqueQueue(func(data ...Data) { + levelQueue, err := NewLevelUniqueQueue(func(data ...Data) []Data { for _, datum := range data { err := queue.Push(datum) if err != nil && err != ErrAlreadyInQueue { log.Error("Unable push to channelled queue: %v", err) } } + return nil }, levelCfg, exemplar) if err == nil { queue.delayedStarter = delayedStarter{ @@ -163,13 +164,14 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func()) q.lock.Lock() if q.internal == nil { - err := q.setInternal(atShutdown, func(data ...Data) { + err := q.setInternal(atShutdown, func(data ...Data) []Data { for _, datum := range data { err := q.Push(datum) if err != nil && err != ErrAlreadyInQueue { log.Error("Unable push to channelled queue: %v", err) } } + return nil }, q.channelQueue.exemplar) q.lock.Unlock() if err != nil { diff --git a/modules/queue/unique_queue_wrapped.go b/modules/queue/unique_queue_wrapped.go index 8c815218ddd21..32fa9ed970dbc 100644 --- a/modules/queue/unique_queue_wrapped.go +++ b/modules/queue/unique_queue_wrapped.go @@ -73,7 +73,7 @@ func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue // wrapped.handle is passed to the delayedStarting internal queue and is run to handle // data passed to - wrapped.handle = func(data ...Data) { + wrapped.handle = func(data ...Data) (unhandled []Data) { for _, datum := range data { wrapped.tlock.Lock() if !wrapped.ready { @@ -87,8 +87,11 @@ func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue } } wrapped.tlock.Unlock() - handle(datum) + if u := handle(datum); u != nil { + unhandled = append(unhandled, u...) + } } + return unhandled } _ = GetManager().Add(queue, WrappedUniqueQueueType, config, exemplar) return wrapped, nil diff --git a/modules/task/task.go b/modules/task/task.go index 0443517c01694..12aece85bc3bc 100644 --- a/modules/task/task.go +++ b/modules/task/task.go @@ -44,13 +44,14 @@ func Init() error { return nil } -func handle(data ...queue.Data) { +func handle(data ...queue.Data) []queue.Data { for _, datum := range data { task := datum.(*models.Task) if err := Run(task); err != nil { log.Error("Run task failed: %v", err) } } + return nil } // MigrateRepository add migration repository to task diff --git a/services/mailer/mailer.go b/services/mailer/mailer.go index 6b86734bf845c..ecb331113f79d 100644 --- a/services/mailer/mailer.go +++ b/services/mailer/mailer.go @@ -319,7 +319,7 @@ func NewContext() { Sender = &dummySender{} } - mailQueue = queue.CreateQueue("mail", func(data ...queue.Data) { + mailQueue = queue.CreateQueue("mail", func(data ...queue.Data) []queue.Data { for _, datum := range data { msg := datum.(*Message) gomailMsg := msg.ToMessage() @@ -330,6 +330,7 @@ func NewContext() { log.Trace("E-mails sent %s: %s", gomailMsg.GetHeader("To"), msg.Info) } } + return nil }, &Message{}) go graceful.GetManager().RunWithShutdownFns(mailQueue.Run) diff --git a/services/pull/check.go b/services/pull/check.go index 3ec76de5e8748..3ec83f37070d4 100644 --- a/services/pull/check.go +++ b/services/pull/check.go @@ -214,7 +214,7 @@ func InitializePullRequests(ctx context.Context) { } // handle passed PR IDs and test the PRs -func handle(data ...queue.Data) { +func handle(data ...queue.Data) []queue.Data { for _, datum := range data { id, _ := strconv.ParseInt(datum.(string), 10, 64) @@ -238,6 +238,7 @@ func handle(data ...queue.Data) { } checkAndUpdateStatus(pr) } + return nil } // CheckPrsForBaseBranch check all pulls with bseBrannch diff --git a/services/pull/check_test.go b/services/pull/check_test.go index f6614ea0ad27f..89e07240a9c8f 100644 --- a/services/pull/check_test.go +++ b/services/pull/check_test.go @@ -21,11 +21,12 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) { idChan := make(chan int64, 10) - q, err := queue.NewChannelUniqueQueue(func(data ...queue.Data) { + q, err := queue.NewChannelUniqueQueue(func(data ...queue.Data) []queue.Data { for _, datum := range data { id, _ := strconv.ParseInt(datum.(string), 10, 64) idChan <- id } + return nil }, queue.ChannelUniqueQueueConfiguration{ WorkerPoolConfiguration: queue.WorkerPoolConfiguration{ QueueLength: 10, diff --git a/services/repository/push.go b/services/repository/push.go index bed5c575fe81d..c17ca8fa98f52 100644 --- a/services/repository/push.go +++ b/services/repository/push.go @@ -26,13 +26,14 @@ import ( var pushQueue queue.Queue // handle passed PR IDs and test the PRs -func handle(data ...queue.Data) { +func handle(data ...queue.Data) []queue.Data { for _, datum := range data { opts := datum.([]*repo_module.PushUpdateOptions) if err := pushUpdates(opts); err != nil { log.Error("pushUpdate failed: %v", err) } } + return nil } func initPushQueue() error { From e329084507abbfc2bb6f5efa3b6e2f877af86f63 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 17 May 2021 20:44:27 +0100 Subject: [PATCH 02/14] Create pushback interface Signed-off-by: Andrew Thornton --- modules/queue/queue.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/modules/queue/queue.go b/modules/queue/queue.go index 54d85999d6d0a..1c6adfaa40d38 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -61,6 +61,12 @@ type Queue interface { Push(Data) error } +// PushBackable queues can be pushed back to +type PushBackable interface { + // PushBack pushes data back to the top of the fifo + PushBack(Data) error +} + // DummyQueueType is the type for the dummy queue const DummyQueueType Type = "dummy" From 4fec9f3b0c12492e445d0595630df24ec22fe8ca Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 17 May 2021 18:11:38 +0100 Subject: [PATCH 03/14] Add Pausable interface to WorkerPool and Manager Signed-off-by: Andrew Thornton --- modules/queue/manager.go | 22 ++++- modules/queue/workerpool.go | 155 +++++++++++++++++++++++++----------- 2 files changed, 131 insertions(+), 46 deletions(-) diff --git a/modules/queue/manager.go b/modules/queue/manager.go index a6d48575ab674..f16633b416cc6 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -53,6 +53,18 @@ type Flushable interface { IsEmpty() bool } +// Pausable represents a pool or queue that is Pausable +type Pausable interface { + // IsPaused will return if the pool or queue is paused + IsPaused() bool + // Pause will pause the pool or queue + Pause() + // Resume will resume the pool or queue + Resume() + // IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed + IsPausedIsResumed() (bool, <-chan struct{}) +} + // ManagedPool is a simple interface to get certain details from a worker pool type ManagedPool interface { // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group @@ -183,6 +195,14 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error wg.Done() continue } + if pausable, ok := mq.Managed.(Pausable); ok { + // no point flushing paused queues + if pausable.IsPaused() { + wg.Done() + continue + } + } + allEmpty = false if flushable, ok := mq.Managed.(Flushable); ok { log.Debug("Flushing (flushable) queue: %s", mq.Name) @@ -206,7 +226,7 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error log.Debug("All queues are empty") break } - // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushign + // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushing // but don't delay cancellation here. select { case <-ctx.Done(): diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 0176e2e0b2d20..0641cb812289c 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -22,6 +22,8 @@ type WorkerPool struct { lock sync.Mutex baseCtx context.Context baseCtxCancel context.CancelFunc + paused bool + resumed chan struct{} cond *sync.Cond qid int64 maxNumberOfWorkers int @@ -35,6 +37,9 @@ type WorkerPool struct { numInQueue int64 } +var _ Flushable = &WorkerPool{} +var _ ManagedPool = &WorkerPool{} + // WorkerPoolConfiguration is the basic configuration for a WorkerPool type WorkerPoolConfiguration struct { QueueLength int @@ -50,11 +55,14 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo ctx, cancel := context.WithCancel(context.Background()) dataChan := make(chan Data, config.QueueLength) + resumed := make(chan struct{}) + close(resumed) pool := &WorkerPool{ baseCtx: ctx, baseCtxCancel: cancel, batchLength: config.BatchLength, dataChan: dataChan, + resumed: resumed, handle: handle, blockTimeout: config.BlockTimeout, boostTimeout: config.BoostTimeout, @@ -69,7 +77,7 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo func (p *WorkerPool) Push(data Data) { atomic.AddInt64(&p.numInQueue, 1) p.lock.Lock() - if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) { + if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) && !p.paused { if p.numberOfWorkers == 0 { p.zeroBoost() } else { @@ -290,13 +298,52 @@ func (p *WorkerPool) Wait() { p.cond.Wait() } +// IsPaused returns if the pool is paused +func (p *WorkerPool) IsPaused() bool { + p.lock.Lock() + defer p.lock.Unlock() + return p.paused +} + +// IsPausedIsResumed returns if the pool is paused and a channel that is closed when it is resumed +func (p *WorkerPool) IsPausedIsResumed() (bool, <-chan struct{}) { + p.lock.Lock() + defer p.lock.Unlock() + return p.paused, p.resumed +} + +// Pause pauses the WorkerPool +func (p *WorkerPool) Pause() { + p.lock.Lock() + defer p.lock.Unlock() + if !p.paused { + p.resumed = make(chan struct{}) + p.paused = true + } +} + +// Resume resumes the WorkerPool +func (p *WorkerPool) Resume() { + p.lock.Lock() + defer p.lock.Unlock() + if p.paused { + close(p.resumed) + p.paused = false + } +} + // CleanUp will drain the remaining contents of the channel // This should be called after AddWorkers context is closed func (p *WorkerPool) CleanUp(ctx context.Context) { log.Trace("WorkerPool: %d CleanUp", p.qid) close(p.dataChan) for data := range p.dataChan { - p.handle(data) + if unhandled := p.handle(data); unhandled != nil { + if unhandled != nil { + log.Error("Unhandled Data in clean-up of queue %d", p.qid) + } + } + atomic.AddInt64(&p.numInQueue, -1) select { case <-ctx.Done(): @@ -327,7 +374,9 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error { for { select { case data := <-p.dataChan: - p.handle(data) + if unhandled := p.handle(data); unhandled != nil { + log.Error("Unhandled Data whilst flushing queue %d", p.qid) + } atomic.AddInt64(&p.numInQueue, -1) case <-p.baseCtx.Done(): return p.baseCtx.Err() @@ -341,13 +390,48 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error { func (p *WorkerPool) doWork(ctx context.Context) { delay := time.Millisecond * 300 + + timer := time.NewTimer(0) + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + var data = make([]Data, 0, p.batchLength) for { + paused, resumed := p.IsPausedIsResumed() + if paused { + log.Trace("Worker for Queue %d Pausing", p.qid) + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) + } + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) + } + select { + case <-resumed: + log.Trace("Worker for Queue %d Resuming", p.qid) + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + case <-ctx.Done(): + log.Trace("Worker shutting down") + return + } + } select { case <-ctx.Done(): if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) + } atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) } log.Trace("Worker shutting down") @@ -357,59 +441,40 @@ func (p *WorkerPool) doWork(ctx context.Context) { // the dataChan has been closed - we should finish up: if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) + } atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) } log.Trace("Worker shutting down") return } data = append(data, datum) + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } if len(data) >= p.batchLength { log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) + } atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) data = make([]Data, 0, p.batchLength) + } else { + timer.Reset(delay) } - default: - timer := time.NewTimer(delay) - select { - case <-ctx.Done(): - util.StopTimer(timer) - if len(data) > 0 { - log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) - atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) - } - log.Trace("Worker shutting down") - return - case datum, ok := <-p.dataChan: - util.StopTimer(timer) - if !ok { - // the dataChan has been closed - we should finish up: - if len(data) > 0 { - log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) - atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) - } - log.Trace("Worker shutting down") - return - } - data = append(data, datum) - if len(data) >= p.batchLength { - log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) - atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) - data = make([]Data, 0, p.batchLength) - } - case <-timer.C: - delay = time.Millisecond * 100 - if len(data) > 0 { - log.Trace("Handling: %d data, %v", len(data), data) - p.handle(data...) - atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) - data = make([]Data, 0, p.batchLength) + case <-timer.C: + delay = time.Millisecond * 100 + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) } - + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) + data = make([]Data, 0, p.batchLength) } } } From 22572ecf8b6034b0b34495f55ce4717d9451ac79 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 17 May 2021 17:12:34 +0100 Subject: [PATCH 04/14] Implement Pausable and PushBack for the bytefifos Signed-off-by: Andrew Thornton --- modules/queue/bytefifo.go | 7 ++ modules/queue/queue_bytefifo.go | 143 ++++++++++++++++++++++------ modules/queue/queue_disk.go | 5 + modules/queue/queue_redis.go | 6 ++ modules/queue/unique_queue_disk.go | 5 + modules/queue/unique_queue_redis.go | 12 +++ 6 files changed, 150 insertions(+), 28 deletions(-) diff --git a/modules/queue/bytefifo.go b/modules/queue/bytefifo.go index 3a10c8e1259c6..bb98d468fb09b 100644 --- a/modules/queue/bytefifo.go +++ b/modules/queue/bytefifo.go @@ -16,6 +16,8 @@ type ByteFIFO interface { Pop(ctx context.Context) ([]byte, error) // Close this fifo Close() error + // PushBack pushes data back to the top of the fifo + PushBack(ctx context.Context, data []byte) error } // UniqueByteFIFO defines a FIFO that Uniques its contents @@ -50,6 +52,11 @@ func (*DummyByteFIFO) Len(ctx context.Context) int64 { return 0 } +// PushBack pushes data back to the top of the fifo +func (*DummyByteFIFO) PushBack(ctx context.Context, data []byte) error { + return nil +} + var _ UniqueByteFIFO = &DummyUniqueByteFIFO{} // DummyUniqueByteFIFO represents a dummy unique fifo diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index 3ea61aad0e4c5..6d2b262b9ff45 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -8,6 +8,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "code.gitea.io/gitea/modules/log" @@ -52,8 +53,7 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) - return &ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + q := &ByteFIFOQueue{ byteFIFO: byteFIFO, typ: typ, shutdownCtx: shutdownCtx, @@ -65,7 +65,17 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem name: config.Name, waitOnEmpty: config.WaitOnEmpty, pushed: make(chan struct{}, 1), - }, nil + } + q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) { + for _, unhandled := range handle(data...) { + if fail := q.PushBack(unhandled); fail != nil { + failed = append(failed, fail) + } + } + return + }, config.WorkerPoolConfiguration) + + return q, nil } // Name returns the name of this queue @@ -78,6 +88,25 @@ func (q *ByteFIFOQueue) Push(data Data) error { return q.PushFunc(data, nil) } +// PushBack pushes data to the fifo +func (q *ByteFIFOQueue) PushBack(data Data) error { + if !assignableTo(data, q.exemplar) { + return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) + } + json := jsoniter.ConfigCompatibleWithStandardLibrary + bs, err := json.Marshal(data) + if err != nil { + return err + } + defer func() { + select { + case q.pushed <- struct{}{}: + default: + } + }() + return q.byteFIFO.PushBack(q.terminateCtx, bs) +} + // PushFunc pushes data to the fifo func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { if !assignableTo(data, q.exemplar) { @@ -88,14 +117,12 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { if err != nil { return err } - if q.waitOnEmpty { - defer func() { - select { - case q.pushed <- struct{}{}: - default: - } - }() - } + defer func() { + select { + case q.pushed <- struct{}{}: + default: + } + }() return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn) } @@ -109,6 +136,15 @@ func (q *ByteFIFOQueue) IsEmpty() bool { return q.byteFIFO.Len(q.terminateCtx) == 0 } +// Flush flushes the ByteFIFOQueue +func (q *ByteFIFOQueue) Flush(timeout time.Duration) error { + select { + case q.pushed <- struct{}{}: + default: + } + return q.WorkerPool.Flush(timeout) +} + // Run runs the bytefifo queue func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) { atShutdown(q.Shutdown) @@ -143,31 +179,64 @@ func (q *ByteFIFOQueue) readToChan() { // Default backoff values backOffTime := time.Millisecond * 100 + backOffTimer := time.NewTimer(0) + if !backOffTimer.Stop() { + select { + case <-backOffTimer.C: + default: + } + } loop: for { - err := q.doPop() - if err == errQueueEmpty { - log.Trace("%s: %s Waiting on Empty", q.typ, q.name) + paused, resumed := q.IsPausedIsResumed() + if paused { + log.Trace("Queue %s pausing", q.name) select { - case <-q.pushed: - // reset backOffTime - backOffTime = 100 * time.Millisecond - continue loop + case <-resumed: + log.Trace("Queue %s resuming", q.name) case <-q.shutdownCtx.Done(): - // Oops we've been shutdown whilst waiting - // Make sure the worker pool is shutdown too + // tell the pool to shutdown. q.baseCtxCancel() return + case data := <-q.dataChan: + if err := q.PushBack(data); err != nil { + log.Error("Unable to push back data into queue %s", q.name) + } + atomic.AddInt64(&q.numInQueue, -1) } } - // Reset the backOffTime if there is no error or an unmarshalError - if err == nil || err == errUnmarshal { - backOffTime = 100 * time.Millisecond + // empty the pushed channel + select { + case <-q.pushed: + default: + } + + err := q.doPop() + + if !backOffTimer.Stop() { + select { + case <-backOffTimer.C: + default: + } } if err != nil { + if err == errQueueEmpty && q.waitOnEmpty { + log.Trace("%s: %s Waiting on Empty", q.typ, q.name) + + // reset the backoff time but don't set the timer + backOffTime = 100 * time.Millisecond + } else if err == errUnmarshal { + // reset the timer and backoff + backOffTime = 100 * time.Millisecond + backOffTimer.Reset(backOffTime) + } else { + // backoff + backOffTimer.Reset(backOffTime) + } + // Need to Backoff select { case <-q.shutdownCtx.Done(): @@ -175,8 +244,13 @@ loop: // Make sure the worker pool is shutdown too q.baseCtxCancel() return - case <-time.After(backOffTime): - // OK we've waited - so backoff a bit + case <-q.pushed: + // Data has been pushed to the fifo (or flush has been called) + // reset the backoff time + backOffTime = 100 * time.Millisecond + continue loop + case <-backOffTimer.C: + // Calculate the next backoff time backOffTime += backOffTime / 2 if backOffTime > maxBackOffTime { backOffTime = maxBackOffTime @@ -184,6 +258,10 @@ loop: continue loop } } + + // Reset the backoff time + backOffTime = 100 * time.Millisecond + select { case <-q.shutdownCtx.Done(): // Oops we've been shutdown @@ -288,9 +366,8 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) - return &ByteFIFOUniqueQueue{ + q := &ByteFIFOUniqueQueue{ ByteFIFOQueue: ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), byteFIFO: byteFIFO, typ: typ, shutdownCtx: shutdownCtx, @@ -301,7 +378,17 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun workers: config.Workers, name: config.Name, }, - }, nil + } + q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) { + for _, unhandled := range handle(data...) { + if fail := q.PushBack(unhandled); fail != nil { + failed = append(failed, fail) + } + } + return + }, config.WorkerPoolConfiguration) + + return q, nil } // Has checks if the provided data is in the queue diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index 911233a5d9a01..2691ab02f51ee 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -94,6 +94,11 @@ func (fifo *LevelQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn fu return fifo.internal.LPush(data) } +// PushBack pushes data to the top of the fifo +func (fifo *LevelQueueByteFIFO) PushBack(ctx context.Context, data []byte) error { + return fifo.internal.RPush(data) +} + // Pop pops data from the start of the fifo func (fifo *LevelQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.internal.RPop() diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index a5fb866dc1e11..84ab235d5efbf 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -57,6 +57,7 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) type redisClient interface { RPush(ctx context.Context, key string, args ...interface{}) *redis.IntCmd + LPush(ctx context.Context, key string, args ...interface{}) *redis.IntCmd LPop(ctx context.Context, key string) *redis.StringCmd LLen(ctx context.Context, key string) *redis.IntCmd SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd @@ -103,6 +104,11 @@ func (fifo *RedisByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() return fifo.client.RPush(ctx, fifo.queueName, data).Err() } +// PushBack pushes data to the top of the fifo +func (fifo *RedisByteFIFO) PushBack(ctx context.Context, data []byte) error { + return fifo.client.LPush(ctx, fifo.queueName, data).Err() +} + // Pop pops data from the start of the fifo func (fifo *RedisByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes() diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go index bb0eb7d950c59..dae32f75a8518 100644 --- a/modules/queue/unique_queue_disk.go +++ b/modules/queue/unique_queue_disk.go @@ -93,6 +93,11 @@ func (fifo *LevelUniqueQueueByteFIFO) PushFunc(ctx context.Context, data []byte, return fifo.internal.LPushFunc(data, fn) } +// PushBack pushes data to the top of the fifo +func (fifo *LevelUniqueQueueByteFIFO) PushBack(ctx context.Context, data []byte) error { + return fifo.internal.RPush(data) +} + // Pop pops data from the start of the fifo func (fifo *LevelUniqueQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.internal.RPop() diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go index 7474c096655d3..477d5dd81f1d5 100644 --- a/modules/queue/unique_queue_redis.go +++ b/modules/queue/unique_queue_redis.go @@ -105,6 +105,18 @@ func (fifo *RedisUniqueByteFIFO) PushFunc(ctx context.Context, data []byte, fn f return fifo.client.RPush(ctx, fifo.queueName, data).Err() } +// PushBack pushes data to the top of the fifo +func (fifo *RedisUniqueByteFIFO) PushBack(ctx context.Context, data []byte) error { + added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result() + if err != nil { + return err + } + if added == 0 { + return ErrAlreadyInQueue + } + return fifo.client.LPush(ctx, fifo.queueName, data).Err() +} + // Pop pops data from the start of the fifo func (fifo *RedisUniqueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes() From a73c034fa83e69433690bf134e8d8a8564c1ce4b Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Wed, 19 May 2021 21:05:15 +0100 Subject: [PATCH 05/14] Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues Signed-off-by: Andrew Thornton --- modules/queue/queue_channel.go | 67 +++++++++++++++++++++- modules/queue/queue_disk_channel.go | 58 +++++++++++++------ modules/queue/unique_queue_channel.go | 59 ++++++++++++++++++- modules/queue/unique_queue_disk_channel.go | 33 +++++++++-- 4 files changed, 191 insertions(+), 26 deletions(-) diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 4df64b69ee5ee..e9cce7055551c 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -7,6 +7,8 @@ package queue import ( "context" "fmt" + "sync/atomic" + "time" "code.gitea.io/gitea/modules/log" ) @@ -51,7 +53,6 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) queue := &ChannelQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), shutdownCtx: shutdownCtx, shutdownCtxCancel: shutdownCtxCancel, terminateCtx: terminateCtx, @@ -60,6 +61,23 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro workers: config.Workers, name: config.Name, } + queue.WorkerPool = NewWorkerPool(func(data ...Data) []Data { + unhandled := handle(data...) + if len(unhandled) > 0 { + // We can only pushback to the channel if we're paused. + if queue.IsPaused() { + atomic.AddInt64(&queue.numInQueue, int64(len(unhandled))) + go func() { + for _, datum := range data { + queue.dataChan <- datum + } + }() + return nil + } + } + return unhandled + }, config.WorkerPoolConfiguration) + queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar) return queue, nil } @@ -81,6 +99,52 @@ func (q *ChannelQueue) Push(data Data) error { return nil } +// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager +func (q *ChannelQueue) Flush(timeout time.Duration) error { + q.lock.Lock() + paused := q.paused + q.lock.Unlock() + if paused { + return nil + } + ctx, cancel := q.commonRegisterWorkers(1, timeout, true) + defer cancel() + return q.FlushWithContext(ctx) +} + +// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty +func (q *ChannelQueue) FlushWithContext(ctx context.Context) error { + log.Trace("ChannelQueue: %d Flush", q.qid) + for { + q.lock.Lock() + paused := q.paused + q.lock.Unlock() + if paused { + return nil + } + select { + case data := <-q.dataChan: + if q.IsPaused() { + // we're paused so we should push this back and stop + // (whilst handle will check this too we need to stop the flusher for this to work.) + go func() { + q.dataChan <- data + }() + return nil + } else if unhandled := q.handle(data); unhandled != nil { + log.Error("Unhandled Data whilst flushing queue %d", q.qid) + } + atomic.AddInt64(&q.numInQueue, -1) + case <-q.baseCtx.Done(): + return q.baseCtx.Err() + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } + } +} + // Shutdown processing from this queue func (q *ChannelQueue) Shutdown() { q.lock.Lock() @@ -94,6 +158,7 @@ func (q *ChannelQueue) Shutdown() { log.Trace("ChannelQueue: %s Shutting down", q.name) go func() { log.Trace("ChannelQueue: %s Flushing", q.name) + // We can't use Cleanup here because that will close the channel if err := q.FlushWithContext(q.terminateCtx); err != nil { log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name) return diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index c3a1c5781ef09..e508529d8561b 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -51,7 +51,20 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( } config := configInterface.(PersistableChannelQueueConfiguration) - channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ + queue := &PersistableChannelQueue{ + closed: make(chan struct{}), + } + + wrappedHandle := func(data ...Data) (failed []Data) { + for _, unhandled := range handle(data...) { + if fail := queue.PushBack(unhandled); fail != nil { + failed = append(failed, fail) + } + } + return + } + + channelQueue, err := NewChannelQueue(wrappedHandle, ChannelQueueConfiguration{ WorkerPoolConfiguration: WorkerPoolConfiguration{ QueueLength: config.QueueLength, BatchLength: config.BatchLength, @@ -84,15 +97,12 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( DataDir: config.DataDir, } - levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar) + levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar) if err == nil { - queue := &PersistableChannelQueue{ - channelQueue: channelQueue.(*ChannelQueue), - delayedStarter: delayedStarter{ - internal: levelQueue.(*LevelQueue), - name: config.Name, - }, - closed: make(chan struct{}), + queue.channelQueue = channelQueue.(*ChannelQueue) + queue.delayedStarter = delayedStarter{ + internal: levelQueue.(*LevelQueue), + name: config.Name, } _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar) return queue, nil @@ -102,16 +112,13 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( return nil, ErrInvalidConfiguration{cfg: cfg} } - queue := &PersistableChannelQueue{ - channelQueue: channelQueue.(*ChannelQueue), - delayedStarter: delayedStarter{ - cfg: levelCfg, - underlying: LevelQueueType, - timeout: config.Timeout, - maxAttempts: config.MaxAttempts, - name: config.Name, - }, - closed: make(chan struct{}), + queue.channelQueue = channelQueue.(*ChannelQueue) + queue.delayedStarter = delayedStarter{ + cfg: levelCfg, + underlying: LevelQueueType, + timeout: config.Timeout, + maxAttempts: config.MaxAttempts, + name: config.Name, } _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar) return queue, nil @@ -132,6 +139,19 @@ func (q *PersistableChannelQueue) Push(data Data) error { } } +// PushBack will push the indexer data to queue +func (q *PersistableChannelQueue) PushBack(data Data) error { + select { + case <-q.closed: + if pbr, ok := q.internal.(PushBackable); ok { + return pbr.PushBack(data) + } + return q.internal.Push(data) + default: + return q.channelQueue.Push(data) + } +} + // Run starts to run the queue func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name) diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index ca9e9b9150827..6f6770d44d1ff 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -8,6 +8,8 @@ import ( "context" "fmt" "sync" + "sync/atomic" + "time" "code.gitea.io/gitea/modules/log" ) @@ -69,7 +71,16 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue delete(queue.table, datum) queue.lock.Unlock() if u := handle(datum); u != nil { - unhandled = append(unhandled, u...) + if queue.IsPaused() { + // We can only pushback to the channel if we're paused. + go func() { + if err := queue.Push(u); err != nil { + log.Error("Unable to push back to queue %d", queue.qid) + } + }() + } else { + unhandled = append(unhandled, u...) + } } } return unhandled @@ -131,6 +142,52 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { return has, nil } +// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager +func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error { + q.lock.Lock() + paused := q.paused + q.lock.Unlock() + if paused { + return nil + } + ctx, cancel := q.commonRegisterWorkers(1, timeout, true) + defer cancel() + return q.FlushWithContext(ctx) +} + +// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty +func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error { + log.Trace("ChannelUniqueQueue: %d Flush", q.qid) + for { + q.lock.Lock() + paused := q.paused + q.lock.Unlock() + if paused { + return nil + } + select { + case data := <-q.dataChan: + if q.IsPaused() { + // we're paused so we should push this back and stop + // (whilst handle will check this too we need to stop the flusher for this to work.) + go func() { + q.dataChan <- data + }() + return nil + } else if unhandled := q.handle(data); unhandled != nil { + log.Error("Unhandled Data whilst flushing queue %d", q.qid) + } + atomic.AddInt64(&q.numInQueue, -1) + case <-q.baseCtx.Done(): + return q.baseCtx.Err() + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } + } +} + // Shutdown processing from this queue func (q *ChannelUniqueQueue) Shutdown() { log.Trace("ChannelUniqueQueue: %s Shutting down", q.name) diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index 1243800e28d81..e1d92d1270dcb 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -51,7 +51,20 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac } config := configInterface.(PersistableChannelUniqueQueueConfiguration) - channelUniqueQueue, err := NewChannelUniqueQueue(handle, ChannelUniqueQueueConfiguration{ + queue := &PersistableChannelUniqueQueue{ + closed: make(chan struct{}), + } + + wrappedHandle := func(data ...Data) (failed []Data) { + for _, unhandled := range handle(data...) { + if fail := queue.PushBack(unhandled); fail != nil { + failed = append(failed, fail) + } + } + return + } + + channelUniqueQueue, err := NewChannelUniqueQueue(wrappedHandle, ChannelUniqueQueueConfiguration{ WorkerPoolConfiguration: WorkerPoolConfiguration{ QueueLength: config.QueueLength, BatchLength: config.BatchLength, @@ -84,10 +97,7 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac DataDir: config.DataDir, } - queue := &PersistableChannelUniqueQueue{ - channelQueue: channelUniqueQueue.(*ChannelUniqueQueue), - closed: make(chan struct{}), - } + queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue) levelQueue, err := NewLevelUniqueQueue(func(data ...Data) []Data { for _, datum := range data { @@ -143,6 +153,19 @@ func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) err } } +// PushBack will push the indexer data to queue +func (q *PersistableChannelUniqueQueue) PushBack(data Data) error { + select { + case <-q.closed: + if pbr, ok := q.internal.(PushBackable); ok { + return pbr.PushBack(data) + } + return q.internal.Push(data) + default: + return q.channelQueue.Push(data) + } +} + // Has will test if the queue has the data func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) { // This is more difficult... From 88662352e5abf74add2187cfe5e5385775c3858b Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Wed, 19 May 2021 21:42:02 +0100 Subject: [PATCH 06/14] Wire in UI for pausing Signed-off-by: Andrew Thornton --- modules/queue/manager.go | 34 +++++++++++++++++++++++++++++++++ options/locale/locale_en-US.ini | 6 ++++++ routers/admin/admin.go | 24 +++++++++++++++++++++++ routers/routes/web.go | 2 ++ templates/admin/queue.tmpl | 29 ++++++++++++++++++++++++++++ 5 files changed, 95 insertions(+) diff --git a/modules/queue/manager.go b/modules/queue/manager.go index f16633b416cc6..9810edabde614 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -310,6 +310,12 @@ func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.Can return nil } +// Flushable returns true if the queue is flushable +func (q *ManagedQueue) Flushable() bool { + _, ok := q.Managed.(Flushable) + return ok +} + // Flush flushes the queue with a timeout func (q *ManagedQueue) Flush(timeout time.Duration) error { if flushable, ok := q.Managed.(Flushable); ok { @@ -327,6 +333,34 @@ func (q *ManagedQueue) IsEmpty() bool { return true } +// Pausable returns whether the queue is Pausable +func (q *ManagedQueue) Pausable() bool { + _, ok := q.Managed.(Pausable) + return ok +} + +// Pause pauses the queue +func (q *ManagedQueue) Pause() { + if pausable, ok := q.Managed.(Pausable); ok { + pausable.Pause() + } +} + +// IsPaused reveals if the queue is paused +func (q *ManagedQueue) IsPaused() bool { + if pausable, ok := q.Managed.(Pausable); ok { + return pausable.IsPaused() + } + return false +} + +// Resume resumes the queue +func (q *ManagedQueue) Resume() { + if pausable, ok := q.Managed.(Pausable); ok { + pausable.Resume() + } +} + // NumberOfWorkers returns the number of workers in the queue func (q *ManagedQueue) NumberOfWorkers() int { if pool, ok := q.Managed.(ManagedPool); ok { diff --git a/options/locale/locale_en-US.ini b/options/locale/locale_en-US.ini index ac1a0d9726395..6491d7e5da4d8 100644 --- a/options/locale/locale_en-US.ini +++ b/options/locale/locale_en-US.ini @@ -2540,6 +2540,12 @@ monitor.queue.pool.flush.title = Flush Queue monitor.queue.pool.flush.desc = Flush will add a worker that will terminate once the queue is empty, or it times out. monitor.queue.pool.flush.submit = Add Flush Worker monitor.queue.pool.flush.added = Flush Worker added for %[1]s +monitor.queue.pool.pause.title = Pause Queue +monitor.queue.pool.pause.desc = Pausing a Queue will prevent it from processing data +monitor.queue.pool.pause.submit = Pause Queue +monitor.queue.pool.resume.title = Resume Queue +monitor.queue.pool.resume.desc = Set this queue to resume work +monitor.queue.pool.resume.submit = Resume Queue monitor.queue.settings.title = Pool Settings monitor.queue.settings.desc = Pools dynamically grow with a boost in response to their worker queue blocking. These changes will not affect current worker groups. diff --git a/routers/admin/admin.go b/routers/admin/admin.go index c2d94ab9c9710..58df89de09c6f 100644 --- a/routers/admin/admin.go +++ b/routers/admin/admin.go @@ -392,6 +392,30 @@ func Flush(ctx *context.Context) { ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10)) } +// Pause pauses a queue +func Pause(ctx *context.Context) { + qid := ctx.ParamsInt64("qid") + mq := queue.GetManager().GetManagedQueue(qid) + if mq == nil { + ctx.Status(404) + return + } + mq.Pause() + ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10)) +} + +// Resume resumes a queue +func Resume(ctx *context.Context) { + qid := ctx.ParamsInt64("qid") + mq := queue.GetManager().GetManagedQueue(qid) + if mq == nil { + ctx.Status(404) + return + } + mq.Resume() + ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10)) +} + // AddWorkers adds workers to a worker group func AddWorkers(ctx *context.Context) { qid := ctx.ParamsInt64("qid") diff --git a/routers/routes/web.go b/routers/routes/web.go index 008c745d6e090..c7e41bc3a8a01 100644 --- a/routers/routes/web.go +++ b/routers/routes/web.go @@ -452,6 +452,8 @@ func RegisterRoutes(m *web.Route) { m.Post("/add", admin.AddWorkers) m.Post("/cancel/{pid}", admin.WorkerCancel) m.Post("/flush", admin.Flush) + m.Post("/pause", admin.Pause) + m.Post("/resume", admin.Resume) }) }) diff --git a/templates/admin/queue.tmpl b/templates/admin/queue.tmpl index 3d9cc95592c58..d2d2c83baf487 100644 --- a/templates/admin/queue.tmpl +++ b/templates/admin/queue.tmpl @@ -92,6 +92,35 @@ + {{if .Queue.Pausable}} + {{if .Queue.IsPaused}} +

+ {{.i18n.Tr "admin.monitor.queue.pool.resume.title"}} +

+
+

{{.i18n.Tr "admin.monitor.queue.pool.resume.desc"}}

+
+ {{$.CsrfTokenHtml}} +
+ +
+
+
+ {{else}} +

+ {{.i18n.Tr "admin.monitor.queue.pool.pause.title"}} +

+
+

{{.i18n.Tr "admin.monitor.queue.pool.pause.desc"}}

+
+ {{$.CsrfTokenHtml}} +
+ +
+
+
+ {{end}} + {{end}}

{{.i18n.Tr "admin.monitor.queue.pool.flush.title"}}

From 8d02208c4b614c90064c214fdaa46b0592378149 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Thu, 20 May 2021 21:32:18 +0100 Subject: [PATCH 07/14] add testcases and fix a few issues Signed-off-by: Andrew Thornton --- modules/queue/manager.go | 2 +- modules/queue/queue_bytefifo.go | 9 +- modules/queue/queue_channel.go | 23 +- modules/queue/queue_channel_test.go | 155 ++++++++++++ modules/queue/queue_disk_channel.go | 42 ++++ modules/queue/queue_disk_channel_test.go | 267 +++++++++++++++++++++ modules/queue/unique_queue_channel.go | 26 +- modules/queue/unique_queue_channel_test.go | 253 +++++++++++++++++++ modules/queue/workerpool.go | 44 +++- 9 files changed, 772 insertions(+), 49 deletions(-) create mode 100644 modules/queue/unique_queue_channel_test.go diff --git a/modules/queue/manager.go b/modules/queue/manager.go index 9810edabde614..40868a7a546b0 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -62,7 +62,7 @@ type Pausable interface { // Resume will resume the pool or queue Resume() // IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed - IsPausedIsResumed() (bool, <-chan struct{}) + IsPausedIsResumed() (paused, resumed <-chan struct{}) } // ManagedPool is a simple interface to get certain details from a worker pool diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index 6d2b262b9ff45..3f2f8e7e0ff0d 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -186,14 +186,18 @@ func (q *ByteFIFOQueue) readToChan() { default: } } + paused, _ := q.IsPausedIsResumed() loop: for { - paused, resumed := q.IsPausedIsResumed() - if paused { + select { + case <-paused: log.Trace("Queue %s pausing", q.name) + _, resumed := q.IsPausedIsResumed() + select { case <-resumed: + paused, _ = q.IsPausedIsResumed() log.Trace("Queue %s resuming", q.name) case <-q.shutdownCtx.Done(): // tell the pool to shutdown. @@ -205,6 +209,7 @@ loop: } atomic.AddInt64(&q.numInQueue, -1) } + default: } // empty the pushed channel diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index e9cce7055551c..7de9c17c86243 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -101,10 +101,7 @@ func (q *ChannelQueue) Push(data Data) error { // Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager func (q *ChannelQueue) Flush(timeout time.Duration) error { - q.lock.Lock() - paused := q.paused - q.lock.Unlock() - if paused { + if q.IsPaused() { return nil } ctx, cancel := q.commonRegisterWorkers(1, timeout, true) @@ -115,23 +112,13 @@ func (q *ChannelQueue) Flush(timeout time.Duration) error { // FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty func (q *ChannelQueue) FlushWithContext(ctx context.Context) error { log.Trace("ChannelQueue: %d Flush", q.qid) + paused, _ := q.IsPausedIsResumed() for { - q.lock.Lock() - paused := q.paused - q.lock.Unlock() - if paused { - return nil - } select { + case <-paused: + return nil case data := <-q.dataChan: - if q.IsPaused() { - // we're paused so we should push this back and stop - // (whilst handle will check this too we need to stop the flusher for this to work.) - go func() { - q.dataChan <- data - }() - return nil - } else if unhandled := q.handle(data); unhandled != nil { + if unhandled := q.handle(data); unhandled != nil { log.Error("Unhandled Data whilst flushing queue %d", q.qid) } atomic.AddInt64(&q.numInQueue, -1) diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go index 01220ca6167df..0a428030a8636 100644 --- a/modules/queue/queue_channel_test.go +++ b/modules/queue/queue_channel_test.go @@ -5,6 +5,7 @@ package queue import ( + "sync" "testing" "time" @@ -97,3 +98,157 @@ func TestChannelQueue_Batch(t *testing.T) { err = queue.Push(test1) assert.Error(t, err) } + +func TestChannelQueue_Pause(t *testing.T) { + lock := sync.Mutex{} + var queue Queue + var err error + pushBack := false + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + lock.Lock() + if pushBack { + if pausable, ok := queue.(Pausable); ok { + pausable.Pause() + } + pushBack = false + lock.Unlock() + return data + } + lock.Unlock() + + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + nilFn := func(_ func()) {} + + queue, err = NewChannelQueue(handle, + ChannelQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 1, + BlockTimeout: 0, + BoostTimeout: 0, + BoostWorkers: 0, + MaxWorkers: 10, + }, + Workers: 1, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + queue.Push(&test1) + + pausable, ok := queue.(Pausable) + if !assert.True(t, ok) { + return + } + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + pausable.Pause() + + paused, resumed := pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + queue.Push(&test2) + + var result2 *testData + select { + case result2 = <-handleChan: + assert.Fail(t, "handler chan should be empty") + case <-time.After(100 * time.Millisecond): + } + + assert.Nil(t, result2) + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result2 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test2") + } + + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + lock.Lock() + pushBack = true + lock.Unlock() + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + assert.Fail(t, "Queue should not be paused") + return + case <-resumed: + default: + assert.Fail(t, "Queue is not resumed") + return + } + + queue.Push(&test1) + + select { + case <-paused: + case <-handleChan: + assert.Fail(t, "handler chan should not contain test1") + return + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "queue should be paused") + return + } + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result1 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test1") + } + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + +} diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index e508529d8561b..defb08d1e364c 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -247,6 +247,48 @@ func (q *PersistableChannelQueue) IsEmpty() bool { return q.internal.IsEmpty() } +// IsPaused returns if the pool is paused +func (q *PersistableChannelQueue) IsPaused() bool { + return q.channelQueue.IsPaused() +} + +// IsPausedIsResumed returns if the pool is paused and a channel that is closed when it is resumed +func (q *PersistableChannelQueue) IsPausedIsResumed() (<-chan struct{}, <-chan struct{}) { + return q.channelQueue.IsPausedIsResumed() +} + +// Pause pauses the WorkerPool +func (q *PersistableChannelQueue) Pause() { + q.channelQueue.Pause() + q.lock.Lock() + defer q.lock.Unlock() + if q.internal == nil { + return + } + + pausable, ok := q.internal.(Pausable) + if !ok { + return + } + pausable.Pause() +} + +// Resume resumes the WorkerPool +func (q *PersistableChannelQueue) Resume() { + q.channelQueue.Resume() + q.lock.Lock() + defer q.lock.Unlock() + if q.internal == nil { + return + } + + pausable, ok := q.internal.(Pausable) + if !ok { + return + } + pausable.Resume() +} + // Shutdown processing this queue func (q *PersistableChannelQueue) Shutdown() { log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name) diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index db8d866f257a3..85d68b387e139 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -6,8 +6,11 @@ package queue import ( "io/ioutil" + "sync" "testing" + "time" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/util" "github.com/stretchr/testify/assert" ) @@ -131,3 +134,267 @@ func TestPersistableChannelQueue(t *testing.T) { } } + +func TestPersistableChannelQueue_Pause(t *testing.T) { + lock := sync.Mutex{} + var queue Queue + var err error + pushBack := false + + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + lock.Lock() + if pushBack { + if pausable, ok := queue.(Pausable); ok { + log.Info("pausing") + pausable.Pause() + } + pushBack = false + lock.Unlock() + return data + } + lock.Unlock() + + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + + queueShutdown := []func(){} + queueTerminate := []func(){} + + tmpDir, err := ioutil.TempDir("", "persistable-channel-queue-pause-test-data") + assert.NoError(t, err) + defer util.RemoveAll(tmpDir) + + queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ + DataDir: tmpDir, + BatchLength: 2, + QueueLength: 20, + Workers: 1, + BoostWorkers: 0, + MaxWorkers: 10, + Name: "first", + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(func(shutdown func()) { + queueShutdown = append(queueShutdown, shutdown) + }, func(terminate func()) { + queueTerminate = append(queueTerminate, terminate) + }) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + + err = queue.Push(&test1) + assert.NoError(t, err) + + pausable, ok := queue.(Pausable) + if !assert.True(t, ok) { + return + } + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + pausable.Pause() + paused, resumed := pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + queue.Push(&test2) + + var result2 *testData + select { + case result2 = <-handleChan: + assert.Fail(t, "handler chan should be empty") + case <-time.After(100 * time.Millisecond): + } + + assert.Nil(t, result2) + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result2 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test2") + } + + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + lock.Lock() + pushBack = true + lock.Unlock() + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + assert.Fail(t, "Queue should not be paused") + return + case <-resumed: + default: + assert.Fail(t, "Queue is not resumed") + return + } + + queue.Push(&test1) + + select { + case <-paused: + case <-handleChan: + assert.Fail(t, "handler chan should not contain test1") + return + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "queue should be paused") + return + } + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result1 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test1") + } + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + // Now shutdown the queue + for _, callback := range queueShutdown { + callback() + } + + // Wait til it is closed + <-queue.(*PersistableChannelQueue).closed + + err = queue.Push(&test1) + assert.NoError(t, err) + err = queue.Push(&test2) + assert.NoError(t, err) + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + + // terminate the queue + for _, callback := range queueTerminate { + callback() + } + + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + + lock.Lock() + pushBack = true + lock.Unlock() + + // Reopen queue + queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ + DataDir: tmpDir, + BatchLength: 1, + QueueLength: 20, + Workers: 1, + BoostWorkers: 0, + MaxWorkers: 10, + Name: "second", + }, &testData{}) + assert.NoError(t, err) + pausable, ok = queue.(Pausable) + if !assert.True(t, ok) { + return + } + + paused, _ = pausable.IsPausedIsResumed() + + go queue.Run(func(shutdown func()) { + queueShutdown = append(queueShutdown, shutdown) + }, func(terminate func()) { + queueTerminate = append(queueTerminate, terminate) + }) + + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + case <-paused: + } + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + + pausable.Resume() + + result3 := <-handleChan + result4 := <-handleChan + if result4.TestString == test1.TestString { + result3, result4 = result4, result3 + } + assert.Equal(t, test1.TestString, result3.TestString) + assert.Equal(t, test1.TestInt, result3.TestInt) + + assert.Equal(t, test2.TestString, result4.TestString) + assert.Equal(t, test2.TestInt, result4.TestInt) + for _, callback := range queueShutdown { + callback() + } + for _, callback := range queueTerminate { + callback() + } + +} diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index 6f6770d44d1ff..d869e471138ab 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -74,8 +74,8 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue if queue.IsPaused() { // We can only pushback to the channel if we're paused. go func() { - if err := queue.Push(u); err != nil { - log.Error("Unable to push back to queue %d", queue.qid) + if err := queue.Push(u[0]); err != nil { + log.Error("Unable to push back to queue %d. Error: %v", queue.qid, err) } }() } else { @@ -144,10 +144,7 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { // Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error { - q.lock.Lock() - paused := q.paused - q.lock.Unlock() - if paused { + if q.IsPaused() { return nil } ctx, cancel := q.commonRegisterWorkers(1, timeout, true) @@ -158,23 +155,16 @@ func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error { // FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error { log.Trace("ChannelUniqueQueue: %d Flush", q.qid) + paused, _ := q.IsPausedIsResumed() for { - q.lock.Lock() - paused := q.paused - q.lock.Unlock() - if paused { + select { + case <-paused: return nil + default: } select { case data := <-q.dataChan: - if q.IsPaused() { - // we're paused so we should push this back and stop - // (whilst handle will check this too we need to stop the flusher for this to work.) - go func() { - q.dataChan <- data - }() - return nil - } else if unhandled := q.handle(data); unhandled != nil { + if unhandled := q.handle(data); unhandled != nil { log.Error("Unhandled Data whilst flushing queue %d", q.qid) } atomic.AddInt64(&q.numInQueue, -1) diff --git a/modules/queue/unique_queue_channel_test.go b/modules/queue/unique_queue_channel_test.go new file mode 100644 index 0000000000000..3c3ff88dd7a3f --- /dev/null +++ b/modules/queue/unique_queue_channel_test.go @@ -0,0 +1,253 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestChannelUniqueQueue(t *testing.T) { + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + + nilFn := func(_ func()) {} + + queue, err := NewChannelUniqueQueue(handle, + ChannelQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 0, + MaxWorkers: 10, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + }, + Workers: 0, + Name: "TestChannelQueue", + }, &testData{}) + assert.NoError(t, err) + + assert.Equal(t, queue.(*ChannelUniqueQueue).WorkerPool.boostWorkers, 5) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + go queue.Push(&test1) + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + err = queue.Push(test1) + assert.Error(t, err) +} + +func TestChannelUniqueQueue_Batch(t *testing.T) { + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + + nilFn := func(_ func()) {} + + queue, err := NewChannelUniqueQueue(handle, + ChannelQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 2, + BlockTimeout: 0, + BoostTimeout: 0, + BoostWorkers: 0, + MaxWorkers: 10, + }, + Workers: 1, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + + queue.Push(&test1) + go queue.Push(&test2) + + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + result2 := <-handleChan + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + err = queue.Push(test1) + assert.Error(t, err) +} + +func TestChannelUniqueQueue_Pause(t *testing.T) { + lock := sync.Mutex{} + var queue Queue + var err error + pushBack := false + handleChan := make(chan *testData) + handle := func(data ...Data) []Data { + lock.Lock() + if pushBack { + if pausable, ok := queue.(Pausable); ok { + pausable.Pause() + } + pushBack = false + lock.Unlock() + return data + } + lock.Unlock() + + for _, datum := range data { + testDatum := datum.(*testData) + handleChan <- testDatum + } + return nil + } + nilFn := func(_ func()) {} + + queue, err = NewChannelUniqueQueue(handle, + ChannelQueueConfiguration{ + WorkerPoolConfiguration: WorkerPoolConfiguration{ + QueueLength: 20, + BatchLength: 1, + BlockTimeout: 0, + BoostTimeout: 0, + BoostWorkers: 0, + MaxWorkers: 10, + }, + Workers: 1, + }, &testData{}) + assert.NoError(t, err) + + go queue.Run(nilFn, nilFn) + + test1 := testData{"A", 1} + test2 := testData{"B", 2} + queue.Push(&test1) + + pausable, ok := queue.(Pausable) + if !assert.True(t, ok) { + return + } + result1 := <-handleChan + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + + pausable.Pause() + + paused, resumed := pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + queue.Push(&test2) + + var result2 *testData + select { + case result2 = <-handleChan: + assert.Fail(t, "handler chan should be empty") + case <-time.After(100 * time.Millisecond): + } + + assert.Nil(t, result2) + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result2 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test2") + } + + assert.Equal(t, test2.TestString, result2.TestString) + assert.Equal(t, test2.TestInt, result2.TestInt) + + lock.Lock() + pushBack = true + lock.Unlock() + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + assert.Fail(t, "Queue should not be paused") + return + case <-resumed: + default: + assert.Fail(t, "Queue is not resumed") + return + } + + queue.Push(&test1) + + select { + case <-paused: + case <-handleChan: + assert.Fail(t, "handler chan should not contain test1") + return + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "queue should be paused") + return + } + + paused, resumed = pausable.IsPausedIsResumed() + + select { + case <-paused: + case <-resumed: + assert.Fail(t, "Queue should not be resumed") + return + default: + assert.Fail(t, "Queue is not paused") + return + } + + pausable.Resume() + + select { + case <-resumed: + default: + assert.Fail(t, "Queue should be resumed") + } + + select { + case result1 = <-handleChan: + case <-time.After(500 * time.Millisecond): + assert.Fail(t, "handler chan should contain test1") + } + assert.Equal(t, test1.TestString, result1.TestString) + assert.Equal(t, test1.TestInt, result1.TestInt) + +} diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 0641cb812289c..1361fcf115831 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -22,7 +22,7 @@ type WorkerPool struct { lock sync.Mutex baseCtx context.Context baseCtxCancel context.CancelFunc - paused bool + paused chan struct{} resumed chan struct{} cond *sync.Cond qid int64 @@ -63,6 +63,7 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo batchLength: config.BatchLength, dataChan: dataChan, resumed: resumed, + paused: make(chan struct{}), handle: handle, blockTimeout: config.BlockTimeout, boostTimeout: config.BoostTimeout, @@ -77,7 +78,15 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo func (p *WorkerPool) Push(data Data) { atomic.AddInt64(&p.numInQueue, 1) p.lock.Lock() - if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) && !p.paused { + select { + case <-p.paused: + p.lock.Unlock() + p.dataChan <- data + return + default: + } + + if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) { if p.numberOfWorkers == 0 { p.zeroBoost() } else { @@ -302,11 +311,16 @@ func (p *WorkerPool) Wait() { func (p *WorkerPool) IsPaused() bool { p.lock.Lock() defer p.lock.Unlock() - return p.paused + select { + case <-p.paused: + return true + default: + return false + } } // IsPausedIsResumed returns if the pool is paused and a channel that is closed when it is resumed -func (p *WorkerPool) IsPausedIsResumed() (bool, <-chan struct{}) { +func (p *WorkerPool) IsPausedIsResumed() (<-chan struct{}, <-chan struct{}) { p.lock.Lock() defer p.lock.Unlock() return p.paused, p.resumed @@ -316,9 +330,11 @@ func (p *WorkerPool) IsPausedIsResumed() (bool, <-chan struct{}) { func (p *WorkerPool) Pause() { p.lock.Lock() defer p.lock.Unlock() - if !p.paused { + select { + case <-p.paused: + default: p.resumed = make(chan struct{}) - p.paused = true + close(p.paused) } } @@ -326,9 +342,11 @@ func (p *WorkerPool) Pause() { func (p *WorkerPool) Resume() { p.lock.Lock() defer p.lock.Unlock() - if p.paused { + select { + case <-p.resumed: + default: + p.paused = make(chan struct{}) close(p.resumed) - p.paused = false } } @@ -400,9 +418,10 @@ func (p *WorkerPool) doWork(ctx context.Context) { } var data = make([]Data, 0, p.batchLength) + paused, _ := p.IsPausedIsResumed() for { - paused, resumed := p.IsPausedIsResumed() - if paused { + select { + case <-paused: log.Trace("Worker for Queue %d Pausing", p.qid) if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) @@ -411,8 +430,10 @@ func (p *WorkerPool) doWork(ctx context.Context) { } atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) } + _, resumed := p.IsPausedIsResumed() select { case <-resumed: + paused, _ = p.IsPausedIsResumed() log.Trace("Worker for Queue %d Resuming", p.qid) if !timer.Stop() { select { @@ -424,8 +445,11 @@ func (p *WorkerPool) doWork(ctx context.Context) { log.Trace("Worker shutting down") return } + default: } select { + case <-paused: + // go back around case <-ctx.Done(): if len(data) > 0 { log.Trace("Handling: %d data, %v", len(data), data) From 2f406c76d5c5e3b994bcc19d20d2240cc5c9cd94 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 23 Jul 2021 11:29:05 +0100 Subject: [PATCH 08/14] fix build Signed-off-by: Andrew Thornton --- services/archiver/archiver.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/archiver/archiver.go b/services/archiver/archiver.go index 00c0281306814..c16d0e5c3ba83 100644 --- a/services/archiver/archiver.go +++ b/services/archiver/archiver.go @@ -212,7 +212,7 @@ var archiverQueue queue.UniqueQueue // Init initlize archive func Init() error { - handler := func(data ...queue.Data) { + handler := func(data ...queue.Data) []queue.Data { for _, datum := range data { archiveReq, ok := datum.(*ArchiveRequest) if !ok { @@ -224,6 +224,7 @@ func Init() error { log.Error("Archive %v faild: %v", datum, err) } } + return nil } archiverQueue = queue.CreateUniqueQueue("repo-archive", handler, new(ArchiveRequest)) From cd17d66f26c3f77bac600f541cc467a698ac2585 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 23 Jul 2021 16:55:33 +0100 Subject: [PATCH 09/14] prevent "race" in the test Signed-off-by: Andrew Thornton --- modules/queue/queue_disk_channel_test.go | 32 +++++++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index be6095278cf63..b45edf1bf410c 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -206,8 +206,12 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { assert.NoError(t, err) go queue.Run(func(shutdown func()) { + lock.Lock() + defer lock.Unlock() queueShutdown = append(queueShutdown, shutdown) }, func(terminate func()) { + lock.Lock() + defer lock.Unlock() queueTerminate = append(queueTerminate, terminate) }) @@ -322,8 +326,12 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { assert.Equal(t, test1.TestString, result1.TestString) assert.Equal(t, test1.TestInt, result1.TestInt) + lock.Lock() + callbacks := make([]func(), len(queueShutdown)) + copy(callbacks, queueShutdown) + lock.Unlock() // Now shutdown the queue - for _, callback := range queueShutdown { + for _, callback := range callbacks { callback() } @@ -341,7 +349,11 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { } // terminate the queue - for _, callback := range queueTerminate { + lock.Lock() + callbacks = make([]func(), len(queueTerminate)) + copy(callbacks, queueTerminate) + lock.Unlock() + for _, callback := range callbacks { callback() } @@ -374,8 +386,12 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { paused, _ = pausable.IsPausedIsResumed() go queue.Run(func(shutdown func()) { + lock.Lock() + defer lock.Unlock() queueShutdown = append(queueShutdown, shutdown) }, func(terminate func()) { + lock.Lock() + defer lock.Unlock() queueTerminate = append(queueTerminate, terminate) }) @@ -415,10 +431,18 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { assert.Equal(t, test2.TestString, result4.TestString) assert.Equal(t, test2.TestInt, result4.TestInt) - for _, callback := range queueShutdown { + lock.Lock() + callbacks = make([]func(), len(queueShutdown)) + copy(callbacks, queueShutdown) + lock.Unlock() + for _, callback := range callbacks { callback() } - for _, callback := range queueTerminate { + lock.Lock() + callbacks = make([]func(), len(queueTerminate)) + copy(callbacks, queueTerminate) + lock.Unlock() + for _, callback := range callbacks { callback() } From 6ca3946e693b78fc9c374c042ad57acc4b1e16dc Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Wed, 28 Jul 2021 21:36:37 +0100 Subject: [PATCH 10/14] fix jsoniter mismerge Signed-off-by: Andrew Thornton --- modules/queue/queue_bytefifo.go | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index e1da75cd6a46b..ad97fbdf474cb 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -93,7 +93,6 @@ func (q *ByteFIFOQueue) PushBack(data Data) error { if !assignableTo(data, q.exemplar) { return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name) } - json := jsoniter.ConfigCompatibleWithStandardLibrary bs, err := json.Marshal(data) if err != nil { return err From 0f11dacc2f5bef2a4ef8c9ce2591b342a4349573 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 21 Jan 2022 17:22:44 +0000 Subject: [PATCH 11/14] fix conflicts Signed-off-by: Andrew Thornton --- modules/queue/queue_disk_channel_test.go | 2 +- services/mirror/mirror.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index e52d319467a7c..2068bf7a1a0d3 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -223,7 +223,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { queueShutdown := []func(){} queueTerminate := []func(){} - tmpDir, err := ioutil.TempDir("", "persistable-channel-queue-pause-test-data") + tmpDir, err := os.MkdirTemp("", "persistable-channel-queue-pause-test-data") assert.NoError(t, err) defer util.RemoveAll(tmpDir) diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 26432001743f3..6f285ec467c63 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -130,11 +130,12 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { return nil } -func queueHandle(data ...queue.Data) { +func queueHandle(data ...queue.Data) []queue.Data { for _, datum := range data { req := datum.(*SyncRequest) doMirrorSync(graceful.GetManager().ShutdownContext(), req) } + return nil } // InitSyncMirrors initializes a go routine to sync the mirrors From 899ed5479497a2e570cded6086609b01f08c0a00 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 21 Jan 2022 17:23:07 +0000 Subject: [PATCH 12/14] fix format Signed-off-by: Andrew Thornton --- modules/queue/queue_channel_test.go | 1 - modules/queue/queue_disk_channel_test.go | 1 - modules/queue/unique_queue_channel_test.go | 1 - modules/queue/workerpool.go | 6 ++++-- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go index 2d6cffd552b21..b700b28a14a5a 100644 --- a/modules/queue/queue_channel_test.go +++ b/modules/queue/queue_channel_test.go @@ -250,5 +250,4 @@ func TestChannelQueue_Pause(t *testing.T) { } assert.Equal(t, test1.TestString, result1.TestString) assert.Equal(t, test1.TestInt, result1.TestInt) - } diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index 2068bf7a1a0d3..9bbd146efe9be 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -478,5 +478,4 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { for _, callback := range callbacks { callback() } - } diff --git a/modules/queue/unique_queue_channel_test.go b/modules/queue/unique_queue_channel_test.go index 3c3ff88dd7a3f..ef6752079e149 100644 --- a/modules/queue/unique_queue_channel_test.go +++ b/modules/queue/unique_queue_channel_test.go @@ -249,5 +249,4 @@ func TestChannelUniqueQueue_Pause(t *testing.T) { } assert.Equal(t, test1.TestString, result1.TestString) assert.Equal(t, test1.TestInt, result1.TestInt) - } diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 58267b857d91d..9562b2d1db01a 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -37,8 +37,10 @@ type WorkerPool struct { numInQueue int64 } -var _ Flushable = &WorkerPool{} -var _ ManagedPool = &WorkerPool{} +var ( + _ Flushable = &WorkerPool{} + _ ManagedPool = &WorkerPool{} +) // WorkerPoolConfiguration is the basic configuration for a WorkerPool type WorkerPoolConfiguration struct { From ebed5132871e71a3e533a42d003f3d1b6f119c61 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 21 Jan 2022 21:24:31 +0000 Subject: [PATCH 13/14] Add warnings for no worker configurations and prevent data-loss with redis/levelqueue Signed-off-by: Andrew Thornton --- modules/queue/queue_bytefifo.go | 7 +++++++ modules/queue/setting.go | 20 ++++++++++++++++++++ modules/queue/workerpool.go | 21 +++++++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index bb7a9186a6aa7..53432b1671b57 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -197,6 +197,13 @@ loop: case <-resumed: paused, _ = q.IsPausedIsResumed() log.Trace("Queue %s resuming", q.name) + if q.HasNoWorkerScaling() { + log.Warn( + "Queue: %s is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+ + "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", q.name) + q.Pause() + continue loop + } case <-q.shutdownCtx.Done(): // tell the pool to shutdown. q.baseCtxCancel() diff --git a/modules/queue/setting.go b/modules/queue/setting.go index caaf123d42b68..61f156c377e26 100644 --- a/modules/queue/setting.go +++ b/modules/queue/setting.go @@ -65,6 +65,16 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { log.Error("Unable to create queue for %s: %v", name, err) return nil } + + // Sanity check configuration + if q.Workers == 0 && (q.BoostTimeout == 0 || q.BoostWorkers == 0 || q.MaxWorkers == 0) { + log.Warn("Queue: %s is configured to be non-scaling and have no workers\n - this configuration is likely incorrect and could cause Gitea to block", q.Name) + if pausable, ok := returnable.(Pausable); ok { + log.Warn("Queue: %s is being paused to prevent data-loss, add workers manually and unpause.", q.Name) + pausable.Pause() + } + } + return returnable } @@ -103,5 +113,15 @@ func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) Un log.Error("Unable to create unique queue for %s: %v", name, err) return nil } + + // Sanity check configuration + if q.Workers == 0 && (q.BoostTimeout == 0 || q.BoostWorkers == 0 || q.MaxWorkers == 0) { + log.Warn("Queue: %s is configured to be non-scaling and have no workers\n - this configuration is likely incorrect and could cause Gitea to block", q.Name) + if pausable, ok := returnable.(Pausable); ok { + log.Warn("Queue: %s is being paused to prevent data-loss, add workers manually and unpause.", q.Name) + pausable.Pause() + } + } + return returnable.(UniqueQueue) } diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 9562b2d1db01a..3c3d220120c57 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -101,6 +101,17 @@ func (p *WorkerPool) Push(data Data) { } } +// HasNoWorkerScaling will return true if the queue has no workers, and has no worker boosting +func (p *WorkerPool) HasNoWorkerScaling() bool { + p.lock.Lock() + defer p.lock.Unlock() + return p.hasNoWorkerScaling() +} + +func (p *WorkerPool) hasNoWorkerScaling() bool { + return p.numberOfWorkers == 0 && (p.boostTimeout == 0 || p.boostWorkers == 0 || p.maxNumberOfWorkers == 0) +} + func (p *WorkerPool) zeroBoost() { ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout) mq := GetManager().GetManagedQueue(p.qid) @@ -291,6 +302,12 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, p.cond.Broadcast() cancel() } + if p.hasNoWorkerScaling() { + log.Warn( + "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+ + "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid) + p.pause() + } p.lock.Unlock() }() } @@ -332,6 +349,10 @@ func (p *WorkerPool) IsPausedIsResumed() (<-chan struct{}, <-chan struct{}) { func (p *WorkerPool) Pause() { p.lock.Lock() defer p.lock.Unlock() + p.pause() +} + +func (p *WorkerPool) pause() { select { case <-p.paused: default: From cd6dde28e6bfc7947bd90630cdf2e3d83a384b6b Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 22 Jan 2022 15:15:09 +0000 Subject: [PATCH 14/14] Use StopTimer Signed-off-by: Andrew Thornton --- modules/queue/queue_bytefifo.go | 16 ++++------------ modules/queue/workerpool.go | 22 ++++------------------ 2 files changed, 8 insertions(+), 30 deletions(-) diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index 53432b1671b57..0380497ea675f 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -13,6 +13,7 @@ import ( "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/util" ) // ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue @@ -178,12 +179,8 @@ func (q *ByteFIFOQueue) readToChan() { // Default backoff values backOffTime := time.Millisecond * 100 backOffTimer := time.NewTimer(0) - if !backOffTimer.Stop() { - select { - case <-backOffTimer.C: - default: - } - } + util.StopTimer(backOffTimer) + paused, _ := q.IsPausedIsResumed() loop: @@ -225,12 +222,7 @@ loop: err := q.doPop() - if !backOffTimer.Stop() { - select { - case <-backOffTimer.C: - default: - } - } + util.StopTimer(backOffTimer) if err != nil { if err == errQueueEmpty && q.waitOnEmpty { diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 3c3d220120c57..da56216dcb124 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -434,12 +434,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { // Create a common timer - we will use this elsewhere timer := time.NewTimer(0) - if !timer.Stop() { - select { - case <-timer.C: - default: - } - } + util.StopTimer(timer) paused, _ := p.IsPausedIsResumed() data := make([]Data, 0, p.batchLength) @@ -459,12 +454,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { case <-resumed: paused, _ = p.IsPausedIsResumed() log.Trace("Worker for Queue %d Resuming", p.qid) - if !timer.Stop() { - select { - case <-timer.C: - default: - } - } + util.StopTimer(timer) case <-ctx.Done(): log.Trace("Worker shutting down") return @@ -498,12 +488,8 @@ func (p *WorkerPool) doWork(ctx context.Context) { return } data = append(data, datum) - if !timer.Stop() { - select { - case <-timer.C: - default: - } - } + util.StopTimer(timer) + if len(data) >= p.batchLength { log.Trace("Handling: %d data, %v", len(data), data) if unhandled := p.handle(data...); unhandled != nil {