This repository has been archived by the owner on Jan 12, 2023. It is now read-only.
/
writer.go
307 lines (261 loc) · 8.62 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
package io
import (
"io"
"os"
"path/filepath"
"sync"
"sync/atomic"
)
// Similar to MultiWriter, but assumes writes never fail, and provides the same buffer
// to all writers in parallel. However, it will return only when all writes are finished
type uncheckedParallelMultiWriter struct {
writers []io.Writer
wg sync.WaitGroup
}
// A writer which dispatches to multiple destinations, collecting errors on the way
// and returning the first one it encounteres.
// If a writer fails, it will not be written anymore until it is closed or reset using SetWriter
type ParallelMultiWriter struct {
writers []io.Writer
wg sync.WaitGroup
// Pre-allocated array, one slot per writer
results []error
}
func (t *uncheckedParallelMultiWriter) Write(b []byte) (n int, err error) {
t.wg.Add(len(t.writers))
for _, w := range t.writers {
go func(w io.Writer) {
w.Write(b)
t.wg.Done()
}(w)
}
t.wg.Wait()
return len(b), nil
}
func NewUncheckedParallelMultiWriter(writers ...io.Writer) io.Writer {
w := make([]io.Writer, len(writers))
copy(w, writers)
return &uncheckedParallelMultiWriter{writers: w}
}
func NewParallelMultiWriter(writers []io.Writer) *ParallelMultiWriter {
w := ParallelMultiWriter{}
w.writers = writers
w.results = make([]error, len(writers))
return &w
}
// Set the given writer to be located at the given index. We don't do bounds checking
func (p *ParallelMultiWriter) SetWriterAtIndex(i int, w io.Writer) {
p.writers[i] = w
p.results[i] = nil
}
// Append the given writer, expanding our internal structures as needed
// As this is not thread-safe, you must assure we are not writing while locking
// return the index at which the new writer was appended
func (p *ParallelMultiWriter) AppendWriter(w io.Writer) int {
p.writers = append(p.writers, w)
p.results = append(p.results, nil)
return len(p.writers) - 1
}
// Insert the given writer at the first free slot and return it's index.
// Will append if necesary
func (p *ParallelMultiWriter) AutoInsert(w io.Writer) int {
for wid, ew := range p.writers {
if ew == nil {
p.writers[wid] = w
// result can be assumed to be nil too
return wid
}
}
return p.AppendWriter(w)
}
// Return amount of writers we can store at max, useful for iterating with WriterAtIndex()
func (p *ParallelMultiWriter) Capacity() int {
return len(p.writers)
}
// Return the writer at the given index, and the first error it might have caused when writing
// to it. We perform no bounds checking
func (p *ParallelMultiWriter) WriterAtIndex(i int) (io.Writer, error) {
return p.writers[i], p.results[i]
}
// Writes will always succeed, even if individual writers may have failed.
// It's up to our user to check for errors when the write is finished
func (p *ParallelMultiWriter) Write(b []byte) (n int, err error) {
for i, w := range p.writers {
// continue on writers with errors
if p.results[i] != nil || p.writers[i] == nil {
continue
}
p.wg.Add(1)
go func(i int, w io.Writer) {
_, p.results[i] = w.Write(b)
p.wg.Done()
}(i, w)
}
p.wg.Wait()
return len(b), nil
}
// Used in conjunction with a WriteChannelController, serving as front-end communicating with
// the actual writer that resides in a separate go-routine
type ChannelWriter struct {
ctrl *WriteChannelController
// A writer to write to. Must be set if path is nil
writer io.Writer
// bytes to write - it's just a temporary
b []byte
// amount of bytes written
n int
// error of previous write operation
e error
// will let us know when reomte is done
// NOTE: Would a channel be faster ?
wg sync.WaitGroup
}
// Like WriteCloser interface, but allows to retrieve more information specific to our usage
type WriteCloser interface {
io.WriteCloser
// Writer returns the writer this interface instance contains
Writer() io.Writer
}
func (c *ChannelWriter) Writer() io.Writer {
return c.writer
}
// Set our writer to be the given one. Allows to reuse ChannelWriters
func (c *ChannelWriter) SetWriter(w io.Writer) {
c.writer = w
}
// Send bytes down our channel and wait for the writer on the end to be done, retrieving the result.
func (c *ChannelWriter) Write(b []byte) (n int, err error) {
c.b = b
c.wg.Add(1)
c.ctrl.c <- c
c.wg.Wait()
// ... allowing us to return the actual result safely now
return c.n, c.e
}
func (c *ChannelWriter) Close() error {
atomic.AddUint32(&c.ctrl.stats.TotalFilesWritten, 1)
if w, ok := c.writer.(io.Closer); ok {
return w.Close()
}
return nil
}
// A writer that will create a new file and intermediate directories on first write.
// You must call the close method to finish the writes and release system resources
type LazyFileWriteCloser struct {
// The path we should open a writer to on first write. This will fail if the fail already exists.
path string
// The mode the destination file should have when done writing
mode os.FileMode
// A writer we are using to perform the write
writer *os.File
}
// Path returns the currently set path
func (l *LazyFileWriteCloser) Path() string {
return l.path
}
// SetPath changes the path to the given one.
// It's an error to set a new path while the previous writer wasn't closed yet
func (l *LazyFileWriteCloser) SetPath(p string, mode os.FileMode) {
if l.writer != nil {
panic("Previous writer wasn't close - can't set new path")
}
l.path = p
l.mode = mode
}
func (l *LazyFileWriteCloser) Write(b []byte) (n int, err error) {
if l.writer == nil {
// assure directory exists
err = os.MkdirAll(filepath.Dir(l.path), 0777)
if err != nil {
return 0, err
}
// NOTE: We may rightfully assume we see only one write in case of symlinks !
// This is because the read-buffer is large enough to hold any symlink.
// If not, the reader will panic
// Symlinks are created right away
if l.mode&os.ModeSymlink == os.ModeSymlink {
err = os.Symlink(string(b), l.path)
return len(b), err
} else {
l.writer, err = os.OpenFile(l.path, os.O_EXCL|os.O_WRONLY|os.O_CREATE, l.mode)
if err != nil {
return 0, err
}
}
}
return l.writer.Write(b)
}
// Close our writer if it was initialized already. Therefore it's safe to call this even if Write wasn't called
// beforehand
func (l *LazyFileWriteCloser) Close() error {
if l.writer != nil {
err := l.writer.Close()
l.writer = nil
return err
}
return nil
}
// A utility to help control how parallel we try to write
type WriteChannelController struct {
// Keeps all write requests, which contain all information we could possibly want to write something.
// As the ChannelWriter is keeping all information, we serves as request right away
c chan *ChannelWriter
// Allows to track amount of written files
stats *Stats
}
// A utility structure to associate a tree with a writer.
// That way, writers can be more easily associated with a device which hosts a particular Tree
type RootedWriteController struct {
// The trees the controller should write to
Trees []string
// A possibly shared controller which may write to the given tree
Ctrl WriteChannelController
}
// Create a new controller which deals with writing all incoming requests with nprocs go-routines.
// Use the channel capacity to assure less blocking will occur. A good value is depending heavily on your
// algorithm's patterns. Should at least be nprocs, or larger.
func NewWriteChannelController(nprocs, channelCap int, stats *Stats) WriteChannelController {
ctrl := WriteChannelController{
make(chan *ChannelWriter, channelCap),
stats,
}
if nprocs < 1 {
panic("Need at least one go routine to process work")
}
for i := 0; i < nprocs; i++ {
go func() {
for cw := range ctrl.c {
atomic.AddUint32(&stats.FilesBeingWritten, 1)
cw.n, cw.e = cw.writer.Write(cw.b)
atomic.AddUint64(&stats.BytesWritten, uint64(cw.n))
atomic.AddUint32(&stats.FilesBeingWritten, ^uint32(0))
// protocol mandates the sender has to listen for our reply, channel must not be closed here ... .
cw.wg.Done()
} // for each channel writer
}()
} // for each routine to create
// We will only really need this when we are copying data anyway ... .
return ctrl
}
// Initialize as many new ChannelWriters as fit into the given slice of writers
// You will have to set it's writer before using it
func (w *WriteChannelController) InitChannelWriters(out []ChannelWriter) {
// create one writer per
for i := range out {
out[i] = ChannelWriter{
ctrl: w,
}
}
}
// Return amount of streams we handle in parallel
func (w *WriteChannelController) Streams() int {
return cap(w.c)
}
type RootedWriteControllers []RootedWriteController
// Returns the amount of Trees/Destinations we can write to in total
func (wm RootedWriteControllers) Trees() (n int) {
for _, rctrl := range wm {
n += len(rctrl.Trees)
}
return
}