Skip to content

Commit 12c215c

Browse files
committed
sstable: store compressor in layoutWriter
1 parent 1985e8d commit 12c215c

File tree

2 files changed

+19
-22
lines changed

2 files changed

+19
-22
lines changed

sstable/layout.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,7 @@ type layoutWriter struct {
770770

771771
// options copied from WriterOptions
772772
tableFormat TableFormat
773-
compression block.Compression
773+
compressor block.Compressor
774774
checksumType block.ChecksumType
775775

776776
// Attribute bitset of the sstable, derived from sstable Properties at the time
@@ -796,7 +796,7 @@ func makeLayoutWriter(w objstorage.Writable, opts WriterOptions) layoutWriter {
796796
writable: w,
797797
cacheOpts: opts.internal.CacheOpts,
798798
tableFormat: opts.TableFormat,
799-
compression: opts.Compression,
799+
compressor: block.MakeCompressor(opts.Compression),
800800
checksumType: opts.Checksum,
801801
buf: blockBuf{
802802
checksummer: block.Checksummer{Type: opts.Checksum},
@@ -815,13 +815,14 @@ func (w *layoutWriter) Abort() {
815815
if w.writable != nil {
816816
w.writable.Abort()
817817
w.writable = nil
818+
w.compressor.Close()
818819
}
819820
}
820821

821822
// WriteDataBlock constructs a trailer for the provided data block and writes
822823
// the block and trailer to the writer. It returns the block's handle.
823824
func (w *layoutWriter) WriteDataBlock(b []byte, buf *blockBuf) (block.Handle, error) {
824-
return w.writeBlock(b, w.compression, buf)
825+
return w.writeBlock(b, &w.compressor, buf)
825826
}
826827

827828
// WritePrecompressedDataBlock writes a pre-compressed data block and its
@@ -836,7 +837,7 @@ func (w *layoutWriter) WritePrecompressedDataBlock(blk block.PhysicalBlock) (blo
836837
// the last-written index block's handle and adds it to the file's meta index
837838
// when the writer is finished.
838839
func (w *layoutWriter) WriteIndexBlock(b []byte) (block.Handle, error) {
839-
h, err := w.writeBlock(b, w.compression, &w.buf)
840+
h, err := w.writeBlock(b, &w.compressor, &w.buf)
840841
if err == nil {
841842
w.lastIndexBlockHandle = h
842843
}
@@ -851,42 +852,43 @@ func (w *layoutWriter) WriteFilterBlock(f filterWriter) (bh block.Handle, err er
851852
if err != nil {
852853
return block.Handle{}, err
853854
}
854-
return w.writeNamedBlock(b, block.NoCompression, f.metaName())
855+
return w.writeNamedBlock(b, block.NoopCompressor, f.metaName())
855856
}
856857

857858
// WritePropertiesBlock constructs a trailer for the provided properties block
858859
// and writes the block and trailer to the writer. It automatically adds the
859860
// properties block to the file's meta index when the writer is finished.
860861
func (w *layoutWriter) WritePropertiesBlock(b []byte) (block.Handle, error) {
862+
compressor := &w.compressor
861863
// In v6 and earlier, we use a row oriented block with an infinite restart
862864
// interval, which provides very good prefix compression. Since v7, we use the
863865
// columnar format without prefix compression for this block; we enable block
864866
// compression to compensate.
865-
if w.tableFormat >= TableFormatPebblev7 {
866-
return w.writeNamedBlock(b, w.compression, metaPropertiesName)
867+
if w.tableFormat < TableFormatPebblev7 {
868+
compressor = block.NoopCompressor
867869
}
868-
return w.writeNamedBlock(b, block.NoCompression, metaPropertiesName)
870+
return w.writeNamedBlock(b, compressor, metaPropertiesName)
869871
}
870872

871873
// WriteRangeKeyBlock constructs a trailer for the provided range key block and
872874
// writes the block and trailer to the writer. It automatically adds the range
873875
// key block to the file's meta index when the writer is finished.
874876
func (w *layoutWriter) WriteRangeKeyBlock(b []byte) (block.Handle, error) {
875-
return w.writeNamedBlock(b, block.NoCompression, metaRangeKeyName)
877+
return w.writeNamedBlock(b, block.NoopCompressor, metaRangeKeyName)
876878
}
877879

878880
// WriteRangeDeletionBlock constructs a trailer for the provided range deletion
879881
// block and writes the block and trailer to the writer. It automatically adds
880882
// the range deletion block to the file's meta index when the writer is
881883
// finished.
882884
func (w *layoutWriter) WriteRangeDeletionBlock(b []byte) (block.Handle, error) {
883-
return w.writeNamedBlock(b, block.NoCompression, metaRangeDelV2Name)
885+
return w.writeNamedBlock(b, block.NoopCompressor, metaRangeDelV2Name)
884886
}
885887

886888
func (w *layoutWriter) writeNamedBlock(
887-
b []byte, compression block.Compression, name string,
889+
b []byte, compressor *block.Compressor, name string,
888890
) (bh block.Handle, err error) {
889-
bh, err = w.writeBlock(b, compression, &w.buf)
891+
bh, err = w.writeBlock(b, compressor, &w.buf)
890892
if err == nil {
891893
w.recordToMetaindex(name, bh)
892894
}
@@ -915,12 +917,9 @@ func (w *layoutWriter) WriteValueIndexBlock(
915917

916918
// writeBlock checksums, compresses, and writes out a block.
917919
func (w *layoutWriter) writeBlock(
918-
b []byte, compression block.Compression, buf *blockBuf,
920+
b []byte, compressor *block.Compressor, buf *blockBuf,
919921
) (block.Handle, error) {
920-
// TODO(radu): store a compressor in the layoutWriter.
921-
compressor := block.MakeCompressor(compression)
922-
defer compressor.Close()
923-
pb := block.CompressAndChecksum(&buf.dataBuf, b, &compressor, &buf.checksummer)
922+
pb := block.CompressAndChecksum(&buf.dataBuf, b, compressor, &buf.checksummer)
924923
h, err := w.writePrecompressedBlock(pb)
925924
return h, err
926925
}
@@ -1004,7 +1003,7 @@ func (w *layoutWriter) Finish() (size uint64, err error) {
10041003
}
10051004
b = bw.Finish()
10061005
}
1007-
metaIndexHandle, err := w.writeBlock(b, block.NoCompression, &w.buf)
1006+
metaIndexHandle, err := w.writeBlock(b, block.NoopCompressor, &w.buf)
10081007
if err != nil {
10091008
return 0, err
10101009
}
@@ -1025,5 +1024,6 @@ func (w *layoutWriter) Finish() (size uint64, err error) {
10251024

10261025
err = w.writable.Finish()
10271026
w.writable = nil
1027+
w.compressor.Close()
10281028
return w.offset, err
10291029
}

sstable/rowblk_writer.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1932,13 +1932,10 @@ func (w *RawRowWriter) copyDataBlocks(
19321932
// addDataBlock implements RawWriter.
19331933
func (w *RawRowWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error {
19341934
blockBuf := &w.dataBlockBuf.blockBuf
1935-
// TODO(radu): store a compressor in w.layout.
1936-
compressor := block.MakeCompressor(w.layout.compression)
1937-
defer compressor.Close()
19381935
pb := block.CompressAndChecksum(
19391936
&blockBuf.dataBuf,
19401937
b,
1941-
&compressor,
1938+
&w.layout.compressor,
19421939
&blockBuf.checksummer,
19431940
)
19441941

0 commit comments

Comments
 (0)