/
streamclient.go
95 lines (92 loc) · 2.29 KB
/
streamclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package proxy
import (
"context"
"fmt"
"io"
"reflect"
"runtime/debug"
"github.com/LLKennedy/mercury/httpapi"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
// Stream of structs in, one struct out
func (s *Server) handleClientStream(ctx context.Context, procType reflect.Type, caller reflect.Value, srv httpapi.ExposedService_ProxyStreamServer) (err error) {
defer func() {
r := recover()
if r != nil {
err = status.Errorf(codes.Internal, "caught panic for client stream: %v", r)
fmt.Printf("%s\n", debug.Stack())
}
}()
// Client streaming always starts by passing the context and nothing else to receive a stream + error
returnValues := caller.Call([]reflect.Value{reflect.ValueOf(ctx)})
// Parse our return values
var clientErr error
var client grpc.ClientStream
endpoint := returnValues[0]
if endpoint.CanInterface() {
var ok bool
client, ok = (endpoint.Interface()).(grpc.ClientStream)
if !ok {
clientErr = status.Errorf(codes.Internal, "response message could not be converted to grpc.ClientStream interface")
}
}
if returnValues[1].CanInterface() {
err, _ = returnValues[1].Interface().(error)
}
if err != nil {
_, ok := status.FromError(err)
if !ok {
err = status.Errorf(codes.Internal, "non-gRPC error returned when initiating stream: %v", err)
}
return
}
if clientErr != nil {
err = clientErr
return
}
// All worked as expected and without error, now we start proxying request messages
send := endpoint.MethodByName("Send")
recv := endpoint.MethodByName("CloseAndRecv")
sendT := send.Type()
reqT := sendT.In(0).Elem()
var req *httpapi.StreamedRequest
req, err = srv.Recv()
for err == nil {
msg := reflect.New(reqT).Interface().(proto.Message)
err = unmarshaller.Unmarshal(req.GetRequest(), msg)
if err != nil {
break
}
err = client.SendMsg(msg)
if err != nil {
break
}
req, err = srv.Recv()
}
if err == io.EOF {
err = client.CloseSend()
if err != nil {
return
}
var res proto.Message
res, err = wrapRecv(recv)
if err != nil {
return
}
var data []byte
data, err = marshaller.Marshal(res)
if err != nil {
return
}
err = srv.Send(&httpapi.StreamedResponse{
Response: data,
})
if err != nil {
return
}
}
return
}