Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions badger/cmd/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var (
numAccounts int
numPrevious int
duration string
stopAll int32
stopAll atomic.Int32
checkStream bool
checkSubscriber bool
verbose bool
Expand Down Expand Up @@ -241,7 +241,7 @@ func seekTotal(txn *badger.Txn) ([]account, error) {
if total != expected {
log.Printf("Balance did NOT match up. Expected: %d. Received: %d",
expected, total)
atomic.AddInt32(&stopAll, 1)
stopAll.Add(1)
return accounts, errFailure
}
return accounts, nil
Expand Down Expand Up @@ -419,7 +419,7 @@ func runTest(cmd *cobra.Command, args []string) error {

// startTs := time.Now()
endTs := time.Now().Add(dur)
var total, errors, reads uint64
var total, errors, reads atomic.Uint64

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -429,15 +429,15 @@ func runTest(cmd *cobra.Command, args []string) error {
defer ticker.Stop()

for range ticker.C {
if atomic.LoadInt32(&stopAll) > 0 {
if stopAll.Load() > 0 {
// Do not proceed.
return
}
// log.Printf("[%6s] Total: %d. Errors: %d Reads: %d.\n",
// time.Since(startTs).Round(time.Second).String(),
// atomic.LoadUint64(&total),
// atomic.LoadUint64(&errors),
// atomic.LoadUint64(&reads))
// total.Load(),
// errors.Load(),
// reads.Load())
if time.Now().After(endTs) {
return
}
Expand All @@ -454,7 +454,7 @@ func runTest(cmd *cobra.Command, args []string) error {
defer ticker.Stop()

for range ticker.C {
if atomic.LoadInt32(&stopAll) > 0 {
if stopAll.Load() > 0 {
// Do not proceed.
return
}
Expand All @@ -467,11 +467,11 @@ func runTest(cmd *cobra.Command, args []string) error {
continue
}
err := moveMoney(db, from, to)
atomic.AddUint64(&total, 1)
total.Add(1)
if err == nil && verbose {
log.Printf("Moved $5. %d -> %d\n", from, to)
} else {
atomic.AddUint64(&errors, 1)
errors.Add(1)
}
}
}()
Expand All @@ -489,7 +489,7 @@ func runTest(cmd *cobra.Command, args []string) error {
log.Printf("Received stream\n")

// Do not proceed.
if atomic.LoadInt32(&stopAll) > 0 || time.Now().After(endTs) {
if stopAll.Load() > 0 || time.Now().After(endTs) {
return
}

Expand Down Expand Up @@ -533,7 +533,7 @@ func runTest(cmd *cobra.Command, args []string) error {
defer ticker.Stop()

for range ticker.C {
if atomic.LoadInt32(&stopAll) > 0 {
if stopAll.Load() > 0 {
// Do not proceed.
return
}
Expand All @@ -546,7 +546,7 @@ func runTest(cmd *cobra.Command, args []string) error {
if err != nil {
log.Printf("Error while calculating total: %v", err)
} else {
atomic.AddUint64(&reads, 1)
reads.Add(1)
}
return nil
}))
Expand Down Expand Up @@ -586,13 +586,13 @@ func runTest(cmd *cobra.Command, args []string) error {
if err != nil {
log.Printf("Error while calculating subscriber DB total: %v", err)
} else {
atomic.AddUint64(&reads, 1)
reads.Add(1)
}
return nil
}))
}

if atomic.LoadInt32(&stopAll) == 0 {
if stopAll.Load() == 0 {
log.Println("Test OK")
return nil
}
Expand Down
18 changes: 9 additions & 9 deletions badger/cmd/read_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ This command reads data from existing Badger database randomly using multiple go
}

var (
sizeRead uint64 // will store size read till now
entriesRead uint64 // will store entries read till now
startTime time.Time // start time of read benchmarking
sizeRead atomic.Uint64 // will store size read till now
entriesRead atomic.Uint64 // will store entries read till now
startTime time.Time // start time of read benchmarking

ro = struct {
blockCacheSize int64
Expand Down Expand Up @@ -91,8 +91,8 @@ func fullScanDB(db *badger.DB) {
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
i := it.Item()
atomic.AddUint64(&entriesRead, 1)
atomic.AddUint64(&sizeRead, uint64(i.EstimatedSize()))
entriesRead.Add(1)
sizeRead.Add(uint64(i.EstimatedSize()))
}
}

Expand Down Expand Up @@ -140,8 +140,8 @@ func printStats(c *z.Closer) {
return
case <-t.C:
dur := time.Since(startTime)
sz := atomic.LoadUint64(&sizeRead)
entries := atomic.LoadUint64(&entriesRead)
sz := sizeRead.Load()
entries := entriesRead.Load()
bytesRate := sz / uint64(dur.Seconds())
entriesRate := entries / uint64(dur.Seconds())
fmt.Printf("Time elapsed: %s, bytes read: %s, speed: %s/sec, "+
Expand All @@ -160,8 +160,8 @@ func readKeys(db *badger.DB, c *z.Closer, keys [][]byte) {
return
default:
key := keys[r.Int31n(int32(len(keys)))]
atomic.AddUint64(&sizeRead, lookupForKey(db, key))
atomic.AddUint64(&entriesRead, 1)
sizeRead.Add(lookupForKey(db, key))
entriesRead.Add(1)
}
}
}
Expand Down
24 changes: 12 additions & 12 deletions badger/cmd/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ var (
gcDiscardRatio float64
}{}

sizeWritten uint64
gcSuccess uint64
sizeWritten atomic.Uint64
gcSuccess atomic.Uint64
sstCount uint32
vlogCount uint32
files []string
entriesWritten uint64
entriesWritten atomic.Uint64
)

const (
Expand Down Expand Up @@ -161,8 +161,8 @@ func writeRandom(db *badger.DB, num uint64) error {
panic(err)
}

atomic.AddUint64(&entriesWritten, 1)
atomic.AddUint64(&sizeWritten, es)
entriesWritten.Add(1)
sizeWritten.Add(es)
}
return batch.Flush()
}
Expand Down Expand Up @@ -224,8 +224,8 @@ func writeSorted(db *badger.DB, num uint64) error {
badger.KVToBuffer(kv, kvBuf)

sz += es
atomic.AddUint64(&entriesWritten, 1)
atomic.AddUint64(&sizeWritten, uint64(es))
entriesWritten.Add(1)
sizeWritten.Add(uint64(es))

if sz >= 4<<20 { // 4 MB
writeCh <- kvBuf
Expand Down Expand Up @@ -390,8 +390,8 @@ func reportStats(c *z.Closer, db *badger.DB) {
}

dur := time.Since(startTime)
sz := atomic.LoadUint64(&sizeWritten)
entries := atomic.LoadUint64(&entriesWritten)
sz := sizeWritten.Load()
entries := entriesWritten.Load()
bytesRate := sz / uint64(dur.Seconds())
entriesRate := entries / uint64(dur.Seconds())
fmt.Printf("[WRITE] Time elapsed: %s, bytes written: %s, speed: %s/sec, "+
Expand Down Expand Up @@ -423,7 +423,7 @@ func runGC(c *z.Closer, db *badger.DB) {
return
case <-t.C:
if err := db.RunValueLogGC(wo.gcDiscardRatio); err == nil {
atomic.AddUint64(&gcSuccess, 1)
gcSuccess.Add(1)
} else {
log.Printf("[GC] Failed due to following err %v", err)
}
Expand Down Expand Up @@ -502,8 +502,8 @@ func printReadStats(c *z.Closer, startTime time.Time) {
return
case <-t.C:
dur := time.Since(startTime)
sz := atomic.LoadUint64(&sizeRead)
entries := atomic.LoadUint64(&entriesRead)
sz := sizeRead.Load()
entries := entriesRead.Load()
bytesRate := sz / uint64(dur.Seconds())
entriesRate := entries / uint64(dur.Seconds())
fmt.Printf("[READ] Time elapsed: %s, bytes read: %s, speed: %s/sec, "+
Expand Down
24 changes: 12 additions & 12 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ type DB struct {
flushChan chan flushTask // For flushing memtables.
closeOnce sync.Once // For closing DB only once.

blockWrites int32
isClosed uint32
blockWrites atomic.Int32
isClosed atomic.Uint32

orc *oracle
bannedNamespaces *lockedKeys
Expand Down Expand Up @@ -531,16 +531,16 @@ func (db *DB) Close() error {
// IsClosed denotes if the badger DB is closed or not. A DB instance should not
// be used after closing it.
func (db *DB) IsClosed() bool {
return atomic.LoadUint32(&db.isClosed) == 1
return db.isClosed.Load() == 1
}

func (db *DB) close() (err error) {
defer db.allocPool.Release()

db.opt.Debugf("Closing database")
db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(atomic.LoadInt64(&db.lc.l0stallsMs)))
db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs.Load()))

atomic.StoreInt32(&db.blockWrites, 1)
db.blockWrites.Store(1)

if !db.opt.InMemory {
// Stop value GC first.
Expand Down Expand Up @@ -626,7 +626,7 @@ func (db *DB) close() (err error) {
db.blockCache.Close()
db.indexCache.Close()

atomic.StoreUint32(&db.isClosed, 1)
db.isClosed.Store(1)
db.threshold.close()

if db.opt.InMemory {
Expand Down Expand Up @@ -851,7 +851,7 @@ func (db *DB) writeRequests(reqs []*request) error {
}

func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
if atomic.LoadInt32(&db.blockWrites) == 1 {
if db.blockWrites.Load() == 1 {
return nil, ErrBlockedWrites
}
var count, size int64
Expand Down Expand Up @@ -1604,7 +1604,7 @@ func (db *DB) Flatten(workers int) error {

func (db *DB) blockWrite() error {
// Stop accepting new writes.
if !atomic.CompareAndSwapInt32(&db.blockWrites, 0, 1) {
if !db.blockWrites.CompareAndSwap(0, 1) {
return ErrBlockedWrites
}

Expand All @@ -1619,7 +1619,7 @@ func (db *DB) unblockWrite() {
go db.doWrites(db.closers.writes)

// Resume writes.
atomic.StoreInt32(&db.blockWrites, 0)
db.blockWrites.Store(0)
}

func (db *DB) prepareToDrop() (func(), error) {
Expand Down Expand Up @@ -1709,7 +1709,7 @@ func (db *DB) dropAll() (func(), error) {
if err != nil {
return resume, err
}
db.lc.nextFileID = 1
db.lc.nextFileID.Store(1)
db.opt.Infof("Deleted %d value log files. DropAll done.\n", num)
db.blockCache.Clear()
db.indexCache.Clear()
Expand Down Expand Up @@ -1906,7 +1906,7 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
return err
case <-ctx.Done():
c.Done()
atomic.StoreUint64(s.active, 0)
s.active.Store(0)
drain()
db.pub.deleteSubscriber(s.id)
// Delete the subscriber to avoid further updates.
Expand All @@ -1915,7 +1915,7 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
err := slurp(batch)
if err != nil {
c.Done()
atomic.StoreUint64(s.active, 0)
s.active.Store(0)
drain()
// Delete the subscriber if there is an error by the callback.
db.pub.deleteSubscriber(s.id)
Expand Down
18 changes: 8 additions & 10 deletions integration/testgc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type testSuite struct {
sync.Mutex
vals map[uint64]uint64

count uint64 // Not under mutex lock.
count atomic.Uint64 // Not under mutex lock.
}

func encoded(i uint64) []byte {
Expand All @@ -39,7 +39,7 @@ func (s *testSuite) write(db *badger.DB) error {
// These keys would be overwritten.
keyi := uint64(rand.Int63n(maxValue))
key := encoded(keyi)
vali := atomic.AddUint64(&s.count, 1)
vali := s.count.Add(1)
val := encoded(vali)
val = append(val, suffix...)
if err := txn.SetEntry(badger.NewEntry(key, val)); err != nil {
Expand All @@ -48,7 +48,7 @@ func (s *testSuite) write(db *badger.DB) error {
}
for i := 0; i < 20; i++ {
// These keys would be new and never overwritten.
keyi := atomic.AddUint64(&s.count, 1)
keyi := s.count.Add(1)
if keyi%1000000 == 0 {
log.Printf("Count: %d\n", keyi)
}
Expand All @@ -63,7 +63,7 @@ func (s *testSuite) write(db *badger.DB) error {
}

func (s *testSuite) read(db *badger.DB) error {
max := int64(atomic.LoadUint64(&s.count))
max := int64(s.count.Load())
keyi := uint64(rand.Int63n(max))
key := encoded(keyi)

Expand Down Expand Up @@ -138,11 +138,9 @@ func main() {
}
}()

s := testSuite{
count: uint64(maxValue),
vals: make(map[uint64]uint64),
}
var numLoops uint64
s := testSuite{vals: make(map[uint64]uint64)}
s.count.Store(uint64(maxValue))
var numLoops atomic.Uint64
ticker := time.NewTicker(5 * time.Second)
for i := 0; i < 10; i++ {
go func() {
Expand All @@ -156,7 +154,7 @@ func main() {
log.Fatal(err)
}
}
nl := atomic.AddUint64(&numLoops, 1)
nl := numLoops.Add(1)
select {
case <-closer.HasBeenClosed():
return
Expand Down
Loading