Skip to content

Commit 38eb5a1

Browse files
authored
chore(stream): log the estimated size of data to be streamed (#1632)
When you're scanning a lot, but sending little data, the stream framework seems to work really slowly. But, that's deceiving, because it is scanning really fast. We calculate the rough uncompressed size of the data the stream framework will send. We also periodically log the amount of data scanned by produceKVs.
1 parent d6666ae commit 38eb5a1

File tree

3 files changed

+29
-5
lines changed

3 files changed

+29
-5
lines changed

db.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,6 +1300,19 @@ func (db *DB) Levels() []LevelInfo {
13001300
return db.lc.getLevelInfo()
13011301
}
13021302

1303+
// EstimateSize can be used to get rough estimate of data size for a given prefix.
1304+
func (db *DB) EstimateSize(prefix []byte) (uint64, uint64) {
1305+
var onDiskSize, uncompressedSize uint64
1306+
tables := db.Tables()
1307+
for _, ti := range tables {
1308+
if bytes.HasPrefix(ti.Left, prefix) && bytes.HasPrefix(ti.Right, prefix) {
1309+
onDiskSize += uint64(ti.OnDiskSize)
1310+
uncompressedSize += uint64(ti.UncompressedSize)
1311+
}
1312+
}
1313+
return onDiskSize, uncompressedSize
1314+
}
1315+
13031316
// KeySplits can be used to get rough key ranges to divide up iteration over
13041317
// the DB.
13051318
func (db *DB) KeySplits(prefix []byte) []string {

iterator.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,8 @@ type Iterator struct {
429429

430430
lastKey []byte // Used to skip over multiple versions of the same key.
431431

432-
closed bool
432+
closed bool
433+
scanned int // Used to estimate the size of data scanned by iterator.
433434

434435
// ThreadId is an optional value that can be set to identify which goroutine created
435436
// the iterator. It can be used, for example, to uniquely identify each of the
@@ -559,11 +560,11 @@ func (it *Iterator) Close() {
559560
func (it *Iterator) Next() {
560561
// Reuse current item
561562
it.item.wg.Wait() // Just cleaner to wait before pushing to avoid doing ref counting.
563+
it.scanned += len(it.item.key) + len(it.item.val) + len(it.item.vptr) + 2
562564
it.waste.push(it.item)
563565

564566
// Set next item to current
565567
it.item = it.data.pop()
566-
567568
for it.iitr.Valid() {
568569
if it.parseItem() {
569570
// parseItem calls one extra next.

stream.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ type Stream struct {
8787
kvChan chan *z.Buffer
8888
nextStreamId uint32
8989
doneMarkers bool
90+
scanned uint64 // used to estimate the ETA for data scan.
9091
}
9192

9293
// SendDoneMarkers when true would send out done markers on the stream. False by default.
@@ -202,11 +203,14 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
202203

203204
// This unique stream id is used to identify all the keys from this iteration.
204205
streamId := atomic.AddUint32(&st.nextStreamId, 1)
206+
var scanned int
205207

206208
sendIt := func() error {
207209
select {
208210
case st.kvChan <- outList:
209211
outList = z.NewBuffer(2 * batchSize)
212+
atomic.AddUint64(&st.scanned, uint64(itr.scanned-scanned))
213+
scanned = itr.scanned
210214
case <-ctx.Done():
211215
return ctx.Err()
212216
}
@@ -227,6 +231,7 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
227231
if len(kr.right) > 0 && bytes.Compare(item.Key(), kr.right) >= 0 {
228232
break
229233
}
234+
230235
// Check if we should pick this key.
231236
if st.ChooseKey != nil && !st.ChooseKey(item) {
232237
continue
@@ -281,6 +286,10 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
281286
}
282287

283288
func (st *Stream) streamKVs(ctx context.Context) error {
289+
onDiskSize, uncompressedSize := st.db.EstimateSize(st.Prefix)
290+
st.db.opt.Infof("%s Streaming about %s of uncompressed data (%s on disk)\n",
291+
st.LogPrefix, humanize.IBytes(uncompressedSize), humanize.IBytes(onDiskSize))
292+
284293
var bytesSent uint64
285294
t := time.NewTicker(time.Second)
286295
defer t.Stop()
@@ -340,9 +349,10 @@ outer:
340349
continue
341350
}
342351
speed := bytesSent / durSec
343-
344-
st.db.opt.Infof("%s Time elapsed: %s, bytes sent: %s, speed: %s/sec, jemalloc: %s\n",
345-
st.LogPrefix, y.FixedDuration(dur), humanize.IBytes(bytesSent),
352+
scanned := atomic.LoadUint64(&st.scanned)
353+
st.db.opt.Infof("%s Time elapsed: %s, scanned: ~%s/%s, bytes sent: %s, speed: %s/sec,"+
354+
"jemalloc: %s\n", st.LogPrefix, y.FixedDuration(dur), humanize.IBytes(scanned),
355+
humanize.IBytes(uncompressedSize), humanize.IBytes(bytesSent),
346356
humanize.IBytes(speed), humanize.IBytes(uint64(z.NumAllocBytes())))
347357

348358
case kvs, ok := <-st.kvChan:

0 commit comments

Comments
 (0)