forked from RichardKnop/machinery
/
broker.go
117 lines (98 loc) · 2.89 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package common
import (
"errors"
"github.com/RichardKnop/machinery/v1/brokers/iface"
"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/log"
"github.com/RichardKnop/machinery/v1/retry"
"github.com/RichardKnop/machinery/v1/tasks"
)
// Broker represents a base broker structure
type Broker struct {
cnf *config.Config
registeredTaskNames []string
retry bool
retryFunc func(chan int)
retryStopChan chan int
stopChan chan int
}
// NewBroker creates new Broker instance
func NewBroker(cnf *config.Config) Broker {
return Broker{cnf: cnf, retry: true}
}
// GetConfig returns config
func (b *Broker) GetConfig() *config.Config {
return b.cnf
}
// GetRetry ...
func (b *Broker) GetRetry() bool {
return b.retry
}
// GetRetryFunc ...
func (b *Broker) GetRetryFunc() func(chan int) {
return b.retryFunc
}
// GetRetryStopChan ...
func (b *Broker) GetRetryStopChan() chan int {
return b.retryStopChan
}
// GetStopChan ...
func (b *Broker) GetStopChan() chan int {
return b.stopChan
}
// Publish places a new message on the default queue
func (b *Broker) Publish(signature *tasks.Signature) error {
return errors.New("Not implemented")
}
// SetRegisteredTaskNames sets registered task names
func (b *Broker) SetRegisteredTaskNames(names []string) {
b.registeredTaskNames = names
}
// IsTaskRegistered returns true if the task is registered with this broker
func (b *Broker) IsTaskRegistered(name string) bool {
for _, registeredTaskName := range b.registeredTaskNames {
if registeredTaskName == name {
return true
}
}
return false
}
// GetPendingTasks returns a slice of task.Signatures waiting in the queue
func (b *Broker) GetPendingTasks(queue string) ([]*tasks.Signature, error) {
return nil, errors.New("Not implemented")
}
// StartConsuming is a common part of StartConsuming method
func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) {
if b.retryFunc == nil {
b.retryFunc = retry.Closure()
}
b.stopChan = make(chan int)
b.retryStopChan = make(chan int)
}
// StopConsuming is a common part of StopConsuming
func (b *Broker) StopConsuming() {
// Do not retry from now on
b.retry = false
// Stop the retry closure earlier
select {
case b.retryStopChan <- 1:
log.WARNING.Print("Stopping retry closure.")
default:
}
// Notifying the stop channel stops consuming of messages
b.stopChan <- 1
}
// GetRegisteredTaskNames returns registered tasks names
func (b *Broker) GetRegisteredTaskNames() []string {
return b.registeredTaskNames
}
// AdjustRoutingKey makes sure the routing key is correct.
// If the routing key is an empty string:
// a) set it to binding key for direct exchange type
// b) set it to default queue name
func (b *Broker) AdjustRoutingKey(s *tasks.Signature) {
if s.RoutingKey != "" {
return
}
s.RoutingKey = b.GetConfig().DefaultQueue
}