Skip to content

Commit

Permalink
fix: tests
Browse files Browse the repository at this point in the history
  • Loading branch information
metacertain committed Jun 7, 2021
1 parent 984f107 commit 2f83408
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 22 deletions.
1 change: 1 addition & 0 deletions pkg/p2p/streamtest/streamtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (r *Recorder) NewStream(ctx context.Context, addr swarm.Address, h p2p.Head
return nil, err
}
}

recordIn := newRecord()
recordOut := newRecord()
streamOut := newStream(recordIn, recordOut)
Expand Down
18 changes: 9 additions & 9 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,15 +445,6 @@ func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.C

ps.metrics.TotalSent.Inc()

// if you manage to get a tag, just increment the respective counter
t, err := ps.tagger.Get(ch.TagID())
if err == nil && t != nil {
err = t.Inc(tags.StateSent)
if err != nil {
return nil, true, fmt.Errorf("tag %d increment: %v", ch.TagID(), err)
}
}

var receipt pb.Receipt
if err := r.ReadMsgWithContext(ctx, &receipt); err != nil {
_ = streamer.Reset()
Expand All @@ -470,6 +461,15 @@ func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.C
return nil, true, err
}

// if you manage to get a tag, just increment the respective counter
t, err := ps.tagger.Get(ch.TagID())
if err == nil && t != nil {
err = t.Inc(tags.StateSent)
if err != nil {
return nil, true, fmt.Errorf("tag %d increment: %v", ch.TagID(), err)
}
}

return &receipt, true, nil
}

Expand Down
34 changes: 21 additions & 13 deletions pkg/pushsync/pushsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func TestPushChunkToNextClosest(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if ta2.Get(tags.StateSent) != 2 {
if ta2.Get(tags.StateSent) != 1 {
t.Fatalf("tags error")
}

Expand Down Expand Up @@ -940,10 +940,8 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) {
)
defer storerPeer4.Close()

var (
fail = true
lock sync.Mutex
)
triggerCount := 0
var lock sync.Mutex

recorder := streamtest.New(
streamtest.WithPeerProtocols(
Expand All @@ -954,15 +952,25 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) {
peer4.String(): psPeer4.Protocol(),
},
),
streamtest.WithStreamError(
func(addr swarm.Address, _, _, _ string) error {
lock.Lock()
defer lock.Unlock()
if fail && addr.String() != peer4.String() {
return errors.New("peer not reachable")
}
streamtest.WithMiddlewares(
func(h p2p.HandlerFunc) p2p.HandlerFunc {
return func(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
lock.Lock()
defer lock.Unlock()

return nil
if triggerCount < 9 {
triggerCount++
stream.Close()
return errors.New("fmt")
}

if err := h(ctx, peer, stream); err != nil {
return err
}
// close stream after all previous middlewares wrote to it
// so that the receiving peer can get all the post messages
return stream.Close()
}
},
),
streamtest.WithBaseAddr(pivotNode),
Expand Down

0 comments on commit 2f83408

Please sign in to comment.