forked from zrepl/zrepl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream_conn.go
193 lines (167 loc) · 5.03 KB
/
stream_conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package stream
import (
"bytes"
"context"
"fmt"
"io"
"sync"
"time"
"github.com/zrepl/zrepl/rpc/dataconn/heartbeatconn"
"github.com/zrepl/zrepl/rpc/dataconn/timeoutconn"
"github.com/zrepl/zrepl/zfs"
)
type Conn struct {
hc *heartbeatconn.Conn
// whether the per-conn readFrames goroutine completed
waitReadFramesDone chan struct{}
// filled by per-conn readFrames goroutine
frameReads chan readFrameResult
// readMtx serializes read stream operations because we inherently only
// support a single stream at a time over hc.
readMtx sync.Mutex
readClean bool
// writeMtx serializes write stream operations because we inherently only
// support a single stream at a time over hc.
writeMtx sync.Mutex
writeClean bool
}
var readMessageSentinel = fmt.Errorf("read stream complete")
type writeStreamToErrorUnknownState struct{}
func (e writeStreamToErrorUnknownState) Error() string {
return "dataconn read stream: connection is in unknown state"
}
func (e writeStreamToErrorUnknownState) IsReadError() bool { return true }
func (e writeStreamToErrorUnknownState) IsWriteError() bool { return false }
func Wrap(nc timeoutconn.Wire, sendHeartbeatInterval, peerTimeout time.Duration) *Conn {
hc := heartbeatconn.Wrap(nc, sendHeartbeatInterval, peerTimeout)
conn := &Conn{
hc: hc, readClean: true, writeClean: true,
waitReadFramesDone: make(chan struct{}),
frameReads: make(chan readFrameResult, 5), // FIXME constant
}
go conn.readFrames()
return conn
}
func isConnCleanAfterRead(res *ReadStreamError) bool {
return res == nil || res.Kind == ReadStreamErrorKindSource || res.Kind == ReadStreamErrorKindStreamErrTrailerEncoding
}
func isConnCleanAfterWrite(err error) bool {
return err == nil
}
var ErrReadFramesStopped = fmt.Errorf("stream: reading frames stopped")
func (c *Conn) readFrames() {
defer close(c.waitReadFramesDone)
defer close(c.frameReads)
readFrames(c.frameReads, c.hc)
}
func (c *Conn) ReadStreamedMessage(ctx context.Context, maxSize uint32, frameType uint32) ([]byte, *ReadStreamError) {
c.readMtx.Lock()
defer c.readMtx.Unlock()
if !c.readClean {
return nil, &ReadStreamError{
Kind: ReadStreamErrorKindConn,
Err: fmt.Errorf("dataconn read message: connection is in unknown state"),
}
}
r, w := io.Pipe()
var buf bytes.Buffer
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
lr := io.LimitReader(r, int64(maxSize))
if _, err := io.Copy(&buf, lr); err != nil && err != readMessageSentinel {
panic(err)
}
}()
err := readStream(c.frameReads, c.hc, w, frameType)
c.readClean = isConnCleanAfterRead(err)
_ = w.CloseWithError(readMessageSentinel) // always returns nil
wg.Wait()
if err != nil {
return nil, err
} else {
return buf.Bytes(), nil
}
}
// WriteStreamTo reads a stream from Conn and writes it to w.
func (c *Conn) ReadStreamInto(w io.Writer, frameType uint32) zfs.StreamCopierError {
c.readMtx.Lock()
defer c.readMtx.Unlock()
if !c.readClean {
return writeStreamToErrorUnknownState{}
}
var err *ReadStreamError = readStream(c.frameReads, c.hc, w, frameType)
c.readClean = isConnCleanAfterRead(err)
// https://golang.org/doc/faq#nil_error
if err == nil {
return nil
}
return err
}
func (c *Conn) WriteStreamedMessage(ctx context.Context, buf io.Reader, frameType uint32) error {
c.writeMtx.Lock()
defer c.writeMtx.Unlock()
if !c.writeClean {
return fmt.Errorf("dataconn write message: connection is in unknown state")
}
errBuf, errConn := writeStream(ctx, c.hc, buf, frameType)
if errBuf != nil {
panic(errBuf)
}
c.writeClean = isConnCleanAfterWrite(errConn)
return errConn
}
func (c *Conn) SendStream(ctx context.Context, src zfs.StreamCopier, frameType uint32) error {
c.writeMtx.Lock()
defer c.writeMtx.Unlock()
if !c.writeClean {
return fmt.Errorf("dataconn send stream: connection is in unknown state")
}
// avoid io.Pipe if zfs.StreamCopier is an io.Reader
var r io.Reader
var w *io.PipeWriter
streamCopierErrChan := make(chan zfs.StreamCopierError, 1)
if reader, ok := src.(io.Reader); ok {
r = reader
streamCopierErrChan <- nil
close(streamCopierErrChan)
} else {
r, w = io.Pipe()
go func() {
streamCopierErrChan <- src.WriteStreamTo(w)
w.Close()
}()
}
type writeStreamRes struct {
errStream, errConn error
}
writeStreamErrChan := make(chan writeStreamRes, 1)
go func() {
var res writeStreamRes
res.errStream, res.errConn = writeStream(ctx, c.hc, r, frameType)
if w != nil {
_ = w.CloseWithError(res.errStream) // always returns nil
}
writeStreamErrChan <- res
}()
writeRes := <-writeStreamErrChan
streamCopierErr := <-streamCopierErrChan
c.writeClean = isConnCleanAfterWrite(writeRes.errConn) // TODO correct?
if streamCopierErr != nil && streamCopierErr.IsReadError() {
return streamCopierErr // something on our side is bad
} else {
if writeRes.errStream != nil {
return writeRes.errStream
} else if writeRes.errConn != nil {
return writeRes.errConn
}
// TODO combined error?
return streamCopierErr
}
}
func (c *Conn) Close() error {
err := c.hc.Shutdown()
<-c.waitReadFramesDone
return err
}