-
Notifications
You must be signed in to change notification settings - Fork 5
/
stream-pipe.go
86 lines (76 loc) · 1.85 KB
/
stream-pipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package srpc
import (
"context"
"io"
"sync"
)
// pipeStream implements an in-memory stream.
// intended for testing
type pipeStream struct {
ctx context.Context
ctxCancel context.CancelFunc
// other is the other end of the stream.
other *pipeStream
// closeOnce ensures we close only once.
closeOnce sync.Once
// dataCh is the data channel
dataCh chan []byte
}
// NewPipeStream constructs a new in-memory stream.
func NewPipeStream(ctx context.Context) (Stream, Stream) {
s1 := &pipeStream{dataCh: make(chan []byte, 5)}
s1.ctx, s1.ctxCancel = context.WithCancel(ctx)
s2 := &pipeStream{other: s1, dataCh: make(chan []byte, 5)}
s2.ctx, s2.ctxCancel = context.WithCancel(ctx)
s1.other = s2
return s1, s2
}
// Context is canceled when the Stream is no longer valid.
func (p *pipeStream) Context() context.Context {
return p.ctx
}
// MsgSend sends the message to the remote.
func (p *pipeStream) MsgSend(msg Message) error {
data, err := msg.MarshalVT()
if err != nil {
return err
}
select {
case <-p.ctx.Done():
return context.Canceled
case p.other.dataCh <- data:
return nil
}
}
// MsgRecv receives an incoming message from the remote.
// Parses the message into the object at msg.
func (p *pipeStream) MsgRecv(msg Message) error {
select {
case <-p.ctx.Done():
return context.Canceled
case data, ok := <-p.dataCh:
if !ok {
return io.EOF
}
return msg.UnmarshalVT(data)
}
}
// CloseSend signals to the remote that we will no longer send any messages.
func (p *pipeStream) CloseSend() error {
p.closeRemote()
return nil
}
// Close closes the stream.
func (p *pipeStream) Close() error {
p.ctxCancel()
p.closeRemote()
return nil
}
// closeRemote closes the remote data channel.
func (p *pipeStream) closeRemote() {
p.closeOnce.Do(func() {
close(p.other.dataCh)
})
}
// _ is a type assertion
var _ Stream = ((*pipeStream)(nil))