-
Notifications
You must be signed in to change notification settings - Fork 13
/
require.go
163 lines (139 loc) · 4.79 KB
/
require.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package p2ptest
import (
"context"
"errors"
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dashpay/tenderdash/internal/p2p"
"github.com/dashpay/tenderdash/types"
)
// RequireEmpty requires that the given channel is empty.
func RequireEmpty(ctx context.Context, t *testing.T, channels ...p2p.Channel) {
t.Helper()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
iter := p2p.MergedChannelIterator(ctx, channels...)
count := 0
for iter.Next(ctx) {
count++
e := iter.Envelope()
require.Nil(t, e, "received unexpected message %v", e.Message)
}
require.Zero(t, count)
require.Error(t, ctx.Err())
}
// RequireReceive requires that the given envelope is received on the channel.
func RequireReceive(ctx context.Context, t *testing.T, channel p2p.Channel, expect p2p.Envelope) {
t.Helper()
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
iter := channel.Receive(ctx)
count := 0
for iter.Next(ctx) {
count++
envelope := iter.Envelope()
require.Equal(t, expect.From, envelope.From)
require.Equal(t, expect.Message, envelope.Message)
}
if !assert.True(t, count >= 1) {
require.NoError(t, ctx.Err(), "timed out waiting for message %v", expect)
}
}
// RequireReceiveUnordered requires that the given envelopes are all received on
// the channel, ignoring order.
func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel p2p.Channel, expect []*p2p.Envelope) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
var actual []*p2p.Envelope
iter := channel.Receive(ctx)
for iter.Next(ctx) && len(actual) < len(expect) {
actual = append(actual, iter.Envelope())
}
require.ElementsMatch(t, expect, actual, "len=%d", len(actual))
}
// RequireSend requires that the given envelope is sent on the channel.
func RequireSend(ctx context.Context, t *testing.T, channel p2p.Channel, envelope p2p.Envelope) {
tctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
err := channel.Send(tctx, envelope)
switch {
case errors.Is(err, context.DeadlineExceeded):
require.Fail(t, "timed out sending message to %q", envelope.To)
default:
require.NoError(t, err, "unexpected error")
}
}
// RequireSendReceive requires that a given Protobuf message is sent to the
// given peer, and then that the given response is received back.
func RequireSendReceive(
ctx context.Context,
t *testing.T,
channel p2p.Channel,
peerID types.NodeID,
send proto.Message,
) {
RequireSend(ctx, t, channel, p2p.Envelope{To: peerID, Message: send})
RequireReceive(ctx, t, channel, p2p.Envelope{From: peerID, Message: send})
}
// RequireNoUpdates requires that a PeerUpdates subscription is empty.
func RequireNoUpdates(ctx context.Context, t *testing.T, peerUpdates *p2p.PeerUpdates) {
t.Helper()
select {
case update := <-peerUpdates.Updates():
if ctx.Err() == nil {
require.Fail(t, "unexpected peer updates", "got %v", update)
}
case <-ctx.Done():
default:
}
}
// RequireError requires that the given peer error is submitted for a peer.
func RequireError(ctx context.Context, t *testing.T, channel p2p.Channel, peerError p2p.PeerError) {
tctx, tcancel := context.WithTimeout(ctx, time.Second)
defer tcancel()
err := channel.SendError(tctx, peerError)
switch {
case errors.Is(err, context.DeadlineExceeded):
require.Fail(t, "timed out reporting error", "%v for %q", peerError, channel.String())
default:
require.NoError(t, err, "unexpected error")
}
}
// RequireUpdate requires that a PeerUpdates subscription yields the given update.
func RequireUpdate(t *testing.T, peerUpdates *p2p.PeerUpdates, expect p2p.PeerUpdate) {
timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks
defer timer.Stop()
select {
case update := <-peerUpdates.Updates():
require.Equal(t, expect.NodeID, update.NodeID, "node id did not match")
require.Equal(t, expect.Status, update.Status, "statuses did not match")
case <-timer.C:
require.Fail(t, "timed out waiting for peer update", "expected %v", expect)
}
}
// RequireUpdates requires that a PeerUpdates subscription yields the given updates
// in the given order.
func RequireUpdates(t *testing.T, peerUpdates *p2p.PeerUpdates, expect []p2p.PeerUpdate) {
timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks
defer timer.Stop()
actual := []p2p.PeerUpdate{}
for {
select {
case update := <-peerUpdates.Updates():
actual = append(actual, update)
if len(actual) == len(expect) {
for idx := range expect {
require.Equal(t, expect[idx].NodeID, actual[idx].NodeID)
require.Equal(t, expect[idx].Status, actual[idx].Status)
}
return
}
case <-timer.C:
require.Equal(t, expect, actual, "did not receive expected peer updates")
return
}
}
}