-
Notifications
You must be signed in to change notification settings - Fork 0
/
handler.go
228 lines (173 loc) · 4.77 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package gorpc
import (
"github.com/gsdocker/gserrors"
"github.com/gsdocker/gslogger"
)
type handlerState int
const (
handlerRegister handlerState = iota
handlerActived
handlerUnregister
)
// Context channel handler context
type Context interface {
// current handler name
Name() string
// Pipeline current channel pipeline
Pipeline() Pipeline
// OnActive
FireActive()
// Send create new send pipeline message
Send(message *Message)
// Close close current pipeline
Close()
}
// Handler the gorpc channel pipeline handlers
type Handler interface {
// Register when handler had been add into one pipeline,
// system call this function notify handler
Register(context Context) error
// Unregister sysm call this function when handler had been removed from
// pipeline,you can get this pipeline object by Context#Pipeline function
Unregister(context Context)
// Active system call this function when pipline state trans to active
Active(context Context) error
// Inactive system call this function when pipeline state trans to inactive
Inactive(context Context)
// MessageReceived
MessageReceived(context Context, message *Message) (*Message, error)
// MessageSending
MessageSending(context Context, message *Message) (*Message, error)
// Panic handle async pipline method error
Panic(context Context, err error)
}
// SharedHandler this handler will been shared with more than one piplines
type SharedHandler interface {
// Lock lock this handler
HandlerLock()
// Unlock unlock this handler
HandlerUnlock()
}
// HandlerF handler factory
type HandlerF func() Handler
type _Context struct {
gslogger.Log // mixin log
name string // context bound handler name
handler Handler // context bound handler
shared SharedHandler // shared handler
next *_Context // pipeline next handler
prev *_Context // pipeline prev handler
pipeline *_Pipeline // pipeline which handler belongs to
state handlerState // handler state
}
func newContext(name string, handler Handler, pipeline *_Pipeline, prev *_Context) (*_Context, error) {
context := &_Context{
Log: gslogger.Get("handler-context"),
name: name,
handler: handler,
pipeline: pipeline,
prev: prev,
state: handlerUnregister,
}
context.shared, _ = handler.(SharedHandler)
err := context.handler.Register(context)
if err != nil {
return nil, err
}
if context.prev != nil {
context.prev.next = context
}
context.state = handlerRegister
return context, nil
}
func (context *_Context) Close() {
context.pipeline.onClose()
}
func (context *_Context) Name() string {
return context.name
}
func (context *_Context) String() string {
return context.name
}
func (context *_Context) Pipeline() Pipeline {
return context.pipeline
}
func (context *_Context) FireActive() {
go context.pipeline.fireActive(context)
}
func (context *_Context) Send(message *Message) {
context.pipeline.send(context, message)
}
func (context *_Context) onActive() (err error) {
defer func() {
if e := recover(); e != nil {
err = gserrors.Newf(nil, "catch unhandle error :%s", e)
}
}()
if context.state != handlerRegister {
return nil
}
err = context.handler.Active(context)
if err == nil || err == ErrSkip {
context.state = handlerActived
}
return
}
func (context *_Context) onPanic(err error) {
defer func() {
if e := recover(); e != nil {
err = gserrors.Newf(nil, "catch unhandle error :%s", e)
}
}()
context.handler.Panic(context, err)
}
func (context *_Context) onInactive() {
defer func() {
if e := recover(); e != nil {
context.W(
"call handler(%s) onInactive method error\n%s",
context.Name(),
gserrors.Newf(nil, "catch unhandle error :%s", e),
)
}
}()
if context.state != handlerActived {
return
}
context.handler.Inactive(context)
context.state = handlerRegister
}
func (context *_Context) onUnregister() {
defer func() {
if e := recover(); e != nil {
context.W(
"call handler(%s) Unregister method error\n%s",
context.Name(),
gserrors.Newf(nil, "catch unhandle error :%s", e),
)
}
}()
if context.state == handlerUnregister {
return
}
context.handler.Unregister(context)
context.state = handlerUnregister
}
func (context *_Context) onMessageSending(message *Message) (ret *Message, err error) {
defer func() {
if e := recover(); e != nil {
err = gserrors.Newf(nil, "catch unhandle error :%s", e)
}
}()
ret, err = context.handler.MessageSending(context, message)
return
}
func (context *_Context) onMessageReceived(message *Message) (ret *Message, err error) {
defer func() {
if e := recover(); e != nil {
err = gserrors.Newf(nil, "catch unhandle error :%s", e)
}
}()
ret, err = context.handler.MessageReceived(context, message)
return
}