Skip to content

Commit fba9d91

Browse files
committed
block: remove NoopCompressor
Replace the NoopCompressor with new variants for `ChecksumAndCompress` and inform the `block.Compressor` of any uncompressed blocks.
1 parent 9f09afa commit fba9d91

File tree

5 files changed

+69
-28
lines changed

5 files changed

+69
-28
lines changed

sstable/blob/blob.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ func (w *FileWriter) Close() (FileWriterStats, error) {
293293
{
294294
indexBlock := w.indexEncoder.Finish()
295295
var compressedBuf []byte
296-
pb := block.CompressAndChecksum(&compressedBuf, indexBlock, blockkind.Metadata, block.NoopCompressor, &w.checksummer)
296+
pb := block.CopyAndChecksum(&compressedBuf, indexBlock, blockkind.Metadata, &w.compressor, &w.checksummer)
297297
if _, w.err = pb.WriteTo(w.w); w.err != nil {
298298
err = w.err
299299
if w.w != nil {

sstable/block/compression.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -340,10 +340,6 @@ func (b *PhysicalBlock) WriteTo(w objstorage.Writable) (n int, err error) {
340340
// CompressAndChecksum compresses and checksums the provided block, returning
341341
// the compressed block and its trailer. The result is appended to the dst
342342
// argument.
343-
//
344-
// If the compressed block is not sufficiently smaller than the original block,
345-
// the compressed payload is discarded and the original, uncompressed block data
346-
// is used to avoid unnecessary decompression overhead at read time.
347343
func CompressAndChecksum(
348344
dst *[]byte, blockData []byte, blockKind Kind, compressor *Compressor, checksummer *Checksummer,
349345
) PhysicalBlock {
@@ -358,6 +354,27 @@ func CompressAndChecksum(
358354
return pb
359355
}
360356

357+
// CopyAndChecksum copies the provided block (without compressing it) and
358+
// checksums it, returning the physical block. The result is appended to the dst
359+
// argument.
360+
//
361+
// Note that we still need to provide a Compressor so we can inform it of the
362+
// uncompressed block (for statistics).
363+
func CopyAndChecksum(
364+
dst *[]byte, blockData []byte, blockKind Kind, compressor *Compressor, checksummer *Checksummer,
365+
) PhysicalBlock {
366+
buf := *dst
367+
buf = append(buf[:0], blockData...)
368+
*dst = buf
369+
370+
// Calculate the checksum.
371+
pb := PhysicalBlock{data: buf}
372+
checksum := checksummer.Checksum(buf, byte(NoCompressionIndicator))
373+
pb.trailer = MakeTrailer(byte(NoCompressionIndicator), checksum)
374+
compressor.UncompressedBlock(len(blockData), blockKind)
375+
return pb
376+
}
377+
361378
// CompressAndChecksumToTempBuffer compresses and checksums the provided block
362379
// into a TempBuffer. The caller should Release() the TempBuffer once it is no
363380
// longer necessary.
@@ -370,6 +387,18 @@ func CompressAndChecksumToTempBuffer(
370387
return pb, compressedBuf
371388
}
372389

390+
// CopyAndChecksumToTempBuffer copies (without compressing) and checksums
391+
// the provided block into a TempBuffer. The caller should Release() the
392+
// TempBuffer once it is no longer necessary.
393+
func CopyAndChecksumToTempBuffer(
394+
blockData []byte, blockKind Kind, compressor *Compressor, checksummer *Checksummer,
395+
) (PhysicalBlock, *TempBuffer) {
396+
// Grab a buffer to use as the destination for compression.
397+
compressedBuf := NewTempBuffer()
398+
pb := CopyAndChecksum(&compressedBuf.b, blockData, blockKind, compressor, checksummer)
399+
return pb, compressedBuf
400+
}
401+
373402
// TempBuffer is a buffer that is used temporarily and is released back to a
374403
// pool for reuse.
375404
type TempBuffer struct {

sstable/block/compressor.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,11 @@ func (c *Compressor) Compress(dst, src []byte, kind Kind) (CompressionIndicator,
8888
return compressionIndicatorFromAlgorithm(setting.Algorithm), out
8989
}
9090

91-
// NoopCompressor is a Compressor that does not compress data. It does not have
92-
// any state and can be used in parallel.
93-
var NoopCompressor = &noopCompressor
94-
95-
var noopCompressor = MakeCompressor(NoCompression)
91+
// UncompressedBlock informs the compressor that a block of the given size and
92+
// kind was written uncompressed. This is used so that the final statistics are
93+
// complete.
94+
func (c *Compressor) UncompressedBlock(size int, kind Kind) {
95+
}
9696

9797
type Decompressor = compression.Decompressor
9898

sstable/layout.go

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -858,7 +858,7 @@ func (w *layoutWriter) Abort() {
858858
// WriteDataBlock constructs a trailer for the provided data block and writes
859859
// the block and trailer to the writer. It returns the block's handle.
860860
func (w *layoutWriter) WriteDataBlock(b []byte, buf *blockBuf) (block.Handle, error) {
861-
return w.writeBlock(b, blockkind.SSTableData, &w.compressor, buf)
861+
return w.writeBlock(b, blockkind.SSTableData, buf)
862862
}
863863

864864
// WritePrecompressedDataBlock writes a pre-compressed data block and its
@@ -873,7 +873,7 @@ func (w *layoutWriter) WritePrecompressedDataBlock(blk block.PhysicalBlock) (blo
873873
// the last-written index block's handle and adds it to the file's meta index
874874
// when the writer is finished.
875875
func (w *layoutWriter) WriteIndexBlock(b []byte) (block.Handle, error) {
876-
h, err := w.writeBlock(b, blockkind.SSTableIndex, &w.compressor, &w.buf)
876+
h, err := w.writeBlock(b, blockkind.SSTableIndex, &w.buf)
877877
if err == nil {
878878
w.lastIndexBlockHandle = h
879879
}
@@ -888,51 +888,56 @@ func (w *layoutWriter) WriteFilterBlock(f filterWriter) (bh block.Handle, err er
888888
if err != nil {
889889
return block.Handle{}, err
890890
}
891-
return w.writeNamedBlock(b, blockkind.Filter, block.NoopCompressor, f.metaName())
891+
return w.writeNamedBlockUncompressed(b, blockkind.Filter, f.metaName())
892892
}
893893

894894
// WritePropertiesBlock constructs a trailer for the provided properties block
895895
// and writes the block and trailer to the writer. It automatically adds the
896896
// properties block to the file's meta index when the writer is finished.
897-
func (w *layoutWriter) WritePropertiesBlock(b []byte) (block.Handle, error) {
898-
compressor := &w.compressor
897+
func (w *layoutWriter) WritePropertiesBlock(b []byte) (bh block.Handle, err error) {
899898
// In v6 and earlier, we use a row oriented block with an infinite restart
900899
// interval, which provides very good prefix compression. Since v7, we use the
901900
// columnar format without prefix compression for this block; we enable block
902901
// compression to compensate.
903902
if w.tableFormat < TableFormatPebblev7 {
904-
compressor = block.NoopCompressor
903+
bh, err = w.writeBlockUncompressed(b, blockkind.Metadata, &w.buf)
904+
} else {
905+
bh, err = w.writeBlock(b, blockkind.Metadata, &w.buf)
906+
}
907+
if err == nil {
908+
w.recordToMetaindex(metaPropertiesName, bh)
905909
}
906-
return w.writeNamedBlock(b, blockkind.Metadata, compressor, metaPropertiesName)
910+
return bh, err
907911
}
908912

909913
// WriteRangeKeyBlock constructs a trailer for the provided range key block and
910914
// writes the block and trailer to the writer. It automatically adds the range
911915
// key block to the file's meta index when the writer is finished.
912916
func (w *layoutWriter) WriteRangeKeyBlock(b []byte) (block.Handle, error) {
913-
return w.writeNamedBlock(b, blockkind.RangeKey, block.NoopCompressor, metaRangeKeyName)
917+
return w.writeNamedBlockUncompressed(b, blockkind.RangeKey, metaRangeKeyName)
914918
}
915919

916920
// WriteBlobRefIndexBlock constructs a trailer for the provided blob reference
917921
// index block and writes the block and trailer to the writer. It automatically
918922
// adds the blob reference index block to the file's meta index when the writer
919923
// is finished.
920924
func (w *layoutWriter) WriteBlobRefIndexBlock(b []byte) (block.Handle, error) {
921-
return w.writeNamedBlock(b, blockkind.BlobReferenceValueLivenessIndex, &w.compressor, metaBlobRefIndexName)
925+
return w.writeNamedBlockUncompressed(b, blockkind.BlobReferenceValueLivenessIndex, metaBlobRefIndexName)
922926
}
923927

924928
// WriteRangeDeletionBlock constructs a trailer for the provided range deletion
925929
// block and writes the block and trailer to the writer. It automatically adds
926930
// the range deletion block to the file's meta index when the writer is
927931
// finished.
928932
func (w *layoutWriter) WriteRangeDeletionBlock(b []byte) (block.Handle, error) {
929-
return w.writeNamedBlock(b, blockkind.RangeDel, block.NoopCompressor, metaRangeDelV2Name)
933+
return w.writeNamedBlockUncompressed(b, blockkind.RangeDel, metaRangeDelV2Name)
930934
}
931935

932-
func (w *layoutWriter) writeNamedBlock(
933-
b []byte, kind block.Kind, compressor *block.Compressor, name string,
936+
// writeNamedBlockUncompressed writes a block without compressing it and adds it to the metaindex.
937+
func (w *layoutWriter) writeNamedBlockUncompressed(
938+
b []byte, kind block.Kind, name string,
934939
) (bh block.Handle, err error) {
935-
bh, err = w.writeBlock(b, kind, compressor, &w.buf)
940+
bh, err = w.writeBlockUncompressed(b, kind, &w.buf)
936941
if err == nil {
937942
w.recordToMetaindex(name, bh)
938943
}
@@ -960,10 +965,17 @@ func (w *layoutWriter) WriteValueIndexBlock(
960965
}
961966

962967
// writeBlock checksums, compresses, and writes out a block.
963-
func (w *layoutWriter) writeBlock(
964-
b []byte, kind block.Kind, compressor *block.Compressor, buf *blockBuf,
968+
func (w *layoutWriter) writeBlock(b []byte, kind block.Kind, buf *blockBuf) (block.Handle, error) {
969+
pb := block.CompressAndChecksum(&buf.dataBuf, b, kind, &w.compressor, &buf.checksummer)
970+
h, err := w.writePrecompressedBlock(pb)
971+
return h, err
972+
}
973+
974+
// writeBlock checksums and writes out a block.
975+
func (w *layoutWriter) writeBlockUncompressed(
976+
b []byte, kind block.Kind, buf *blockBuf,
965977
) (block.Handle, error) {
966-
pb := block.CompressAndChecksum(&buf.dataBuf, b, kind, compressor, &buf.checksummer)
978+
pb := block.CopyAndChecksum(&buf.dataBuf, b, kind, &w.compressor, &buf.checksummer)
967979
h, err := w.writePrecompressedBlock(pb)
968980
return h, err
969981
}
@@ -1047,7 +1059,7 @@ func (w *layoutWriter) Finish() (size uint64, err error) {
10471059
}
10481060
b = bw.Finish()
10491061
}
1050-
metaIndexHandle, err := w.writeBlock(b, blockkind.Metadata, block.NoopCompressor, &w.buf)
1062+
metaIndexHandle, err := w.writeBlockUncompressed(b, blockkind.Metadata, &w.buf)
10511063
if err != nil {
10521064
return 0, err
10531065
}

sstable/valblk/writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func (w *Writer) writeValueBlocksIndex(layout LayoutWriter, h IndexHandle) (Inde
170170
if len(b) != 0 {
171171
panic("incorrect length calculation")
172172
}
173-
pb, bufHandle := block.CompressAndChecksumToTempBuffer(w.buf.Data(), blockkind.Metadata, block.NoopCompressor, &w.checksummer)
173+
pb, bufHandle := block.CopyAndChecksumToTempBuffer(w.buf.Data(), blockkind.Metadata, w.compressor, &w.checksummer)
174174
if _, err := layout.WriteValueIndexBlock(pb, h); err != nil {
175175
return IndexHandle{}, err
176176
}

0 commit comments

Comments
 (0)