-
Notifications
You must be signed in to change notification settings - Fork 127
/
queue.go
138 lines (114 loc) · 4.19 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package repository
import (
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/client/queue"
)
const queueHashKey = "Queue"
type ErrQueueNotFound struct {
QueueName string
}
func (err *ErrQueueNotFound) Error() string {
return fmt.Sprintf("could not find queue %q", err.QueueName)
}
type ErrQueueAlreadyExists struct {
QueueName string
}
func (err *ErrQueueAlreadyExists) Error() string {
return fmt.Sprintf("queue %s already exists", err.QueueName)
}
type QueueRepository interface {
GetAllQueues(ctx *armadacontext.Context) ([]queue.Queue, error)
GetQueue(ctx *armadacontext.Context, name string) (queue.Queue, error)
CreateQueue(*armadacontext.Context, queue.Queue) error
UpdateQueue(*armadacontext.Context, queue.Queue) error
DeleteQueue(ctx *armadacontext.Context, name string) error
}
type RedisQueueRepository struct {
db redis.UniversalClient
}
func NewRedisQueueRepository(db redis.UniversalClient) *RedisQueueRepository {
return &RedisQueueRepository{db: db}
}
func (r *RedisQueueRepository) GetAllQueues(ctx *armadacontext.Context) ([]queue.Queue, error) {
result, err := r.db.HGetAll(ctx, queueHashKey).Result()
if err != nil {
return nil, errors.WithStack(err)
}
queues := make([]queue.Queue, len(result))
i := 0
for _, v := range result {
apiQueue := &api.Queue{}
if err := proto.Unmarshal([]byte(v), apiQueue); err != nil {
return nil, errors.WithStack(err)
}
queue, err := queue.NewQueue(apiQueue)
if err != nil {
return nil, err
}
queues[i] = queue
i++
}
return queues, nil
}
func (r *RedisQueueRepository) GetQueue(ctx *armadacontext.Context, name string) (queue.Queue, error) {
result, err := r.db.HGet(ctx, queueHashKey, name).Result()
if err == redis.Nil {
return queue.Queue{}, &ErrQueueNotFound{QueueName: name}
} else if err != nil {
return queue.Queue{}, fmt.Errorf("[RedisQueueRepository.GetQueue] error reading from database: %s", err)
}
apiQueue := &api.Queue{}
e := proto.Unmarshal([]byte(result), apiQueue)
if e != nil {
return queue.Queue{}, fmt.Errorf("[RedisQueueRepository.GetQueue] error unmarshalling queue: %s", err)
}
return queue.NewQueue(apiQueue)
}
func (r *RedisQueueRepository) CreateQueue(ctx *armadacontext.Context, queue queue.Queue) error {
data, err := proto.Marshal(queue.ToAPI())
if err != nil {
return fmt.Errorf("[RedisQueueRepository.CreateQueue] error marshalling queue: %s", err)
}
// HSetNX sets a key-value pair if the key doesn't already exist.
// If the key exists, this is a no-op, and result is false.
result, err := r.db.HSetNX(ctx, queueHashKey, queue.Name, data).Result()
if err != nil {
return fmt.Errorf("[RedisQueueRepository.CreateQueue] error writing to database: %s", err)
}
if !result {
return &ErrQueueAlreadyExists{QueueName: queue.Name}
}
return nil
}
// TODO If the queue to be updated is deleted between this method checking if the queue exists and
// making the update, the deleted queue is re-added to Redis. There's no "update if exists"
// operation in Redis, so we need to do this with a script or transaction.
func (r *RedisQueueRepository) UpdateQueue(ctx *armadacontext.Context, queue queue.Queue) error {
existsResult, err := r.db.HExists(ctx, queueHashKey, queue.Name).Result()
if err != nil {
return fmt.Errorf("[RedisQueueRepository.UpdateQueue] error reading from database: %s", err)
} else if !existsResult {
return &ErrQueueNotFound{QueueName: queue.Name}
}
data, err := proto.Marshal(queue.ToAPI())
if err != nil {
return fmt.Errorf("[RedisQueueRepository.UpdateQueue] error marshalling queue: %s", err)
}
result := r.db.HSet(ctx, queueHashKey, queue.Name, data)
if err := result.Err(); err != nil {
return fmt.Errorf("[RedisQueueRepository.UpdateQueue] error writing to database: %s", err)
}
return nil
}
func (r *RedisQueueRepository) DeleteQueue(ctx *armadacontext.Context, name string) error {
result := r.db.HDel(ctx, queueHashKey, name)
if err := result.Err(); err != nil {
return fmt.Errorf("[RedisQueueRepository.DeleteQueue] error deleting queue: %s", err)
}
return nil
}