This repository has been archived by the owner on May 13, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 346
/
tx_expecter.go
187 lines (173 loc) · 4.93 KB
/
tx_expecter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package rpctest
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/hyperledger/burrow/binary"
"github.com/hyperledger/burrow/event"
"github.com/hyperledger/burrow/execution/exec"
"github.com/hyperledger/burrow/rpc/rpcevents"
)
const (
maximumDurationWithoutProgress = 5 * time.Second
subscriptionBuffer = event.DefaultEventBufferCapacity + 1
logCommits = false
)
type TxExpecter struct {
sync.Mutex
emitter *event.Emitter
subID string
name string
all map[string]struct{}
expected map[string]struct{}
received map[string]struct{}
asserted bool
succeeded chan struct{}
ready chan struct{}
previousTotal int
blockRange *rpcevents.BlockRange
}
// Start listening for blocks and cross off any transactions that were expected.
// Expect can be called multiple times before a single call to AssertCommitted.
// TxExpecter is single-shot - create multiple TxExpecters if you want to call AssertCommitted multiple times.
func ExpectTxs(emitter *event.Emitter, name string) *TxExpecter {
exp := &TxExpecter{
emitter: emitter,
subID: fmt.Sprintf("%s_%s", name, event.GenSubID()),
name: name,
all: make(map[string]struct{}),
expected: make(map[string]struct{}),
received: make(map[string]struct{}),
succeeded: make(chan struct{}),
ready: make(chan struct{}),
blockRange: &rpcevents.BlockRange{},
}
go exp.listen()
<-exp.ready
return exp
}
// Expect a transaction to be committed
func (exp *TxExpecter) Expect(txHash binary.HexBytes) {
exp.Lock()
defer exp.Unlock()
if exp.closed() {
panic(fmt.Errorf("cannot call Expect after AssertCommitted"))
}
key := txHash.String()
exp.expected[key] = struct{}{}
exp.all[key] = struct{}{}
}
// Assert that all expected transactions are committed. Will block until all expected transactions are committed.
// Returns the BlockRange over which the transactions were committed.
func (exp *TxExpecter) AssertCommitted(t testing.TB) *rpcevents.BlockRange {
exp.Lock()
// close() clears subID to indicate this TxExpecter ha been used
if exp.closed() {
panic(fmt.Errorf("cannot call AssertCommitted more than once"))
}
exp.asserted = true
succeeded := exp.reconcile()
exp.Unlock()
defer exp.close()
if succeeded {
return exp.Pass()
}
var err error
for err == nil {
select {
case <-exp.succeeded:
return exp.Pass()
case <-time.After(maximumDurationWithoutProgress):
err = exp.assertMakingProgress()
}
}
t.Fatal(err)
return nil
}
func (exp *TxExpecter) Pass() *rpcevents.BlockRange {
fmt.Printf("%s: Successfully committed %d txs\n", exp.name, len(exp.all))
return exp.blockRange
}
func (exp *TxExpecter) listen() {
numTxs := 0
ch, err := exp.emitter.Subscribe(context.Background(), exp.subID, exec.QueryForBlockExecution(), subscriptionBuffer)
if err != nil {
panic(fmt.Errorf("ExpectTxs(): could not subscribe to blocks: %v", err))
}
close(exp.ready)
for msg := range ch {
be := msg.(*exec.BlockExecution)
blockTxs := len(be.TxExecutions)
numTxs += blockTxs
if logCommits {
fmt.Printf("%s: Total TXs committed at block %v: %v (+%v)\n", exp.name, be.GetHeight(), numTxs, blockTxs)
}
for _, txe := range be.TxExecutions {
// Return if this is the last expected transaction (and we are finished expecting)
if exp.receive(txe) {
close(exp.succeeded)
return
}
}
}
}
func (exp *TxExpecter) close() {
exp.Lock()
defer exp.Unlock()
err := exp.emitter.UnsubscribeAll(context.Background(), exp.subID)
if err != nil {
fmt.Printf("TxExpecter could not unsubscribe: %v\n", err)
}
exp.subID = ""
}
func (exp *TxExpecter) closed() bool {
return exp.subID == ""
}
func (exp *TxExpecter) receive(txe *exec.TxExecution) (done bool) {
exp.Lock()
defer exp.Unlock()
exp.received[txe.TxHash.String()] = struct{}{}
if exp.blockRange.Start == nil {
exp.blockRange.Start = rpcevents.AbsoluteBound(txe.Height)
exp.blockRange.End = rpcevents.AbsoluteBound(txe.Height)
}
exp.blockRange.End.Index = txe.Height
if exp.asserted {
return exp.reconcile()
}
return false
}
func (exp *TxExpecter) reconcile() (done bool) {
for re := range exp.received {
if _, ok := exp.expected[re]; ok {
// Remove from expected
delete(exp.expected, re)
// No longer need to cache in received
delete(exp.received, re)
}
}
total := len(exp.expected)
return total == 0
}
func (exp *TxExpecter) assertMakingProgress() error {
exp.Lock()
defer exp.Unlock()
total := len(exp.expected)
if exp.previousTotal == 0 {
exp.previousTotal = total
return nil
}
// if the total is reducing we are making progress
if total < exp.previousTotal {
return nil
}
committed := len(exp.all) - total
committedString := "none"
if committed != 0 {
committedString = fmt.Sprintf("only %d", committed)
}
return fmt.Errorf("TxExpecter timed out after %v: expecting %d txs to be committed but %s were",
maximumDurationWithoutProgress, total, committedString)
}