/
delayServerGoRoutine.go
152 lines (124 loc) · 5.38 KB
/
delayServerGoRoutine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package server
import (
"fmt"
"github.com/gojuukaze/YTask/v2/message"
"github.com/gojuukaze/YTask/v2/yerrors"
"time"
)
// 获取延时任务到本地队列
func (s *DelayServer) GetDelayMsgGoroutine() {
//log.YTaskLog.WithField("server", s.delayGroupName).WithField("goroutine", "get_delay_message").Info("start")
s.logger.InfoWithField("goroutine get_delay_message start", "server", s.delayGroupName)
for true {
if s.IsStop() {
break
}
if s.queue.IsFull() {
time.Sleep(300 * time.Millisecond)
}
msg, err := s.Next(s.delayGroupName)
if err != nil {
if !yerrors.IsEqual(err, yerrors.ErrTypeEmptyQuery) {
//log.YTaskLog.WithField("server", s.delayGroupName).WithField("goroutine", "get_delay_message").Error("get msg error, ", err)
s.logger.ErrorWithField(fmt.Sprint("goroutine get_delay_message get msg error, ", err), "server", s.delayGroupName)
}
continue
}
//log.YTaskLog.WithField("server", s.delayGroupName).WithField("goroutine", "get_delay_message").Debug("get delay msg, ", msg)
s.logger.DebugWithField(fmt.Sprint("goroutine get_delay_message get delay msg, ", msg), "server", s.delayGroupName)
s.GetDelayMsgGoroutine_UpdateQueue(msg)
}
s.getDelayMsgStopChan <- struct{}{}
//log.YTaskLog.WithField("server", s.delayGroupName).WithField("goroutine", "get_delay_message").Info("stop")
s.logger.InfoWithField("goroutine get_delay_message stop", "server", s.delayGroupName)
}
func (s *DelayServer) GetDelayMsgGoroutine_UpdateQueue(msg message.Message) {
popMsg := s.queue.Insert(msg)
if popMsg != nil {
//log.YTaskLog.WithField("server", s.delayGroupName).WithField("goroutine", "get_delay_message").Debug("pop msg, ", *popMsg)
s.logger.DebugWithField(fmt.Sprint("goroutine get_delay_message pop msg, ", *popMsg), "server", s.delayGroupName)
err := s.SendMsg(s.delayGroupName, *popMsg)
if err != nil {
//log.YTaskLog.WithField("server", s.delayGroupName).WithField("goroutine", "get_delay_message").Error("Send msg error: ", err, " [msg=", *popMsg, "]")
s.logger.ErrorWithField(fmt.Sprint("goroutine get_delay_message Send msg error: ", err, " [msg=", *popMsg, "]"), "server", s.delayGroupName)
}
}
}
// 从本地队列中获取到处理时间的任务,发送到readyMsgChan
func (s *DelayServer) GetReadyMsgGoroutine() {
//log.YTaskLog.WithField("server", s.delayGroupName).WithField("goroutine", "get_ready_message").Info("start")
s.logger.InfoWithField("goroutine get_ready_message start", "server", s.delayGroupName)
for true {
if s.IsStop() {
break
}
msg := s.queue.Pop()
if msg == nil {
time.Sleep(300 * time.Millisecond)
continue
}
//log.YTaskLog.WithField("server", s.delayGroupName).WithField("goroutine", "get_ready_message").Debug("get ready msg: ", msg)
s.logger.DebugWithField(fmt.Sprint("goroutine get_ready_message get ready msg: ", msg), "server", s.delayGroupName)
err := s.GetReadyMsgGoroutine_Send(*msg)
// 这里只有停止服务时才会报错
if err != nil {
err = s.broker.LSend(s.GetQueueName(s.delayGroupName), *msg)
if err != nil {
//log.YTaskLog.WithField("server", s.delayGroupName).WithField("goroutine", "get_ready_message").Error("LSend msg error: ", err, " [msg=", msg, "]")
s.logger.ErrorWithField(fmt.Sprint("goroutine get_ready_message LSend msg error: ", err, " [msg=", msg, "]"), "server", s.delayGroupName)
}
}
}
s.getReadyMsgStopChan <- struct{}{}
//log.YTaskLog.WithField("server", s.delayGroupName).WithField("goroutine", "get_ready_message").Info("stop")
s.logger.InfoWithField("goroutine get_ready_message stop", "server", s.delayGroupName)
}
func (s *DelayServer) GetReadyMsgGoroutine_Send(msg message.Message) (err error) {
defer func() {
e := recover()
if e != nil {
err = e.(error)
}
}()
if s.IsStop() {
err = yerrors.ErrServerStop{}
return
}
s.readyMsgChan <- msg
return
}
// 从readyMsgChan中读取任务,传给inlineServer的Chan处理
func (s *DelayServer) SendReadyMsgGoroutine() {
//log.YTaskLog.WithField("server", s.delayGroupName).WithField("goroutine", "send_ready_message").Info("start")
s.logger.InfoWithField("goroutine send_ready_message start", "server", s.delayGroupName)
for msg := range s.readyMsgChan {
//log.YTaskLog.WithField("server", s.delayGroupName).WithField("goroutine", "send_ready_message").Info("send ready msg: ", msg)
s.logger.InfoWithField(fmt.Sprint("goroutine send_ready_message send ready msg: ", msg), "server", s.delayGroupName)
err := s.SendReadyMsgGoroutine_Send(msg)
// 这里只有停止服务时才会报错
if err != nil {
err = s.broker.LSend(s.GetQueueName(s.delayGroupName), msg)
if err != nil {
//log.YTaskLog.WithField("server", s.delayGroupName).WithField("goroutine", "send_ready_message").Error("LSend msg error: ", err, " [msg=", msg, "]")
s.logger.ErrorWithField(fmt.Sprint("goroutine send_ready_message LSend msg error: ", err, " [msg=", msg, "]"), "server", s.delayGroupName)
}
}
}
//log.YTaskLog.WithField("server", s.delayGroupName).WithField("goroutine", "send_ready_message").Info("stop")
s.logger.InfoWithField("goroutine send_ready_message stop", "server", s.delayGroupName)
s.sendReadyMsgStopChan <- struct{}{}
}
func (s *DelayServer) SendReadyMsgGoroutine_Send(msg message.Message) (err error) {
defer func() {
e := recover()
if e != nil {
err = e.(error)
}
}()
if s.IsStop() {
err = yerrors.ErrServerStop{}
return
}
s.inlineServerMsgChan <- msg
return
}