Skip to content

Commit

Permalink
stats: fix races finally, imp tests
Browse files Browse the repository at this point in the history
  • Loading branch information
EugeneOne1 committed Aug 4, 2022
1 parent c63f5f4 commit 7fc3ff1
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 13 deletions.
72 changes: 72 additions & 0 deletions internal/stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"fmt"
"net"
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/AdguardTeam/AdGuardHome/internal/aghtest"
"github.com/AdguardTeam/golibs/testutil"
Expand Down Expand Up @@ -161,3 +163,73 @@ func TestStatsCollector(t *testing.T) {
}
})
}

func TestStats_races(t *testing.T) {
const unitUpdIvl = time.Second / 2

idGen := func() (id uint32) { return uint32(time.Now().UnixNano() / int64(unitUpdIvl)) }

conf := Config{
UnitID: idGen,
Filename: "./stats.db",
LimitDays: 1,
}

s, err := New(conf)
require.NoError(t, err)

s.Start()
startTime := time.Now()
testutil.CleanupAndRequireSuccess(t, func() (err error) {
s.clear()
s.Close()

return os.Remove(conf.Filename)
})

writeFunc := func(wg *sync.WaitGroup, i int) {
e := Entry{
Domain: fmt.Sprintf("example-%d.org", i),
Client: fmt.Sprintf("client_%d", i),
Result: Result(i)%(resultLast-1) + 1,
Time: uint32(time.Since(startTime).Milliseconds()),
}

wg.Done()
wg.Wait()

s.Update(e)
}
readFunc := func(wg *sync.WaitGroup) {
wg.Done()
wg.Wait()

_, _ = s.getData()
}

const (
roundsNum = 3
roundsIvl = time.Second

writersNum = 10
readersNum = 5
)

for round := 0; round < roundsNum; round++ {
time.Sleep(roundsIvl)

wg := &sync.WaitGroup{}
wg.Add(writersNum + readersNum)

name := fmt.Sprintf("round-%d_%s_since_start", round+1, time.Since(startTime))
t.Run(name, func(_ *testing.T) {
for i := 0; i < writersNum; i++ {
go writeFunc(wg, i)
}

for i := 0; i < readersNum; i++ {
go readFunc(wg)
}
})
}
}
25 changes: 12 additions & 13 deletions internal/stats/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func (s *StatsCtx) database() (db *bbolt.DB) {
// swapDatabase swaps the database with another one and returns it. It's safe
// for concurrent use.
func (s *StatsCtx) swapDatabase(with *bbolt.DB) (old *bbolt.DB) {
s.currMu.Lock()
defer s.currMu.Unlock()
s.dbMu.Lock()
defer s.dbMu.Unlock()

old, s.db = s.db, with

Expand Down Expand Up @@ -402,8 +402,6 @@ func (s *StatsCtx) periodicFlush() {

nu := newUnit(id)
u := s.swapCurrent(nu)
// This use of serialize is safe since no one else accesses the unit
// that has just been swapped.
udb := u.serialize()

if tx == nil {
Expand Down Expand Up @@ -459,8 +457,11 @@ func convertSliceToMap(a []countPair) map[string]uint64 {
return m
}

// serialize converts u to the *unitDB.
// serialize converts u to the *unitDB. It's safe for concurrent use.
func (u *unit) serialize() (udb *unitDB) {
u.mu.RLock()
defer u.mu.RUnlock()

var timeAvg uint32 = 0
if u.nTotal != 0 {
timeAvg = uint32(u.timeSum / u.nTotal)
Expand All @@ -477,11 +478,15 @@ func (u *unit) serialize() (udb *unitDB) {
}

// deserealize assigns the appropriate values from udb to u. u must not be nil.
// It's safe for concurrent use.
func (u *unit) deserialize(udb *unitDB) {
if udb == nil {
return
}

u.mu.Lock()
defer u.mu.Unlock()

u.nTotal = udb.NTotal
u.nResult = make([]uint64, resultLast)
copy(u.nResult, udb.NResult)
Expand Down Expand Up @@ -562,12 +567,10 @@ func (s *StatsCtx) WriteDiskConfig(dc *DiskConfig) {

func (s *StatsCtx) Close() {
u := s.swapCurrent(nil)
// This use of serialize is safe since no one else accesses the unit
// that has just been swapped.
udb := u.serialize()

db := s.database()
if tx := beginTxn(db, true); tx != nil {
udb := u.serialize()
if flushUnitToDB(tx, u.id, udb) {
s.commitTxn(tx)
} else {
Expand Down Expand Up @@ -652,11 +655,7 @@ func (s *StatsCtx) loadUnits(limit uint32) ([]*unitDB, uint32) {
}

cur := s.ongoing()

cur.mu.RLock()
defer cur.mu.RUnlock()

curID := cur.id
curID := atomic.LoadUint32(&cur.id)

// Per-hour units.
units := []*unitDB{}
Expand Down

0 comments on commit 7fc3ff1

Please sign in to comment.