/
storage.go
62 lines (49 loc) · 1.82 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package storage
import (
"context"
"time"
)
// TODO(wtlangford): Check if the value of these keys are Sidekiq-compatible
const (
RetryKey = "goretry"
ScheduledJobsKey = "schedule"
)
// StorageError is used to return errors from the storage layer
type StorageError string
func (e StorageError) Error() string { return string(e) }
// list of known errors
const (
NoMessage = StorageError("no message")
)
// Stats has all the stats related to a manager
type Stats struct {
Processed int64
Failed int64
RetryCount int64
Enqueued map[string]int64
}
// Retries has the list of messages in the retry queue
type Retries struct {
TotalRetryCount int64
RetryJobs []string
}
// Store is the interface for storing and retrieving data
type Store interface {
// General queue operations
CreateQueue(ctx context.Context, queue string) error
ListMessages(ctx context.Context, queue string) ([]string, error)
AcknowledgeMessage(ctx context.Context, queue string, message string) error
EnqueueMessage(ctx context.Context, queue string, priority float64, message string) error
EnqueueMessageNow(ctx context.Context, queue string, message string) error
DequeueMessage(ctx context.Context, queue string, inprogressQueue string, timeout time.Duration) (string, error)
// Special purpose queue operations
EnqueueScheduledMessage(ctx context.Context, priority float64, message string) error
DequeueScheduledMessage(ctx context.Context, priority float64) (string, error)
EnqueueRetriedMessage(ctx context.Context, priority float64, message string) error
DequeueRetriedMessage(ctx context.Context, priority float64) (string, error)
// Stats
IncrementStats(ctx context.Context, metric string) error
GetAllStats(ctx context.Context, queues []string) (*Stats, error)
// Retries
GetAllRetries(ctx context.Context) (*Retries, error)
}