This repository has been archived by the owner on Aug 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
proxy.go
164 lines (131 loc) · 3.25 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package client
import (
"github.com/cr-mao/lorig/cluster"
"github.com/cr-mao/lorig/errors"
"github.com/cr-mao/lorig/packet"
)
var (
ErrClientShut = errors.New("client is shut")
ErrConnectionClosed = errors.New("connection closed")
)
type Proxy interface {
// GetID 获取当前节点ID
GetID() string
// AddRouteHandler 添加路由处理器
AddRouteHandler(route int32, handler RouteHandler)
// SetDefaultRouteHandler 设置默认路由处理器,所有未注册的路由均走默认路由处理器
SetDefaultRouteHandler(handler RouteHandler)
// AddEventListener 添加事件监听器
AddEventListener(event cluster.Event, handler EventHandler)
// Bind 绑定用户ID
Bind(uid int64) error
// Unbind 解绑用户ID
Unbind() error
// Push 推送消息
Push(seq, route int32, message interface{}) error
// Reconnect 重新连接
Reconnect() error
// Disconnect 断开连接
Disconnect() error
}
var _ Proxy = &proxy{}
type proxy struct {
client *Client // 节点
}
func newProxy(client *Client) *proxy {
return &proxy{client: client}
}
// GetID 获取当前节点ID
func (p *proxy) GetID() string {
return p.client.opts.id
}
// AddRouteHandler 添加路由处理器
func (p *proxy) AddRouteHandler(route int32, handler RouteHandler) {
p.client.addRouteHandler(route, handler)
}
// SetDefaultRouteHandler 设置默认路由处理器,所有未注册的路由均走默认路由处理器
func (p *proxy) SetDefaultRouteHandler(handler RouteHandler) {
p.client.setDefaultRouteHandler(handler)
}
// AddEventListener 添加事件监听器
func (p *proxy) AddEventListener(event cluster.Event, handler EventHandler) {
p.client.addEventListener(event, handler)
}
// Bind 绑定用户ID
func (p *proxy) Bind(uid int64) error {
p.client.rw.RLock()
defer p.client.rw.RUnlock()
if p.client.state == cluster.Shut {
return ErrClientShut
}
if p.client.conn == nil {
return ErrConnectionClosed
}
p.client.conn.Bind(uid)
return nil
}
// Unbind 解绑用户ID
func (p *proxy) Unbind() error {
p.client.rw.RLock()
defer p.client.rw.RUnlock()
if p.client.state == cluster.Shut {
return ErrClientShut
}
if p.client.conn == nil {
return ErrConnectionClosed
}
p.client.conn.Unbind()
return nil
}
// Push 推送消息
func (p *proxy) Push(seq, route int32, message interface{}) error {
p.client.rw.RLock()
defer p.client.rw.RUnlock()
if p.client.state == cluster.Shut {
return ErrClientShut
}
if p.client.conn == nil {
return ErrConnectionClosed
}
var (
err error
buffer []byte
)
if message != nil {
buffer, err = p.client.opts.codec.Marshal(message)
if err != nil {
return err
}
}
if p.client.opts.encryptor != nil {
buffer, err = p.client.opts.encryptor.Encrypt(buffer)
if err != nil {
return err
}
}
msg, err := packet.Pack(&packet.Message{
Seq: seq,
Route: route,
Buffer: buffer,
})
if err != nil {
return err
}
return p.client.conn.Push(msg)
}
// Reconnect 重新连接
func (p *proxy) Reconnect() error {
return p.client.dial()
}
// Disconnect 断开连接
func (p *proxy) Disconnect() error {
p.client.rw.RLock()
defer p.client.rw.RUnlock()
if p.client.state == cluster.Shut {
return ErrClientShut
}
if p.client.conn == nil {
return ErrConnectionClosed
}
return p.client.conn.Close()
}