-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
113 lines (90 loc) · 2.27 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package vsrpc
import (
"context"
"github.com/chronos-tachyon/assert"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/emptypb"
)
type BaseStream interface {
Call() *Call
Conn() *Conn
Context() context.Context
ID() ID
Method() Method
Cancel() error
}
type SendStream[T proto.Message] interface {
BaseStream
Send(in T) error
CloseSend() error
}
type RecvStream[T proto.Message] interface {
BaseStream
Recv(blocking bool, out T) (ok bool, done bool, err error)
}
type BiStream[T proto.Message, U proto.Message] interface {
BaseStream
SendStream[T]
RecvStream[U]
}
func NewSendStream[T proto.Message](call *Call) SendStream[T] {
return &implStream[T, *emptypb.Empty]{call: call}
}
func NewRecvStream[T proto.Message](call *Call) RecvStream[T] {
return &implStream[*emptypb.Empty, T]{call: call}
}
func NewBiStream[T proto.Message, U proto.Message](call *Call) BiStream[T, U] {
return &implStream[T, U]{call: call}
}
type implStream[T proto.Message, U proto.Message] struct {
call *Call
}
func (stream implStream[T, U]) Call() *Call {
return stream.call
}
func (stream implStream[T, U]) Conn() *Conn {
return stream.Call().Conn()
}
func (stream implStream[T, U]) Queue() *Queue {
return stream.Call().Queue()
}
func (stream implStream[T, U]) Context() context.Context {
return stream.Call().Context()
}
func (stream implStream[T, U]) ID() ID {
return stream.Call().ID()
}
func (stream implStream[T, U]) Method() Method {
return stream.Call().Method()
}
func (stream implStream[T, U]) Cancel() error {
return stream.Call().Cancel()
}
func (stream implStream[T, U]) Close() error {
return stream.Call().Close()
}
func (stream implStream[T, U]) Send(in T) error {
assert.NotNil(&in)
var out anypb.Any
if err := MarshalAny(&out, in); err != nil {
return err
}
return stream.Call().Send(&out)
}
func (stream implStream[T, U]) CloseSend() error {
return stream.Call().CloseSend()
}
func (stream implStream[T, U]) Recv(blocking bool, out U) (ok bool, done bool, err error) {
assert.NotNil(&out)
proto.Reset(out)
var payload *anypb.Any
payload, ok, done = stream.Queue().Recv(blocking)
if ok && payload != nil {
err = UnmarshalAny(out, payload)
if err != nil {
ok = false
}
}
return
}