Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#279 from ipfs/fix/races
Browse files Browse the repository at this point in the history
fix: races in tests

This commit was moved from ipfs/go-bitswap@159748c
  • Loading branch information
Stebalien committed Mar 6, 2020
2 parents 0fe4542 + f83ad67 commit 8fa9f69
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 123 deletions.
24 changes: 13 additions & 11 deletions bitswap/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ const (
// the alpha for the EWMA used to track long term usefulness
longTermAlpha = 0.05

// how frequently the engine should sample usefulness. Peers that
// interact every shortTerm time period are considered "active".
shortTerm = 10 * time.Second

// long term ratio defines what "long term" means in terms of the
// shortTerm duration. Peers that interact once every longTermRatio are
// considered useful over the long term.
Expand All @@ -96,14 +100,6 @@ const (
blockstoreWorkerCount = 128
)

var (
// how frequently the engine should sample usefulness. Peers that
// interact every shortTerm time period are considered "active".
//
// this is only a variable to make testing easier.
shortTerm = 10 * time.Second
)

// Envelope contains a message for a Peer.
type Envelope struct {
// Peer is the intended recipient.
Expand Down Expand Up @@ -161,18 +157,23 @@ type Engine struct {
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock int

// how frequently the engine should sample peer usefulness
peerSampleInterval time.Duration

sendDontHaves bool

self peer.ID
}

// NewEngine creates a new block sending engine for the given block store
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine {
return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock)
return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, shortTerm)
}

// This constructor is used by the tests
func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID, maxReplaceSize int) *Engine {
func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID,
maxReplaceSize int, peerSampleInterval time.Duration) *Engine {

e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
bsm: newBlockstoreManager(ctx, bs, blockstoreWorkerCount),
Expand All @@ -181,6 +182,7 @@ func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger,
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
peerSampleInterval: peerSampleInterval,
taskWorkerCount: taskWorkerCount,
sendDontHaves: true,
self: self,
Expand Down Expand Up @@ -236,7 +238,7 @@ func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
func (e *Engine) scoreWorker(ctx context.Context) {
ticker := time.NewTicker(shortTerm)
ticker := time.NewTicker(e.peerSampleInterval)
defer ticker.Stop()

type update struct {
Expand Down
42 changes: 20 additions & 22 deletions bitswap/internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ type engineSet struct {
Blockstore blockstore.Blockstore
}

func newTestEngine(ctx context.Context, idStr string) engineSet {
func newTestEngine(ctx context.Context, idStr string, peerSampleInterval time.Duration) engineSet {
fpt := &fakePeerTagger{}
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := newEngine(ctx, bs, fpt, "localhost", 0)
e := newEngine(ctx, bs, fpt, "localhost", 0, peerSampleInterval)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
return engineSet{
Peer: peer.ID(idStr),
Expand All @@ -108,8 +108,8 @@ func newTestEngine(ctx context.Context, idStr string) engineSet {
func TestConsistentAccounting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sender := newTestEngine(ctx, "Ernie")
receiver := newTestEngine(ctx, "Bert")
sender := newTestEngine(ctx, "Ernie", shortTerm)
receiver := newTestEngine(ctx, "Bert", shortTerm)

// Send messages from Ernie to Bert
for i := 0; i < 1000; i++ {
Expand Down Expand Up @@ -143,8 +143,8 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sanfrancisco := newTestEngine(ctx, "sf")
seattle := newTestEngine(ctx, "sea")
sanfrancisco := newTestEngine(ctx, "sf", shortTerm)
seattle := newTestEngine(ctx, "sea", shortTerm)

m := message.New(true)

Expand Down Expand Up @@ -181,7 +181,7 @@ func peerIsPartner(p peer.ID, e *Engine) bool {
func TestOutboxClosedWhenEngineClosed(t *testing.T) {
ctx := context.Background()
t.SkipNow() // TODO implement *Engine.Close
e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0)
e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -509,7 +509,7 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) {
testCases = onlyTestCases
}

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
for i, testCase := range testCases {
t.Logf("Test case %d:", i)
Expand Down Expand Up @@ -665,7 +665,7 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) {
testCases = onlyTestCases
}

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

var next envChan
Expand Down Expand Up @@ -850,7 +850,7 @@ func TestPartnerWantsThenCancels(t *testing.T) {
ctx := context.Background()
for i := 0; i < numRounds; i++ {
expected := make([][]string, 0, len(testcases))
e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0)
e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
for _, testcase := range testcases {
set := testcase[0]
Expand All @@ -875,7 +875,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
Expand Down Expand Up @@ -919,7 +919,7 @@ func TestSendDontHave(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
Expand Down Expand Up @@ -981,8 +981,8 @@ func TestSendDontHave(t *testing.T) {
func TestTaggingPeers(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
sanfrancisco := newTestEngine(ctx, "sf")
seattle := newTestEngine(ctx, "sea")
sanfrancisco := newTestEngine(ctx, "sf", shortTerm)
seattle := newTestEngine(ctx, "sea", shortTerm)

keys := []string{"a", "b", "c", "d", "e"}
for _, letter := range keys {
Expand All @@ -1007,13 +1007,11 @@ func TestTaggingPeers(t *testing.T) {
}

func TestTaggingUseful(t *testing.T) {
oldShortTerm := shortTerm
shortTerm = 2 * time.Millisecond
defer func() { shortTerm = oldShortTerm }()
peerSampleInterval := 2 * time.Millisecond

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
me := newTestEngine(ctx, "engine")
me := newTestEngine(ctx, "engine", peerSampleInterval)
friend := peer.ID("friend")

block := blocks.NewBlock([]byte("foobar"))
Expand All @@ -1025,21 +1023,21 @@ func TestTaggingUseful(t *testing.T) {
t.Fatal("Peers should be untagged but weren't")
}
me.Engine.MessageSent(friend, msg)
time.Sleep(shortTerm * 2)
time.Sleep(peerSampleInterval * 2)
if me.PeerTagger.count(me.Engine.tagUseful) != 1 {
t.Fatal("Peers should be tagged but weren't")
}
time.Sleep(shortTerm * 8)
time.Sleep(peerSampleInterval * 8)
}

if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
t.Fatal("peers should still be tagged due to long-term usefulness")
}
time.Sleep(shortTerm * 2)
time.Sleep(peerSampleInterval * 2)
if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
t.Fatal("peers should still be tagged due to long-term usefulness")
}
time.Sleep(shortTerm * 20)
time.Sleep(peerSampleInterval * 30)
if me.PeerTagger.count(me.Engine.tagUseful) != 0 {
t.Fatal("peers should finally be untagged")
}
Expand Down
51 changes: 30 additions & 21 deletions bitswap/internal/messagequeue/donthavetimeoutmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,24 @@ type timeoutRecorder struct {
func (tr *timeoutRecorder) onTimeout(tks []cid.Cid) {
tr.lk.Lock()
defer tr.lk.Unlock()

tr.timedOutKs = append(tr.timedOutKs, tks...)
}

func (tr *timeoutRecorder) timedOutCount() int {
tr.lk.Lock()
defer tr.lk.Unlock()

return len(tr.timedOutKs)
}

func (tr *timeoutRecorder) clear() {
tr.lk.Lock()
defer tr.lk.Unlock()

tr.timedOutKs = nil
}

func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
firstks := testutil.GenerateCids(2)
secondks := append(firstks, testutil.GenerateCids(3)...)
Expand All @@ -75,7 +90,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
time.Sleep(expectedTimeout - 5*time.Millisecond)

// At this stage no keys should have timed out
if len(tr.timedOutKs) > 0 {
if tr.timedOutCount() > 0 {
t.Fatal("expected timeout not to have happened yet")
}

Expand All @@ -86,20 +101,20 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
time.Sleep(10 * time.Millisecond)

// At this stage first set of keys should have timed out
if len(tr.timedOutKs) != len(firstks) {
if tr.timedOutCount() != len(firstks) {
t.Fatal("expected timeout")
}

// Clear the recorded timed out keys
tr.timedOutKs = nil
tr.clear()

// Sleep until the second set of keys should have timed out
time.Sleep(expectedTimeout)

// At this stage all keys should have timed out. The second set included
// the first set of keys, but they were added before the first set timed
// out, so only the remaining keys should have beed added.
if len(tr.timedOutKs) != len(secondks)-len(firstks) {
if tr.timedOutCount() != len(secondks)-len(firstks) {
t.Fatal("expected second set of keys to timeout")
}
}
Expand Down Expand Up @@ -130,7 +145,7 @@ func TestDontHaveTimeoutMgrCancel(t *testing.T) {
time.Sleep(expectedTimeout)

// At this stage all non-cancelled keys should have timed out
if len(tr.timedOutKs) != len(ks)-cancelCount {
if tr.timedOutCount() != len(ks)-cancelCount {
t.Fatal("expected timeout")
}
}
Expand Down Expand Up @@ -167,15 +182,15 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) {
time.Sleep(10 * time.Millisecond)

// At this stage only the key that was never cancelled should have timed out
if len(tr.timedOutKs) != 1 {
if tr.timedOutCount() != 1 {
t.Fatal("expected one key to timeout")
}

// Wait till after added back key should time out
time.Sleep(latency)

// At this stage the key that was added back should also have timed out
if len(tr.timedOutKs) != 2 {
if tr.timedOutCount() != 2 {
t.Fatal("expected added back key to timeout")
}
}
Expand All @@ -202,7 +217,7 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) {
time.Sleep(latency + 5*time.Millisecond)

// At this stage all keys should have timed out
if len(tr.timedOutKs) != len(ks) {
if tr.timedOutCount() != len(ks) {
t.Fatal("expected timeout")
}
}
Expand All @@ -229,15 +244,15 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) {
time.Sleep(expectedTimeout - 5*time.Millisecond)

// At this stage no timeout should have happened yet
if len(tr.timedOutKs) > 0 {
if tr.timedOutCount() > 0 {
t.Fatal("expected timeout not to have happened yet")
}

// Sleep until after the expected timeout
time.Sleep(10 * time.Millisecond)

// Now the keys should have timed out
if len(tr.timedOutKs) != len(ks) {
if tr.timedOutCount() != len(ks) {
t.Fatal("expected timeout")
}
}
Expand All @@ -263,15 +278,15 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) {
time.Sleep(defaultTimeout - 5*time.Millisecond)

// At this stage no timeout should have happened yet
if len(tr.timedOutKs) > 0 {
if tr.timedOutCount() > 0 {
t.Fatal("expected timeout not to have happened yet")
}

// Sleep until after the default timeout
time.Sleep(10 * time.Millisecond)

// Now the keys should have timed out
if len(tr.timedOutKs) != len(ks) {
if tr.timedOutCount() != len(ks) {
t.Fatal("expected timeout")
}
}
Expand All @@ -281,17 +296,11 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) {
latency := time.Millisecond * 10
latMultiplier := 1
expProcessTime := time.Duration(0)
tr := timeoutRecorder{}
ctx := context.Background()
pc := &mockPeerConn{latency: latency}

var lk sync.Mutex
var timedOutKs []cid.Cid
onTimeout := func(tks []cid.Cid) {
lk.Lock()
defer lk.Unlock()
timedOutKs = append(timedOutKs, tks...)
}
dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, onTimeout,
dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dhtm.Start()

Expand All @@ -308,7 +317,7 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) {
time.Sleep(10 * time.Millisecond)

// Manager was shut down so timeout should not have fired
if len(timedOutKs) != 0 {
if tr.timedOutCount() != 0 {
t.Fatal("expected no timeout after shutdown")
}
}

0 comments on commit 8fa9f69

Please sign in to comment.