Skip to content

Commit bef9490

Browse files
committed
block: remove CompressAndChecksum variant without compressor
We now require a compressor for `CompressAndChecksum`.
1 parent 207e6cd commit bef9490

File tree

6 files changed

+19
-16
lines changed

6 files changed

+19
-16
lines changed

sstable/blob/blob.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ func (w *FileWriter) Close() (FileWriterStats, error) {
282282
{
283283
indexBlock := w.indexEncoder.Finish()
284284
var compressedBuf []byte
285-
pb := block.CompressAndChecksum(&compressedBuf, indexBlock, block.NoCompression, &w.checksummer)
285+
pb := block.CompressAndChecksum(&compressedBuf, indexBlock, block.NoopCompressor, &w.checksummer)
286286
if _, w.err = pb.WriteTo(w.w); w.err != nil {
287287
err = w.err
288288
if w.w != nil {

sstable/block/compression.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -260,14 +260,6 @@ func (b *PhysicalBlock) WriteTo(w objstorage.Writable) (n int, err error) {
260260
// the compressed payload is discarded and the original, uncompressed block data
261261
// is used to avoid unnecessary decompression overhead at read time.
262262
func CompressAndChecksum(
263-
dst *[]byte, blockData []byte, c Compression, checksummer *Checksummer,
264-
) PhysicalBlock {
265-
compressor := GetCompressor(c)
266-
defer compressor.Close()
267-
return CompressAndChecksumWithCompressor(dst, blockData, compressor, checksummer)
268-
}
269-
270-
func CompressAndChecksumWithCompressor(
271263
dst *[]byte, blockData []byte, compressor Compressor, checksummer *Checksummer,
272264
) PhysicalBlock {
273265
buf := (*dst)[:0]
@@ -296,7 +288,7 @@ func CompressAndChecksumToTempBuffer(
296288
) (PhysicalBlock, *TempBuffer) {
297289
// Grab a buffer to use as the destination for compression.
298290
compressedBuf := NewTempBuffer()
299-
pb := CompressAndChecksumWithCompressor(&compressedBuf.b, blockData, compressor, checksummer)
291+
pb := CompressAndChecksum(&compressedBuf.b, blockData, compressor, checksummer)
300292
return pb, compressedBuf
301293
}
302294

sstable/colblk_writer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,7 @@ func (w *RawColumnWriter) enqueueDataBlock(
720720
// Serialize the data block, compress it and send it to the write queue.
721721
cb := compressedBlockPool.Get().(*compressedBlock)
722722
cb.blockBuf.checksummer.Type = w.opts.Checksum
723-
cb.physical = block.CompressAndChecksumWithCompressor(
723+
cb.physical = block.CompressAndChecksum(
724724
&cb.blockBuf.dataBuf,
725725
serializedBlock,
726726
w.compressor,
@@ -1247,7 +1247,7 @@ func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProper
12471247
// Serialize the data block, compress it and send it to the write queue.
12481248
cb := compressedBlockPool.Get().(*compressedBlock)
12491249
cb.blockBuf.checksummer.Type = w.opts.Checksum
1250-
cb.physical = block.CompressAndChecksumWithCompressor(
1250+
cb.physical = block.CompressAndChecksum(
12511251
&cb.blockBuf.dataBuf,
12521252
b,
12531253
w.compressor,

sstable/layout.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,10 @@ func (w *layoutWriter) WriteValueIndexBlock(
916916
func (w *layoutWriter) writeBlock(
917917
b []byte, compression block.Compression, buf *blockBuf,
918918
) (block.Handle, error) {
919-
pb := block.CompressAndChecksum(&buf.dataBuf, b, compression, &buf.checksummer)
919+
// TODO(radu): store a compressor in the layoutWriter.
920+
compressor := block.GetCompressor(compression)
921+
defer compressor.Close()
922+
pb := block.CompressAndChecksum(&buf.dataBuf, b, compressor, &buf.checksummer)
920923
h, err := w.writePrecompressedBlock(pb)
921924
return h, err
922925
}

sstable/rowblk_writer.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,10 @@ func (d *dataBlockBuf) finish() {
489489
}
490490

491491
func (d *dataBlockBuf) compressAndChecksum(c block.Compression) {
492-
d.physical = block.CompressAndChecksum(&d.dataBuf, d.uncompressed, c, &d.checksummer)
492+
// TODO(radu): pass a Compressor here.
493+
compressor := block.GetCompressor(c)
494+
defer compressor.Close()
495+
d.physical = block.CompressAndChecksum(&d.dataBuf, d.uncompressed, compressor, &d.checksummer)
493496
}
494497

495498
func (d *dataBlockBuf) shouldFlush(
@@ -1929,10 +1932,13 @@ func (w *RawRowWriter) copyDataBlocks(
19291932
// addDataBlock implements RawWriter.
19301933
func (w *RawRowWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error {
19311934
blockBuf := &w.dataBlockBuf.blockBuf
1935+
// TODO(radu): store a compressor in w.layout.
1936+
compressor := block.GetCompressor(w.layout.compression)
1937+
defer compressor.Close()
19321938
pb := block.CompressAndChecksum(
19331939
&blockBuf.dataBuf,
19341940
b,
1935-
w.layout.compression,
1941+
compressor,
19361942
&blockBuf.checksummer,
19371943
)
19381944

sstable/suffix_rewriter.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ func rewriteDataBlocksInParallel(
170170
// We'll assume all blocks are _roughly_ equal so round-robin static partition
171171
// of each worker doing every ith block is probably enough.
172172
err := func() error {
173+
compressor := block.GetCompressor(opts.Compression)
174+
defer compressor.Close()
173175
for i := worker; i < len(input); i += concurrency {
174176
bh := input[i]
175177
var err error
@@ -190,7 +192,7 @@ func rewriteDataBlocksInParallel(
190192
return err
191193
}
192194
compressedBuf = compressedBuf[:cap(compressedBuf)]
193-
finished := block.CompressAndChecksum(&compressedBuf, outputBlock, opts.Compression, &checksummer)
195+
finished := block.CompressAndChecksum(&compressedBuf, outputBlock, compressor, &checksummer)
194196
output[i].physical = finished.CloneWithByteAlloc(&blockAlloc)
195197
}
196198
return nil

0 commit comments

Comments
 (0)