forked from apache/incubator-seata-go
/
getty_client_session_manager.go
101 lines (87 loc) · 2.3 KB
/
getty_client_session_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package rpc_client
import (
"sync"
"sync/atomic"
"time"
)
import (
getty "github.com/apache/dubbo-getty"
)
var (
MAX_CHECK_ALIVE_RETRY = 600
CHECK_ALIVE_INTERNAL = 100
allSessions = sync.Map{}
// serverAddress -> rpc_client.Session -> bool
serverSessions = sync.Map{}
sessionSize int32 = 0
clientSessionManager = &GettyClientSessionManager{}
)
type GettyClientSessionManager struct{}
func (sessionManager *GettyClientSessionManager) AcquireGettySession() getty.Session {
// map 遍历是随机的
var session getty.Session
allSessions.Range(func(key, value interface{}) bool {
session = key.(getty.Session)
if session.IsClosed() {
sessionManager.ReleaseGettySession(session)
} else {
return false
}
return true
})
if session != nil {
return session
}
if sessionSize == 0 {
ticker := time.NewTicker(time.Duration(CHECK_ALIVE_INTERNAL) * time.Millisecond)
defer ticker.Stop()
for i := 0; i < MAX_CHECK_ALIVE_RETRY; i++ {
<-ticker.C
allSessions.Range(func(key, value interface{}) bool {
session = key.(getty.Session)
if session.IsClosed() {
sessionManager.ReleaseGettySession(session)
} else {
return false
}
return true
})
if session != nil {
return session
}
}
}
return nil
}
func (sessionManager *GettyClientSessionManager) AcquireGettySessionByServerAddress(serverAddress string) getty.Session {
m, _ := serverSessions.LoadOrStore(serverAddress, &sync.Map{})
sMap := m.(*sync.Map)
var session getty.Session
sMap.Range(func(key, value interface{}) bool {
session = key.(getty.Session)
if session.IsClosed() {
sessionManager.ReleaseGettySession(session)
} else {
return false
}
return true
})
return session
}
func (sessionManager *GettyClientSessionManager) ReleaseGettySession(session getty.Session) {
allSessions.Delete(session)
if !session.IsClosed() {
m, _ := serverSessions.LoadOrStore(session.RemoteAddr(), &sync.Map{})
sMap := m.(*sync.Map)
sMap.Delete(session)
session.Close()
}
atomic.AddInt32(&sessionSize, -1)
}
func (sessionManager *GettyClientSessionManager) RegisterGettySession(session getty.Session) {
allSessions.Store(session, true)
m, _ := serverSessions.LoadOrStore(session.RemoteAddr(), &sync.Map{})
sMap := m.(*sync.Map)
sMap.Store(session, true)
atomic.AddInt32(&sessionSize, 1)
}