Skip to content

Commit

Permalink
Have concurrent rac reading require an io.ReaderAt
Browse files Browse the repository at this point in the history
  • Loading branch information
nigeltao committed Sep 28, 2019
1 parent b480859 commit a9b9a82
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
11 changes: 10 additions & 1 deletion lib/rac/reader.go
Expand Up @@ -95,7 +95,10 @@ type Reader struct {
// Bigger values often lead to faster throughput, up to a
// hardware-dependent point, but also larger memory requirements.
//
// Zero means a non-concurrent (single-goroutine) reader.
// If positive, then the ReadSeeker must also be an io.ReaderAt.
//
// Non-positive values (including zero) mean a non-concurrent
// (single-goroutine) reader.
Concurrency int

// err is the first error encountered. It is sticky: once a non-nil error
Expand Down Expand Up @@ -175,6 +178,12 @@ func (r *Reader) initialize() error {
}
r.chunkReader.ReadSeeker = r.ReadSeeker
r.chunkReader.CompressedSize = r.CompressedSize
if r.Concurrency > 0 {
if _, ok := r.ReadSeeker.(io.ReaderAt); !ok {
r.err = fmt.Errorf("rac: Concurrency > 0 requires the ReadSeeker to be an io.ReaderAt")
return r.err
}
}
if err := r.chunkReader.initialize(); err != nil {
r.err = err
return r.err
Expand Down
15 changes: 9 additions & 6 deletions lib/raczlib/raczlib_test.go
Expand Up @@ -75,12 +75,13 @@ func racCompress(original []byte, cChunkSize uint64, dChunkSize uint64, resource
return buf.Bytes(), nil
}

func racDecompress(compressed []byte) ([]byte, error) {
func racDecompress(compressed []byte, concurrency int) ([]byte, error) {
buf := &bytes.Buffer{}
r := &rac.Reader{
ReadSeeker: bytes.NewReader(compressed),
CompressedSize: int64(len(compressed)),
CodecReaders: []rac.CodecReader{&CodecReader{}},
Concurrency: concurrency,
}
defer r.Close()
if _, err := io.Copy(buf, r); err != nil {
Expand All @@ -89,8 +90,8 @@ func racDecompress(compressed []byte) ([]byte, error) {
return buf.Bytes(), nil
}

func testReader(tt *testing.T, decoded string, encoded string) {
g, err := racDecompress([]byte(encoded))
func testReader(tt *testing.T, decoded string, encoded string, concurrency int) {
g, err := racDecompress([]byte(encoded), concurrency)
if err != nil {
tt.Fatalf("racDecompress: %v", err)
}
Expand All @@ -99,8 +100,9 @@ func testReader(tt *testing.T, decoded string, encoded string) {
}
}

func TestReaderSansDictionary(tt *testing.T) { testReader(tt, decodedMore, encodedMore) }
func TestReaderWithDictionary(tt *testing.T) { testReader(tt, decodedSheep, encodedSheep) }
func TestReaderSansDictionary(tt *testing.T) { testReader(tt, decodedMore, encodedMore, 0) }
func TestReaderWithDictionary(tt *testing.T) { testReader(tt, decodedSheep, encodedSheep, 0) }
func TestConcurrentReader(tt *testing.T) { testReader(tt, decodedSheep, encodedSheep, 2) }

func TestReaderConcatenation(tt *testing.T) {
// Create a RAC file whose decoding is the concatenation of two other RAC
Expand Down Expand Up @@ -174,6 +176,7 @@ func TestReaderConcatenation(tt *testing.T) {
testReader(tt,
decodedSheep+decodedMore,
encodedSheep+encodedMore+string(buf[:]),
0,
)
}

Expand Down Expand Up @@ -262,7 +265,7 @@ func TestSharedDictionary(tt *testing.T) {
compressedLengths[i] = len(compressed)

// Decompress.
decompressed, err := racDecompress(compressed)
decompressed, err := racDecompress(compressed, 0)
if err != nil {
tt.Fatalf("i=%d: racDecompress: %v", i, err)
}
Expand Down

0 comments on commit a9b9a82

Please sign in to comment.