/
proxystream.go
68 lines (64 loc) · 2.1 KB
/
proxystream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package proxy
import (
"fmt"
"reflect"
"runtime/debug"
"github.com/LLKennedy/httpgrpc/httpapi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
// ProxyStream streams requests and responses in both directions in any order
func (s *Server) ProxyStream(srv httpapi.ExposedService_ProxyStreamServer) (err error) {
wrapErr := func(code codes.Code, err error) error {
if err == nil {
return nil
}
return status.Error(code, fmt.Sprintf("httpgrpc: %v", err))
}
defer func() {
if r := recover(); r != nil {
err = wrapErr(codes.Internal, fmt.Errorf("caught panic %v", r))
fmt.Printf("%s\n", debug.Stack())
}
}()
ctx := srv.Context()
initMsg, err := srv.Recv()
if err != nil {
return wrapErr(codes.InvalidArgument, fmt.Errorf("could not receive initial routing message in ProxyClientStream: %v", err))
}
switch initMsg.GetMessageType().(type) {
case *httpapi.StreamedRequest_Init:
break
case *httpapi.StreamedRequest_Request:
return wrapErr(codes.InvalidArgument, fmt.Errorf("first message in ProxyClientStream must be Init"))
}
msg := initMsg.GetMessageType().(*httpapi.StreamedRequest_Init).Init
procType, caller, pattern, err := s.findProc(msg.GetMethod(), msg.GetProcedure())
if err != nil {
return wrapErr(codes.Unimplemented, err)
}
switch pattern {
case apiMethodPatternStreamStream:
err = s.handleDualStream(ctx, procType, caller, srv)
case apiMethodPatternStreamStruct:
err = s.handleClientStream(ctx, procType, caller, srv)
case apiMethodPatternStructStream:
err = s.handleServerStream(ctx, procType, caller, srv)
case apiMethodPatternStructStruct:
err = wrapErr(codes.Unimplemented, fmt.Errorf("ProxyStream called for non-stream RPC"))
case apiMethodPatternUnknown:
fallthrough
default:
err = wrapErr(codes.Unimplemented, fmt.Errorf("nonstandard grpc signature not implemented"))
}
return err
}
func wrapRecv(recv reflect.Value) (proto.Message, error) {
resVals := recv.Call(nil)
errVal := resVals[1].Interface()
if errVal != nil {
return nil, errVal.(error)
}
return resVals[0].Interface().(proto.Message), nil
}