2727
2828global mininode_lock
2929
30+ def wait_until (predicate , attempts = float ('inf' ), timeout = float ('inf' )):
31+ attempt = 0
32+ elapsed = 0
33+
34+ while attempt < attempts and elapsed < timeout :
35+ with mininode_lock :
36+ if predicate ():
37+ return True
38+ attempt += 1
39+ elapsed += 0.05
40+ time .sleep (0.05 )
41+
42+ return False
43+
3044class TestNode (NodeConnCB ):
3145
3246 def __init__ (self , block_store , tx_store ):
@@ -43,6 +57,10 @@ def __init__(self, block_store, tx_store):
4357 # a response
4458 self .pingMap = {}
4559 self .lastInv = []
60+ self .closed = False
61+
62+ def on_close (self , conn ):
63+ self .closed = True
4664
4765 def add_connection (self , conn ):
4866 self .conn = conn
@@ -132,61 +150,48 @@ class TestManager(object):
132150 def __init__ (self , testgen , datadir ):
133151 self .test_generator = testgen
134152 self .connections = []
153+ self .test_nodes = []
135154 self .block_store = BlockStore (datadir )
136155 self .tx_store = TxStore (datadir )
137156 self .ping_counter = 1
138157
139158 def add_all_connections (self , nodes ):
140159 for i in range (len (nodes )):
141160 # Create a p2p connection to each node
142- self .connections .append (NodeConn ('127.0.0.1' , p2p_port (i ),
143- nodes [i ], TestNode (self .block_store , self .tx_store )))
161+ test_node = TestNode (self .block_store , self .tx_store )
162+ self .test_nodes .append (test_node )
163+ self .connections .append (NodeConn ('127.0.0.1' , p2p_port (i ), nodes [i ], test_node ))
144164 # Make sure the TestNode (callback class) has a reference to its
145165 # associated NodeConn
146- self .connections [- 1 ].cb .add_connection (self .connections [- 1 ])
166+ test_node .add_connection (self .connections [- 1 ])
167+
168+ def wait_for_disconnections (self ):
169+ def disconnected ():
170+ return all (node .closed for node in self .test_nodes )
171+ return wait_until (disconnected , timeout = 10 )
147172
148173 def wait_for_verack (self ):
149- sleep_time = 0.05
150- max_tries = 10 / sleep_time # Wait at most 10 seconds
151- while max_tries > 0 :
152- done = True
153- with mininode_lock :
154- for c in self .connections :
155- if c .cb .verack_received is False :
156- done = False
157- break
158- if done :
159- break
160- time .sleep (sleep_time )
174+ def veracked ():
175+ return all (node .verack_received for node in self .test_nodes )
176+ return wait_until (veracked , timeout = 10 )
161177
162178 def wait_for_pings (self , counter ):
163- received_pongs = False
164- while received_pongs is not True :
165- time .sleep (0.05 )
166- received_pongs = True
167- with mininode_lock :
168- for c in self .connections :
169- if c .cb .received_ping_response (counter ) is not True :
170- received_pongs = False
171- break
179+ def received_pongs ():
180+ return all (node .received_ping_response (counter ) for node in self .test_nodes )
181+ return wait_until (received_pongs )
172182
173183 # sync_blocks: Wait for all connections to request the blockhash given
174184 # then send get_headers to find out the tip of each node, and synchronize
175185 # the response by using a ping (and waiting for pong with same nonce).
176186 def sync_blocks (self , blockhash , num_blocks ):
177- # Wait for nodes to request block (50ms sleep * 20 tries * num_blocks)
178- max_tries = 20 * num_blocks
179- while max_tries > 0 :
180- with mininode_lock :
181- results = [ blockhash in c .cb .block_request_map and
182- c .cb .block_request_map [blockhash ] for c in self .connections ]
183- if False not in results :
184- break
185- time .sleep (0.05 )
186- max_tries -= 1
187+ def blocks_requested ():
188+ return all (
189+ blockhash in node .block_request_map and node .block_request_map [blockhash ]
190+ for node in self .test_nodes
191+ )
187192
188193 # --> error if not requested
189- if max_tries == 0 :
194+ if not wait_until ( blocks_requested , attempts = 20 * num_blocks ) :
190195 # print [ c.cb.block_request_map for c in self.connections ]
191196 raise AssertionError ("Not all nodes requested block" )
192197 # --> Answer request (we did this inline!)
@@ -202,18 +207,14 @@ def sync_blocks(self, blockhash, num_blocks):
202207 # Analogous to sync_block (see above)
203208 def sync_transaction (self , txhash , num_events ):
204209 # Wait for nodes to request transaction (50ms sleep * 20 tries * num_events)
205- max_tries = 20 * num_events
206- while max_tries > 0 :
207- with mininode_lock :
208- results = [ txhash in c .cb .tx_request_map and
209- c .cb .tx_request_map [txhash ] for c in self .connections ]
210- if False not in results :
211- break
212- time .sleep (0.05 )
213- max_tries -= 1
210+ def transaction_requested ():
211+ return all (
212+ txhash in node .tx_request_map and node .tx_request_map [txhash ]
213+ for node in self .test_nodes
214+ )
214215
215216 # --> error if not requested
216- if max_tries == 0 :
217+ if not wait_until ( transaction_requested , attempts = 20 * num_events ) :
217218 # print [ c.cb.tx_request_map for c in self.connections ]
218219 raise AssertionError ("Not all nodes requested transaction" )
219220 # --> Answer request (we did this inline!)
@@ -336,6 +337,7 @@ def run(self):
336337 print "Test %d: PASS" % test_number , [ c .rpc .getblockcount () for c in self .connections ]
337338 test_number += 1
338339
340+ [ c .disconnect_node () for c in self .connections ]
341+ self .wait_for_disconnections ()
339342 self .block_store .close ()
340343 self .tx_store .close ()
341- [ c .disconnect_node () for c in self .connections ]
0 commit comments