-
Notifications
You must be signed in to change notification settings - Fork 0
/
consume.go
148 lines (138 loc) · 4.19 KB
/
consume.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package async
import (
"context"
"fmt"
"github.com/json-iterator/go"
"github.com/streadway/amqp"
"github.com/crochee/lirity/logger"
"github.com/crochee/lirity/mq"
"github.com/crochee/lirity/routine"
"github.com/crochee/lirity/validator"
)
func NewTaskConsumer(ctx context.Context, opts ...func(*ConsumerOption)) *taskConsumer {
t := &taskConsumer{
ConsumerOption: ConsumerOption{
Pool: routine.NewPool(ctx, routine.Recover(func(ctx context.Context, i interface{}) {
logger.From(ctx).Sugar().Errorf("%v", i)
})),
Manager: NewManager(),
Marshal: mq.DefaultMarshal{},
JSONHandler: jsoniter.ConfigCompatibleWithStandardLibrary,
ParamPool: NewParamPool(),
Validator: validator.NewValidator(),
},
}
for _, opt := range opts {
opt(&t.ConsumerOption)
}
return t
}
type ConsumerOption struct {
Pool *routine.Pool // goroutine safe run pool
Manager ManagerExecutor // manager executor how to run
Marshal mq.MarshalAPI // mq assemble request or response
JSONHandler jsoniter.API
ParamPool ParamPool // get Param
Validator validator.Validator
}
type taskConsumer struct {
ConsumerOption
}
func (t *taskConsumer) Register(executors ...Executor) error {
return t.Manager.Register(executors...)
}
func (t *taskConsumer) Subscribe(channel Channel, queueName string) error {
t.Pool.Go(func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
}
deliveries, err := channel.Consume(
queueName,
// 用来区分多个消费者
"consumer."+queueName,
// 是否自动应答(自动应答确认消息,这里设置为否,在下面手动应答确认)
false,
// 是否具有排他性
false,
// 如果设置为true,表示不能将同一个connection中发送的消息
// 传递给同一个connection的消费者
false,
// 是否为阻塞
false,
nil,
)
if err != nil {
logger.From(ctx).Error(err.Error())
fmt.Println(err)
continue
}
t.handleMessage(ctx, deliveries)
}
})
t.Pool.Wait()
return nil
}
func (t *taskConsumer) handleMessage(ctx context.Context, deliveries <-chan amqp.Delivery) {
for {
select {
case <-ctx.Done():
return
case v := <-deliveries:
t.Pool.Go(func(ctx context.Context) {
if err := t.handle(ctx, v); err != nil {
logger.From(ctx).Error(err.Error())
}
})
}
}
}
// nolint:gocritic
func (t *taskConsumer) handle(ctx context.Context, d amqp.Delivery) error {
msgStruct, err := t.Marshal.Unmarshal(&d)
if err != nil {
logger.From(ctx).Error(err.Error())
// 当requeue为true时,将该消息排队,以在另一个通道上传递给使用者。
// 当requeue为false或服务器无法将该消息排队时,它将被丢弃。
if err = d.Reject(false); err != nil { // nolint:gocritic
return err
}
return nil
}
logger.From(ctx).Sugar().Infof("consume uuid %s body:%s", msgStruct.UUID, msgStruct.Payload)
param := t.ParamPool.Get()
if err = t.JSONHandler.Unmarshal(msgStruct.Payload, param); err != nil {
logger.From(ctx).Error(err.Error())
// 当requeue为true时,将该消息排队,以在另一个通道上传递给使用者。
// 当requeue为false或服务器无法将该消息排队时,它将被丢弃。
if err = d.Reject(false); err != nil { // nolint:gocritic
return err
}
return nil
}
if err = t.Validator.ValidateStruct(param); err != nil {
logger.From(ctx).Error(err.Error())
// 当requeue为true时,将该消息排队,以在另一个通道上传递给使用者。
// 当requeue为false或服务器无法将该消息排队时,它将被丢弃。
if err = d.Reject(false); err != nil { // nolint:gocritic
return err
}
return nil
}
err = t.Manager.Run(ctx, param)
t.ParamPool.Put(param)
if err != nil {
logger.From(ctx).Error(err.Error())
// 当requeue为true时,将该消息排队,以在另一个通道上传递给使用者。
// 当requeue为false或服务器无法将该消息排队时,它将被丢弃。
if err = d.Reject(false); err != nil {
return err
}
return nil
}
// 手动确认收到本条消息, true表示回复当前信道所有未回复的ack,用于批量确认。
// false表示回复当前条目
return d.Ack(false)
}