Skip to content

Commit

Permalink
Merge pull request #6509
Browse files Browse the repository at this point in the history
45a6cce Fix race condition on test node shutdown (Casey Rodarmor)
  • Loading branch information
laanwj committed Aug 17, 2015
2 parents 6feeec1 + 45a6cce commit bb4faee
Showing 1 changed file with 48 additions and 46 deletions.
94 changes: 48 additions & 46 deletions qa/rpc-tests/test_framework/comptool.py
Expand Up @@ -27,6 +27,20 @@


global mininode_lock global mininode_lock


def wait_until(predicate, attempts=float('inf'), timeout=float('inf')):
attempt = 0
elapsed = 0

while attempt < attempts and elapsed < timeout:
with mininode_lock:
if predicate():
return True
attempt += 1
elapsed += 0.05
time.sleep(0.05)

return False

class TestNode(NodeConnCB): class TestNode(NodeConnCB):


def __init__(self, block_store, tx_store): def __init__(self, block_store, tx_store):
Expand All @@ -43,6 +57,10 @@ def __init__(self, block_store, tx_store):
# a response # a response
self.pingMap = {} self.pingMap = {}
self.lastInv = [] self.lastInv = []
self.closed = False

def on_close(self, conn):
self.closed = True


def add_connection(self, conn): def add_connection(self, conn):
self.conn = conn self.conn = conn
Expand Down Expand Up @@ -132,61 +150,48 @@ class TestManager(object):
def __init__(self, testgen, datadir): def __init__(self, testgen, datadir):
self.test_generator = testgen self.test_generator = testgen
self.connections = [] self.connections = []
self.test_nodes = []
self.block_store = BlockStore(datadir) self.block_store = BlockStore(datadir)
self.tx_store = TxStore(datadir) self.tx_store = TxStore(datadir)
self.ping_counter = 1 self.ping_counter = 1


def add_all_connections(self, nodes): def add_all_connections(self, nodes):
for i in range(len(nodes)): for i in range(len(nodes)):
# Create a p2p connection to each node # Create a p2p connection to each node
self.connections.append(NodeConn('127.0.0.1', p2p_port(i), test_node = TestNode(self.block_store, self.tx_store)
nodes[i], TestNode(self.block_store, self.tx_store))) self.test_nodes.append(test_node)
self.connections.append(NodeConn('127.0.0.1', p2p_port(i), nodes[i], test_node))
# Make sure the TestNode (callback class) has a reference to its # Make sure the TestNode (callback class) has a reference to its
# associated NodeConn # associated NodeConn
self.connections[-1].cb.add_connection(self.connections[-1]) test_node.add_connection(self.connections[-1])

def wait_for_disconnections(self):
def disconnected():
return all(node.closed for node in self.test_nodes)
return wait_until(disconnected, timeout=10)


def wait_for_verack(self): def wait_for_verack(self):
sleep_time = 0.05 def veracked():
max_tries = 10 / sleep_time # Wait at most 10 seconds return all(node.verack_received for node in self.test_nodes)
while max_tries > 0: return wait_until(veracked, timeout=10)
done = True
with mininode_lock:
for c in self.connections:
if c.cb.verack_received is False:
done = False
break
if done:
break
time.sleep(sleep_time)


def wait_for_pings(self, counter): def wait_for_pings(self, counter):
received_pongs = False def received_pongs():
while received_pongs is not True: return all(node.received_ping_response(counter) for node in self.test_nodes)
time.sleep(0.05) return wait_until(received_pongs)
received_pongs = True
with mininode_lock:
for c in self.connections:
if c.cb.received_ping_response(counter) is not True:
received_pongs = False
break


# sync_blocks: Wait for all connections to request the blockhash given # sync_blocks: Wait for all connections to request the blockhash given
# then send get_headers to find out the tip of each node, and synchronize # then send get_headers to find out the tip of each node, and synchronize
# the response by using a ping (and waiting for pong with same nonce). # the response by using a ping (and waiting for pong with same nonce).
def sync_blocks(self, blockhash, num_blocks): def sync_blocks(self, blockhash, num_blocks):
# Wait for nodes to request block (50ms sleep * 20 tries * num_blocks) def blocks_requested():
max_tries = 20*num_blocks return all(
while max_tries > 0: blockhash in node.block_request_map and node.block_request_map[blockhash]
with mininode_lock: for node in self.test_nodes
results = [ blockhash in c.cb.block_request_map and )
c.cb.block_request_map[blockhash] for c in self.connections ]
if False not in results:
break
time.sleep(0.05)
max_tries -= 1


# --> error if not requested # --> error if not requested
if max_tries == 0: if not wait_until(blocks_requested, attempts=20*num_blocks):
# print [ c.cb.block_request_map for c in self.connections ] # print [ c.cb.block_request_map for c in self.connections ]
raise AssertionError("Not all nodes requested block") raise AssertionError("Not all nodes requested block")
# --> Answer request (we did this inline!) # --> Answer request (we did this inline!)
Expand All @@ -202,18 +207,14 @@ def sync_blocks(self, blockhash, num_blocks):
# Analogous to sync_block (see above) # Analogous to sync_block (see above)
def sync_transaction(self, txhash, num_events): def sync_transaction(self, txhash, num_events):
# Wait for nodes to request transaction (50ms sleep * 20 tries * num_events) # Wait for nodes to request transaction (50ms sleep * 20 tries * num_events)
max_tries = 20*num_events def transaction_requested():
while max_tries > 0: return all(
with mininode_lock: txhash in node.tx_request_map and node.tx_request_map[txhash]
results = [ txhash in c.cb.tx_request_map and for node in self.test_nodes
c.cb.tx_request_map[txhash] for c in self.connections ] )
if False not in results:
break
time.sleep(0.05)
max_tries -= 1


# --> error if not requested # --> error if not requested
if max_tries == 0: if not wait_until(transaction_requested, attempts=20*num_events):
# print [ c.cb.tx_request_map for c in self.connections ] # print [ c.cb.tx_request_map for c in self.connections ]
raise AssertionError("Not all nodes requested transaction") raise AssertionError("Not all nodes requested transaction")
# --> Answer request (we did this inline!) # --> Answer request (we did this inline!)
Expand Down Expand Up @@ -336,6 +337,7 @@ def run(self):
print "Test %d: PASS" % test_number, [ c.rpc.getblockcount() for c in self.connections ] print "Test %d: PASS" % test_number, [ c.rpc.getblockcount() for c in self.connections ]
test_number += 1 test_number += 1


[ c.disconnect_node() for c in self.connections ]
self.wait_for_disconnections()
self.block_store.close() self.block_store.close()
self.tx_store.close() self.tx_store.close()
[ c.disconnect_node() for c in self.connections ]

0 comments on commit bb4faee

Please sign in to comment.