This repository has been archived by the owner on Jun 23, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 64
/
conn.go
90 lines (66 loc) · 1.92 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package memsd
import (
"github.com/davyxu/cellmesh/discovery"
"github.com/davyxu/cellmesh/discovery/memsd/model"
"github.com/davyxu/cellmesh/discovery/memsd/proto"
"github.com/davyxu/cellnet"
"github.com/davyxu/cellnet/peer"
"github.com/davyxu/cellnet/proc"
"time"
)
func (self *memDiscovery) clearCache() {
self.svcCacheGuard.Lock()
self.svcCache = map[string][]*discovery.ServiceDesc{}
self.svcCacheGuard.Unlock()
self.kvCacheGuard.Lock()
self.kvCache = map[string][]byte{}
self.kvCacheGuard.Unlock()
}
func (self *memDiscovery) connect(addr string) {
p := peer.NewGenericPeer("tcp.Connector", "memsd", addr, model.Queue)
proc.BindProcessorHandler(p, "memsd.cli", func(ev cellnet.Event) {
switch msg := ev.Message().(type) {
case *cellnet.SessionConnected:
self.sesGuard.Lock()
self.ses = ev.Session()
self.sesGuard.Unlock()
self.clearCache()
ev.Session().Send(&proto.AuthREQ{
Token: self.token,
})
case *cellnet.SessionClosed:
log.Errorf("memsd discovery lost!")
case *proto.AuthACK:
self.token = msg.Token
if self.initWg != nil {
// Pull的消息还要在queue里处理,这里确认处理完成后才算初始化完成
self.initWg.Done()
}
log.Infof("memsd discovery ready!")
case *proto.ValueChangeNotifyACK:
if model.IsServiceKey(msg.Key) {
self.updateSvcCache(msg.SvcName, msg.Value)
} else {
self.updateKVCache(msg.Key, msg.Value)
}
case *proto.ValueDeleteNotifyACK:
if model.IsServiceKey(msg.Key) {
svcid := model.GetSvcIDByServiceKey(msg.Key)
self.deleteSvcCache(svcid, msg.SvcName)
} else {
self.deleteKVCache(msg.Key)
}
}
})
// noDelay
p.(cellnet.TCPSocketOption).SetSocketBuffer(1024*1024, 1024*1024, true)
// 断线后自动重连
p.(cellnet.TCPConnector).SetReconnectDuration(time.Second * 5)
p.Start()
for {
if p.(cellnet.PeerReadyChecker).IsReady() {
break
}
time.Sleep(time.Millisecond * 500)
}
}