Skip to content

Commit bb1d12f

Browse files
authored
Maximize disk utilization during Open (#682)
During badger.Open, we read the SSTables from disk and load their index. Previously, this was being done serially, where we open one table at a time, and then we were using 64 goroutines to read the first keys of each block within the SSTable. With this PR, this has been switched around. Using a serial reader to read the block keys provides the same performance as using the 64 goroutines to do random reads (given MemoryMap or LoadToRAM). So, we use a simpler serial reader for `readIndex`. This PR uses 3 goroutines to load multiple tables at a time. Using 3 goroutines has shown to provide maximum disk read throughput utilization, a sweet spot between SSDs and HDDs. Using 8 goroutines to load tables is worse on HDDs than just using 1, due to too much random IOPS. On internal SSD, average read throughput for master is ~400 MBps. With this PR, the throughput is >500MBps, which is the same throughput as possible via cat (i.e. best possible). During these tests, care was taken to ensure that disk cache was flushed out, using: ``` #!/bin/sh echo 3 | sudo tee /proc/sys/vm/drop_caches sudo blockdev --flushbufs /dev/sdb1 ``` Changes: * Attempt at making open tables concurrent. But, my testing shows it to be slower than opening one table at a time. Probably largely due to the fact that I'm testing on external rotational HDD. (Initially 8 goroutines were being used. Using 1 resulted in subpar disk utilization. Using 3 proved ideal in the test set). * Saw a huge perf improvement when reading the tables serially to generate block index (turned out to be the same). * Use concurrency of 2 to open tables. Serial algo for read index (later switched to 3 goroutines). * Improve logging * Self-review
1 parent 10e1ee8 commit bb1d12f

File tree

6 files changed

+121
-67
lines changed

6 files changed

+121
-67
lines changed

badger/cmd/fill.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"time"
2222

2323
"github.com/dgraph-io/badger"
24+
"github.com/dgraph-io/badger/options"
2425
"github.com/dgraph-io/badger/y"
2526
"github.com/spf13/cobra"
2627
)
@@ -56,6 +57,8 @@ func fill(cmd *cobra.Command, args []string) error {
5657
opts.Truncate = truncate
5758
opts.SyncWrites = false
5859
opts.CompactL0OnClose = force
60+
opts.TableLoadingMode = options.FileIO
61+
opts.ValueLogLoadingMode = options.FileIO
5962

6063
db, err := badger.Open(opts)
6164
if err != nil {

badger/cmd/info.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"time"
2727

2828
"github.com/dgraph-io/badger"
29+
"github.com/dgraph-io/badger/options"
2930
"github.com/dgraph-io/badger/table"
3031
"github.com/dgraph-io/badger/y"
3132
humanize "github.com/dustin/go-humanize"
@@ -77,6 +78,7 @@ func dur(src, dst time.Time) string {
7778
func tableInfo(dir, valueDir string) error {
7879
// Open DB
7980
opts := badger.DefaultOptions
81+
opts.TableLoadingMode = options.MemoryMap
8082
opts.Dir = sstDir
8183
opts.ValueDir = vlogDir
8284
opts.ReadOnly = true

badger/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,6 @@ func main() {
3737
}
3838
}()
3939
runtime.SetBlockProfileRate(100)
40+
runtime.GOMAXPROCS(128)
4041
cmd.Execute()
4142
}

levels.go

Lines changed: 75 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"math/rand"
2323
"os"
2424
"sort"
25+
"sync"
26+
"sync/atomic"
2527
"time"
2628

2729
"golang.org/x/net/trace"
@@ -101,33 +103,89 @@ func newLevelsController(kv *DB, mf *Manifest) (*levelsController, error) {
101103
}
102104

103105
// Some files may be deleted. Let's reload.
106+
var flags uint32 = y.Sync
107+
if kv.opt.ReadOnly {
108+
flags |= y.ReadOnly
109+
}
110+
111+
var mu sync.Mutex
104112
tables := make([][]*table.Table, kv.opt.MaxLevels)
105113
var maxFileID uint64
106-
for fileID, tableManifest := range mf.Tables {
107-
fname := table.NewFilename(fileID, kv.opt.Dir)
108-
var flags uint32 = y.Sync
109-
if kv.opt.ReadOnly {
110-
flags |= y.ReadOnly
111-
}
112-
fd, err := y.OpenExistingFile(fname, flags)
113-
if err != nil {
114-
closeAllTables(tables)
115-
return nil, errors.Wrapf(err, "Opening file: %q", fname)
116-
}
117114

118-
t, err := table.OpenTable(fd, kv.opt.TableLoadingMode)
119-
if err != nil {
120-
closeAllTables(tables)
121-
return nil, errors.Wrapf(err, "Opening table: %q", fname)
115+
// Make errCh non-blocking for iteration over mf.Tables.
116+
errCh := make(chan error, len(mf.Tables))
117+
118+
// We found that using 3 goroutines allows disk throughput to be utilized to its max.
119+
// Disk utilization is the main thing we should focus on, while trying to read the data. That's
120+
// the one factor that remains constant between HDD and SSD.
121+
throttleCh := make(chan struct{}, 3)
122+
flushThrottle := func() error {
123+
close(throttleCh)
124+
for range throttleCh {
125+
}
126+
close(errCh)
127+
for err := range errCh {
128+
if err != nil {
129+
return err
130+
}
122131
}
132+
return nil
133+
}
123134

124-
level := tableManifest.Level
125-
tables[level] = append(tables[level], t)
135+
start := time.Now()
136+
var numOpened int32
137+
tick := time.NewTicker(3 * time.Second)
138+
defer tick.Stop()
126139

140+
for fileID, tableManifest := range mf.Tables {
141+
fname := table.NewFilename(fileID, kv.opt.Dir)
142+
THROTTLE:
143+
for {
144+
select {
145+
case throttleCh <- struct{}{}:
146+
break THROTTLE
147+
case <-tick.C:
148+
Infof("%d tables out of %d opened in %s\n", atomic.LoadInt32(&numOpened),
149+
len(mf.Tables), time.Since(start).Round(time.Millisecond))
150+
case err := <-errCh:
151+
if err != nil {
152+
flushThrottle()
153+
closeAllTables(tables)
154+
return nil, err
155+
}
156+
}
157+
}
127158
if fileID > maxFileID {
128159
maxFileID = fileID
129160
}
161+
go func(fname string, level int) {
162+
defer func() {
163+
<-throttleCh
164+
atomic.AddInt32(&numOpened, 1)
165+
}()
166+
fd, err := y.OpenExistingFile(fname, flags)
167+
if err != nil {
168+
errCh <- errors.Wrapf(err, "Opening file: %q", fname)
169+
return
170+
}
171+
172+
t, err := table.OpenTable(fd, kv.opt.TableLoadingMode)
173+
if err != nil {
174+
errCh <- errors.Wrapf(err, "Opening table: %q", fname)
175+
return
176+
}
177+
178+
mu.Lock()
179+
tables[level] = append(tables[level], t)
180+
mu.Unlock()
181+
}(fname, int(tableManifest.Level))
182+
}
183+
if err := flushThrottle(); err != nil {
184+
closeAllTables(tables)
185+
return nil, err
130186
}
187+
Infof("All %d tables opened in %s\n", atomic.LoadInt32(&numOpened),
188+
time.Since(start).Round(time.Millisecond))
131189
s.nextFileID = maxFileID + 1
132190
for i, tbls := range tables {
133191
s.levels[i].initTables(tbls)

table/table.go

Lines changed: 36 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717
package table
1818

1919
import (
20+
"bufio"
21+
"bytes"
2022
"encoding/binary"
2123
"fmt"
24+
"io"
2225
"os"
2326
"path"
2427
"path/filepath"
@@ -150,7 +153,7 @@ func OpenTable(fd *os.File, loadingMode options.FileLoadingMode) (*Table, error)
150153
}
151154
}
152155

153-
if err := t.readIndex(); err != nil {
156+
if err := t.readIndex(loadingMode); err != nil {
154157
return nil, y.Wrap(err)
155158
}
156159

@@ -200,7 +203,7 @@ func (t *Table) readNoFail(off int, sz int) []byte {
200203
return res
201204
}
202205

203-
func (t *Table) readIndex() error {
206+
func (t *Table) readIndex(loadingMode options.FileLoadingMode) error {
204207
readPos := t.tableSize
205208

206209
// Read bloom filter.
@@ -240,54 +243,39 @@ func (t *Table) readIndex() error {
240243
t.blockIndex = append(t.blockIndex, ko)
241244
}
242245

243-
che := make(chan error, len(t.blockIndex))
244-
blocks := make(chan int, len(t.blockIndex))
245-
246-
for i := 0; i < len(t.blockIndex); i++ {
247-
blocks <- i
248-
}
249-
250-
for i := 0; i < 64; i++ { // Run 64 goroutines.
251-
go func() {
252-
var h header
253-
254-
for index := range blocks {
255-
ko := &t.blockIndex[index]
256-
257-
offset := ko.offset
258-
buf, err := t.read(offset, h.Size())
259-
if err != nil {
260-
che <- errors.Wrap(err, "While reading first header in block")
261-
continue
262-
}
263-
264-
h.Decode(buf)
265-
y.AssertTruef(h.plen == 0, "Key offset: %+v, h.plen = %d", *ko, h.plen)
266-
267-
offset += h.Size()
268-
buf = make([]byte, h.klen)
269-
var out []byte
270-
if out, err = t.read(offset, int(h.klen)); err != nil {
271-
che <- errors.Wrap(err, "While reading first key in block")
272-
continue
273-
}
274-
y.AssertTrue(len(buf) == copy(buf, out))
275-
276-
ko.key = buf
277-
che <- nil
278-
}
279-
}()
280-
}
281-
close(blocks) // to stop reading goroutines
282-
283-
var readError error
284-
for i := 0; i < len(t.blockIndex); i++ {
285-
if err := <-che; err != nil && readError == nil {
286-
readError = err
246+
// Execute this index read serially, because all disks are orders of magnitude faster when read
247+
// serially compared to executing random reads.
248+
var h header
249+
var offset int
250+
var r *bufio.Reader
251+
if loadingMode == options.LoadToRAM {
252+
// We already read the table to put it into t.mmap. So, no point reading it again from disk.
253+
// Instead use the read buffer.
254+
r = bufio.NewReader(bytes.NewReader(t.mmap))
255+
} else {
256+
if _, err := t.fd.Seek(0, io.SeekStart); err != nil {
257+
return err
287258
}
259+
r = bufio.NewReader(t.fd)
288260
}
289-
if readError != nil {
290-
return readError
261+
hbuf := make([]byte, h.Size())
262+
for idx := range t.blockIndex {
263+
ko := &t.blockIndex[idx]
264+
if _, err := r.Discard(ko.offset - offset); err != nil {
265+
return err
266+
}
267+
offset = ko.offset
268+
if _, err := io.ReadFull(r, hbuf); err != nil {
269+
return err
270+
}
271+
offset += len(hbuf)
272+
h.Decode(hbuf)
273+
y.AssertTrue(h.plen == 0)
274+
ko.key = make([]byte, h.klen)
275+
if _, err := io.ReadFull(r, ko.key); err != nil {
276+
return err
277+
}
278+
offset += len(ko.key)
291279
}
292280

293281
return nil

util.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package badger
1818

1919
import (
20+
"encoding/hex"
2021
"io/ioutil"
2122
"math/rand"
2223
"sync/atomic"
@@ -77,8 +78,9 @@ func (s *levelHandler) validate() error {
7778

7879
if y.CompareKeys(s.tables[j-1].Biggest(), s.tables[j].Smallest()) >= 0 {
7980
return errors.Errorf(
80-
"Inter: %q vs %q: level=%d j=%d numTables=%d",
81-
string(s.tables[j-1].Biggest()), string(s.tables[j].Smallest()), s.level, j, numTables)
81+
"Inter: Biggest(j-1) \n%s\n vs Smallest(j): \n%s\n: level=%d j=%d numTables=%d",
82+
hex.Dump(s.tables[j-1].Biggest()), hex.Dump(s.tables[j].Smallest()),
83+
s.level, j, numTables)
8284
}
8385

8486
if y.CompareKeys(s.tables[j].Smallest(), s.tables[j].Biggest()) > 0 {

0 commit comments

Comments
 (0)