-
Notifications
You must be signed in to change notification settings - Fork 0
/
grpc_client.go
106 lines (96 loc) · 2.66 KB
/
grpc_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package arpcnet
import (
"context"
"io"
"github.com/blachris/arpcnet/rpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
// GRPCClient is an RPC exit module of a Node that relays RPCs as gRPC calls to an actual gRPC server.
type GRPCClient struct {
conn *grpc.ClientConn
route *rpc.MultiRPCHandler
mountPrefixLen int
}
// NewGRPCClient creates a new instance of a GRPCClient and associates it with the given core.
// The mountPrefixLen parameter is used to split the actual gRPC full method name from the AprcNet address
// and must be set to the length of address where this route handler is mounted at the core.
func NewGRPCClient(core *rpc.Core, mountPrefixLen int, target string, insecure bool) *GRPCClient {
var conn *grpc.ClientConn
var err error
if insecure {
conn, err = grpc.Dial(target, grpc.WithInsecure())
} else {
conn, err = grpc.Dial(target)
}
if err != nil {
panic(err)
}
gc := &GRPCClient{conn, nil, mountPrefixLen}
gc.route = rpc.NewServerCallHandler(0, "grpc://"+target, context.Background(), core.MemMan(), gc.handleServerCall)
return gc
}
// Close terminates operation of this client.
func (gc *GRPCClient) Close() {
gc.route.Close()
gc.conn.Close()
}
// Handler returns the rpc.Handler interface of this client instance for hooking it into a Core.
func (gc *GRPCClient) Handler() rpc.Handler {
return gc.route
}
var clientStreamDesc = &grpc.StreamDesc{
ServerStreams: true,
ClientStreams: true,
}
func (gc *GRPCClient) handleServerCall(serverCall *rpc.ServerCall) {
clientCtx, cancelClient := context.WithCancel(serverCall.Ctx())
fullMethodNameAddr := serverCall.RPC().FullID().Dest().Slice(gc.mountPrefixLen, -1)
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDesc, gc.conn, ToFullMethodName(fullMethodNameAddr), grpc.CallCustomCodec(&rawCodec{}))
if err != nil {
go serverCall.FinishExt(codes.Internal, err.Error(), []string{})
cancelClient()
return
}
go func() {
for {
data, err := serverCall.Receive()
if _, ok := err.(rpc.HalfCloseError); ok {
clientStream.CloseSend()
return
} else if err != nil {
cancelClient()
return
}
m := rawMessage{data}
err = clientStream.SendMsg(&m)
if err == io.EOF {
serverCall.Finish()
return
} else if err != nil {
// TODO finish ext code
serverCall.Finish()
return
}
}
}()
go func() {
defer cancelClient()
for {
m := rawMessage{}
err := clientStream.RecvMsg(&m)
if err == io.EOF {
serverCall.Finish()
return
} else if err != nil {
// TODO finish ext code
serverCall.Finish()
return
}
err = serverCall.Send(m.data)
if err != nil {
return
}
}
}()
}