Skip to content

Commit

Permalink
Merge a1e6b78 into ef0e552
Browse files Browse the repository at this point in the history
  • Loading branch information
mangalaman93 committed May 17, 2023
2 parents ef0e552 + a1e6b78 commit d2507f0
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 7 deletions.
13 changes: 12 additions & 1 deletion stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ func (sw *StreamWriter) PrepareIncremental() error {
}
sw.done = func() { once.Do(done) }

mts, decr := sw.db.getMemTables()
defer decr()
for _, m := range mts {
if !m.sl.Empty() {
return fmt.Errorf("Unable to do incremental writes because MemTable has data")
}
}

isEmptyDB := true
for _, level := range sw.db.Levels() {
if level.NumTables > 0 {
Expand All @@ -117,7 +125,10 @@ func (sw *StreamWriter) PrepareIncremental() error {
return nil
}
if sw.prevLevel == 0 {
return fmt.Errorf("Unable to do incremental writes because L0 has data")
if err := sw.db.Flatten(3); err != nil {
return errors.Wrapf(err, "error during flatten in StreamWriter")
}
sw.prevLevel = len(sw.db.Levels()) - 1
}
return nil
}
Expand Down
87 changes: 81 additions & 6 deletions stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ func TestStreamWriterWithLargeValue(t *testing.T) {
}

func TestStreamWriterIncremental(t *testing.T) {
addIncremtal := func(t *testing.T, db *DB, keys [][]byte) {
addIncremental := func(t *testing.T, db *DB, keys [][]byte) {
buf := z.NewBuffer(10<<20, "test")
defer func() { require.NoError(t, buf.Release()) }()
for _, key := range keys {
Expand Down Expand Up @@ -633,7 +633,7 @@ func TestStreamWriterIncremental(t *testing.T) {
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")

addIncremtal(t, db, [][]byte{[]byte("key-2")})
addIncremental(t, db, [][]byte{[]byte("key-2")})

txn := db.NewTransaction(false)
defer txn.Discard()
Expand All @@ -646,7 +646,7 @@ func TestStreamWriterIncremental(t *testing.T) {

t.Run("incremental on empty DB", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremtal(t, db, [][]byte{[]byte("key-1")})
addIncremental(t, db, [][]byte{[]byte("key-1")})
txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("key-1"))
Expand All @@ -656,9 +656,9 @@ func TestStreamWriterIncremental(t *testing.T) {

t.Run("multiple incremental", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremtal(t, db, [][]byte{[]byte("a1"), []byte("c1")})
addIncremtal(t, db, [][]byte{[]byte("a2"), []byte("c2")})
addIncremtal(t, db, [][]byte{[]byte("a3"), []byte("c3")})
addIncremental(t, db, [][]byte{[]byte("a1"), []byte("c1")})
addIncremental(t, db, [][]byte{[]byte("a2"), []byte("c2")})
addIncremental(t, db, [][]byte{[]byte("a3"), []byte("c3")})
txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("a1"))
Expand All @@ -675,4 +675,79 @@ func TestStreamWriterIncremental(t *testing.T) {
require.NoError(t, err)
})
})

t.Run("write between incremental writes", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremental(t, db, [][]byte{[]byte("a1"), []byte("c1")})
require.NoError(t, db.Update(func(txn *Txn) error {
return txn.Set([]byte("a3"), []byte("c3"))
}))

sw := db.NewStreamWriter()
defer sw.Cancel()
require.EqualError(t, sw.PrepareIncremental(), "Unable to do incremental writes because MemTable has data")

txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("a1"))
require.NoError(t, err)
_, err = txn.Get([]byte("c1"))
require.NoError(t, err)
_, err = txn.Get([]byte("a3"))
require.NoError(t, err)
})
})

t.Run("incremental writes > #levels", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremental(t, db, [][]byte{[]byte("a1"), []byte("c1")})
addIncremental(t, db, [][]byte{[]byte("a2"), []byte("c2")})
addIncremental(t, db, [][]byte{[]byte("a3"), []byte("c3")})
addIncremental(t, db, [][]byte{[]byte("a4"), []byte("c4")})
addIncremental(t, db, [][]byte{[]byte("a5"), []byte("c5")})
addIncremental(t, db, [][]byte{[]byte("a6"), []byte("c6")})
addIncremental(t, db, [][]byte{[]byte("a7"), []byte("c7")})
addIncremental(t, db, [][]byte{[]byte("a8"), []byte("c8")})
addIncremental(t, db, [][]byte{[]byte("a9"), []byte("c9")})

txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("a1"))
require.NoError(t, err)
_, err = txn.Get([]byte("c1"))
require.NoError(t, err)
_, err = txn.Get([]byte("a2"))
require.NoError(t, err)
_, err = txn.Get([]byte("c2"))
require.NoError(t, err)
_, err = txn.Get([]byte("a3"))
require.NoError(t, err)
_, err = txn.Get([]byte("c3"))
require.NoError(t, err)
_, err = txn.Get([]byte("a4"))
require.NoError(t, err)
_, err = txn.Get([]byte("c4"))
require.NoError(t, err)
_, err = txn.Get([]byte("a5"))
require.NoError(t, err)
_, err = txn.Get([]byte("c5"))
require.NoError(t, err)
_, err = txn.Get([]byte("a6"))
require.NoError(t, err)
_, err = txn.Get([]byte("c6"))
require.NoError(t, err)
_, err = txn.Get([]byte("a7"))
require.NoError(t, err)
_, err = txn.Get([]byte("c7"))
require.NoError(t, err)
_, err = txn.Get([]byte("a8"))
require.NoError(t, err)
_, err = txn.Get([]byte("c8"))
require.NoError(t, err)
_, err = txn.Get([]byte("a9"))
require.NoError(t, err)
_, err = txn.Get([]byte("c9"))
require.NoError(t, err)
})
})
}

0 comments on commit d2507f0

Please sign in to comment.