forked from vanadium-archive/go.ref
/
proxy_invoker.go
251 lines (230 loc) · 7.81 KB
/
proxy_invoker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package impl
import (
"fmt"
"io"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/glob"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security/access"
"v.io/v23/vdl"
"v.io/v23/vdlroot/signature"
"v.io/v23/verror"
)
var (
errCantUpgradeServerCall = verror.Register(pkgPath+".errCantUpgradeServerCall", verror.NoRetry, "{1:}{2:} couldn't upgrade rpc.ServerCall to rpc.StreamServerCall{:_}")
errBadNumberOfResults = verror.Register(pkgPath+".errBadNumberOfResults", verror.NoRetry, "{1:}{2:} unexpected number of result values. Got {3}, want 2.{:_}")
errBadErrorType = verror.Register(pkgPath+".errBadErrorType", verror.NoRetry, "{1:}{2:} unexpected error type. Got {3}, want error.{:_}")
errWantSigInterfaceSlice = verror.Register(pkgPath+".errWantSigInterfaceSlice", verror.NoRetry, "{1:}{2:} unexpected result value type. Got {3}, want []signature.Interface.{:_}")
errWantSigMethod = verror.Register(pkgPath+".errWantSigMethod", verror.NoRetry, "{1:}{2:} unexpected result value type. Got {3}, want signature.Method.{:_}")
errUnknownMethod = verror.Register(pkgPath+".errUnknownMethod", verror.NoRetry, "{1:}{2:} unknown method{:_}")
)
// proxyInvoker is an rpc.Invoker implementation that proxies all requests
// to a remote object, i.e. requests to <suffix> are forwarded to
// <remote> transparently.
//
// remote is the name of the remote object.
// access is the access tag require to access the object.
// desc is used to determine the number of results for a given method.
func newProxyInvoker(remote string, access access.Tag, desc []rpc.InterfaceDesc) *proxyInvoker {
methodNumResults := make(map[string]int)
for _, iface := range desc {
for _, method := range iface.Methods {
methodNumResults[method.Name] = len(method.OutArgs)
}
}
return &proxyInvoker{remote, access, methodNumResults}
}
type proxyInvoker struct {
remote string
access access.Tag
methodNumResults map[string]int
}
var _ rpc.Invoker = (*proxyInvoker)(nil)
func (p *proxyInvoker) Prepare(_ *context.T, method string, numArgs int) (argptrs []interface{}, tags []*vdl.Value, _ error) {
// TODO(toddw): Change argptrs to be filled in with *vdl.Value, to avoid
// unnecessary type lookups.
argptrs = make([]interface{}, numArgs)
for i, _ := range argptrs {
var x interface{}
argptrs[i] = &x
}
tags = []*vdl.Value{vdl.ValueOf(p.access)}
return
}
func (p *proxyInvoker) Invoke(ctx *context.T, inCall rpc.StreamServerCall, method string, argptrs []interface{}) (results []interface{}, err error) {
// We accept any values as argument and pass them through to the remote
// server.
args := make([]interface{}, len(argptrs))
for i, ap := range argptrs {
args[i] = ap
}
client := v23.GetClient(ctx)
outCall, err := client.StartCall(ctx, p.remote, method, args)
if err != nil {
return nil, err
}
// Each RPC has a bi-directional stream, and there is no way to know in
// advance how much data will be sent in either direction, if any.
//
// This method (Invoke) must return when the remote server is done with
// the RPC, which is when outCall.Recv() returns EOF. When that happens,
// we need to call outCall.Finish() to get the return values, and then
// return these values to the client.
//
// While we are forwarding data from the server to the client, we must
// also forward data from the client to the server. This happens in a
// separate goroutine. This goroutine may return after Invoke has
// returned if the client doesn't call CloseSend() explicitly.
//
// Any error, other than EOF, will be returned to the client, if
// possible. The only situation where it is not possible to send an
// error to the client is when the error comes from forwarding data from
// the client to the server and Invoke has already returned or is about
// to return. In this case, the error is lost. So, it is possible that
// the client could successfully Send() data that the server doesn't
// actually receive if the server terminates the RPC while the data is
// in the proxy.
fwd := func(src, dst rpc.Stream, errors chan<- error) {
for {
var obj interface{}
switch err := src.Recv(&obj); err {
case io.EOF:
if call, ok := src.(rpc.ClientCall); ok {
if err := call.CloseSend(); err != nil {
errors <- err
}
}
return
case nil:
break
default:
errors <- err
return
}
if err := dst.Send(obj); err != nil {
errors <- err
return
}
}
}
errors := make(chan error, 2)
go fwd(inCall, outCall, errors)
fwd(outCall, inCall, errors)
select {
case err := <-errors:
return nil, err
default:
}
nResults, err := p.numResults(ctx, method)
if err != nil {
return nil, err
}
// We accept any return values, without type checking, and return them
// to the client.
res := make([]interface{}, nResults)
for i := 0; i < len(res); i++ {
var foo interface{}
res[i] = &foo
}
err = outCall.Finish(res...)
results = make([]interface{}, len(res))
for i, r := range res {
results[i] = *r.(*interface{})
}
return results, err
}
// TODO(toddw): Expose a helper function that performs all error checking based
// on reflection, to simplify the repeated logic processing results.
func (p *proxyInvoker) Signature(ctx *context.T, call rpc.ServerCall) ([]signature.Interface, error) {
streamCall, ok := call.(rpc.StreamServerCall)
if !ok {
return nil, verror.New(errCantUpgradeServerCall, ctx)
}
results, err := p.Invoke(ctx, streamCall, rpc.ReservedSignature, nil)
if err != nil {
return nil, err
}
if len(results) != 2 {
return nil, verror.New(errBadNumberOfResults, ctx, len(results))
}
if results[1] != nil {
err, ok := results[1].(error)
if !ok {
return nil, verror.New(errBadErrorType, ctx, fmt.Sprintf("%T", err))
}
return nil, err
}
var res []signature.Interface
if results[0] != nil {
sig, ok := results[0].([]signature.Interface)
if !ok {
return nil, verror.New(errWantSigInterfaceSlice, ctx, fmt.Sprintf("%T", sig))
}
}
return res, nil
}
func (p *proxyInvoker) MethodSignature(ctx *context.T, call rpc.ServerCall, method string) (signature.Method, error) {
empty := signature.Method{}
streamCall, ok := call.(rpc.StreamServerCall)
if !ok {
return empty, verror.New(errCantUpgradeServerCall, ctx)
}
results, err := p.Invoke(ctx, streamCall, rpc.ReservedMethodSignature, []interface{}{&method})
if err != nil {
return empty, err
}
if len(results) != 2 {
return empty, verror.New(errBadNumberOfResults, ctx, len(results))
}
if results[1] != nil {
err, ok := results[1].(error)
if !ok {
return empty, verror.New(errBadErrorType, ctx, fmt.Sprintf("%T", err))
}
return empty, err
}
var res signature.Method
if results[0] != nil {
sig, ok := results[0].(signature.Method)
if !ok {
return empty, verror.New(errWantSigMethod, ctx, fmt.Sprintf("%T", sig))
}
}
return res, nil
}
func (p *proxyInvoker) Globber() *rpc.GlobState {
return &rpc.GlobState{AllGlobber: p}
}
type call struct {
rpc.GlobServerCall
}
func (c *call) Recv(v interface{}) error {
return io.EOF
}
func (c *call) Send(v interface{}) error {
return c.SendStream().Send(v.(naming.GlobReply))
}
func (p *proxyInvoker) Glob__(ctx *context.T, serverCall rpc.GlobServerCall, g *glob.Glob) error {
pattern := g.String()
p.Invoke(ctx, &call{serverCall}, rpc.GlobMethod, []interface{}{&pattern})
return nil
}
// numResults returns the number of result values for the given method.
func (p *proxyInvoker) numResults(ctx *context.T, method string) (int, error) {
switch method {
case rpc.GlobMethod:
return 1, nil
case rpc.ReservedSignature, rpc.ReservedMethodSignature:
return 2, nil
}
num, ok := p.methodNumResults[method]
if !ok {
return 0, verror.New(errUnknownMethod, ctx, method)
}
return num, nil
}