forked from RichardKnop/machinery
/
broker.go
139 lines (118 loc) · 3.48 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package common
import (
"errors"
"sync"
"github.com/Michael-LiK/machinery/v1/brokers/iface"
"github.com/Michael-LiK/machinery/v1/config"
"github.com/Michael-LiK/machinery/v1/log"
"github.com/Michael-LiK/machinery/v1/retry"
"github.com/Michael-LiK/machinery/v1/tasks"
)
type registeredTaskNames struct {
sync.RWMutex
items []string
}
// Broker represents a base broker structure
type Broker struct {
cnf *config.Config
registeredTaskNames registeredTaskNames
retry bool
retryFunc func(chan int)
retryStopChan chan int
stopChan chan int
}
// NewBroker creates new Broker instance
func NewBroker(cnf *config.Config) Broker {
return Broker{
cnf: cnf,
retry: true,
stopChan: make(chan int),
retryStopChan: make(chan int),
}
}
// GetConfig returns config
func (b *Broker) GetConfig() *config.Config {
return b.cnf
}
// GetRetry ...
func (b *Broker) GetRetry() bool {
return b.retry
}
// GetRetryFunc ...
func (b *Broker) GetRetryFunc() func(chan int) {
return b.retryFunc
}
// GetRetryStopChan ...
func (b *Broker) GetRetryStopChan() chan int {
return b.retryStopChan
}
// GetStopChan ...
func (b *Broker) GetStopChan() chan int {
return b.stopChan
}
// Publish places a new message on the default queue
func (b *Broker) Publish(signature *tasks.Signature) error {
return errors.New("Not implemented")
}
// SetRegisteredTaskNames sets registered task names
func (b *Broker) SetRegisteredTaskNames(names []string) {
b.registeredTaskNames.Lock()
defer b.registeredTaskNames.Unlock()
b.registeredTaskNames.items = names
}
// IsTaskRegistered returns true if the task is registered with this broker
func (b *Broker) IsTaskRegistered(name string) bool {
b.registeredTaskNames.RLock()
defer b.registeredTaskNames.RUnlock()
for _, registeredTaskName := range b.registeredTaskNames.items {
if registeredTaskName == name {
return true
}
}
return false
}
// GetPendingTasks returns a slice of task.Signatures waiting in the queue
func (b *Broker) GetPendingTasks(queue string) ([]*tasks.Signature, error) {
return nil, errors.New("Not implemented")
}
// GetDelayedTasks returns a slice of task.Signatures that are scheduled, but not yet in the queue
func (b *Broker) GetDelayedTasks() ([]*tasks.Signature, error) {
return nil, errors.New("Not implemented")
}
// StartConsuming is a common part of StartConsuming method
func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) {
if b.retryFunc == nil {
b.retryFunc = retry.Closure()
}
}
// StopConsuming is a common part of StopConsuming
func (b *Broker) StopConsuming() {
// Do not retry from now on
b.retry = false
// Stop the retry closure earlier
select {
case b.retryStopChan <- 1:
log.WARNING.Print("Stopping retry closure.")
default:
}
// Notifying the stop channel stops consuming of messages
close(b.stopChan)
log.WARNING.Print("Stop channel")
}
// GetRegisteredTaskNames returns registered tasks names
func (b *Broker) GetRegisteredTaskNames() []string {
b.registeredTaskNames.RLock()
defer b.registeredTaskNames.RUnlock()
items := b.registeredTaskNames.items
return items
}
// AdjustRoutingKey makes sure the routing key is correct.
// If the routing key is an empty string:
// a) set it to binding key for direct exchange type
// b) set it to default queue name
func (b *Broker) AdjustRoutingKey(s *tasks.Signature) {
if s.RoutingKey != "" {
return
}
s.RoutingKey = b.GetConfig().DefaultQueue
}