Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly flush unique queues on startup #23154

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion modules/queue/queue_disk_channel.go
Expand Up @@ -94,7 +94,8 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
},
Workers: 0,
},
DataDir: config.DataDir,
DataDir: config.DataDir,
QueueName: config.Name + "-level",
}

levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar)
Expand Down
27 changes: 19 additions & 8 deletions modules/queue/unique_queue_disk_channel.go
Expand Up @@ -94,7 +94,8 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
},
Workers: 0,
},
DataDir: config.DataDir,
DataDir: config.DataDir,
QueueName: config.Name + "-level",
}

queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue)
Expand Down Expand Up @@ -209,17 +210,27 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())
atTerminate(q.Terminate)
_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)

if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 {
if luq, ok := q.internal.(*LevelUniqueQueue); ok && !luq.IsEmpty() {
lunny marked this conversation as resolved.
Show resolved Hide resolved
// Just run the level queue - we shut it down once it's flushed
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
go luq.Run(func(_ func()) {}, func(_ func()) {})
go func() {
_ = q.internal.Flush(0)
log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelUniqueQueue).Name())
q.internal.(*LevelUniqueQueue).Shutdown()
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
_ = luq.Flush(0)
for !luq.IsEmpty() {
_ = luq.Flush(0)
select {
case <-time.After(100 * time.Millisecond):
case <-luq.shutdownCtx.Done():
log.Warn("LevelUniqueQueue: %s shut down before completely flushed", luq.Name())
return
}
}
log.Debug("LevelUniqueQueue: %s flushed so shutting down", luq.Name())
luq.Shutdown()
GetManager().Remove(luq.qid)
}()
} else {
log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
_ = q.internal.Flush(0)
q.internal.(*LevelUniqueQueue).Shutdown()
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
}
Expand Down Expand Up @@ -286,7 +297,7 @@ func (q *PersistableChannelUniqueQueue) Shutdown() {
close(q.channelQueue.dataChan)
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data)
_ = q.internal.(*LevelUniqueQueue).Push(data)
}
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)

Expand Down
224 changes: 224 additions & 0 deletions modules/queue/unique_queue_disk_channel_test.go
@@ -0,0 +1,224 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package queue

import (
"fmt"
"strconv"
"sync"
"testing"
"time"

"code.gitea.io/gitea/modules/log"

"github.com/stretchr/testify/assert"
zeripath marked this conversation as resolved.
Show resolved Hide resolved
)

func TestPersistableChannelUniqueQueue(t *testing.T) {
tmpDir := t.TempDir()
fmt.Printf("TempDir %s\n", tmpDir)
_ = log.NewLogger(1000, "console", "console", `{"level":"trace","stacktracelevel":"NONE","stderr":true}`)

// Common function to create the Queue
newQueue := func(handle func(data ...Data) []Data) Queue {
q, err := NewPersistableChannelUniqueQueue(handle,
PersistableChannelUniqueQueueConfiguration{
Name: "TestPersistableChannelUniqueQueue",
DataDir: tmpDir,
QueueLength: 200,
MaxWorkers: 1,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 1,
Workers: 0,
}, "task-0")
assert.NoError(t, err)
return q
}

// runs the provided queue and provides some timer function
type channels struct {
readyForShutdown chan struct{} // closed when shutdown functions have been assigned
readyForTerminate chan struct{} // closed when terminate functions have been assigned
signalShutdown chan struct{} // Should close to signal shutdown
doneShutdown chan struct{} // closed when shutdown function is done
queueTerminate []func() // list of atTerminate functions to call atTerminate - need to be accessed with lock
}
runQueue := func(q Queue, lock *sync.Mutex) *channels {
returnable := &channels{
readyForShutdown: make(chan struct{}),
readyForTerminate: make(chan struct{}),
signalShutdown: make(chan struct{}),
doneShutdown: make(chan struct{}),
}
go q.Run(func(atShutdown func()) {
go func() {
lock.Lock()
select {
case <-returnable.readyForShutdown:
default:
close(returnable.readyForShutdown)
}
lock.Unlock()
<-returnable.signalShutdown
atShutdown()
close(returnable.doneShutdown)
}()
}, func(atTerminate func()) {
lock.Lock()
defer lock.Unlock()
select {
case <-returnable.readyForTerminate:
default:
close(returnable.readyForTerminate)
}
returnable.queueTerminate = append(returnable.queueTerminate, atTerminate)
})

return returnable
}

// call to shutdown and terminate the queue associated with the channels
shutdownAndTerminate := func(chans *channels, lock *sync.Mutex) {
close(chans.signalShutdown)
<-chans.doneShutdown
<-chans.readyForTerminate

lock.Lock()
callbacks := []func(){}
callbacks = append(callbacks, chans.queueTerminate...)
lock.Unlock()

for _, callback := range callbacks {
callback()
}
}

executedTasks1 := []string{}
hasTasks1 := []string{}

t.Run("Initial Filling", func(t *testing.T) {
lock := sync.Mutex{}

startAt100Queued := make(chan struct{})
stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item

handle := func(data ...Data) []Data {
<-startAt100Queued
for _, datum := range data {
s := datum.(string)
lock.Lock()
executedTasks1 = append(executedTasks1, s)
lock.Unlock()
if s == "task-20" {
close(stopAt20Shutdown)
}
}
return nil
}

q := newQueue(handle)

// add 100 tasks to the queue
for i := 0; i < 100; i++ {
_ = q.Push("task-" + strconv.Itoa(i))
}
close(startAt100Queued)

chans := runQueue(q, &lock)

<-chans.readyForShutdown
<-stopAt20Shutdown
shutdownAndTerminate(chans, &lock)

// check which tasks are still in the queue
for i := 0; i < 100; i++ {
if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
hasTasks1 = append(hasTasks1, "task-"+strconv.Itoa(i))
}
}
assert.Equal(t, 100, len(executedTasks1)+len(hasTasks1))
})

executedTasks2 := []string{}
hasTasks2 := []string{}
t.Run("Ensure that things will empty on restart", func(t *testing.T) {
lock := sync.Mutex{}
stop := make(chan struct{})

// collect the tasks that have been executed
handle := func(data ...Data) []Data {
lock.Lock()
for _, datum := range data {
t.Logf("executed %s", datum.(string))
executedTasks2 = append(executedTasks2, datum.(string))
if datum.(string) == "task-99" {
close(stop)
}
}
lock.Unlock()
return nil
}

q := newQueue(handle)
chans := runQueue(q, &lock)

<-chans.readyForShutdown
<-stop
shutdownAndTerminate(chans, &lock)

// check which tasks are still in the queue
for i := 0; i < 100; i++ {
if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
hasTasks2 = append(hasTasks2, "task-"+strconv.Itoa(i))
}
}

assert.Equal(t, 100, len(executedTasks1)+len(executedTasks2))
assert.Equal(t, 0, len(hasTasks2))
})

executedTasks3 := []string{}
hasTasks3 := []string{}

t.Run("refill", func(t *testing.T) {
lock := sync.Mutex{}
stop := make(chan struct{})

handle := func(data ...Data) []Data {
lock.Lock()
for _, datum := range data {
executedTasks3 = append(executedTasks3, datum.(string))
}
lock.Unlock()
return nil
}

q := newQueue(handle)
chans := runQueue(q, &lock)

// re-run all tasks
for i := 0; i < 100; i++ {
_ = q.Push("task-" + strconv.Itoa(i))
}

// wait for a while
time.Sleep(1 * time.Second)

close(stop)
<-chans.readyForShutdown
shutdownAndTerminate(chans, &lock)

// check whether the tasks are still in the queue
for i := 0; i < 100; i++ {
if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
hasTasks3 = append(hasTasks3, "task-"+strconv.Itoa(i))
}
}
assert.Equal(t, 100, len(executedTasks3)+len(hasTasks3))
})

t.Logf("TestPersistableChannelUniqueQueue completed1=%v, executed2=%v, has2=%v, executed3=%v, has3=%v",
len(executedTasks1), len(executedTasks2), len(hasTasks2), len(executedTasks3), len(hasTasks3))
}