Skip to content

Commit 134168d

Browse files
committed
sstable: remove checksummer from blockBuf
The compressor and checksummer are used together but in many code paths we use a checksummer in a blockBuf. Remove this and always use a checksummer initialized alongside the compressor.
1 parent 4fa1e6a commit 134168d

File tree

5 files changed

+41
-49
lines changed

5 files changed

+41
-49
lines changed

sstable/colblk_writer.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,10 @@ type RawColumnWriter struct {
112112
disableKeyOrderChecks bool
113113
cpuMeasurer base.CPUMeasurer
114114

115-
// RawColumnWriter writes data sequentially so each writer can have a compressor
116-
compressor block.Compressor
115+
// RawColumnWriter writes data sequentially so each writer can have a
116+
// compressor and checksummer.
117+
compressor block.Compressor
118+
checksummer block.Checksummer
117119
}
118120

119121
// Assert that *RawColumnWriter implements RawWriter.
@@ -137,9 +139,9 @@ func newColumnarWriter(
137139
SmallestSeqNum: math.MaxUint64,
138140
},
139141
opts: o,
140-
layout: makeLayoutWriter(writable, o),
141142
disableKeyOrderChecks: o.internal.DisableKeyOrderChecks,
142143
}
144+
w.layout.Init(writable, o)
143145
w.dataFlush = block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
144146
w.indexFlush = block.MakeFlushGovernor(o.IndexBlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
145147
w.dataBlock.Init(o.KeySchema)
@@ -150,7 +152,7 @@ func newColumnarWriter(
150152
if !o.DisableValueBlocks {
151153
w.valueBlock = valblk.NewWriter(
152154
block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses),
153-
&w.compressor, w.opts.Checksum, func(compressedSize int) {})
155+
&w.compressor, &w.checksummer, func(compressedSize int) {})
154156
}
155157
if o.FilterPolicy != base.NoFilterPolicy {
156158
switch o.FilterType {
@@ -197,6 +199,7 @@ func newColumnarWriter(
197199
go w.drainWriteQueue()
198200

199201
w.compressor = block.MakeCompressor(w.opts.Compression)
202+
w.checksummer.Init(w.opts.Checksum)
200203
return w
201204
}
202205

@@ -724,13 +727,12 @@ func (w *RawColumnWriter) enqueueDataBlock(
724727

725728
// Serialize the data block, compress it and send it to the write queue.
726729
cb := compressedBlockPool.Get().(*compressedBlock)
727-
cb.blockBuf.checksummer.Type = w.opts.Checksum
728730
cb.physical = block.CompressAndChecksum(
729731
&cb.blockBuf.dataBuf,
730732
serializedBlock,
731733
blockkind.SSTableData,
732734
&w.compressor,
733-
&cb.blockBuf.checksummer,
735+
&w.checksummer,
734736
)
735737
return w.enqueuePhysicalBlock(cb, separator)
736738
}
@@ -1270,13 +1272,12 @@ func (w *RawColumnWriter) copyDataBlocks(
12701272
func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error {
12711273
// Serialize the data block, compress it and send it to the write queue.
12721274
cb := compressedBlockPool.Get().(*compressedBlock)
1273-
cb.blockBuf.checksummer.Type = w.opts.Checksum
12741275
cb.physical = block.CompressAndChecksum(
12751276
&cb.blockBuf.dataBuf,
12761277
b,
12771278
blockkind.SSTableData,
12781279
&w.compressor,
1279-
&cb.blockBuf.checksummer,
1280+
&w.checksummer,
12801281
)
12811282
if err := w.enqueuePhysicalBlock(cb, sep); err != nil {
12821283
return err

sstable/layout.go

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -810,9 +810,10 @@ type layoutWriter struct {
810810
cacheOpts sstableinternal.CacheOptions
811811

812812
// options copied from WriterOptions
813-
tableFormat TableFormat
814-
compressor block.Compressor
815-
checksumType block.ChecksumType
813+
tableFormat TableFormat
814+
815+
compressor block.Compressor
816+
checksummer block.Checksummer
816817

817818
// Attribute bitset of the sstable, derived from sstable Properties at the time
818819
// of writing.
@@ -832,17 +833,13 @@ type layoutWriter struct {
832833
buf blockBuf
833834
}
834835

835-
func makeLayoutWriter(w objstorage.Writable, opts WriterOptions) layoutWriter {
836-
return layoutWriter{
837-
writable: w,
838-
cacheOpts: opts.internal.CacheOpts,
839-
tableFormat: opts.TableFormat,
840-
compressor: block.MakeCompressor(opts.Compression),
841-
checksumType: opts.Checksum,
842-
buf: blockBuf{
843-
checksummer: block.Checksummer{Type: opts.Checksum},
844-
},
845-
}
836+
func (w *layoutWriter) Init(writable objstorage.Writable, opts WriterOptions) {
837+
*w = layoutWriter{}
838+
w.writable = writable
839+
w.cacheOpts = opts.internal.CacheOpts
840+
w.tableFormat = opts.TableFormat
841+
w.compressor = block.MakeCompressor(opts.Compression)
842+
w.checksummer.Init(opts.Checksum)
846843
}
847844

848845
type metaIndexHandle struct {
@@ -971,7 +968,7 @@ func (w *layoutWriter) WriteValueIndexBlock(
971968

972969
// writeBlock checksums, compresses, and writes out a block.
973970
func (w *layoutWriter) writeBlock(b []byte, kind block.Kind, buf *blockBuf) (block.Handle, error) {
974-
pb := block.CompressAndChecksum(&buf.dataBuf, b, kind, &w.compressor, &buf.checksummer)
971+
pb := block.CompressAndChecksum(&buf.dataBuf, b, kind, &w.compressor, &w.checksummer)
975972
h, err := w.writePrecompressedBlock(pb)
976973
return h, err
977974
}
@@ -980,7 +977,7 @@ func (w *layoutWriter) writeBlock(b []byte, kind block.Kind, buf *blockBuf) (blo
980977
func (w *layoutWriter) writeBlockUncompressed(
981978
b []byte, kind block.Kind, buf *blockBuf,
982979
) (block.Handle, error) {
983-
pb := block.CopyAndChecksum(&buf.dataBuf, b, kind, &w.compressor, &buf.checksummer)
980+
pb := block.CopyAndChecksum(&buf.dataBuf, b, kind, &w.compressor, &w.checksummer)
984981
h, err := w.writePrecompressedBlock(pb)
985982
return h, err
986983
}
@@ -1072,7 +1069,7 @@ func (w *layoutWriter) Finish() (size uint64, err error) {
10721069
// Write the table footer.
10731070
footer := footer{
10741071
format: w.tableFormat,
1075-
checksum: w.checksumType,
1072+
checksum: w.checksummer.Type,
10761073
metaindexBH: metaIndexHandle,
10771074
indexBH: w.lastIndexBlockHandle,
10781075
attributes: w.attributes,

sstable/rowblk_writer.go

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ type RawRowWriter struct {
5656
isStrictObsolete bool
5757
writingToLowestLevel bool
5858
restartInterval int
59-
checksumType block.ChecksumType
6059
// disableKeyOrderChecks disables the checks that keys are added to an
6160
// sstable in order. It is intended for internal use only in the construction
6261
// of invalid sstables for testing. See tool/make_test_sstables.go.
@@ -411,8 +410,7 @@ type blockBuf struct {
411410
// compression is not used) for storing a copy of the data. It is re-used over
412411
// the lifetime of the blockBuf, avoiding the allocation of a temporary buffer
413412
// for each block.
414-
dataBuf []byte
415-
checksummer block.Checksummer
413+
dataBuf []byte
416414
}
417415

418416
func (b *blockBuf) clear() {
@@ -477,19 +475,20 @@ var dataBlockBufPool = sync.Pool{
477475
},
478476
}
479477

480-
func newDataBlockBuf(restartInterval int, checksumType block.ChecksumType) *dataBlockBuf {
478+
func newDataBlockBuf(restartInterval int) *dataBlockBuf {
481479
d := dataBlockBufPool.Get().(*dataBlockBuf)
482480
d.dataBlock.RestartInterval = restartInterval
483-
d.checksummer.Type = checksumType
484481
return d
485482
}
486483

487484
func (d *dataBlockBuf) finish() {
488485
d.uncompressed = d.dataBlock.Finish()
489486
}
490487

491-
func (d *dataBlockBuf) compressAndChecksum(compressor *block.Compressor) {
492-
d.physical = block.CompressAndChecksum(&d.dataBuf, d.uncompressed, blockkind.SSTableData, compressor, &d.checksummer)
488+
func (d *dataBlockBuf) compressAndChecksum(
489+
compressor *block.Compressor, checksummer *block.Checksummer,
490+
) {
491+
d.physical = block.CompressAndChecksum(&d.dataBuf, d.uncompressed, blockkind.SSTableData, compressor, checksummer)
493492
}
494493

495494
func (d *dataBlockBuf) shouldFlush(
@@ -994,7 +993,7 @@ func (w *RawRowWriter) flush(key InternalKey) error {
994993
}
995994
w.dataBlockBuf.finish()
996995
w.maybeIncrementTombstoneDenseBlocks()
997-
w.dataBlockBuf.compressAndChecksum(&w.layout.compressor)
996+
w.dataBlockBuf.compressAndChecksum(&w.layout.compressor, &w.layout.checksummer)
998997
// Since dataBlockEstimates.addInflightDataBlock was never called, the
999998
// inflightSize is set to 0.
1000999
w.coordination.sizeEstimate.dataBlockCompressed(w.dataBlockBuf.physical.LengthWithoutTrailer(), 0)
@@ -1053,7 +1052,7 @@ func (w *RawRowWriter) flush(key InternalKey) error {
10531052

10541053
w.dataBlockBuf = nil
10551054
err = w.coordination.writeQueue.addSync(writeTask)
1056-
w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
1055+
w.dataBlockBuf = newDataBlockBuf(w.restartInterval)
10571056

10581057
return err
10591058
}
@@ -1679,7 +1678,6 @@ func newRowWriter(writable objstorage.Writable, o WriterOptions) *RawRowWriter {
16791678
}
16801679
o = o.ensureDefaults()
16811680
w := &RawRowWriter{
1682-
layout: makeLayoutWriter(writable, o),
16831681
meta: WriterMetadata{
16841682
SmallestSeqNum: math.MaxUint64,
16851683
},
@@ -1694,7 +1692,6 @@ func newRowWriter(writable objstorage.Writable, o WriterOptions) *RawRowWriter {
16941692
isStrictObsolete: o.IsStrictObsolete,
16951693
writingToLowestLevel: o.WritingToLowestLevel,
16961694
restartInterval: o.BlockRestartInterval,
1697-
checksumType: o.Checksum,
16981695
disableKeyOrderChecks: o.internal.DisableKeyOrderChecks,
16991696
indexBlock: newIndexBlockBuf(),
17001697
rangeDelBlock: rowblk.Writer{RestartInterval: 1},
@@ -1704,25 +1701,22 @@ func newRowWriter(writable objstorage.Writable, o WriterOptions) *RawRowWriter {
17041701
numDeletionsThreshold: o.NumDeletionsThreshold,
17051702
deletionSizeRatioThreshold: o.DeletionSizeRatioThreshold,
17061703
}
1704+
w.layout.Init(writable, o)
17071705
w.dataFlush = block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
17081706
w.indexFlush = block.MakeFlushGovernor(o.IndexBlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses)
17091707
if w.tableFormat >= TableFormatPebblev3 {
17101708
w.shortAttributeExtractor = o.ShortAttributeExtractor
17111709
if !o.DisableValueBlocks {
17121710
w.valueBlockWriter = valblk.NewWriter(
17131711
block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses),
1714-
&w.layout.compressor, w.checksumType, func(compressedSize int) {
1712+
&w.layout.compressor, &w.layout.checksummer, func(compressedSize int) {
17151713
w.coordination.sizeEstimate.dataBlockCompressed(compressedSize, 0)
17161714
},
17171715
)
17181716
}
17191717
}
17201718

1721-
w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType)
1722-
1723-
w.blockBuf = blockBuf{
1724-
checksummer: block.Checksummer{Type: o.Checksum},
1725-
}
1719+
w.dataBlockBuf = newDataBlockBuf(w.restartInterval)
17261720

17271721
w.coordination.init(w)
17281722
defer func() {
@@ -1929,7 +1923,7 @@ func (w *RawRowWriter) copyDataBlocks(
19291923
func (w *RawRowWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error {
19301924
blockBuf := &w.dataBlockBuf.blockBuf
19311925
pb := block.CompressAndChecksum(
1932-
&blockBuf.dataBuf, b, blockkind.SSTableData, &w.layout.compressor, &blockBuf.checksummer,
1926+
&blockBuf.dataBuf, b, blockkind.SSTableData, &w.layout.compressor, &w.layout.checksummer,
19331927
)
19341928

19351929
// layout.WriteDataBlock keeps layout.offset up-to-date for us.

sstable/valblk/writer.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ type Writer struct {
2121
blockFinishedFunc func(compressedSize int)
2222

2323
compressor *block.Compressor
24-
checksummer block.Checksummer
24+
checksummer *block.Checksummer
2525
// buf is the current block being written to (uncompressed).
2626
buf *block.TempBuffer
2727
// Sequence of blocks that are finished.
@@ -47,18 +47,18 @@ var valueBlockWriterPool = sync.Pool{
4747
func NewWriter(
4848
flushGovernor block.FlushGovernor,
4949
compressor *block.Compressor,
50-
checksumType block.ChecksumType,
50+
checksummer *block.Checksummer,
5151
// compressedSize should exclude the block trailer.
5252
blockFinishedFunc func(compressedSize int),
5353
) *Writer {
5454
w := valueBlockWriterPool.Get().(*Writer)
5555
*w = Writer{
5656
flush: flushGovernor,
5757
compressor: compressor,
58+
checksummer: checksummer,
5859
blockFinishedFunc: blockFinishedFunc,
5960
blocks: w.blocks[:0],
6061
}
61-
w.checksummer.Init(checksumType)
6262
w.buf = block.NewTempBuffer()
6363
return w
6464
}
@@ -93,7 +93,7 @@ func (w *Writer) Size() uint64 {
9393

9494
func (w *Writer) compressAndFlush() {
9595
physicalBlock, bufHandle := block.CompressAndChecksumToTempBuffer(
96-
w.buf.Data(), blockkind.SSTableValue, w.compressor, &w.checksummer,
96+
w.buf.Data(), blockkind.SSTableValue, w.compressor, w.checksummer,
9797
)
9898
w.buf.Reset()
9999
bh := block.Handle{Offset: w.totalBlockBytes, Length: uint64(physicalBlock.LengthWithoutTrailer())}
@@ -170,7 +170,7 @@ func (w *Writer) writeValueBlocksIndex(layout LayoutWriter, h IndexHandle) (Inde
170170
if len(b) != 0 {
171171
panic("incorrect length calculation")
172172
}
173-
pb, bufHandle := block.CopyAndChecksumToTempBuffer(w.buf.Data(), blockkind.Metadata, w.compressor, &w.checksummer)
173+
pb, bufHandle := block.CopyAndChecksumToTempBuffer(w.buf.Data(), blockkind.Metadata, w.compressor, w.checksummer)
174174
if _, err := layout.WriteValueIndexBlock(pb, h); err != nil {
175175
return IndexHandle{}, err
176176
}

sstable/writer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,7 @@ func TestBlockBufClear(t *testing.T) {
600600

601601
func TestClearDataBlockBuf(t *testing.T) {
602602
defer leaktest.AfterTest(t)()
603-
d := newDataBlockBuf(1, block.ChecksumTypeCRC32c)
603+
d := newDataBlockBuf(1)
604604
d.blockBuf.dataBuf = make([]byte, 1)
605605
require.NoError(t, d.dataBlock.Add(ikey("apple"), nil))
606606
require.NoError(t, d.dataBlock.Add(ikey("banana"), nil))

0 commit comments

Comments
 (0)