Skip to content

Commit ae4d7e2

Browse files
committed
sstable: add Compressor to column writer
For future adaptive compressor changes, the columnar writer should have its own compressor so that adaptive compressor state can be persisted 1:1 with a columnar writer.
1 parent 24dc135 commit ae4d7e2

File tree

3 files changed

+23
-19
lines changed

3 files changed

+23
-19
lines changed

sstable/block/compression.go

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -217,24 +217,21 @@ func (b *PhysicalBlock) WriteTo(w objstorage.Writable) (n int, err error) {
217217
// is used to avoid unnecessary decompression overhead at read time.
218218
func CompressAndChecksum(
219219
dst *[]byte, blockData []byte, compression Compression, checksummer *Checksummer,
220+
) PhysicalBlock {
221+
compressor := GetCompressor(compression)
222+
defer compressor.Close()
223+
return CompressAndChecksumWithCompressor(dst, blockData, compressor, checksummer)
224+
}
225+
226+
func CompressAndChecksumWithCompressor(
227+
dst *[]byte, blockData []byte, compressor Compressor, checksummer *Checksummer,
220228
) PhysicalBlock {
221229
buf := (*dst)[:0]
222230
// Compress the buffer, discarding the result if the improvement isn't at
223231
// least 12.5%.
224-
algo := NoCompressionIndicator
225-
if compression != NoCompression {
226-
compressor := GetCompressor(compression)
227-
defer compressor.Close()
228-
algo, buf = compressor.Compress(buf, blockData)
229-
if len(buf) >= len(blockData)-len(blockData)/8 {
230-
algo = NoCompressionIndicator
231-
}
232-
}
233-
if algo == NoCompressionIndicator {
234-
// We don't want to use the given blockData buffer directly: typically the
235-
// result will be written to disk and that can mangle the buffer, leading to
236-
// fragile code.
237-
buf = append(buf[:0], blockData...)
232+
algo, buf := compressor.Compress(buf, blockData)
233+
if len(buf) >= len(blockData)-len(blockData)/8 {
234+
algo, buf = (noopCompressor{}).Compress(buf, blockData)
238235
}
239236

240237
*dst = buf

sstable/block/compressor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ var _ Compressor = snappyCompressor{}
2626
var _ Compressor = minlzCompressor{}
2727

2828
func (noopCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
29-
panic("NoCompressionCompressor.Compress() should not be called.")
29+
dst = append(dst[:0], src...)
30+
return NoCompressionIndicator, dst
3031
}
3132
func (noopCompressor) Close() {}
3233

sstable/colblk_writer.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ type RawColumnWriter struct {
103103
validator invariants.Value[*colblk.DataBlockValidator]
104104
disableKeyOrderChecks bool
105105
cpuMeasurer base.CPUMeasurer
106+
107+
// RawColumnWriter writes data sequentially so each writer can have a compressor
108+
compressor block.Compressor
106109
}
107110

108111
// Assert that *RawColumnWriter implements RawWriter.
@@ -184,6 +187,8 @@ func newColumnarWriter(
184187
w.writeQueue.wg.Add(1)
185188
w.cpuMeasurer = cpuMeasurer
186189
go w.drainWriteQueue()
190+
191+
w.compressor = block.GetCompressor(w.opts.Compression)
187192
return w
188193
}
189194

@@ -711,10 +716,10 @@ func (w *RawColumnWriter) enqueueDataBlock(
711716
// Serialize the data block, compress it and send it to the write queue.
712717
cb := compressedBlockPool.Get().(*compressedBlock)
713718
cb.blockBuf.checksummer.Type = w.opts.Checksum
714-
cb.physical = block.CompressAndChecksum(
719+
cb.physical = block.CompressAndChecksumWithCompressor(
715720
&cb.blockBuf.dataBuf,
716721
serializedBlock,
717-
w.opts.Compression,
722+
w.compressor,
718723
&cb.blockBuf.checksummer,
719724
)
720725
return w.enqueuePhysicalBlock(cb, separator)
@@ -1040,6 +1045,7 @@ func (w *RawColumnWriter) Close() (err error) {
10401045
return err
10411046
}
10421047
w.meta.Properties = w.props
1048+
w.compressor.Close()
10431049
// Release any held memory and make any future calls error.
10441050
*w = RawColumnWriter{meta: w.meta, err: errWriterClosed}
10451051
return nil
@@ -1220,10 +1226,10 @@ func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProper
12201226
// Serialize the data block, compress it and send it to the write queue.
12211227
cb := compressedBlockPool.Get().(*compressedBlock)
12221228
cb.blockBuf.checksummer.Type = w.opts.Checksum
1223-
cb.physical = block.CompressAndChecksum(
1229+
cb.physical = block.CompressAndChecksumWithCompressor(
12241230
&cb.blockBuf.dataBuf,
12251231
b,
1226-
w.opts.Compression,
1232+
w.compressor,
12271233
&cb.blockBuf.checksummer,
12281234
)
12291235
if err := w.enqueuePhysicalBlock(cb, sep); err != nil {

0 commit comments

Comments
 (0)