This repository has been archived by the owner on Aug 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
185 lines (153 loc) · 3.62 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package client
import (
"context"
"sync"
"github.com/cr-mao/lorig/cluster"
"github.com/cr-mao/lorig/component"
"github.com/cr-mao/lorig/log"
"github.com/cr-mao/lorig/network"
"github.com/cr-mao/lorig/packet"
)
type RouteHandler func(req Request)
type EventHandler func(proxy Proxy)
type Client struct {
component.Base
opts *options
ctx context.Context
cancel context.CancelFunc
routes map[int32]RouteHandler
events map[cluster.Event]EventHandler
defaultRouteHandler RouteHandler
proxy *proxy
rw sync.RWMutex
state cluster.State
conn network.Conn
}
func NewClient(opts ...Option) *Client {
o := defaultOptions()
for _, opt := range opts {
opt(o)
}
c := &Client{}
c.opts = o
c.proxy = newProxy(c)
c.routes = make(map[int32]RouteHandler)
c.events = make(map[cluster.Event]EventHandler)
c.state = cluster.Shut
c.ctx, c.cancel = context.WithCancel(o.ctx)
return c
}
// Name 组件名称
func (c *Client) Name() string {
return c.opts.name
}
// Init 初始化节点
func (c *Client) Init() {
if c.opts.client == nil {
log.Fatal("client plugin is not injected")
}
if c.opts.codec == nil {
log.Fatal("codec plugin is not injected")
}
c.state = cluster.Work
}
// Start 启动组件
func (c *Client) Start() {
c.opts.client.OnConnect(c.handleConnect)
c.opts.client.OnDisconnect(c.handleDisconnect)
c.opts.client.OnReceive(c.handleReceive)
if err := c.dial(); err != nil {
log.Fatalf("connect server failed: %v", err)
}
}
// Destroy 销毁组件
func (c *Client) Destroy() {
c.rw.Lock()
c.conn = nil
c.state = cluster.Shut
c.rw.Unlock()
}
// Proxy 获取节点代理
func (c *Client) Proxy() Proxy {
return c.proxy
}
// 处理连接打开
func (c *Client) handleConnect(conn network.Conn) {
c.rw.Lock()
isNew := c.conn == nil
c.conn = conn
c.rw.Unlock()
var (
ok bool
handler EventHandler
)
if !isNew {
handler, ok = c.events[cluster.Reconnect]
}
if !ok {
handler, ok = c.events[cluster.Connect]
}
if !ok {
return
}
handler(c.proxy)
}
// 处理断开连接
func (c *Client) handleDisconnect(_ network.Conn) {
handler, ok := c.events[cluster.Disconnect]
if !ok {
return
}
handler(c.proxy)
}
// 处理接收到的消息
func (c *Client) handleReceive(_ network.Conn, data []byte) {
message, err := packet.Unpack(data)
if err != nil {
log.Errorf("unpack message failed: %v", err)
return
}
handler, ok := c.routes[message.Route]
if ok {
handler(&request{client: c, message: message})
} else if c.defaultRouteHandler != nil {
c.defaultRouteHandler(&request{client: c, message: message})
} else {
log.Errorf("route handler is not registered, route:%v", message.Route)
}
}
// 拨号
func (c *Client) dial() error {
c.rw.RLock()
isShut := c.state == cluster.Shut
c.rw.RUnlock()
if isShut {
return ErrClientShut
}
_, err := c.opts.client.Dial()
return err
}
// 添加路由处理器
func (c *Client) addRouteHandler(route int32, handler RouteHandler) {
if c.state == cluster.Shut {
c.routes[route] = handler
} else {
log.Warnf("client is working, can't add route handler")
}
}
// 默认路由处理器
func (c *Client) setDefaultRouteHandler(handler RouteHandler) {
if c.state == cluster.Shut {
c.defaultRouteHandler = handler
} else {
log.Warnf("client is working, can't set default route handler")
}
}
// 添加事件处理器
func (c *Client) addEventListener(event cluster.Event, handler EventHandler) {
if c.state == cluster.Shut {
c.events[event] = handler
} else {
log.Warnf("client is working, can't add event handler")
}
}