From 4a81889d810d409ed42fcf07a0fa6a4ac97db72b Mon Sep 17 00:00:00 2001 From: garethgeorge Date: Sat, 13 Apr 2024 01:04:38 -0700 Subject: [PATCH] fix: use new orchestrator queue --- .github/workflows/build.yml | 43 ++++++ .../{build-and-test.yml => test.yml} | 18 +-- internal/orchestrator/orchestrator.go | 49 +++---- internal/orchestrator/scheduledtaskheap.go | 11 ++ internal/queue/genheap_test.go | 6 + internal/queue/timepriorityqueue.go | 67 ++++++++- internal/queue/timepriorityqueue_test.go | 138 ++++++++++++++++-- internal/queue/timequeue.go | 31 +++- 8 files changed, 303 insertions(+), 60 deletions(-) create mode 100644 .github/workflows/build.yml rename .github/workflows/{build-and-test.yml => test.yml} (79%) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 00000000..20246036 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,43 @@ +# This workflow will build a golang project +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go + +name: Build Snapshot Relaese + +on: + push: + branches: ["main"] + workflow_dispatch: + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: "1.21" + + - name: Setup NodeJS + uses: actions/setup-node@v4 + with: + node-version: "20" + + - name: Build + uses: goreleaser/goreleaser-action@v5 + with: + distribution: goreleaser + version: latest + args: release --snapshot --clean + + - name: Upload Artifacts + uses: actions/upload-artifact@v3 + with: + name: backrest-snapshot-builds + path: | + dist/*.tar.gz + dist/*.zip diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/test.yml similarity index 79% rename from .github/workflows/build-and-test.yml rename to .github/workflows/test.yml index 2040e3cd..76a2434c 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/test.yml @@ -1,7 +1,7 @@ # This workflow will build a golang project # For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go -name: Build and Test +name: Test on: push: @@ -11,14 +11,11 @@ on: workflow_dispatch: jobs: - build-nix: + test-nix: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - name: Set up QEMU - uses: docker/setup-qemu-action@v3 - - name: Set up Go uses: actions/setup-go@v4 with: @@ -29,17 +26,16 @@ jobs: with: node-version: "20" + - name: Generate + run: go generate ./... + - name: Build - uses: goreleaser/goreleaser-action@v5 - with: - distribution: goreleaser - version: latest - args: release --snapshot --clean + run: go build ./... - name: Test run: PATH=$(pwd):$PATH go test ./... --race - build-win: + test-win: runs-on: windows-latest steps: - uses: actions/checkout@v3 diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 6a95e10b..bff51b30 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -13,6 +13,7 @@ import ( "github.com/garethgeorge/backrest/internal/config" "github.com/garethgeorge/backrest/internal/hook" "github.com/garethgeorge/backrest/internal/oplog" + "github.com/garethgeorge/backrest/internal/queue" "github.com/garethgeorge/backrest/internal/rotatinglog" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -40,7 +41,7 @@ type Orchestrator struct { config *v1.Config OpLog *oplog.OpLog repoPool *resticRepoPool - taskQueue taskQueue + taskQueue *queue.TimePriorityQueue[scheduledTask] hookExecutor *hook.HookExecutor logStore *rotatinglog.RotatingLog @@ -59,10 +60,8 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog, logSt OpLog: oplog, config: cfg, // repoPool created with a memory store to ensure the config is updated in an atomic operation with the repo pool's config value. - repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}), - taskQueue: newTaskQueue(func() time.Time { - return o.curTime() - }), + repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}), + taskQueue: queue.NewTimePriorityQueue[scheduledTask](), hookExecutor: hook.NewHookExecutor(oplog, logStore), logStore: logStore, } @@ -194,29 +193,23 @@ func (o *Orchestrator) CancelOperation(operationId int64, status v1.OperationSta running.cancel() } - tasks := o.taskQueue.Reset() - remaining := make([]scheduledTask, 0, len(tasks)) - - for _, t := range tasks { - if t.task.OperationId() == operationId { - if err := t.task.Cancel(status); err != nil { - return fmt.Errorf("cancel task %q: %w", t.task.Name(), err) - } - - // check if the task has a next after it's current 'runAt' time, if it does then we will schedule the next run. - if nextTime := t.task.Next(t.runAt); nextTime != nil { - remaining = append(remaining, scheduledTask{ - task: t.task, - runAt: *nextTime, - }) - } - } else { - remaining = append(remaining, *t) - } + allTasks := o.taskQueue.GetAll() + idx := slices.IndexFunc(allTasks, func(t scheduledTask) bool { + return t.task.OperationId() == operationId + }) + if idx == -1 { + return nil } - o.taskQueue.Push(remaining...) - + t := allTasks[idx] + o.taskQueue.Remove(t) + if err := t.task.Cancel(status); err != nil { + return fmt.Errorf("cancel task %q: %w", t.task.Name(), err) + } + if nextTime := t.task.Next(t.runAt.Add(1 * time.Second)); nextTime != nil { + t.runAt = *nextTime + o.taskQueue.Enqueue(*nextTime, t.priority, t) + } return nil } @@ -231,7 +224,7 @@ func (o *Orchestrator) Run(mainCtx context.Context) { } t := o.taskQueue.Dequeue(mainCtx) - if t == nil { + if t.task == nil { continue } @@ -272,7 +265,7 @@ func (o *Orchestrator) ScheduleTask(t Task, priority int, callbacks ...func(erro return } zap.L().Info("scheduling task", zap.String("task", t.Name()), zap.String("runAt", nextRun.Format(time.RFC3339))) - o.taskQueue.Push(scheduledTask{ + o.taskQueue.Enqueue(*nextRun, priority, scheduledTask{ task: t, runAt: *nextRun, priority: priority, diff --git a/internal/orchestrator/scheduledtaskheap.go b/internal/orchestrator/scheduledtaskheap.go index 03abf5d0..1944b311 100644 --- a/internal/orchestrator/scheduledtaskheap.go +++ b/internal/orchestrator/scheduledtaskheap.go @@ -164,6 +164,17 @@ type scheduledTask struct { config *v1.Config } +func (s *scheduledTask) Less(other *scheduledTask) bool { + if s.priority != other.priority { + return s.priority > other.priority + } + return s.runAt.Before(other.runAt) +} + +func (s scheduledTask) Eq(other scheduledTask) bool { + return s.task == other.task && s.runAt.Equal(other.runAt) && s.priority == other.priority && s.config == other.config +} + type scheduledTaskHeap struct { tasks []*scheduledTask comparator func(i, j *scheduledTask) bool diff --git a/internal/queue/genheap_test.go b/internal/queue/genheap_test.go index 34527b1c..61bee966 100644 --- a/internal/queue/genheap_test.go +++ b/internal/queue/genheap_test.go @@ -13,7 +13,12 @@ func (v val) Less(other val) bool { return v.v < other.v } +func (v val) Eq(other val) bool { + return v.v == other.v +} + func TestGenericHeapInit(t *testing.T) { + t.Parallel() genHeap := genericHeap[val]{{v: 3}, {v: 2}, {v: 1}} heap.Init(&genHeap) @@ -30,6 +35,7 @@ func TestGenericHeapInit(t *testing.T) { } func TestGenericHeapPushPop(t *testing.T) { + t.Parallel() genHeap := genericHeap[val]{} // empty heap heap.Push(&genHeap, val{v: 3}) heap.Push(&genHeap, val{v: 2}) diff --git a/internal/queue/timepriorityqueue.go b/internal/queue/timepriorityqueue.go index 8f235733..b5f9d373 100644 --- a/internal/queue/timepriorityqueue.go +++ b/internal/queue/timepriorityqueue.go @@ -8,13 +8,13 @@ import ( ) // TimePriorityQueue is a priority queue that dequeues elements at (or after) a specified time, and prioritizes elements based on a priority value. It is safe for concurrent use. -type TimePriorityQueue[T any] struct { +type TimePriorityQueue[T equals[T]] struct { mu sync.Mutex tqueue TimeQueue[priorityEntry[T]] ready genericHeap[priorityEntry[T]] } -func NewTimePriorityQueue[T any]() *TimePriorityQueue[T] { +func NewTimePriorityQueue[T equals[T]]() *TimePriorityQueue[T] { return &TimePriorityQueue[T]{ tqueue: TimeQueue[priorityEntry[T]]{}, ready: genericHeap[priorityEntry[T]]{}, @@ -23,33 +23,80 @@ func NewTimePriorityQueue[T any]() *TimePriorityQueue[T] { func (t *TimePriorityQueue[T]) Len() int { t.mu.Lock() + t.tqueue.mu.Lock() defer t.mu.Unlock() - return t.tqueue.Len() + t.ready.Len() + defer t.tqueue.mu.Unlock() + return t.tqueue.heap.Len() + t.ready.Len() } func (t *TimePriorityQueue[T]) Peek() T { t.mu.Lock() + t.tqueue.mu.Lock() defer t.mu.Unlock() + defer t.tqueue.mu.Unlock() if t.ready.Len() > 0 { return t.ready.Peek().v } - return t.tqueue.Peek().v + if t.tqueue.heap.Len() > 0 { + return t.tqueue.heap.Peek().v.v + } + var zero T + return zero } func (t *TimePriorityQueue[T]) Reset() []T { t.mu.Lock() + t.tqueue.mu.Lock() defer t.mu.Unlock() + defer t.tqueue.mu.Unlock() + var res []T for t.ready.Len() > 0 { res = append(res, heap.Pop(&t.ready).(priorityEntry[T]).v) } - for t.tqueue.Len() > 0 { + for t.tqueue.heap.Len() > 0 { res = append(res, heap.Pop(&t.tqueue.heap).(timeQueueEntry[priorityEntry[T]]).v.v) } return res } +func (t *TimePriorityQueue[T]) GetAll() []T { + t.mu.Lock() + t.tqueue.mu.Lock() + defer t.mu.Unlock() + defer t.tqueue.mu.Unlock() + res := make([]T, 0, t.tqueue.heap.Len()+t.ready.Len()) + for _, entry := range t.tqueue.heap { + res = append(res, entry.v.v) + } + for _, entry := range t.ready { + res = append(res, entry.v) + } + return res +} + +func (t *TimePriorityQueue[T]) Remove(v T) { + t.mu.Lock() + t.tqueue.mu.Lock() + defer t.mu.Unlock() + defer t.tqueue.mu.Unlock() + + for idx := 0; idx < t.tqueue.heap.Len(); idx++ { + if t.tqueue.heap[idx].v.v.Eq(v) { + heap.Remove(&t.tqueue.heap, idx) + return + } + } + + for idx := 0; idx < t.ready.Len(); idx++ { + if t.ready[idx].v.Eq(v) { + heap.Remove(&t.ready, idx) + return + } + } +} + func (t *TimePriorityQueue[T]) Enqueue(at time.Time, priority int, v T) { t.mu.Lock() t.tqueue.Enqueue(at, priorityEntry[T]{at, priority, v}) @@ -59,8 +106,9 @@ func (t *TimePriorityQueue[T]) Enqueue(at time.Time, priority int, v T) { func (t *TimePriorityQueue[T]) Dequeue(ctx context.Context) T { t.mu.Lock() for { + t.tqueue.mu.Lock() for t.tqueue.heap.Len() > 0 { - thead := t.tqueue.Peek() // peek at the head of the time queue + thead := t.tqueue.heap.Peek() // peek at the head of the time queue if thead.at.Before(time.Now()) { tqe := heap.Pop(&t.tqueue.heap).(timeQueueEntry[priorityEntry[T]]) heap.Push(&t.ready, tqe.v) @@ -68,6 +116,7 @@ func (t *TimePriorityQueue[T]) Dequeue(ctx context.Context) T { break } } + t.tqueue.mu.Unlock() if t.ready.Len() > 0 { defer t.mu.Unlock() return heap.Pop(&t.ready).(priorityEntry[T]).v @@ -80,7 +129,7 @@ func (t *TimePriorityQueue[T]) Dequeue(ctx context.Context) T { } } -type priorityEntry[T any] struct { +type priorityEntry[T equals[T]] struct { at time.Time priority int v T @@ -89,3 +138,7 @@ type priorityEntry[T any] struct { func (t priorityEntry[T]) Less(other priorityEntry[T]) bool { return t.priority > other.priority } + +func (t priorityEntry[T]) Eq(other priorityEntry[T]) bool { + return t.at == other.at && t.priority == other.priority && t.v.Eq(other.v) +} diff --git a/internal/queue/timepriorityqueue_test.go b/internal/queue/timepriorityqueue_test.go index 375d744d..c3850e2b 100644 --- a/internal/queue/timepriorityqueue_test.go +++ b/internal/queue/timepriorityqueue_test.go @@ -3,6 +3,7 @@ package queue import ( "context" "math/rand" + "slices" "testing" "time" ) @@ -10,11 +11,11 @@ import ( // TestTPQEnqueue tests that enqueued elements are retruned highest priority first. func TestTPQPriority(t *testing.T) { t.Parallel() - tpq := NewTimePriorityQueue[int]() + tpq := NewTimePriorityQueue[val]() now := time.Now().Add(-time.Second) for i := 0; i < 100; i++ { - tpq.Enqueue(now, i, i) + tpq.Enqueue(now, i, val{i}) } if tpq.Len() != 100 { @@ -23,7 +24,7 @@ func TestTPQPriority(t *testing.T) { for i := 99; i >= 0; i-- { v := tpq.Dequeue(context.Background()) - if v != i { + if v.v != i { t.Errorf("expected %d, got %d", i, v) } } @@ -31,14 +32,14 @@ func TestTPQPriority(t *testing.T) { func TestTPQMixedReadinessStates(t *testing.T) { t.Parallel() - tpq := NewTimePriorityQueue[int]() + tpq := NewTimePriorityQueue[val]() now := time.Now() for i := 0; i < 100; i++ { - tpq.Enqueue(now.Add(-100*time.Millisecond), i, i) + tpq.Enqueue(now.Add(-100*time.Millisecond), i, val{i}) } for i := 0; i < 100; i++ { - tpq.Enqueue(now.Add(100*time.Millisecond), i, i) + tpq.Enqueue(now.Add(100*time.Millisecond), i, val{i}) } if tpq.Len() != 200 { @@ -48,7 +49,7 @@ func TestTPQMixedReadinessStates(t *testing.T) { for j := 0; j < 2; j++ { for i := 99; i >= 0; i-- { v := tpq.Dequeue(context.Background()) - if v != i { + if v.v != i { t.Errorf("pass %d expected %d, got %d", j, i, v) } } @@ -56,7 +57,8 @@ func TestTPQMixedReadinessStates(t *testing.T) { } func TestTPQStress(t *testing.T) { - tpq := NewTimePriorityQueue[int]() + t.Parallel() + tpq := NewTimePriorityQueue[val]() start := time.Now() totalEnqueued := 0 @@ -65,8 +67,8 @@ func TestTPQStress(t *testing.T) { go func() { ctx, _ := context.WithDeadline(context.Background(), start.Add(1*time.Second)) for ctx.Err() == nil { - v := rand.Intn(100) - tpq.Enqueue(time.Now().Add(time.Duration(rand.Intn(1000)-500)*time.Millisecond), rand.Intn(5), v) + v := rand.Intn(100) + 1 + tpq.Enqueue(time.Now().Add(time.Duration(rand.Intn(1000)-500)*time.Millisecond), rand.Intn(5), val{v}) totalEnqueuedSum += v totalEnqueued++ } @@ -76,11 +78,123 @@ func TestTPQStress(t *testing.T) { totalDequeued := 0 sum := 0 for ctx.Err() == nil || totalDequeued < totalEnqueued { - sum += tpq.Dequeue(ctx) - totalDequeued++ + v := tpq.Dequeue(ctx) + if v.v != 0 { + totalDequeued++ + sum += v.v + } + } + + if totalDequeued != totalEnqueued { + t.Errorf("expected totalDequeued to be %d, got %d", totalEnqueued, totalDequeued) } if sum != totalEnqueuedSum { t.Errorf("expected sum to be %d, got %d", totalEnqueuedSum, sum) } } + +func TestTPQRemove(t *testing.T) { + t.Parallel() + tpq := NewTimePriorityQueue[val]() + + now := time.Now().Add(-time.Second) // make sure the time is in the past + for i := 0; i < 100; i++ { + tpq.Enqueue(now, -i, val{i}) + } + + if tpq.Len() != 100 { + t.Errorf("expected length to be 100, got %d", tpq.Len()) + } + + // remove all even numbers, dequeue the odd numbers + for i := 0; i < 100; i += 2 { + tpq.Remove(val{i}) + v := tpq.Dequeue(context.Background()) + if v.v != i+1 { + t.Errorf("expected %d, got %d", i+1, v) + } + } + + if tpq.Len() != 0 { + t.Errorf("expected length to be 0, got %d", tpq.Len()) + } +} + +func TestTPQReset(t *testing.T) { + t.Parallel() + tpq := NewTimePriorityQueue[val]() + + now := time.Now() // make sure the time is in the past + for i := 0; i < 50; i++ { + tpq.Enqueue(now.Add(time.Second), i, val{i}) + } + for i := 50; i < 100; i++ { + tpq.Enqueue(now.Add(-time.Second), i, val{i}) + } + + if tpq.Len() != 100 { + t.Errorf("expected length to be 100, got %d", tpq.Len()) + } + + dv := tpq.Dequeue(context.Background()) + if dv.v != 99 { + t.Errorf("expected 99, got %d", dv.v) + } + + vals := tpq.Reset() + + if len(vals) != 99 { + t.Errorf("expected length to be 100, got %d", len(vals)) + } + + slices.SortFunc(vals, func(i, j val) int { + if i.v > j.v { + return 1 + } + return -1 + }) + + for i := 0; i < 99; i++ { + if vals[i].v != i { + t.Errorf("expected %d, got %d", i, vals[i].v) + } + } + + if tpq.Len() != 0 { + t.Errorf("expected length to be 0, got %d", tpq.Len()) + } +} + +func TestTPQGetAll(t *testing.T) { + t.Parallel() + tpq := NewTimePriorityQueue[val]() + now := time.Now() + + for i := 0; i < 100; i++ { + tpq.Enqueue(now.Add(time.Second), i, val{i}) + } + + if tpq.Len() != 100 { + t.Errorf("expected length to be 100, got %d", tpq.Len()) + } + + vals := tpq.GetAll() + + if len(vals) != 100 { + t.Errorf("expected length to be 100, got %d", len(vals)) + } + + slices.SortFunc(vals, func(i, j val) int { + if i.v > j.v { + return 1 + } + return -1 + }) + + for i := 0; i < 100; i++ { + if vals[i].v != i { + t.Errorf("expected %d, got %d", i, vals[i].v) + } + } +} diff --git a/internal/queue/timequeue.go b/internal/queue/timequeue.go index ef512cac..842f561d 100644 --- a/internal/queue/timequeue.go +++ b/internal/queue/timequeue.go @@ -9,7 +9,7 @@ import ( ) // TimeQueue is a priority queue that dequeues elements at (or after) a specified time. It is safe for concurrent use. -type TimeQueue[T any] struct { +type TimeQueue[T equals[T]] struct { heap genericHeap[timeQueueEntry[T]] dequeueMu sync.Mutex @@ -17,7 +17,7 @@ type TimeQueue[T any] struct { notify atomic.Pointer[chan struct{}] } -func NewTimeQueue[T any]() *TimeQueue[T] { +func NewTimeQueue[T equals[T]]() *TimeQueue[T] { return &TimeQueue[T]{ heap: genericHeap[timeQueueEntry[T]]{}, } @@ -63,6 +63,29 @@ func (t *TimeQueue[T]) Reset() []T { return res } +func (t *TimeQueue[T]) Remove(v T) { + t.mu.Lock() + defer t.mu.Unlock() + + for idx := 0; idx < t.heap.Len(); idx++ { + if t.heap[idx].v.Eq(v) { + heap.Remove(&t.heap, idx) + return + } + } +} + +func (t *TimeQueue[T]) GetAll() []T { + t.mu.Lock() + defer t.mu.Unlock() + + res := make([]T, 0, t.heap.Len()) + for _, entry := range t.heap { + res = append(res, entry.v) + } + return res +} + func (t *TimeQueue[T]) Dequeue(ctx context.Context) T { t.dequeueMu.Lock() defer t.dequeueMu.Unlock() @@ -130,3 +153,7 @@ func (t timeQueueEntry[T]) Less(other timeQueueEntry[T]) bool { func (t timeQueueEntry[T]) Eq(other timeQueueEntry[T]) bool { return t.at.Equal(other.at) } + +type equals[T any] interface { + Eq(other T) bool +}