Skip to content

Commit

Permalink
fix(test): fix flakiness of TestPersistLFDiscardStats (#1963)
Browse files Browse the repository at this point in the history
fixes DGRAPHCORE-234

## Problem
The purpose of TestPersistLFDiscardStats is to make sure that if you
make changes to the file such that we need to maintain the discardStats
in the value log, and if you close the DB, then when you reopen the DB,
you remember the same status.

Note that the DiscardStats are updated when we perform compaction, and
in our test cases we have 4 compactors concurrently running
(with ids 0, 1, 2, and 3). Most of the time, we were fine when we captured
a single compaction cycle -- and then closed the DB and then reopened.
However, there was a race condition wherein we waited for some arbitrary
amount of time, then captured the current discardStats, then closed the
DB. The problem is that between the time that we capture the discardStatus
and close the DB, another compaction cycle may have started!

Hence, every once in a while, we would find that the saved copy of the discard
stats would not match what we picked up later when we reopened the file.

## Solution
As noted in the problem statement, we ended up waiting an "arbitrary amount
of time" instead of waiting for specific events and then reacting to those events.
Specifically, we wanted to wait until at least "some stats" had been generated,
and then we waited for compaction to complete. Unfortunately, there did not
exist a clear way (previously) to capture the event of "some stats having been
generated", nor was there a way to capture the discardStats upon closure of
the database.

The solution then was to add in two things: first, a test channel in the database
where we can log messages to this channel, but only when the channel has
been specified. Second, we add in a means (via options.go) of specifying
an "onCloseDiscardCapture" map. This map will be populated (assuming it
was initialized and is not nil) when we close the db and specifically when we
close the valueLog.

We no longer rely on time.Sleep, but instead rely on specific events.
  • Loading branch information
billprovince committed Jun 12, 2023
1 parent 907dd65 commit 3e4a25d
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 8 deletions.
5 changes: 5 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func (lk *lockedKeys) all() []uint64 {
// DB provides the various functions required to interact with Badger.
// DB is thread-safe.
type DB struct {
testOnlyDBExtensions

lock sync.RWMutex // Guards list of inmemory tables, not individual reads and writes.

dirLockGuard *directoryLockGuard
Expand Down Expand Up @@ -252,6 +254,9 @@ func Open(opt Options) (*DB, error) {
bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
threshold: initVlogThreshold(&opt),
}

db.syncChan = opt.syncChan

// Cleanup all the goroutines started by badger in case of an error.
defer func() {
if err != nil {
Expand Down
36 changes: 36 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,42 @@ import (
"github.com/dgraph-io/ristretto/z"
)

// waitForMessage(ch, expected, count, timeout, t) will block until either
// `timeout` seconds have occurred or `count` instances of the string `expected`
// have occurred on the channel `ch`. We log messages or generate errors using `t`.
func waitForMessage(ch chan string, expected string, count int, timeout int, t *testing.T) {
if count <= 0 {
t.Logf("Will skip waiting for %s since exected count <= 0.",
expected)
return
}
tout := time.NewTimer(time.Duration(timeout) * time.Second)
remaining := count
for {
select {
case curMsg, ok := <-ch:
if !ok {
t.Errorf("Test channel closed while waiting for "+
"message %s with %d remaining instances expected",
expected, remaining)
return
}
t.Logf("Found message: %s", curMsg)
if curMsg == expected {
remaining--
if remaining == 0 {
return
}
}
case <-tout.C:
t.Errorf("Timed out after %d seconds while waiting on test chan "+
"for message '%s' with %d remaining instances expected",
timeout, expected, remaining)
return
}
}
}

// summary is produced when DB is closed. Currently it is used only for testing.
type summary struct {
fileIDs map[uint64]bool
Expand Down
2 changes: 2 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
//
// Each option X is documented on the WithX method.
type Options struct {
testOnlyOptions

// Required options.

Dir string
Expand Down
79 changes: 79 additions & 0 deletions test_extensions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2023 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package badger

// Important: Do NOT import the "testing" package, as otherwise, that
// will pull in imports into the production class that we do not want.

// TODO: Consider using this with specific compilation tags so that it only
// shows up when performing testing (e.g., specify build tag=unit).
// We are not yet ready to do that, as it may impact customer usage as
// well as requiring us to update the CI build flags. Moreover, the
// current model does not actually incur any significant cost.
// If we do this, we will also want to introduce a parallel file that
// overrides some of these structs and functions with empty contents.

// String constants for messages to be pushed to syncChan.
const (
updateDiscardStatsMsg = "updateDiscardStats iteration done"
endVLogInitMsg = "End: vlog.init(db)"
)

// testOnlyOptions specifies an extension to the type Options that we want to
// use only in the context of testing.
type testOnlyOptions struct {
// syncChan is used to listen for specific messages related to activities
// that can occur in a DB instance. Currently, this is only used in
// testing activities.
syncChan chan string
}

// testOnlyDBExtensions specifies an extension to the type DB that we want to
// use only in the context of testing.
type testOnlyDBExtensions struct {
syncChan chan string

// onCloseDiscardCapture will be populated by a DB instance during the
// process of performing the Close operation. Currently, we only consider
// using this during testing.
onCloseDiscardCapture map[uint64]uint64
}

// logToSyncChan sends a message to the DB's syncChan. Note that we expect
// that the DB never closes this channel; the responsibility for
// allocating and closing the channel belongs to the test module.
// if db.syncChan is nil or has never been initialized, ths will be
// silently ignored.
func (db *DB) logToSyncChan(msg string) {
if db.syncChan != nil {
db.syncChan <- msg
}
}

// captureDiscardStats will copy the contents of the discardStats file
// maintained by vlog to the onCloseDiscardCapture map specified by
// db.opt. Of couse, if db.opt.onCloseDiscardCapture is nil (as expected
// for a production system as opposed to a test system), this is a no-op.
func (db *DB) captureDiscardStats() {
if db.onCloseDiscardCapture != nil {
db.vlog.discardStats.Lock()
db.vlog.discardStats.Iterate(func(id, val uint64) {
db.onCloseDiscardCapture[id] = val
})
db.vlog.discardStats.Unlock()
}
}
6 changes: 6 additions & 0 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,8 @@ func (vlog *valueLog) init(db *DB) {
lf, err := InitDiscardStats(vlog.opt)
y.Check(err)
vlog.discardStats = lf
// See TestPersistLFDiscardStats for purpose of statement below.
db.logToSyncChan(endVLogInitMsg)
}

func (vlog *valueLog) open(db *DB) error {
Expand Down Expand Up @@ -640,6 +642,7 @@ func (vlog *valueLog) Close() error {
}
}
if vlog.discardStats != nil {
vlog.db.captureDiscardStats()
if terr := vlog.discardStats.Close(-1); terr != nil && err == nil {
err = terr
}
Expand Down Expand Up @@ -1103,6 +1106,9 @@ func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
for fid, discard := range stats {
vlog.discardStats.Update(fid, discard)
}
// The following is to coordinate with some test cases where we want to
// verify that at least one iteration of updateDiscardStats has been completed.
vlog.db.logToSyncChan(updateDiscardStatsMsg)
}

type vlogThreshold struct {
Expand Down
19 changes: 11 additions & 8 deletions value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"reflect"
"sync"
"testing"
"time"

humanize "github.com/dustin/go-humanize"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -496,9 +495,14 @@ func TestPersistLFDiscardStats(t *testing.T) {
opt.CompactL0OnClose = false
opt.MemTableSize = 1 << 15
opt.ValueThreshold = 1 << 10
tChan := make(chan string, 100)
defer close(tChan)
opt.syncChan = tChan

db, err := Open(opt)
require.NoError(t, err)
capturedDiscardStats := make(map[uint64]uint64)
db.onCloseDiscardCapture = capturedDiscardStats

sz := 128 << 10 // 5 entries per value log file.
v := make([]byte, sz)
Expand All @@ -522,14 +526,11 @@ func TestPersistLFDiscardStats(t *testing.T) {
require.NoError(t, err)
}

time.Sleep(2 * time.Second) // wait for compaction to complete
// Wait for invocation of updateDiscardStats at least once -- timeout after 60 seconds.
waitForMessage(tChan, updateDiscardStatsMsg, 1, 60, t)

persistedMap := make(map[uint64]uint64)
db.vlog.discardStats.Lock()
require.True(t, db.vlog.discardStats.Len() > 1, "some discardStats should be generated")
db.vlog.discardStats.Iterate(func(fid, val uint64) {
persistedMap[fid] = val
})

db.vlog.discardStats.Unlock()
require.NoError(t, db.Close())
Expand All @@ -539,13 +540,15 @@ func TestPersistLFDiscardStats(t *testing.T) {
db, err = Open(opt)
require.NoError(t, err)
defer db.Close()
time.Sleep(1 * time.Second) // Wait for discardStats to be populated by populateDiscardStats().
waitForMessage(tChan, endVLogInitMsg, 1, 60, t)
db.vlog.discardStats.Lock()
statsMap := make(map[uint64]uint64)
db.vlog.discardStats.Iterate(func(fid, val uint64) {
statsMap[fid] = val
})
require.True(t, reflect.DeepEqual(persistedMap, statsMap), "Discard maps are not equal")
require.Truef(t, reflect.DeepEqual(capturedDiscardStats, statsMap),
"Discard maps are not equal. On Close: %+v, After Reopen: %+v",
capturedDiscardStats, statsMap)
db.vlog.discardStats.Unlock()
}

Expand Down

0 comments on commit 3e4a25d

Please sign in to comment.