/
stream.go
122 lines (101 loc) · 3.07 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package grpcext
import (
"encoding/json"
"errors"
"fmt"
"io"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/dynamicpb"
)
// Stream is the wrapper around the grpc.ClientStream
// with some handy methods.
type Stream struct {
method string
methodDescriptor protoreflect.MethodDescriptor
raw grpc.ClientStream
marshaler protojson.MarshalOptions
}
// ErrCanceled canceled by client (k6)
var ErrCanceled = errors.New("canceled by client (k6)")
// ReceiveConverted receives a converted message from the stream
// if the stream has been closed successfully, it returns io.EOF
// if the stream has been cancelled, it returns ErrCanceled
func (s *Stream) ReceiveConverted() (map[string]interface{}, error) {
raw, err := s.receive()
if err != nil && !errors.Is(err, io.EOF) {
return nil, err
}
msg, errConv := s.convert(raw)
if errConv != nil {
return nil, errConv
}
return msg, err
}
func (s *Stream) receive() (*dynamicpb.Message, error) {
msg := dynamicpb.NewMessage(s.methodDescriptor.Output())
err := s.raw.RecvMsg(msg)
// io.EOF means that the stream has been closed successfully
if err == nil || errors.Is(err, io.EOF) {
return msg, err
}
sterr := status.Convert(err)
if sterr.Code() == codes.Canceled {
return nil, ErrCanceled
}
return nil, err
}
// convert converts the message to the map[string]interface{} format
// which could be returned to the JS
// there is a lot of marshaling/unmarshaling here, but if we just pass the dynamic message
// the default Marshaller would be used, which would strip any zero/default values from the JSON.
// eg. given this message:
//
// message Point {
// double x = 1;
// double y = 2;
// double z = 3;
// }
//
// and a value like this:
// msg := Point{X: 6, Y: 4, Z: 0}
// would result in JSON output:
// {"x":6,"y":4}
// rather than the desired:
// {"x":6,"y":4,"z":0}
func (s *Stream) convert(msg *dynamicpb.Message) (map[string]interface{}, error) {
// TODO(olegbespalov): add the test that checks that message is not nil
raw, err := s.marshaler.Marshal(msg)
if err != nil {
return nil, fmt.Errorf("failed to marshal the message: %w", err)
}
back := make(map[string]interface{})
err = json.Unmarshal(raw, &back)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal the message: %w", err)
}
return back, err
}
// CloseSend closes the stream
func (s *Stream) CloseSend() error {
return s.raw.CloseSend()
}
// BuildMessage builds a message from the input
func (s *Stream) buildMessage(b []byte) (*dynamicpb.Message, error) {
msg := dynamicpb.NewMessage(s.methodDescriptor.Input())
if err := protojson.Unmarshal(b, msg); err != nil {
return nil, fmt.Errorf("can't serialise request object to protocol buffer: %w", err)
}
return msg, nil
}
// Send sends the message to the stream
func (s *Stream) Send(b []byte) error {
msg, err := s.buildMessage(b)
if err != nil {
return err
}
return s.raw.SendMsg(msg)
}