Skip to content

Commit 335af5d

Browse files
committed
sstable: free physical buffers on write
This change moves the responsibility of releasing the `PhysicalBlock`s to the code that writes out the block. This makes the higher level code simpler and more robust. To avoid misuse, we add an `OwnedPhysicalBlock` type which is used to indicate that once passed, the function takes responsibility for releasing the block. The only way to create an `OwnedPhysicalBlock` clears the `PhysicalBlock` so it can't be incorrectly used again.
1 parent 1520f91 commit 335af5d

File tree

7 files changed

+101
-72
lines changed

7 files changed

+101
-72
lines changed

sstable/blob/blob.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func (w *FileWriter) drainWriteQueue() {
229229
// Call once to initialize the CPU measurer.
230230
w.cpuMeasurer.MeasureCPU(base.CompactionGoroutineBlobFileSecondary)
231231
for cb := range w.writeQueue.ch {
232-
_, err := cb.pb.WriteTo(w.w)
232+
length, err := block.WriteAndReleasePhysicalBlock(cb.pb.Take(), w.w)
233233
// Report to the CPU measurer immediately after writing (note that there
234234
// may be a time lag until the next block is available to write).
235235
w.cpuMeasurer.MeasureCPU(base.CompactionGoroutineBlobFileSecondary)
@@ -239,10 +239,8 @@ func (w *FileWriter) drainWriteQueue() {
239239
}
240240
w.indexEncoder.AddBlockHandle(block.Handle{
241241
Offset: cb.off,
242-
Length: uint64(cb.pb.LengthWithoutTrailer()),
242+
Length: uint64(length.WithoutTrailer()),
243243
})
244-
// We're done with this physical block.
245-
cb.pb.Release()
246244
}
247245
}
248246

@@ -281,16 +279,17 @@ func (w *FileWriter) Close() (FileWriterStats, error) {
281279
{
282280
indexBlock := w.indexEncoder.Finish()
283281
pb := w.physBlockMaker.Make(indexBlock, blockkind.Metadata, block.DontCompress)
284-
defer pb.Release()
285-
if _, w.err = pb.WriteTo(w.w); w.err != nil {
282+
length, err := block.WriteAndReleasePhysicalBlock(pb.Take(), w.w)
283+
if err != nil {
284+
w.err = err
286285
if w.w != nil {
287286
w.w.Abort()
288287
}
289-
return FileWriterStats{}, w.err
288+
return FileWriterStats{}, err
290289
}
291290
indexBlockHandle.Offset = stats.FileLen
292-
indexBlockHandle.Length = uint64(pb.LengthWithoutTrailer())
293-
stats.FileLen += uint64(pb.LengthWithTrailer())
291+
indexBlockHandle.Length = uint64(length.WithoutTrailer())
292+
stats.FileLen += uint64(length.WithTrailer())
294293
}
295294

296295
// Write the footer.

sstable/block/physical.go

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,22 @@ func MakeTrailer(blockType byte, checksum uint32) (t Trailer) {
3636
return t
3737
}
3838

39-
// LengthWithTrailer returns the length of the data block, including the trailer.
40-
func (b PhysicalBlock) LengthWithTrailer() int {
41-
return b.tb.Size()
39+
// PhysicalBlockLength represents the length of a physical block.
40+
type PhysicalBlockLength struct {
41+
lenWithTrailer int
42+
}
43+
44+
func (l PhysicalBlockLength) WithTrailer() int {
45+
return l.lenWithTrailer
46+
}
47+
48+
func (l PhysicalBlockLength) WithoutTrailer() int {
49+
return invariants.SafeSub(l.lenWithTrailer, TrailerLen)
50+
}
51+
52+
// Length returns the length of the data block.
53+
func (b PhysicalBlock) Length() PhysicalBlockLength {
54+
return PhysicalBlockLength{lenWithTrailer: b.tb.Size()}
4255
}
4356

4457
// LengthWithoutTrailer returns the length of the data block, excluding the trailer.
@@ -48,8 +61,10 @@ func (b PhysicalBlock) LengthWithoutTrailer() int {
4861

4962
// Release the underlying TempBuffer. The PhysicalBlock should not be used again.
5063
func (b *PhysicalBlock) Release() {
51-
b.tb.Release()
52-
b.tb = nil
64+
if b.tb != nil {
65+
b.tb.Release()
66+
b.tb = nil
67+
}
5368
}
5469

5570
// AlreadyEncodedPhysicalBlock creates a PhysicalBlock from the provided on-disk
@@ -61,21 +76,37 @@ func AlreadyEncodedPhysicalBlock(dataWithTrailer []byte) PhysicalBlock {
6176
return PhysicalBlock{tb: tb}
6277
}
6378

64-
// WriteTo writes the block (including its trailer) to the provided Writable. If
65-
// err == nil, n is the number of bytes successfully written to the Writable.
66-
//
67-
// WriteTo might mangle the block data.
68-
func (b PhysicalBlock) WriteTo(w objstorage.Writable) (n int, err error) {
69-
if err := w.Write(b.tb.Data()); err != nil {
70-
return 0, err
71-
}
79+
// OwnedPhysicalBlock is a wrapper around PhysicalBlock which indicates that
80+
// whoever is holding it is responsible for releasing the buffer.
81+
type OwnedPhysicalBlock struct {
82+
pb PhysicalBlock
83+
}
7284

73-
// WriteTo is allowed to mangle the data. Mangle it ourselves some of the time
74-
// in invariant builds to catch callers that don't handle this.
75-
if invariants.Enabled && invariants.Sometimes(1) {
76-
invariants.Mangle(b.tb.Data())
85+
// Length returns the length of the data block.
86+
func (b OwnedPhysicalBlock) Length() PhysicalBlockLength {
87+
return PhysicalBlockLength{lenWithTrailer: b.pb.tb.Size()}
88+
}
89+
90+
// Take moves the physical block to an OwnedPhysicalBlock. The receiver must not
91+
// be used again. This is used to make the ownership transfer more explicit.
92+
func (b *PhysicalBlock) Take() OwnedPhysicalBlock {
93+
pb := *b
94+
b.tb = nil
95+
return OwnedPhysicalBlock{pb: pb}
96+
}
97+
98+
// WriteAndReleasePhysicalBlock writes the block (including its trailer) to the
99+
// provided Writable and releases the block. The block is released even in error
100+
// cases.
101+
func WriteAndReleasePhysicalBlock(
102+
b OwnedPhysicalBlock, w objstorage.Writable,
103+
) (length PhysicalBlockLength, err error) {
104+
defer b.pb.Release()
105+
// Note that Write can mangle the buffer, but we're releasing it anyway.
106+
if err := w.Write(b.pb.tb.Data()); err != nil {
107+
return PhysicalBlockLength{}, err
77108
}
78-
return b.tb.Size(), nil
109+
return b.Length(), nil
79110
}
80111

81112
// PhysicalBlockMaker is used to create physical blocks from logical block data.

sstable/colblk_writer.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ type RawColumnWriter struct {
9999

100100
writeQueue struct {
101101
wg sync.WaitGroup
102-
ch chan block.PhysicalBlock
102+
ch chan block.OwnedPhysicalBlock
103103
err error
104104
}
105105
layout layoutWriter
@@ -189,7 +189,7 @@ func newColumnarWriter(
189189
w.props.KeySchemaName = o.KeySchema.Name
190190
w.props.MergerName = o.MergerName
191191

192-
w.writeQueue.ch = make(chan block.PhysicalBlock)
192+
w.writeQueue.ch = make(chan block.OwnedPhysicalBlock)
193193
w.writeQueue.wg.Add(1)
194194
w.cpuMeasurer = cpuMeasurer
195195
go w.drainWriteQueue()
@@ -710,15 +710,17 @@ func (w *RawColumnWriter) enqueueDataBlock(
710710

711711
// Compress and checksum the data block and send it to the write queue.
712712
pb := w.layout.physBlockMaker.Make(serializedBlock, blockkind.SSTableData, block.NoFlags)
713-
return w.enqueuePhysicalBlock(pb, separator)
713+
return w.enqueuePhysicalBlock(pb.Take(), separator)
714714
}
715715

716716
// enqueuePhysicalBlock enqueues a physical block to the write queue; the
717717
// physical block will be automatically released.
718-
func (w *RawColumnWriter) enqueuePhysicalBlock(pb block.PhysicalBlock, separator []byte) error {
718+
func (w *RawColumnWriter) enqueuePhysicalBlock(
719+
pb block.OwnedPhysicalBlock, separator []byte,
720+
) error {
719721
dataBlockHandle := block.Handle{
720722
Offset: w.queuedDataSize,
721-
Length: uint64(pb.LengthWithoutTrailer()),
723+
Length: uint64(pb.Length().WithoutTrailer()),
722724
}
723725
w.queuedDataSize += dataBlockHandle.Length + block.TrailerLen
724726
w.writeQueue.ch <- pb
@@ -881,7 +883,6 @@ func (w *RawColumnWriter) drainWriteQueue() {
881883
if _, err := w.layout.WritePrecompressedDataBlock(pb); err != nil {
882884
w.writeQueue.err = err
883885
}
884-
pb.Release()
885886
// Report to the CPU measurer immediately after writing (note that there
886887
// may be a time lag until the next block is available to write).
887888
w.cpuMeasurer.MeasureCPU(base.CompactionGoroutineSSTableSecondary)
@@ -1125,7 +1126,7 @@ func (w *RawColumnWriter) rewriteSuffixes(
11251126
w.separatorBuf = w.comparer.Successor(w.separatorBuf[:0], blocks[i].end.UserKey)
11261127
separator = w.separatorBuf
11271128
}
1128-
if err := w.enqueuePhysicalBlock(blocks[i].physical, separator); err != nil {
1129+
if err := w.enqueuePhysicalBlock(blocks[i].physical.Take(), separator); err != nil {
11291130
return err
11301131
}
11311132
}
@@ -1207,7 +1208,7 @@ func (w *RawColumnWriter) copyDataBlocks(
12071208
offsetDiff := blocks[i].bh.Offset - blocks[firstBlockIdx].bh.Offset
12081209
dataWithTrailer := buf[offsetDiff : offsetDiff+blocks[i].bh.Length+block.TrailerLen]
12091210
pb := block.AlreadyEncodedPhysicalBlock(dataWithTrailer)
1210-
if err := w.enqueuePhysicalBlock(pb, blocks[i].sep); err != nil {
1211+
if err := w.enqueuePhysicalBlock(pb.Take(), blocks[i].sep); err != nil {
12111212
return err
12121213
}
12131214
}
@@ -1242,7 +1243,7 @@ func (w *RawColumnWriter) copyDataBlocks(
12421243
func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProperties) error {
12431244
// Compress and checksum the data block and send it to the write queue.
12441245
pb := w.layout.physBlockMaker.Make(b, blockkind.SSTableData, block.NoFlags)
1245-
if err := w.enqueuePhysicalBlock(pb, sep); err != nil {
1246+
if err := w.enqueuePhysicalBlock(pb.Take(), sep); err != nil {
12461247
return err
12471248
}
12481249
return nil

sstable/layout.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -859,10 +859,12 @@ func (w *layoutWriter) WriteDataBlock(b []byte) (block.Handle, error) {
859859
return w.writeBlock(b, blockkind.SSTableData)
860860
}
861861

862-
// WritePrecompressedDataBlock writes a pre-compressed data block and its
863-
// pre-computed trailer to the writer, returning its block handle. It can mangle
864-
// the block data.
865-
func (w *layoutWriter) WritePrecompressedDataBlock(blk block.PhysicalBlock) (block.Handle, error) {
862+
// WritePrecompressedDataBlock writes a pre-compressed data block (including the
863+
// trailer) and releases it, returning its block handle. Releases the block even
864+
// in error cases.
865+
func (w *layoutWriter) WritePrecompressedDataBlock(
866+
blk block.OwnedPhysicalBlock,
867+
) (block.Handle, error) {
866868
return w.writePhysicalBlock(blk)
867869
}
868870

@@ -943,15 +945,15 @@ func (w *layoutWriter) writeNamedBlockUncompressed(
943945
}
944946

945947
// WriteValueBlock writes a pre-finished value block (with the trailer) to the
946-
// writer. It can mangle the block data.
947-
func (w *layoutWriter) WriteValueBlock(blk block.PhysicalBlock) (block.Handle, error) {
948+
// writer and releases the buffer. The buffer is releaseed even in error cases.
949+
func (w *layoutWriter) WriteValueBlock(blk block.OwnedPhysicalBlock) (block.Handle, error) {
948950
return w.writePhysicalBlock(blk)
949951
}
950952

951-
// WriteValueIndexBlock writes a value index block and adds it to the meta
952-
// index. It can mangle the block data.
953+
// WriteValueIndexBlock writes a value index block, releases it, ands updates
954+
// the meta index. The block is released even in error cases.
953955
func (w *layoutWriter) WriteValueIndexBlock(
954-
blk block.PhysicalBlock, vbih valblk.IndexHandle,
956+
blk block.OwnedPhysicalBlock, vbih valblk.IndexHandle,
955957
) (block.Handle, error) {
956958
h, err := w.writePhysicalBlock(blk)
957959
if err != nil {
@@ -965,32 +967,31 @@ func (w *layoutWriter) WriteValueIndexBlock(
965967
// writeBlock checksums, compresses, and writes out a block.
966968
func (w *layoutWriter) writeBlock(b []byte, kind block.Kind) (block.Handle, error) {
967969
pb := w.physBlockMaker.Make(b, kind, block.NoFlags)
968-
defer pb.Release()
969-
h, err := w.writePhysicalBlock(pb)
970+
h, err := w.writePhysicalBlock(pb.Take())
970971
return h, err
971972
}
972973

973974
// writeBlock checksums and writes out a block.
974975
func (w *layoutWriter) writeBlockUncompressed(b []byte, kind block.Kind) (block.Handle, error) {
975976
pb := w.physBlockMaker.Make(b, kind, block.DontCompress)
976-
defer pb.Release()
977-
h, err := w.writePhysicalBlock(pb)
977+
h, err := w.writePhysicalBlock(pb.Take())
978978
return h, err
979979
}
980980

981-
// writePhysicalBlock writes a physical block to the writer, returning its block
982-
// handle. Does not release the physical block.
981+
// writePhysicalBlock writes a physical block to the writer and releases it,
982+
// returning its block handle. Releases the physical block even in error cases.
983983
//
984984
// writePhysicalBlock might mangle the block data.
985-
func (w *layoutWriter) writePhysicalBlock(blk block.PhysicalBlock) (block.Handle, error) {
985+
func (w *layoutWriter) writePhysicalBlock(blk block.OwnedPhysicalBlock) (block.Handle, error) {
986986
w.clearFromCache(w.offset)
987-
// Write the bytes to the file.
988-
n, err := blk.WriteTo(w.writable)
987+
// Write the bytes to the file. Note that this call can mangle the block data,
988+
// but it should not matter since we are releasing the TempBuffer right away.
989+
length, err := block.WriteAndReleasePhysicalBlock(blk, w.writable)
989990
if err != nil {
990991
return block.Handle{}, err
991992
}
992-
bh := block.Handle{Offset: w.offset, Length: uint64(blk.LengthWithoutTrailer())}
993-
w.offset += uint64(n)
993+
bh := block.Handle{Offset: w.offset, Length: uint64(length.WithoutTrailer())}
994+
w.offset += uint64(length.WithTrailer())
994995
return bh, nil
995996
}
996997

sstable/rowblk_writer.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,7 +1810,7 @@ func (w *RawRowWriter) rewriteSuffixes(
18101810

18111811
for i := range blocks {
18121812
// Write the rewritten block to the file.
1813-
bh, err := w.layout.WritePrecompressedDataBlock(blocks[i].physical)
1813+
bh, err := w.layout.WritePrecompressedDataBlock(blocks[i].physical.Take())
18141814
blocks[i].physical.Release()
18151815
if err != nil {
18161816
return err
@@ -1922,8 +1922,7 @@ func (w *RawRowWriter) addDataBlock(b, sep []byte, bhp block.HandleWithPropertie
19221922

19231923
// layout.writePhysicalBlock keeps layout.offset up-to-date for us.
19241924
// Note that this can mangle the pb data.
1925-
bh, err := w.layout.writePhysicalBlock(pb)
1926-
pb.Release()
1925+
bh, err := w.layout.writePhysicalBlock(pb.Take())
19271926
if err != nil {
19281927
return err
19291928
}

sstable/valblk/writer.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ func (w *Writer) Size() uint64 {
9191
func (w *Writer) compressAndFlush() {
9292
physicalBlock := w.physBlockMaker.Make(w.buf.Data(), blockkind.SSTableValue, block.NoFlags)
9393
w.buf.Reset()
94-
bh := block.Handle{Offset: w.totalBlockBytes, Length: uint64(physicalBlock.LengthWithoutTrailer())}
95-
w.totalBlockBytes += uint64(physicalBlock.LengthWithTrailer())
94+
bh := block.Handle{Offset: w.totalBlockBytes, Length: uint64(physicalBlock.Length().WithoutTrailer())}
95+
w.totalBlockBytes += uint64(physicalBlock.Length().WithTrailer())
9696
// blockFinishedFunc length excludes the block trailer.
9797
w.blockFinishedFunc(physicalBlock.LengthWithoutTrailer())
9898
w.blocks = append(w.blocks, bufferedValueBlock{
@@ -114,7 +114,7 @@ func (w *Writer) Finish(layout LayoutWriter, fileOffset uint64) (IndexHandle, Wr
114114
largestOffset := uint64(0)
115115
largestLength := uint64(0)
116116
for i := range w.blocks {
117-
_, err := layout.WriteValueBlock(w.blocks[i].block)
117+
_, err := layout.WriteValueBlock(w.blocks[i].block.Take())
118118
if err != nil {
119119
return IndexHandle{}, WriterStats{}, err
120120
}
@@ -165,8 +165,7 @@ func (w *Writer) writeValueBlocksIndex(layout LayoutWriter, h IndexHandle) (Inde
165165
panic("incorrect length calculation")
166166
}
167167
pb := w.physBlockMaker.Make(w.buf.Data(), blockkind.Metadata, block.DontCompress)
168-
defer pb.Release()
169-
if _, err := layout.WriteValueIndexBlock(pb, h); err != nil {
168+
if _, err := layout.WriteValueIndexBlock(pb.Take(), h); err != nil {
170169
return IndexHandle{}, err
171170
}
172171
return h, nil
@@ -200,10 +199,10 @@ type WriterStats struct {
200199
// LayoutWriter defines the interface for a writer that writes out serialized
201200
// value and value index blocks.
202201
type LayoutWriter interface {
203-
// WriteValueBlock writes a pre-finished value block (with the trailer) to
204-
// the writer.
205-
WriteValueBlock(blk block.PhysicalBlock) (block.Handle, error)
206-
// WriteValueIndexBlock writes a pre-finished value block index to the
207-
// writer.
208-
WriteValueIndexBlock(blk block.PhysicalBlock, vbih IndexHandle) (block.Handle, error)
202+
// WriteValueBlock writes a pre-finished value block (with the trailer) to the
203+
// writer and releases the block. The block is released even in error cases.
204+
WriteValueBlock(blk block.OwnedPhysicalBlock) (block.Handle, error)
205+
// WriteValueIndexBlock writes a pre-finished value block index to the writer
206+
// and releases the block. The block is released even in error cases.
207+
WriteValueIndexBlock(blk block.OwnedPhysicalBlock, vbih IndexHandle) (block.Handle, error)
209208
}

sstable/write_queue.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ func newWriteQueue(size int, writer *RawRowWriter) *writeQueue {
6060
}
6161

6262
func (w *writeQueue) performWrite(task *writeTask) error {
63-
handle, err := w.writer.layout.WritePrecompressedDataBlock(task.buf.physical)
64-
task.buf.physical.Release()
63+
handle, err := w.writer.layout.WritePrecompressedDataBlock(task.buf.physical.Take())
6564
if err != nil {
6665
return err
6766
}

0 commit comments

Comments
 (0)