/
notifier.go
121 lines (112 loc) · 3.04 KB
/
notifier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package twelve
import (
"github.com/hootuu/tome/nd"
"github.com/hootuu/utils/errors"
"github.com/hootuu/utils/sys"
"go.uber.org/zap"
)
const (
NotifierMsgBufSize = 102400
)
type IListener interface {
OnRequest(msg *Letter) *errors.Error
OnPrepare(msg *Letter) *errors.Error
OnCommitted(msg *Letter) *errors.Error
OnConfirmed(msg *Letter) *errors.Error
OnInvariable(msg *Letter) *errors.Error
}
type Notifier struct {
node ITwelveNode
listener IListener
buf chan *Letter
lf *LineFactory
}
func NewNotifier(node ITwelveNode) *Notifier {
if node == nil {
sys.Error("require node")
}
ntf := &Notifier{
node: node,
buf: make(chan *Letter, NotifierMsgBufSize),
lf: NewLineFactory(),
}
node.Register(ntf)
return ntf
}
func (n *Notifier) BindListener(listener IListener) {
n.listener = listener
}
func (n *Notifier) On(letter *Letter) *errors.Error {
if letter == nil {
return errors.Verify("require letter, it is nil")
}
if sys.RunMode.IsRd() {
gLogger.Info("REV LETTER", zap.String("peer", letter.From.S()),
zap.String("here", nd.Here().ID.S()))
}
//if letter.From.IsHere() {
// return nil
//}
select {
case n.buf <- letter:
return nil
default:
return errors.Sys("The buffer is full")
}
}
func (n *Notifier) Notify(letter *Letter) *errors.Error {
return n.node.Notify(letter)
}
func (n *Notifier) Listening() {
go func() {
for {
sys.Info("========>>>>> Listening.....")
letter := <-n.buf
if sys.RunMode.IsRd() {
gLogger.Info("Notifier.Listening", zap.Any("letter", letter))
}
if letter.Arrow == InvariableArrow {
sys.Info("========>>>>> Listening.OnInvariable.1....")
err := n.listener.OnInvariable(letter)
sys.Info("========>>>>> Listening.OnInvariable.2....")
if err != nil {
gLogger.Error("Listening.OnInvariable error", zap.Error(err))
sys.Error("Listening.OnInvariable error", err.Error())
}
continue
}
line := n.lf.MustGet(letter)
err := line.RunOrRegister(letter, func(letter *Letter) *errors.Error {
var innerErr *errors.Error
switch letter.Arrow {
case RequestArrow:
sys.Info("========>>>>> Listening.OnRequest.1....")
innerErr = n.listener.OnRequest(letter)
sys.Info("========>>>>> Listening.OnRequest.2....")
case PrepareArrow:
sys.Info("========>>>>> Listening.OnPrepare.1....")
innerErr = n.listener.OnPrepare(letter)
sys.Info("========>>>>> Listening.OnPrepare.2....")
case CommittedArrow:
sys.Info("========>>>>> Listening.OnCommitted.1....")
innerErr = n.listener.OnCommitted(letter)
sys.Info("========>>>>> Listening.OnCommitted.2....")
case ConfirmedArrow:
sys.Info("========>>>>> Listening.OnConfirmed.1....")
innerErr = n.listener.OnConfirmed(letter)
sys.Info("========>>>>> Listening.OnConfirmed.2....")
}
if innerErr != nil {
return innerErr
}
return nil
})
if err != nil {
gLogger.Error("line.RunOrRegister error", zap.Error(err))
sys.Error("line.RunOrRegister error", err.Error())
//n.buf <- letter
continue
}
}
}()
}