/
inflator_reader.go
71 lines (57 loc) · 1.63 KB
/
inflator_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package shared
import (
"io"
"sync"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
)
// ReadSeekStarter implements io.Reader and allows the caller to seek to
// the start of the reader
type ReadSeekStarter interface {
io.Reader
SeekStart() error
}
// inflatorReader wraps the MultiReader returned by padreader so that we can
// add a SeekStart method. It's used for example when there is an error
// reading from the reader and we need to return to the start.
type inflatorReader struct {
readSeeker io.ReadSeeker
payloadSize uint64
targetSize abi.UnpaddedPieceSize
lk sync.RWMutex
paddedReader io.Reader
}
var _ ReadSeekStarter = (*inflatorReader)(nil)
func NewInflatorReader(readSeeker io.ReadSeeker, payloadSize uint64, targetSize abi.UnpaddedPieceSize) (*inflatorReader, error) {
paddedReader, err := padreader.NewInflator(readSeeker, payloadSize, targetSize)
if err != nil {
return nil, err
}
return &inflatorReader{
readSeeker: readSeeker,
paddedReader: paddedReader,
payloadSize: payloadSize,
targetSize: targetSize,
}, nil
}
func (r *inflatorReader) Read(p []byte) (n int, err error) {
r.lk.RLock()
defer r.lk.RUnlock()
return r.paddedReader.Read(p)
}
func (r *inflatorReader) SeekStart() error {
r.lk.Lock()
defer r.lk.Unlock()
// Seek to the start of the underlying reader
_, err := r.readSeeker.Seek(0, io.SeekStart)
if err != nil {
return err
}
// Recreate the padded reader
paddedReader, err := padreader.NewInflator(r.readSeeker, r.payloadSize, r.targetSize)
if err != nil {
return err
}
r.paddedReader = paddedReader
return nil
}