-
Notifications
You must be signed in to change notification settings - Fork 3
/
polleroptions.go
75 lines (69 loc) · 2.81 KB
/
polleroptions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package messagequeue
import (
"time"
"github.com/beaconsoftwarellc/gadget/v2/errors"
"github.com/beaconsoftwarellc/gadget/v2/log"
)
const (
minimumConcurrentMessageHandlers = 1
defaultConcurrentMessageHandlers = 10
minimumWaitForBatch = time.Second
maximumWaitForBatch = time.Hour
defaultWaitForBatch = 30 * time.Second
minimumDequeueCount = 1
maximumDequeueCount = 10
defaultDequeueCount = 10
minimumQueueOperationTimeout = time.Second
maximumQueueOperationTimeout = time.Hour
defaultQueueOperationTimeout = 5 * time.Second
)
type PollerOptions struct {
// Logger to use for reporting errors
Logger log.Logger
// ConcurrentMessageHandlers that should be running at any given time
ConcurrentMessageHandlers int
// WaitForBatch the specified duration before prematurely returning with less
// than the desired number of messages.
WaitForBatch time.Duration
// DequeueCount is the number of messages to attempt to dequeue per request.
// maximum will vary by implementation
DequeueCount int
// QueueOperationTimeout
QueueOperationTimeout time.Duration
}
// NewPollerOptions with valid values that can be used to initialize a new Poller
func NewPollerOptions() *PollerOptions {
return &PollerOptions{
Logger: log.Global(),
ConcurrentMessageHandlers: defaultConcurrentMessageHandlers,
WaitForBatch: defaultWaitForBatch,
QueueOperationTimeout: defaultQueueOperationTimeout,
DequeueCount: defaultDequeueCount,
}
}
// Validate that the values contained in this Options are complete and within the
// bounds necessary for operation.
func (po *PollerOptions) Validate() error {
// logger should not be nil
if po.Logger == nil {
return errors.New("PollerOptions.Logger cannot be nil")
}
if po.ConcurrentMessageHandlers < minimumConcurrentMessageHandlers {
return errors.New("PollerOptions.ConcurrentMessageHandlers(%d) was out of bounds [%d, -)",
po.ConcurrentMessageHandlers, minimumConcurrentMessageHandlers)
}
if po.WaitForBatch < minimumWaitForBatch || po.WaitForBatch > maximumWaitForBatch {
return errors.New("PollerOptions.WaitForBatch(%s) was out of bounds [%s, %s]",
po.WaitForBatch, minimumWaitForBatch, maximumWaitForBatch)
}
if po.QueueOperationTimeout < minimumQueueOperationTimeout ||
po.QueueOperationTimeout > maximumQueueOperationTimeout {
return errors.New("PollerOptions.QueueOperationTimeout(%s) was out of bounds [%s, %s]",
po.QueueOperationTimeout, minimumQueueOperationTimeout, maximumQueueOperationTimeout)
}
if po.DequeueCount < minimumDequeueCount || po.DequeueCount > maximumDequeueCount {
return errors.New("PollerOptions.DequeueCount(%d) was out of bounds [%d, %d]",
po.DequeueCount, minimumDequeueCount, maximumDequeueCount)
}
return nil
}