forked from RichardKnop/machinery
/
broker.go
119 lines (100 loc) · 3.2 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package brokers
import (
"errors"
"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/log"
"github.com/RichardKnop/machinery/v1/retry"
"github.com/RichardKnop/machinery/v1/tasks"
)
// Broker represents a base broker structure
type Broker struct {
cnf *config.Config
registeredTaskNames []string
retry bool
retryFunc func(chan int)
retryStopChan chan int
stopChan chan int
}
// New creates new Broker instance
func New(cnf *config.Config) Broker {
return Broker{cnf: cnf, retry: true}
}
// GetConfig returns config
func (b *Broker) GetConfig() *config.Config {
return b.cnf
}
// StartConsuming enters a loop and waits for incoming messages
func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error) {
return false, errors.New("Not implemented")
}
// Publish places a new message on the default queue
func (b *Broker) Publish(signature *tasks.Signature) error {
return errors.New("Not implemented")
}
// StopConsuming quits the loop
func (b *Broker) StopConsuming() {
//
}
// SetRegisteredTaskNames sets registered task names
func (b *Broker) SetRegisteredTaskNames(names []string) {
b.registeredTaskNames = names
}
// IsTaskRegistered returns true if the task is registered with this broker
func (b *Broker) IsTaskRegistered(name string) bool {
for _, registeredTaskName := range b.registeredTaskNames {
if registeredTaskName == name {
return true
}
}
return false
}
// GetPendingTasks returns a slice of task.Signatures waiting in the queue
func (b *Broker) GetPendingTasks(queue string) ([]*tasks.Signature, error) {
return nil, errors.New("Not implemented")
}
// startConsuming is a common part of StartConsuming method
func (b *Broker) startConsuming(consumerTag string, taskProcessor TaskProcessor) {
if b.retryFunc == nil {
b.retryFunc = retry.Closure()
}
b.stopChan = make(chan int)
b.retryStopChan = make(chan int)
}
// stopConsuming is a common part of StopConsuming
func (b *Broker) stopConsuming() {
// Do not retry from now on
b.retry = false
// Stop the retry closure earlier
select {
case b.retryStopChan <- 1:
log.WARNING.Print("Stopping retry closure.")
default:
}
// Notifying the stop channel stops consuming of messages
b.stopChan <- 1
}
// GetRegisteredTaskNames returns registered tasks names
func (b *Broker) GetRegisteredTaskNames() []string {
return b.registeredTaskNames
}
// AdjustRoutingKey makes sure the routing key is correct.
// If the routing key is an empty string:
// a) set it to binding key for direct exchange type
// b) set it to default queue name
func AdjustRoutingKey(b Interface, s *tasks.Signature) {
if s.RoutingKey != "" {
return
}
if IsAMQP(b) && b.GetConfig().AMQP != nil && b.GetConfig().AMQP.ExchangeType == "direct" {
// The routing algorithm behind a direct exchange is simple - a message goes
// to the queues whose binding key exactly matches the routing key of the message.
s.RoutingKey = b.GetConfig().AMQP.BindingKey
return
}
s.RoutingKey = b.GetConfig().DefaultQueue
}
// IsAMQP returns true if the broker is AMQP
func IsAMQP(b Interface) bool {
_, isAMQPBroker := b.(*AMQPBroker)
return isAMQPBroker
}