/
queue_inmem.go
63 lines (54 loc) · 1.37 KB
/
queue_inmem.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package taskqueueworker
import (
"context"
"sync"
"github.com/golangid/candi/candishared"
)
// inMemQueue queue
type inMemQueue struct {
mu sync.Mutex
queue map[string]*candishared.Queue[string]
}
// NewInMemQueue init inmem queue
func NewInMemQueue() QueueStorage {
q := &inMemQueue{queue: make(map[string]*candishared.Queue[string])}
return q
}
func (i *inMemQueue) PushJob(ctx context.Context, job *Job) (n int64) {
i.mu.Lock()
defer i.mu.Unlock()
if i.queue[job.TaskName] == nil {
i.queue[job.TaskName] = candishared.NewQueue[string]()
}
i.queue[job.TaskName].Push(job.ID)
return int64(i.queue[job.TaskName].Len())
}
func (i *inMemQueue) PopJob(ctx context.Context, taskName string) string {
i.mu.Lock()
defer i.mu.Unlock()
if i.queue[taskName] == nil {
i.queue[taskName] = candishared.NewQueue[string]()
}
el, _ := i.queue[taskName].Pop()
return el
}
func (i *inMemQueue) NextJob(ctx context.Context, taskName string) string {
i.mu.Lock()
defer i.mu.Unlock()
if i.queue[taskName] == nil {
i.queue[taskName] = candishared.NewQueue[string]()
}
el, _ := i.queue[taskName].Peek()
return el
}
func (i *inMemQueue) Clear(ctx context.Context, taskName string) {
i.mu.Lock()
defer i.mu.Unlock()
i.queue[taskName] = candishared.NewQueue[string]()
}
func (i *inMemQueue) Ping() error {
return nil
}
func (i *inMemQueue) Type() string {
return "In Memory Queue"
}