-
Notifications
You must be signed in to change notification settings - Fork 5
/
server-rpc.go
94 lines (83 loc) · 2.29 KB
/
server-rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package srpc
import (
"context"
"github.com/pkg/errors"
)
// ServerRPC represents the server side of an on-going RPC call message stream.
type ServerRPC struct {
commonRPC
// invoker is the rpc call invoker
invoker Invoker
}
// NewServerRPC constructs a new ServerRPC session.
// note: call SetWriter before handling any incoming messages.
func NewServerRPC(ctx context.Context, invoker Invoker, writer PacketWriter) *ServerRPC {
rpc := &ServerRPC{invoker: invoker}
initCommonRPC(ctx, &rpc.commonRPC)
rpc.writer = writer
return rpc
}
// HandlePacketData handles an incoming unparsed message packet.
func (r *ServerRPC) HandlePacketData(data []byte) error {
msg := &Packet{}
if err := msg.UnmarshalVT(data); err != nil {
return err
}
return r.HandlePacket(msg)
}
// HandlePacket handles an incoming parsed message packet.
func (r *ServerRPC) HandlePacket(msg *Packet) error {
if msg == nil {
return nil
}
if err := msg.Validate(); err != nil {
return err
}
switch b := msg.GetBody().(type) {
case *Packet_CallStart:
return r.HandleCallStart(b.CallStart)
case *Packet_CallData:
return r.HandleCallData(b.CallData)
case *Packet_CallCancel:
if b.CallCancel {
return r.HandleCallCancel()
}
return nil
default:
return nil
}
}
// HandleCallStart handles the call start packet.
func (r *ServerRPC) HandleCallStart(pkt *CallStart) error {
r.mtx.Lock()
defer r.mtx.Unlock()
// process start: method and service
if r.method != "" || r.service != "" {
return errors.New("call start must be sent only once")
}
if r.dataClosed {
return ErrCompleted
}
service, method := pkt.GetRpcService(), pkt.GetRpcMethod()
r.service, r.method = service, method
// process first data packet, if included
if data := pkt.GetData(); len(data) != 0 || pkt.GetDataIsZero() {
r.dataQueue = append(r.dataQueue, data)
}
// invoke the rpc
r.bcast.Broadcast()
go r.invokeRPC(service, method)
return nil
}
// invokeRPC invokes the RPC after CallStart is received.
func (r *ServerRPC) invokeRPC(serviceID, methodID string) {
strm := NewMsgStream(r.ctx, r, r.ctxCancel)
ok, err := r.invoker.InvokeMethod(serviceID, methodID, strm)
if err == nil && !ok {
err = ErrUnimplemented
}
outPkt := NewCallDataPacket(nil, false, true, err)
_ = r.writer.WritePacket(outPkt)
_ = r.writer.Close()
r.ctxCancel()
}