-
Notifications
You must be signed in to change notification settings - Fork 2
/
lrpc_client.go
105 lines (92 loc) · 1.78 KB
/
lrpc_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package lrpc
import (
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
)
type RPCFunc func(arg ...interface{})
type RPCClient struct {
count int32
addr string
port string
buffer []byte
conn net.Conn
funcMap *sync.Map
}
func NewRPCClient(addr, port string) *RPCClient {
return &RPCClient{
count: 0,
buffer: make([]byte, 0),
addr: addr,
port: port,
funcMap: &sync.Map{},
}
}
func (this *RPCClient) Dial() {
var err error
this.conn, err = net.Dial("tcp", this.addr+":"+this.port)
if err != nil {
panic(err)
}
go func() {
for {
b := make([]byte, 512)
n, err := this.conn.Read(b)
if err == nil {
if n <= 0 {
continue
}
b = b[:n]
var results []*RPCResult
this.buffer, results = handleResultByte(b)
if results != nil {
this.handleResults(results)
}
} else {
fmt.Println(err)
break
}
}
this.conn.Close()
}()
}
// 处理消息
func (this *RPCClient) handleResults(results []*RPCResult) {
for i := range results {
r := results[i]
if e, ok := this.funcMap.Load(r.Seq); ok {
fn := e.(RPCFunc)
if r.Return != nil {
fn(r.Return...)
} else {
fn()
}
this.funcMap.Delete(r.Seq)
}
}
}
// 远程调用(无返回值)
// method: class.method
func (this *RPCClient) Call(method string, args []interface{}) {
this.CallReply(method, args, nil)
}
// 远程调用(有返回值)
// method: class.method
func (this *RPCClient) CallReply(method string, args []interface{}, fn RPCFunc) {
m := strings.Split(method, ".")
atomic.AddInt32(&this.count, 1)
call := &RPCCall{
Class: m[0],
Method: m[1],
Args: args,
Seq: int(atomic.LoadInt32(&this.count)),
}
if fn != nil {
this.funcMap.Store(call.Seq, fn)
}
b := EncodeCallMsg(call)
b = AssembleBuffer(b)
this.conn.Write(b)
}