-
Notifications
You must be signed in to change notification settings - Fork 0
/
writer_to_reader_adapter.go
187 lines (171 loc) · 5.47 KB
/
writer_to_reader_adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package io2
import (
"io"
)
// This class adapts an io.Reader into a io.WriteCloser
// It allows anyone to structure a zlib decompression as
// a write operation, rather than a read, making it more
// convenient to fit into a pipeline
type WriterToReaderAdapter struct {
workChan chan *[]byte
doneWorkChan chan *[]byte
errChan chan error
closed bool
}
// This is the reader interface that transforms the output of
// the internal io.Reader and lets that be io.Copy'd into the io.Writer output
type privateReaderAdapter struct {
bufferToBeRead []byte
workReceipt *[]byte
Writer WriterToReaderAdapter
closeReceived bool
}
// This makes a io.Writer from a io.Reader and begins reading and writing data
// The returned class is not thread safe and public methods must be called from a single thread
func NewWriterToReaderAdapter(toBeAdapted func(io.Reader) (io.Reader, error),
output io.Writer,
shouldCloseDownstream bool) io.WriteCloser {
retval := privateReaderAdapter{
Writer: WriterToReaderAdapter{
workChan: make(chan *[]byte),
doneWorkChan: make(chan *[]byte),
errChan: make(chan error, 3),
closed: false,
}}
go copyDataToOutput(toBeAdapted, &retval, output, shouldCloseDownstream)
return &retval.Writer
}
// this is the private Read implementation, to be used with io.Copy
// on the read thread
func (rself *privateReaderAdapter) Read(data []byte) (int, error) {
lenToCopy := len(rself.bufferToBeRead)
if lenToCopy == 0 {
rself.workReceipt = <-rself.Writer.workChan
if rself.workReceipt == nil {
// no more data to consume
rself.closeReceived = true
return 0, io.EOF
}
rself.bufferToBeRead = *rself.workReceipt
lenToCopy = len(rself.bufferToBeRead)
}
if lenToCopy > len(data) {
lenToCopy = len(data)
}
copy(data[:lenToCopy], rself.bufferToBeRead[:lenToCopy])
rself.bufferToBeRead = rself.bufferToBeRead[lenToCopy:]
if len(rself.bufferToBeRead) == 0 {
rself.Writer.doneWorkChan <- rself.workReceipt
rself.workReceipt = nil
}
return lenToCopy, nil
}
// this is the public Write interface that presents any data to the
// companion goroutine (copyDataToOutput)
// This function is unbuffered and blocks until the companion goroutine
// consumes the data and returns a receipt.
// This means that there are no extraneous allocations since the receipt is the data
// that was consumed (sent) and the Write can now return
func (wrself *WriterToReaderAdapter) Write(data []byte) (int, error) {
if len(data) == 0 {
return 0, nil
}
wrself.workChan <- &data
var err error
select {
case err = <-wrself.errChan:
default:
}
receipt := <-wrself.doneWorkChan
if receipt != &data {
panic("Only one thread allowed to use io.Writer")
}
return len(data), err
}
func (wrself *WriterToReaderAdapter) getErrors() (err error) {
for item := range wrself.errChan {
if err == nil {
err = item
}
}
return
}
// Close must be called, even if there's an error during Write,
// to clean up all goroutines and resources
// This function shuts down the Writer, which will deliver
// an io.EOF to the reader class. It then blocks until the
// downstream writer has been passed a close and returns any errors
// from the downstream Close (or any pending errors from final reads
// that were triggered by the io.Reader to be adapted)
func (wrself *WriterToReaderAdapter) Close() error {
if wrself.closed {
panic("Double close on WriterToReaderAdapter")
}
wrself.workChan <- nil
close(wrself.workChan)
wrself.closed = true
closeErr := wrself.getErrors()
close(wrself.doneWorkChan) // once they've sent closing err, they won't be touching this
return closeErr
}
// drain
//
// This is the final function called when the wrapped io.Reader shuts down and
// stops accepting more input.
//
// this is because readers like zlib don't validate the CRC32
// (the last 4 bytes) in the normal codepath and leave the final buffer unconsumed
func (rself *privateReaderAdapter) drain() {
if rself.closeReceived {
return // we have already drained
}
if len(rself.bufferToBeRead) != 0 {
if rself.workReceipt == nil {
panic("Logic error: if there's data to be read, we must still have the receipt")
}
rself.Writer.doneWorkChan <- rself.workReceipt
rself.workReceipt = nil
} else {
if rself.workReceipt != nil {
panic("Logic error: work receipt should be nil if there's no buffer to drain")
}
}
for toDrain := range rself.Writer.workChan {
if toDrain == nil {
break
} else {
rself.Writer.doneWorkChan <- toDrain
}
}
}
// This io.Copy's as much data as possible from the wrapped reader
// to the corresponding writer output.
// When finished it closes the downstream and drains the upstream
// writer. Finally it sends any remaining errors to the errChan and
// closes that channel
func copyDataToOutput(inputFactory func(io.Reader) (io.Reader, error),
adaptedInput *privateReaderAdapter,
output io.Writer,
shouldCloseDownstream bool) {
input, err := inputFactory(adaptedInput)
if err != nil {
adaptedInput.Writer.errChan <- err
} else {
_, err = io.Copy(output, input)
if err != nil {
adaptedInput.Writer.errChan <- err
}
}
writeCloser, ok := output.(io.WriteCloser)
if ok && shouldCloseDownstream {
closeErr := writeCloser.Close()
if closeErr != nil {
adaptedInput.Writer.errChan <- closeErr
}
}
// pulls all the data from the writer until EOF is reached
// this is because readers like zlib don't validate the CRC32
// (the last 4 bytes) in the normal codepath
adaptedInput.drain()
close(adaptedInput.Writer.errChan)
}