Skip to content

Commit

Permalink
fix(http): clean up time handling in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed May 10, 2023
1 parent 050a113 commit 6810ff2
Showing 1 changed file with 66 additions and 57 deletions.
123 changes: 66 additions & 57 deletions pkg/retriever/httpretriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ import (
)

type httpRemote struct {
peer peer.AddrInfo
lsys *linking.LinkSystem
sel ipld.Node
responseDelay time.Duration
malformed bool
peer peer.AddrInfo
lsys *linking.LinkSystem
sel ipld.Node
respondAt time.Time
malformed bool
}

func TestHTTPRetriever(t *testing.T) {
Expand Down Expand Up @@ -92,10 +92,10 @@ func TestHTTPRetriever(t *testing.T) {
remotes: map[cid.Cid][]httpRemote{
cid1: {
{
peer: cid1Cands[0].MinerPeer,
lsys: makeLsys(tbc1.AllBlocks()),
sel: allSelector,
responseDelay: time.Millisecond * 40,
peer: cid1Cands[0].MinerPeer,
lsys: makeLsys(tbc1.AllBlocks()),
sel: allSelector,
respondAt: startTime.Add(initialPause + time.Millisecond*40),
},
},
},
Expand Down Expand Up @@ -154,18 +154,18 @@ func TestHTTPRetriever(t *testing.T) {
remotes: map[cid.Cid][]httpRemote{
cid1: {
{
peer: cid1Cands[0].MinerPeer,
lsys: makeLsys(tbc1.AllBlocks()),
sel: allSelector,
responseDelay: time.Millisecond * 40,
peer: cid1Cands[0].MinerPeer,
lsys: makeLsys(tbc1.AllBlocks()),
sel: allSelector,
respondAt: startTime.Add(initialPause + time.Millisecond*40),
},
},
cid2: {
{
peer: cid2Cands[0].MinerPeer,
lsys: makeLsys(tbc2.AllBlocks()),
sel: allSelector,
responseDelay: time.Millisecond * 10,
peer: cid2Cands[0].MinerPeer,
lsys: makeLsys(tbc2.AllBlocks()),
sel: allSelector,
respondAt: startTime.Add(initialPause + time.Millisecond*10),
},
},
},
Expand Down Expand Up @@ -258,25 +258,25 @@ func TestHTTPRetriever(t *testing.T) {
remotes: map[cid.Cid][]httpRemote{
cid1: {
{
peer: cid1Cands[0].MinerPeer,
lsys: makeLsys(nil),
sel: allSelector,
responseDelay: time.Millisecond * 10,
malformed: true,
peer: cid1Cands[0].MinerPeer,
lsys: makeLsys(nil),
sel: allSelector,
respondAt: startTime.Add(initialPause + time.Millisecond*10),
malformed: true,
},
{
peer: cid1Cands[1].MinerPeer,
lsys: makeLsys(nil),
sel: allSelector,
responseDelay: time.Millisecond * 10,
malformed: true,
peer: cid1Cands[1].MinerPeer,
lsys: makeLsys(nil),
sel: allSelector,
respondAt: startTime.Add(initialPause + time.Millisecond*20),
malformed: true,
},
{
peer: cid1Cands[2].MinerPeer,
lsys: makeLsys(nil),
sel: allSelector,
responseDelay: time.Millisecond * 10,
malformed: true,
peer: cid1Cands[2].MinerPeer,
lsys: makeLsys(nil),
sel: allSelector,
respondAt: startTime.Add(initialPause + time.Millisecond*30),
malformed: true,
},
},
},
Expand Down Expand Up @@ -354,24 +354,24 @@ func TestHTTPRetriever(t *testing.T) {
remotes: map[cid.Cid][]httpRemote{
cid1: {
{
peer: cid1Cands[0].MinerPeer,
lsys: makeLsys(nil),
sel: allSelector,
responseDelay: time.Millisecond * 10,
malformed: true,
peer: cid1Cands[0].MinerPeer,
lsys: makeLsys(nil),
sel: allSelector,
respondAt: startTime.Add(initialPause + time.Millisecond*10),
malformed: true,
},
{
peer: cid1Cands[1].MinerPeer,
lsys: makeLsys(nil),
sel: allSelector,
responseDelay: time.Millisecond * 10,
malformed: true,
peer: cid1Cands[1].MinerPeer,
lsys: makeLsys(nil),
sel: allSelector,
respondAt: startTime.Add(initialPause + time.Millisecond*20),
malformed: true,
},
{
peer: cid1Cands[2].MinerPeer,
lsys: makeLsys(tbc1.AllBlocks()),
sel: allSelector,
responseDelay: time.Millisecond * 10,
peer: cid1Cands[2].MinerPeer,
lsys: makeLsys(tbc1.AllBlocks()),
sel: allSelector,
respondAt: startTime.Add(initialPause + time.Millisecond*30),
},
},
},
Expand Down Expand Up @@ -469,10 +469,10 @@ func TestHTTPRetriever(t *testing.T) {
remotes: map[cid.Cid][]httpRemote{
cid1: {
{
peer: cid1Cands[0].MinerPeer,
lsys: makeLsys(tbc1.AllBlocks()[0:50]),
sel: allSelector,
responseDelay: time.Millisecond * 40,
peer: cid1Cands[0].MinerPeer,
lsys: makeLsys(tbc1.AllBlocks()[0:50]),
sel: allSelector,
respondAt: startTime.Add(initialPause + time.Millisecond*40),
},
},
},
Expand Down Expand Up @@ -726,13 +726,23 @@ func (c *cannedBytesRoundTripper) RoundTrip(req *http.Request) (*http.Response,
remote := c.getRemote(root, maddr)
c.StartsCh <- remote.peer.ID

sleepFor := c.clock.Until(remote.respondAt)
if sleepFor > 0 {
select {
case <-c.ctx.Done():
return nil, c.ctx.Err()
case <-c.clock.After(sleepFor):
}
}

makeBody := func(root cid.Cid, maddr string) io.ReadCloser {
carR, carW := io.Pipe()
statsCh := traverseCar(
c.t,
c.ctx,
remote.peer.ID,
c.clock,
remote.respondAt,
c.remoteBlockDuration,
carW,
remote.malformed,
Expand All @@ -753,8 +763,6 @@ func (c *cannedBytesRoundTripper) RoundTrip(req *http.Request) (*http.Response,
}()
return carR
}

c.clock.Sleep(remote.responseDelay)
return &http.Response{
StatusCode: http.StatusOK,
Body: &deferredReader{root: root, maddr: maddr, makeBody: makeBody, end: func() { c.EndsCh <- remote.peer.ID }},
Expand Down Expand Up @@ -847,13 +855,15 @@ func traverseCar(
ctx context.Context,
id peer.ID,
clock *clock.Mock,
startTime time.Time,
blockDuration time.Duration,
carW io.WriteCloser,
malformed bool,
lsys *linking.LinkSystem,
root cid.Cid,
selNode ipld.Node,
) chan testutil.RemoteStats {

req := require.New(t)

sel, err := selector.CompileSelector(selNode)
Expand Down Expand Up @@ -881,8 +891,6 @@ func traverseCar(
carWriter, err := storage.NewWritable(carW, []cid.Cid{root}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(false))
req.NoError(err)

startTime := clock.Now()

// intercept the StorageReadOpener of the LinkSystem so that for each
// read that the traverser performs, we take that block and Put() it
// to the CARv1 writer.
Expand All @@ -902,12 +910,13 @@ func traverseCar(
stats.ByteCount += uint64(len(byts)) // only the length of the bytes, not the rest of the CAR infrastructure

// ensure there is blockDuration between each block send
sleepFor := clock.Until(startTime.Add(blockDuration * time.Duration(len(stats.Blocks))))
if sleepFor > 0 {
sendAt := startTime.Add(blockDuration * time.Duration(len(stats.Blocks)))
if clock.Until(sendAt) > 0 {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-clock.After(sleepFor):
case <-clock.After(clock.Until(sendAt)):
time.Sleep(1 * time.Millisecond) // let em goroutines breathe
}
}
return bytes.NewReader(byts), nil
Expand Down

0 comments on commit 6810ff2

Please sign in to comment.