-
Notifications
You must be signed in to change notification settings - Fork 3
/
node_connection.go
95 lines (80 loc) · 2.13 KB
/
node_connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package cluster
import (
"code.google.com/p/go.net/websocket"
"encoding/gob"
"github.com/bixi/kylin/message"
"log"
"sync"
)
type NodeConnection interface {
Info() NodeInfo
Start() error
Stop() error
AddMessageHandler(MessageHandler)
Send(message interface{})
Conn() *websocket.Conn
WaitForDone()
}
type nodeConnection struct {
sync.Mutex
gobEncoder *gob.Encoder
gobDecoder *gob.Decoder
localNode Node
info NodeInfo
transporter message.Transporter
conn *websocket.Conn
messageHandlers []MessageHandler
}
type MessageHandler func(message interface{}) (handled bool)
func newNodeConnection(localNode Node, conn *websocket.Conn, info NodeInfo) NodeConnection {
nc := &nodeConnection{}
nc.info = info
nc.conn = conn
nc.localNode = localNode
nc.gobEncoder = gob.NewEncoder(conn)
nc.gobDecoder = gob.NewDecoder(conn)
nc.transporter = message.NewTransporter(nc, nc, nc, nc)
return nc
}
func (nc *nodeConnection) Conn() *websocket.Conn {
return nc.conn
}
func (nc *nodeConnection) Info() NodeInfo {
return nc.info
}
func (nc *nodeConnection) Start() error {
return nc.transporter.Start()
}
func (nc *nodeConnection) Stop() error {
return nc.transporter.Stop()
}
func (nc *nodeConnection) Send(message interface{}) {
nc.transporter.Send(message)
}
func (nc *nodeConnection) AddMessageHandler(handler MessageHandler) {
nc.Lock()
defer nc.Unlock()
nc.messageHandlers = append(nc.messageHandlers, handler)
}
func (nc *nodeConnection) WaitForDone() {
nc.transporter.WaitForDone()
}
func (nc *nodeConnection) Encode(message interface{}) error {
return nc.gobEncoder.Encode(&message)
}
func (nc *nodeConnection) Decode(message interface{}) error {
return nc.gobDecoder.Decode(message)
}
func (nc *nodeConnection) OnError(err error) {
log.Printf("node %v error:%v \n", nc.Conn().RemoteAddr(), err)
}
func (nc *nodeConnection) Dispatch(message interface{}) error {
nc.Lock()
defer nc.Unlock()
for i := 0; i < len(nc.messageHandlers); i++ {
if nc.messageHandlers[i](message) {
break
}
}
return nil
}