/
io.go
717 lines (630 loc) · 17.5 KB
/
io.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
// Package cos provides common low-level types and utilities for all aistore projects
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package cos
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"os"
"os/user"
"path/filepath"
"strconv"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
)
// POSIX permissions
const (
PermRWR os.FileMode = 0o640
PermRWRR os.FileMode = 0o644 // (archived)
PermRWXRX os.FileMode = 0o750
configDirMode = PermRWXRX | os.ModeDir
)
const ContentLengthUnknown = -1
// readers
type (
ReadOpenCloser interface {
io.ReadCloser
Open() (ReadOpenCloser, error)
}
// ReadSizer is the interface that adds Size method to io.Reader.
ReadSizer interface {
io.Reader
Size() int64
}
// ReadCloseSizer is the interface that adds Size method to io.ReadCloser.
ReadCloseSizer interface {
io.ReadCloser
Size() int64
}
// ReadOpenCloseSizer is the interface that adds Size method to ReadOpenCloser.
ReadOpenCloseSizer interface {
ReadOpenCloser
Size() int64
}
sizedReader struct {
io.Reader
size int64
}
// implementations
nopReader struct {
size int
offset int
}
ReadReaderAt interface {
io.Reader
io.ReaderAt
}
deferRCS struct {
ReadCloseSizer
cb func()
}
CallbackROC struct {
roc ReadOpenCloser
readCallback func(int, error)
// Number of bytes we've already read, counting from last `Open`.
readBytes int
// Since we could possibly reopen a reader we must keep track of the
// bytes we already reported to `readCallback` so there is no duplications.
// This value is preserved across all the `Open`'s.
reportedBytes int
}
ReaderArgs struct {
R io.Reader
ReadCb func(int, error)
DeferCb func()
Size int64
}
ReaderWithArgs struct {
args ReaderArgs
}
nopOpener struct{ io.ReadCloser }
)
// handles (and more readers)
type (
FileHandle struct {
*os.File
fqn string
}
// SectionHandle is a section of reader with optional padding that implements
// ReadOpenCloser interface.
SectionHandle struct {
r io.ReaderAt
s *io.SectionReader
offset int64 // slice start
size int64 // slice length
padding int64 // padding size
padOffset int64 // offset inside padding when reading a file
}
// FileSectionHandle opens a file and reads a section of it with optional
// padding. It implements the ReadOpenCloser interface.
FileSectionHandle struct {
fh *FileHandle
sec *SectionHandle
}
// ByteHandle is a byte buffer(made from []byte) that implements
// ReadOpenCloser interface
ByteHandle struct {
*bytes.Reader
b []byte
}
)
// writers
type (
WriterAt interface {
io.Writer
io.WriterAt
}
WriteSizer interface {
io.Writer
Size() int64
}
WriterMulti struct{ writers []io.Writer }
// WriterOnly is a helper struct to hide `io.ReaderFrom` interface implementation
// As far as http.ResponseWriter (and its underlying tcp conn.), the following are tradeoffs:
// [-] sendfile (when sending), or
// [-] copy_file_range (when writing local files)
// [+] use (reusable) buffer, reduce code path, reduce locking
WriterOnly struct{ io.Writer }
)
// interface guard
var (
_ io.Reader = (*nopReader)(nil)
_ ReadOpenCloser = (*FileHandle)(nil)
_ ReadOpenCloser = (*CallbackROC)(nil)
_ ReadSizer = (*sizedReader)(nil)
_ ReadOpenCloser = (*SectionHandle)(nil)
_ ReadOpenCloser = (*FileSectionHandle)(nil)
_ ReadOpenCloser = (*nopOpener)(nil)
_ ReadOpenCloser = (*ByteHandle)(nil)
)
// including "unexpecting EOF" to accommodate unsized streaming and
// early termination of the other side (prior to sending the first byte)
func IsEOF(err error) bool {
return err == io.EOF || err == io.ErrUnexpectedEOF ||
errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF)
}
///////////////
// nopReader //
///////////////
func NopReader(size int64) io.Reader {
return &nopReader{
size: int(size),
offset: 0,
}
}
func (r *nopReader) Read(b []byte) (int, error) {
left := r.size - r.offset
if left == 0 {
return 0, io.EOF
}
toRead := Min(len(b), left)
r.offset += toRead
return toRead, nil
}
////////////////
// ByteHandle //
////////////////
func NewByteHandle(bt []byte) *ByteHandle { return &ByteHandle{bytes.NewReader(bt), bt} }
func (*ByteHandle) Close() error { return nil }
func (b *ByteHandle) Open() (ReadOpenCloser, error) { return NewByteHandle(b.b), nil }
///////////////
// nopOpener //
///////////////
func NopOpener(r io.ReadCloser) ReadOpenCloser { return &nopOpener{r} }
func (n *nopOpener) Open() (ReadOpenCloser, error) { return n, nil }
////////////////
// FileHandle //
////////////////
func NewFileHandle(fqn string) (*FileHandle, error) {
file, err := os.Open(fqn)
if err != nil {
return nil, err
}
return &FileHandle{file, fqn}, nil
}
func (f *FileHandle) Open() (ReadOpenCloser, error) {
return NewFileHandle(f.fqn)
}
////////////
// Sized* //
////////////
func NewSizedReader(r io.Reader, size int64) ReadSizer { return &sizedReader{r, size} }
func (f *sizedReader) Size() int64 { return f.size }
//////////////
// deferRCS //
//////////////
func NewDeferRCS(r ReadCloseSizer, cb func()) ReadCloseSizer {
if cb == nil {
return r
}
return &deferRCS{r, cb}
}
func (r *deferRCS) Close() (err error) {
err = r.ReadCloseSizer.Close()
r.cb()
return
}
/////////////////
// CallbackROC //
/////////////////
func NewCallbackReadOpenCloser(r ReadOpenCloser, readCb func(int, error), reportedBytes ...int) *CallbackROC {
var rb int
if len(reportedBytes) > 0 {
rb = reportedBytes[0]
}
return &CallbackROC{
roc: r,
readCallback: readCb,
readBytes: 0,
reportedBytes: rb,
}
}
func (r *CallbackROC) Read(p []byte) (n int, err error) {
n, err = r.roc.Read(p)
r.readBytes += n
if r.readBytes > r.reportedBytes {
diff := r.readBytes - r.reportedBytes
r.readCallback(diff, err)
r.reportedBytes += diff
}
return n, err
}
func (r *CallbackROC) Open() (ReadOpenCloser, error) {
rc, err := r.roc.Open()
if err != nil {
return rc, err
}
return NewCallbackReadOpenCloser(rc, r.readCallback, r.reportedBytes), nil
}
func (r *CallbackROC) Close() error { return r.roc.Close() }
////////////////////
// ReaderWithArgs //
////////////////////
func NewReaderWithArgs(args ReaderArgs) *ReaderWithArgs {
return &ReaderWithArgs{args: args}
}
func (r *ReaderWithArgs) Size() int64 { return r.args.Size }
func (r *ReaderWithArgs) Read(p []byte) (n int, err error) {
n, err = r.args.R.Read(p)
if r.args.ReadCb != nil {
r.args.ReadCb(n, err)
}
return n, err
}
func (*ReaderWithArgs) Open() (ReadOpenCloser, error) { panic("not supported") }
func (r *ReaderWithArgs) Close() (err error) {
if rc, ok := r.args.R.(io.ReadCloser); ok {
err = rc.Close()
}
if r.args.DeferCb != nil {
r.args.DeferCb()
}
return err
}
///////////////////
// SectionHandle //
///////////////////
func NewSectionHandle(r io.ReaderAt, offset, size, padding int64) *SectionHandle {
sec := io.NewSectionReader(r, offset, size)
return &SectionHandle{r, sec, offset, size, padding, 0}
}
func (f *SectionHandle) Open() (ReadOpenCloser, error) {
return NewSectionHandle(f.r, f.offset, f.size, f.padding), nil
}
// Reads a reader section. When the slice finishes but the buffer is not filled
// yet, act as if it reads a few more bytes from somewhere.
func (f *SectionHandle) Read(buf []byte) (n int, err error) {
var fromPad int64
// if it is still reading a file from disk - just continue reading
if f.padOffset == 0 {
n, err = f.s.Read(buf)
// if it reads fewer bytes than expected and it does not fail,
// try to "read" from padding
if f.padding == 0 || n == len(buf) || (err != nil && err != io.EOF) {
return n, err
}
fromPad = min(int64(len(buf)-n), f.padding)
} else {
// slice is already read, keep reading padding bytes
fromPad = min(int64(len(buf)), f.padding-f.padOffset)
}
// either buffer is full or end of padding is reached. Nothing to read
if fromPad == 0 {
return n, io.EOF
}
// the number of remained bytes in padding is enough to complete read request
for idx := n; idx < n+int(fromPad); idx++ {
buf[idx] = 0
}
n += int(fromPad)
f.padOffset += fromPad
if f.padOffset < f.padding {
return n, nil
}
return n, io.EOF
}
func (*SectionHandle) Close() error { return nil }
///////////////////////
// FileSectionHandle //
///////////////////////
// NewFileSectionHandle opens file which is expected at `fqn` and defines
// a SectionHandle on it to only read a specified section.
func NewFileSectionHandle(fqn string, offset, size int64) (*FileSectionHandle, error) {
fh, err := NewFileHandle(fqn)
if err != nil {
return nil, err
}
sec := NewSectionHandle(fh, offset, size, 0)
return &FileSectionHandle{fh: fh, sec: sec}, nil
}
func (f *FileSectionHandle) Open() (ReadOpenCloser, error) {
return NewFileSectionHandle(f.fh.fqn, f.sec.offset, f.sec.size)
}
func (f *FileSectionHandle) Read(buf []byte) (int, error) { return f.sec.Read(buf) }
func (f *FileSectionHandle) Close() error { return f.fh.Close() }
/////////////////
// WriterMulti //
/////////////////
func NewWriterMulti(w ...io.Writer) *WriterMulti { return &WriterMulti{w} }
func (mw *WriterMulti) Write(b []byte) (n int, err error) {
l := len(b)
for _, w := range mw.writers {
n, err = w.Write(b)
if err == nil && n == l {
continue
}
if err == nil {
err = io.ErrShortWrite
}
return
}
n = l
return
}
///////////////////////
// misc file and dir //
///////////////////////
// ExpandPath replaces common abbreviations in file path (eg. `~` with absolute
// path to the current user home directory) and cleans the path.
func ExpandPath(path string) string {
if path == "" || path[0] != '~' {
return filepath.Clean(path)
}
if len(path) > 1 && path[1] != '/' {
return filepath.Clean(path)
}
currentUser, err := user.Current()
if err != nil {
return filepath.Clean(path)
}
return filepath.Clean(filepath.Join(currentUser.HomeDir, path[1:]))
}
// CreateDir creates directory if does not exist.
// If the directory already exists returns nil.
func CreateDir(dir string) error {
return os.MkdirAll(dir, configDirMode)
}
// CreateFile creates a new write-only (O_WRONLY) file with default cos.PermRWR permissions.
// NOTE: if the file pathname doesn't exist it'll be created.
// NOTE: if the file already exists it'll be also silently truncated.
func CreateFile(fqn string) (*os.File, error) {
if err := CreateDir(filepath.Dir(fqn)); err != nil {
return nil, err
}
return os.OpenFile(fqn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, PermRWR)
}
// (creates destination directory if doesn't exist)
func Rename(src, dst string) (err error) {
err = os.Rename(src, dst)
if err == nil || !os.IsNotExist(err) {
return
}
// create and retry (slow path)
err = CreateDir(filepath.Dir(dst))
if err == nil {
err = os.Rename(src, dst)
}
return
}
// RemoveFile removes path; returns nil upon success or if the path does not exist.
func RemoveFile(path string) (err error) {
err = os.Remove(path)
if os.IsNotExist(err) {
err = nil
}
return
}
// and computes checksum if requested
func CopyFile(src, dst string, buf []byte, cksumType string) (written int64, cksum *CksumHash, err error) {
var srcFile, dstFile *os.File
if srcFile, err = os.Open(src); err != nil {
return
}
if dstFile, err = CreateFile(dst); err != nil {
nlog.Errorln("Failed to create", dst+":", err)
Close(srcFile)
return
}
written, cksum, err = CopyAndChecksum(dstFile, srcFile, buf, cksumType)
Close(srcFile)
defer func() {
if err == nil {
return
}
if nestedErr := RemoveFile(dst); nestedErr != nil {
nlog.Errorf("Nested (%v): failed to remove %s, err: %v", err, dst, nestedErr)
}
}()
if err != nil {
nlog.Errorln("Failed to copy", src, "=>", dst+":", err)
Close(dstFile)
return
}
if err = FlushClose(dstFile); err != nil {
nlog.Errorln("Failed to flush and close", dst+":", err)
}
return
}
func SaveReaderSafe(tmpfqn, fqn string, reader io.Reader, buf []byte, cksumType string, size int64) (cksum *CksumHash,
err error) {
if cksum, err = SaveReader(tmpfqn, reader, buf, cksumType, size); err != nil {
return
}
if err = Rename(tmpfqn, fqn); err != nil {
os.Remove(tmpfqn)
}
return
}
// Saves the reader directly to `fqn`, checksums if requested
func SaveReader(fqn string, reader io.Reader, buf []byte, cksumType string, size int64) (cksum *CksumHash, err error) {
var (
written int64
file, erc = CreateFile(fqn)
writer = WriterOnly{file} // Hiding `ReadFrom` for `*os.File` introduced in Go1.15.
)
if erc != nil {
return nil, erc
}
defer func() {
if err != nil {
os.Remove(fqn)
}
}()
if size >= 0 {
reader = io.LimitReader(reader, size)
}
written, cksum, err = CopyAndChecksum(writer, reader, buf, cksumType)
erc = file.Close()
if err != nil {
err = fmt.Errorf("failed to save to %q: %w", fqn, err)
return
}
if size >= 0 && written != size {
err = fmt.Errorf("wrong size when saving to %q: expected %d, got %d", fqn, size, written)
return
}
if erc != nil {
err = fmt.Errorf("failed to close %q: %w", fqn, erc)
return
}
return
}
// a slightly modified excerpt from https://github.com/golang/go/blob/master/src/io/io.go#L407
// - regular streaming copy with `io.WriteTo` and `io.ReaderFrom` not checked and not used
// - buffer _must_ be provided
// - see also: WriterOnly comment (above)
func CopyBuffer(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
for {
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if ew != nil {
if nw > 0 && nw <= nr {
written += int64(nw)
}
err = ew
break
}
if nw < 0 || nw > nr {
err = errors.New("cos.CopyBuffer: invalid write")
break
}
written += int64(nw)
if nr != nw {
err = io.ErrShortWrite
break
}
}
if er != nil {
if er != io.EOF {
err = er
}
break
}
}
return written, err
}
// Read only the first line of a file.
// Do not use for big files: it reads all the content and then extracts the first
// line. Use for files that may contains a few lines with trailing EOL
func ReadOneLine(filename string) (string, error) {
var line string
err := ReadLines(filename, func(l string) error {
line = l
return io.EOF
})
return line, err
}
// Read only the first line of a file and return it as uint64
// Do not use for big files: it reads all the content and then extracts the first
// line. Use for files that may contains a few lines with trailing EOL
func ReadOneUint64(filename string) (uint64, error) {
line, err := ReadOneLine(filename)
if err != nil {
return 0, err
}
val, err := strconv.ParseUint(line, 10, 64)
return val, err
}
// Read only the first line of a file and return it as int64
// Do not use for big files: it reads all the content and then extracts the first
// line. Use for files that may contains a few lines with trailing EOL
func ReadOneInt64(filename string) (int64, error) {
line, err := ReadOneLine(filename)
if err != nil {
return 0, err
}
val, err := strconv.ParseInt(line, 10, 64)
return val, err
}
// Read a file line by line and call a callback for each line until the file
// ends or a callback returns io.EOF
func ReadLines(filename string, cb func(string) error) error {
b, err := os.ReadFile(filename)
if err != nil {
return err
}
lineReader := bufio.NewReader(bytes.NewBuffer(b))
for {
line, _, err := lineReader.ReadLine()
if err != nil {
if err == io.EOF {
err = nil
}
return err
}
if err := cb(string(line)); err != nil {
if err != io.EOF {
return err
}
break
}
}
return nil
}
// CopyAndChecksum reads from `r` and writes to `w`; returns num bytes copied and checksum, or error
func CopyAndChecksum(w io.Writer, r io.Reader, buf []byte, cksumType string) (n int64, cksum *CksumHash, err error) {
debug.Assert(w != io.Discard || buf == nil) // io.Discard is io.ReaderFrom
if cksumType == ChecksumNone || cksumType == "" {
n, err = io.CopyBuffer(w, r, buf)
return
}
cksum = NewCksumHash(cksumType)
var mw io.Writer = cksum.H
if w != io.Discard {
mw = NewWriterMulti(cksum.H, w)
}
n, err = io.CopyBuffer(mw, r, buf)
cksum.Finalize()
return
}
// ChecksumBytes computes checksum of given bytes using additional buffer.
func ChecksumBytes(b []byte, cksumType string) (cksum *Cksum, err error) {
_, hash, err := CopyAndChecksum(io.Discard, bytes.NewReader(b), nil, cksumType)
if err != nil {
return nil, err
}
return &hash.Cksum, nil
}
// DrainReader reads and discards all the data from a reader.
// No need for `io.CopyBuffer` as `io.Discard` has efficient `io.ReaderFrom` implementation.
func DrainReader(r io.Reader) {
_, err := io.Copy(io.Discard, r)
if err == nil || IsEOF(err) {
return
}
debug.AssertNoErr(err)
}
// FloodWriter writes `n` random bytes to provided writer.
func FloodWriter(w io.Writer, n int64) error {
_, err := io.CopyN(w, NowRand(), n)
return err
}
func Close(closer io.Closer) {
err := closer.Close()
debug.AssertNoErr(err)
}
func FlushClose(file *os.File) (err error) {
err = fflush(file)
debug.AssertNoErr(err)
err = file.Close()
debug.AssertNoErr(err)
return
}
// NOTE:
// - file.Close() is implementation dependent as far as flushing dirty buffers;
// - journaling filesystems, such as xfs, generally provide better guarantees but, again, not 100%
// - see discussion at https://lwn.net/Articles/788938;
// - going forward, some sort of `rename_barrier()` would be a much better alternative
// - doesn't work in testing environment - currently disabled, see #1141 and comments
const fsyncDisabled = true
func fflush(file *os.File) (err error) {
if fsyncDisabled {
return
}
return file.Sync()
}