Skip to content

Commit aadda9a

Browse files
author
Ibrahim Jarif
authored
Buffer pool for decompression (#1308)
This commit uses a sync pool to hold the decompression buffers. A buffer is added to the pool only if it was used for decompression. We don't want to put buffers that were not used for decompression because these buffers are read from mmaped SST files and any changes to these buffers would lead to a segfault. Fixes #1239
1 parent af22dfd commit aadda9a

File tree

4 files changed

+78
-20
lines changed

4 files changed

+78
-20
lines changed

db.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,9 @@ func Open(opt Options) (db *DB, err error) {
299299
MaxCost: int64(float64(opt.MaxCacheSize) * 0.95),
300300
BufferItems: 64,
301301
Metrics: true,
302+
OnEvict: func(_, _ uint64, value interface{}, _ int64) {
303+
table.BlockEvictHandler(value)
304+
},
302305
}
303306
db.blockCache, err = ristretto.NewCache(&config)
304307
if err != nil {

table/builder.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,13 @@ func NewTableBuilder(opts Options) *Builder {
118118
return b
119119
}
120120

121-
var slicePool = sync.Pool{
121+
var blockPool = &sync.Pool{
122122
New: func() interface{} {
123-
// Make 4 KB blocks for reuse.
124-
b := make([]byte, 0, 4<<10)
123+
// Create 5 Kb blocks even when the default size of blocks is 4 KB. The
124+
// ZSTD decompresion library increases the buffer by 2X if it's not big
125+
// enough. Using a 5 KB block instead of a 4 KB one avoids the
126+
// unncessary 2X allocation by the decompression library.
127+
b := make([]byte, 5<<10)
125128
return &b
126129
},
127130
}
@@ -135,9 +138,7 @@ func (b *Builder) handleBlock() {
135138
// Compress the block.
136139
if b.opt.Compression != options.None {
137140
var err error
138-
139-
dst = slicePool.Get().(*[]byte)
140-
*dst = (*dst)[:0]
141+
dst = blockPool.Get().(*[]byte)
141142

142143
blockBuf, err = b.compressData(*dst, blockBuf)
143144
y.Check(err)
@@ -167,7 +168,7 @@ func (b *Builder) handleBlock() {
167168
item.end = item.start + uint32(len(blockBuf))
168169

169170
if dst != nil {
170-
slicePool.Put(dst)
171+
blockPool.Put(dst)
171172
}
172173
}
173174
}

table/iterator.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,21 @@ type blockIterator struct {
3333
key []byte
3434
val []byte
3535
entryOffsets []uint32
36+
block *block
3637

3738
// prevOverlap stores the overlap of the previous key with the base key.
3839
// This avoids unnecessary copy of base key when the overlap is same for multiple keys.
3940
prevOverlap uint16
4041
}
4142

4243
func (itr *blockIterator) setBlock(b *block) {
44+
// Decrement the ref for the old block. If the old block was compressed, we
45+
// might be able to reuse it.
46+
itr.block.decrRef()
47+
// Increment the ref for the new block.
48+
b.incrRef()
49+
50+
itr.block = b
4351
itr.err = nil
4452
itr.idx = 0
4553
itr.baseKey = itr.baseKey[:0]
@@ -102,7 +110,9 @@ func (itr *blockIterator) Error() error {
102110
return itr.err
103111
}
104112

105-
func (itr *blockIterator) Close() {}
113+
func (itr *blockIterator) Close() {
114+
itr.block.decrRef()
115+
}
106116

107117
var (
108118
origin = 0
@@ -172,6 +182,7 @@ func (t *Table) NewIterator(reversed bool) *Iterator {
172182

173183
// Close closes the iterator (and it must be called).
174184
func (itr *Iterator) Close() error {
185+
itr.bi.Close()
175186
return itr.t.DecrRef()
176187
}
177188

table/table.go

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -169,15 +169,44 @@ func (t *Table) DecrRef() error {
169169
return nil
170170
}
171171

172+
// BlockEvictHandler is used to reuse the byte slice stored in the block on cache eviction.
173+
func BlockEvictHandler(value interface{}) {
174+
if b, ok := value.(*block); ok {
175+
b.decrRef()
176+
}
177+
}
178+
172179
type block struct {
173180
offset int
174181
data []byte
175182
checksum []byte
176-
entriesIndexStart int // start index of entryOffsets list
177-
entryOffsets []uint32
178-
chkLen int // checksum length
183+
entriesIndexStart int // start index of entryOffsets list
184+
entryOffsets []uint32 // used to binary search an entry in the block.
185+
chkLen int // checksum length.
186+
isReusable bool // used to determine if the blocked should be reused.
187+
ref int32
179188
}
180189

190+
func (b *block) incrRef() {
191+
atomic.AddInt32(&b.ref, 1)
192+
}
193+
func (b *block) decrRef() {
194+
if b == nil {
195+
return
196+
}
197+
198+
p := atomic.AddInt32(&b.ref, -1)
199+
// Insert the []byte into pool only if the block is resuable. When a block
200+
// is reusable a new []byte is used for decompression and this []byte can
201+
// be reused.
202+
// In case of an uncompressed block, the []byte is a reference to the
203+
// table.mmap []byte slice. Any attempt to write data to the mmap []byte
204+
// will lead to SEGFAULT.
205+
if p == 0 && b.isReusable {
206+
blockPool.Put(&b.data)
207+
}
208+
y.AssertTrue(p >= 0)
209+
}
181210
func (b *block) size() int64 {
182211
return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ +
183212
cap(b.data) + cap(b.checksum) + cap(b.entryOffsets)*4)
@@ -419,8 +448,7 @@ func (t *Table) block(idx int) (*block, error) {
419448
}
420449
}
421450

422-
blk.data, err = t.decompressData(blk.data)
423-
if err != nil {
451+
if err = t.decompress(blk); err != nil {
424452
return nil, errors.Wrapf(err,
425453
"failed to decode compressed data in file: %s at offset: %d, len: %d",
426454
t.fd.Name(), blk.offset, ko.Len)
@@ -462,6 +490,7 @@ func (t *Table) block(idx int) (*block, error) {
462490
}
463491
if t.opt.Cache != nil {
464492
key := t.blockCacheKey(idx)
493+
blk.incrRef()
465494
t.opt.Cache.Set(key, blk, blk.size())
466495
}
467496
return blk, nil
@@ -563,7 +592,8 @@ func (t *Table) VerifyChecksum() error {
563592
return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d",
564593
t.Filename(), i, os.Offset)
565594
}
566-
595+
b.incrRef()
596+
defer b.decrRef()
567597
// OnBlockRead or OnTableAndBlockRead, we don't need to call verify checksum
568598
// on block, verification would be done while reading block itself.
569599
if !(t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead) {
@@ -629,15 +659,28 @@ func NewFilename(id uint64, dir string) string {
629659
return filepath.Join(dir, IDToFilename(id))
630660
}
631661

632-
// decompressData decompresses the given data.
633-
func (t *Table) decompressData(data []byte) ([]byte, error) {
662+
// decompress decompresses the data stored in a block.
663+
func (t *Table) decompress(b *block) error {
664+
var err error
634665
switch t.opt.Compression {
635666
case options.None:
636-
return data, nil
667+
// Nothing to be done here.
637668
case options.Snappy:
638-
return snappy.Decode(nil, data)
669+
dst := blockPool.Get().(*[]byte)
670+
b.data, err = snappy.Decode(*dst, b.data)
671+
if err != nil {
672+
return errors.Wrap(err, "failed to decompress")
673+
}
674+
b.isReusable = true
639675
case options.ZSTD:
640-
return y.ZSTDDecompress(nil, data)
676+
dst := blockPool.Get().(*[]byte)
677+
b.data, err = y.ZSTDDecompress(*dst, b.data)
678+
if err != nil {
679+
return errors.Wrap(err, "failed to decompress")
680+
}
681+
b.isReusable = true
682+
default:
683+
return errors.New("Unsupported compression type")
641684
}
642-
return nil, errors.New("Unsupported compression type")
685+
return nil
643686
}

0 commit comments

Comments
 (0)