Skip to content
Permalink
Browse files

Speed up StreamWriter (#825)

Use a goroutine to split up the serial part of writing in StreamWriter to allow another core to work on the table encoding. On my desktop, this can write at the rate of 200MBps (1.6Gbps), finishing 1B keys (16B keys, 16B values) in around 3m20s.

Changes:
* Use a goroutine to split up the serial part of writing in StreamWriter to speed things up.
* Limit to 3 pending requests at a time.
* Use as many goroutines to process requests as the number of streams.
* Update badger.fill tool to send writes to streamwriter concurrently.
* Do batching based on size instead of count.
* Set the value log head correctly.
  • Loading branch information...
manishrjain committed May 29, 2019
1 parent e9447c9 commit 1725096e24a2bbf932d14808e57d78d296753606
Showing with 129 additions and 57 deletions.
  1. +45 −15 badger/cmd/fill.go
  2. +84 −42 stream_writer.go
@@ -20,6 +20,7 @@ import (
"encoding/binary"
"log"
"math/rand"
"sync"
"time"

"github.com/dgraph-io/badger"
@@ -79,27 +80,56 @@ func fillSorted(db *badger.DB, num uint64) error {
if err := writer.Prepare(); err != nil {
return err
}
kvs := &pb.KVList{}
for i := uint64(1); i <= num; i++ {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, i)
kvs.Kv = append(kvs.Kv, &pb.KV{
Key: key,
Value: value,
Version: 1,
})
if len(kvs.Kv) > 1000 {
if err := writer.Write(kvs); err != nil {
return err

wg := &sync.WaitGroup{}
writeCh := make(chan *pb.KVList, 3)
writeRange := func(start, end uint64, streamId uint32) {
// end is not included.
defer wg.Done()
kvs := &pb.KVList{}
var sz int
for i := start; i < end; i++ {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, i)
kvs.Kv = append(kvs.Kv, &pb.KV{
Key: key,
Value: value,
Version: 1,
StreamId: streamId,
})
sz += len(key) + len(value)
if sz >= 4<<20 { // 4 MB
writeCh <- kvs
kvs = &pb.KVList{}
sz = 0
}
kvs = &pb.KVList{}
}
writeCh <- kvs
}
if len(kvs.Kv) > 0 {

// Let's create some streams.
width := num / 16
streamId := uint32(0)
for start := uint64(0); start < num; start += width {
end := start + width
if end > num {
end = num
}
streamId++
wg.Add(1)
go writeRange(start, end, streamId)
}
go func() {
wg.Wait()
close(writeCh)
}()
log.Printf("Max StreamId used: %d. Width: %d\n", streamId, width)
for kvs := range writeCh {
if err := writer.Write(kvs); err != nil {
return err
panic(err)
}
}
log.Println("DONE streaming. Flushing...")
return writer.Flush()
}

@@ -44,9 +44,9 @@ type StreamWriter struct {
db *DB
done func()
throttle *y.Throttle
head valuePointer
maxVersion uint64
writers map[uint32]*sortedWriter
closer *y.Closer
}

// NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be
@@ -60,6 +60,7 @@ func (db *DB) NewStreamWriter() *StreamWriter {
// concurrent streams being processed.
throttle: y.NewThrottle(16),
writers: make(map[uint32]*sortedWriter),
closer: y.NewCloser(0),
}
}

@@ -74,9 +75,12 @@ func (sw *StreamWriter) Prepare() error {
}

// Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
// would use to demux the writes.
// would use to demux the writes. Write is not thread safe and it should NOT be called concurrently.
func (sw *StreamWriter) Write(kvs *pb.KVList) error {
var entries []*Entry
if len(kvs.GetKv()) == 0 {
return nil
}
streamReqs := make(map[uint32]*request)
for _, kv := range kvs.Kv {
var meta, userMeta byte
if len(kv.Meta) > 0 {
@@ -98,50 +102,28 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error {
// If the value can be colocated with the key in LSM tree, we can skip
// writing the value to value log.
e.skipVlog = sw.db.shouldWriteValueToLSM(*e)
entries = append(entries, e)
req := streamReqs[kv.StreamId]
if req == nil {
req = &request{}
streamReqs[kv.StreamId] = req
}
req.Entries = append(req.Entries, e)
}
req := &request{
Entries: entries,
var all []*request
for _, req := range streamReqs {
all = append(all, req)
}
y.AssertTrue(len(kvs.Kv) == len(req.Entries))
if err := sw.db.vlog.write([]*request{req}); err != nil {
if err := sw.db.vlog.write(all); err != nil {
return err
}

for i, kv := range kvs.Kv {
e := req.Entries[i]
vptr := req.Ptrs[i]
if !vptr.IsZero() {
y.AssertTrue(sw.head.Less(vptr))
sw.head = vptr
}

writer, ok := sw.writers[kv.StreamId]
for streamId, req := range streamReqs {
writer, ok := sw.writers[streamId]
if !ok {
writer = sw.newWriter(kv.StreamId)
sw.writers[kv.StreamId] = writer
}

var vs y.ValueStruct
if e.skipVlog {
vs = y.ValueStruct{
Value: e.Value,
Meta: e.meta,
UserMeta: e.UserMeta,
ExpiresAt: e.ExpiresAt,
}
} else {
vbuf := make([]byte, vptrSize)
vs = y.ValueStruct{
Value: vptr.Encode(vbuf),
Meta: e.meta | bitValuePointer,
UserMeta: e.UserMeta,
ExpiresAt: e.ExpiresAt,
}
}
if err := writer.Add(e.Key, vs); err != nil {
return err
writer = sw.newWriter(streamId)
sw.writers[streamId] = writer
}
writer.reqCh <- req
}
return nil
}
@@ -150,15 +132,21 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error {
// updates Oracle with maxVersion found in all entries (if DB is not managed).
func (sw *StreamWriter) Flush() error {
defer sw.done()

sw.closer.SignalAndWait()
var maxHead valuePointer
for _, writer := range sw.writers {
if err := writer.Done(); err != nil {
return err
}
if maxHead.Less(writer.head) {
maxHead = writer.head
}
}

// Encode and write the value log head into a new table.
data := make([]byte, vptrSize)
sw.head.Encode(data)
maxHead.Encode(data)
headWriter := sw.newWriter(headStreamId)
if err := headWriter.Add(
y.KeyWithTs(head, sw.maxVersion),
@@ -198,20 +186,74 @@ type sortedWriter struct {
builder *table.Builder
lastKey []byte
streamId uint32
reqCh chan *request
head valuePointer
}

func (sw *StreamWriter) newWriter(streamId uint32) *sortedWriter {
return &sortedWriter{
w := &sortedWriter{
db: sw.db,
streamId: streamId,
throttle: sw.throttle,
builder: table.NewTableBuilder(),
reqCh: make(chan *request, 3),
}
sw.closer.AddRunning(1)
go w.handleRequests(sw.closer)
return w
}

// ErrUnsortedKey is returned when any out of order key arrives at sortedWriter during call to Add.
var ErrUnsortedKey = errors.New("Keys not in sorted order")

func (w *sortedWriter) handleRequests(closer *y.Closer) {
defer closer.Done()

process := func(req *request) {
for i, e := range req.Entries {
vptr := req.Ptrs[i]
if !vptr.IsZero() {
y.AssertTrue(w.head.Less(vptr))
w.head = vptr
}

var vs y.ValueStruct
if e.skipVlog {
vs = y.ValueStruct{
Value: e.Value,
Meta: e.meta,
UserMeta: e.UserMeta,
ExpiresAt: e.ExpiresAt,
}
} else {
vbuf := make([]byte, vptrSize)
vs = y.ValueStruct{
Value: vptr.Encode(vbuf),
Meta: e.meta | bitValuePointer,
UserMeta: e.UserMeta,
ExpiresAt: e.ExpiresAt,
}
}
if err := w.Add(e.Key, vs); err != nil {
panic(err)
}
}
}

for {
select {
case req := <-w.reqCh:
process(req)
case <-closer.HasBeenClosed():
close(w.reqCh)
for req := range w.reqCh {
process(req)
}
return
}
}
}

// Add adds key and vs to sortedWriter.
func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error {
if bytes.Compare(key, w.lastKey) <= 0 {

0 comments on commit 1725096

Please sign in to comment.
You can’t perform that action at this time.