Skip to content

Commit

Permalink
Merge pull request #6094
Browse files Browse the repository at this point in the history
2a22d4b Fix comptool send_message call when MAX_INV_SZ reached (Suhas Daftuar)
574db48 Fix potential race conditions in p2p testing framework (Suhas Daftuar)
5487975 Don't run invalidblockrequest.py in travis until race condition is fixed (Suhas Daftuar)
ef32817 Fix mininode disconnections to work with select (Suhas Daftuar)
  • Loading branch information
laanwj committed May 4, 2015
2 parents 90c37bc + 2a22d4b commit 82d06e2
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 70 deletions.
85 changes: 48 additions & 37 deletions qa/rpc-tests/comptool.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
# on_getheaders: provide headers via BlockStore
# on_getdata: provide blocks via BlockStore

global mininode_lock

class TestNode(NodeConnCB):

def __init__(self, block_store, tx_store):
Expand Down Expand Up @@ -148,10 +150,11 @@ def wait_for_verack(self):
max_tries = 10 / sleep_time # Wait at most 10 seconds
while max_tries > 0:
done = True
for c in self.connections:
if c.cb.verack_received is False:
done = False
break
with mininode_lock:
for c in self.connections:
if c.cb.verack_received is False:
done = False
break
if done:
break
time.sleep(sleep_time)
Expand All @@ -161,10 +164,11 @@ def wait_for_pings(self, counter):
while received_pongs is not True:
time.sleep(0.05)
received_pongs = True
for c in self.connections:
if c.cb.received_ping_response(counter) is not True:
received_pongs = False
break
with mininode_lock:
for c in self.connections:
if c.cb.received_ping_response(counter) is not True:
received_pongs = False
break

# sync_blocks: Wait for all connections to request the blockhash given
# then send get_headers to find out the tip of each node, and synchronize
Expand All @@ -173,8 +177,9 @@ def sync_blocks(self, blockhash, num_blocks):
# Wait for nodes to request block (50ms sleep * 20 tries * num_blocks)
max_tries = 20*num_blocks
while max_tries > 0:
results = [ blockhash in c.cb.block_request_map and
c.cb.block_request_map[blockhash] for c in self.connections ]
with mininode_lock:
results = [ blockhash in c.cb.block_request_map and
c.cb.block_request_map[blockhash] for c in self.connections ]
if False not in results:
break
time.sleep(0.05)
Expand All @@ -199,8 +204,9 @@ def sync_transaction(self, txhash, num_events):
# Wait for nodes to request transaction (50ms sleep * 20 tries * num_events)
max_tries = 20*num_events
while max_tries > 0:
results = [ txhash in c.cb.tx_request_map and
c.cb.tx_request_map[txhash] for c in self.connections ]
with mininode_lock:
results = [ txhash in c.cb.tx_request_map and
c.cb.tx_request_map[txhash] for c in self.connections ]
if False not in results:
break
time.sleep(0.05)
Expand All @@ -221,19 +227,21 @@ def sync_transaction(self, txhash, num_events):
self.ping_counter += 1

# Sort inv responses from each node
[ c.cb.lastInv.sort() for c in self.connections ]
with mininode_lock:
[ c.cb.lastInv.sort() for c in self.connections ]

# Verify that the tip of each connection all agree with each other, and
# with the expected outcome (if given)
def check_results(self, blockhash, outcome):
for c in self.connections:
if outcome is None:
if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
with mininode_lock:
for c in self.connections:
if outcome is None:
if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
return False
elif ((c.cb.bestblockhash == blockhash) != outcome):
# print c.cb.bestblockhash, blockhash, outcome
return False
elif ((c.cb.bestblockhash == blockhash) != outcome):
# print c.cb.bestblockhash, blockhash, outcome
return False
return True
return True

# Either check that the mempools all agree with each other, or that
# txhash's presence in the mempool matches the outcome specified.
Expand All @@ -242,16 +250,17 @@ def check_results(self, blockhash, outcome):
# perhaps it would be useful to add the ability to check explicitly that
# a particular tx's existence in the mempool is the same across all nodes.
def check_mempool(self, txhash, outcome):
for c in self.connections:
if outcome is None:
# Make sure the mempools agree with each other
if c.cb.lastInv != self.connections[0].cb.lastInv:
# print c.rpc.getrawmempool()
with mininode_lock:
for c in self.connections:
if outcome is None:
# Make sure the mempools agree with each other
if c.cb.lastInv != self.connections[0].cb.lastInv:
# print c.rpc.getrawmempool()
return False
elif ((txhash in c.cb.lastInv) != outcome):
# print c.rpc.getrawmempool(), c.cb.lastInv
return False
elif ((txhash in c.cb.lastInv) != outcome):
# print c.rpc.getrawmempool(), c.cb.lastInv
return False
return True
return True

def run(self):
# Wait until verack is received
Expand All @@ -272,9 +281,10 @@ def run(self):
block = b_or_t
block_outcome = outcome
# Add to shared block_store, set as current block
self.block_store.add_block(block)
for c in self.connections:
c.cb.block_request_map[block.sha256] = False
with mininode_lock:
self.block_store.add_block(block)
for c in self.connections:
c.cb.block_request_map[block.sha256] = False
# Either send inv's to each node and sync, or add
# to invqueue for later inv'ing.
if (test_instance.sync_every_block):
Expand All @@ -288,10 +298,11 @@ def run(self):
assert(isinstance(b_or_t, CTransaction))
tx = b_or_t
tx_outcome = outcome
# Add to shared tx store
self.tx_store.add_transaction(tx)
for c in self.connections:
c.cb.tx_request_map[tx.sha256] = False
# Add to shared tx store and clear map entry
with mininode_lock:
self.tx_store.add_transaction(tx)
for c in self.connections:
c.cb.tx_request_map[tx.sha256] = False
# Again, either inv to all nodes or save for later
if (test_instance.sync_every_tx):
[ c.cb.send_inv(tx) for c in self.connections ]
Expand All @@ -302,7 +313,7 @@ def run(self):
invqueue.append(CInv(1, tx.sha256))
# Ensure we're not overflowing the inv queue
if len(invqueue) == MAX_INV_SZ:
[ c.sb.send_message(msg_inv(invqueue)) for c in self.connections ]
[ c.send_message(msg_inv(invqueue)) for c in self.connections ]
invqueue = []

# Do final sync if we weren't syncing on every block or every tx.
Expand Down
9 changes: 5 additions & 4 deletions qa/rpc-tests/maxblocksinflight.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ def run(self):
time.sleep(2)

total_requests = 0
for key in self.blockReqCounts:
total_requests += self.blockReqCounts[key]
if self.blockReqCounts[key] > 1:
raise AssertionError("Error, test failed: block %064x requested more than once" % key)
with mininode_lock:
for key in self.blockReqCounts:
total_requests += self.blockReqCounts[key]
if self.blockReqCounts[key] > 1:
raise AssertionError("Error, test failed: block %064x requested more than once" % key)
if total_requests > MAX_REQUESTS:
raise AssertionError("Error, too many blocks (%d) requested" % total_requests)
print "Round %d: success (total requests: %d)" % (count, total_requests)
Expand Down
67 changes: 38 additions & 29 deletions qa/rpc-tests/mininode.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import random
import cStringIO
import hashlib
from threading import Lock
from threading import RLock
from threading import Thread
import logging
import copy
Expand All @@ -37,6 +37,19 @@

MAX_INV_SZ = 50000

# Keep our own socket map for asyncore, so that we can track disconnects
# ourselves (to workaround an issue with closing an asyncore socket when
# using select)
mininode_socket_map = dict()

# One lock for synchronizing all data access between the networking thread (see
# NetworkThread below) and the thread running the test logic. For simplicity,
# NodeConn acquires this lock whenever delivering a message to to a NodeConnCB,
# and whenever adding anything to the send buffer (in send_message()). This
# lock should be acquired in the thread running the test logic to synchronize
# access to any data shared with the NodeConnCB or NodeConn.
mininode_lock = RLock()

# Serialization/deserialization tools
def sha256(s):
return hashlib.new('sha256', s).digest()
Expand Down Expand Up @@ -975,10 +988,6 @@ def __repr__(self):
# Reimplement the on_* functions to provide handling for events
class NodeConnCB(object):
def __init__(self):
# Acquire on all callbacks -- overkill for now since asyncore is
# single-threaded, but may be useful for synchronizing access to
# member variables in derived classes.
self.cbLock = Lock()
self.verack_received = False

# Derived classes should call this function once to set the message map
Expand All @@ -1004,7 +1013,7 @@ def create_callback_map(self):
}

def deliver(self, conn, message):
with self.cbLock:
with mininode_lock:
try:
self.cbmap[message.command](conn, message)
except:
Expand Down Expand Up @@ -1076,7 +1085,7 @@ class NodeConn(asyncore.dispatcher):
}

def __init__(self, dstaddr, dstport, rpc, callback, net="regtest"):
asyncore.dispatcher.__init__(self)
asyncore.dispatcher.__init__(self, map=mininode_socket_map)
self.log = logging.getLogger("NodeConn(%s:%d)" % (dstaddr, dstport))
self.dstaddr = dstaddr
self.dstport = dstport
Expand All @@ -1089,7 +1098,6 @@ def __init__(self, dstaddr, dstport, rpc, callback, net="regtest"):
self.state = "connecting"
self.network = net
self.cb = callback
self.sendbufLock = Lock() # for protecting the sendbuffer
self.disconnect = False

# stuff version msg into sendbuf
Expand Down Expand Up @@ -1140,24 +1148,18 @@ def readable(self):
return True

def writable(self):
if self.disconnect:
self.handle_close()
return False
else:
self.sendbufLock.acquire()
with mininode_lock:
length = len(self.sendbuf)
self.sendbufLock.release()
return (length > 0)
return (length > 0)

def handle_write(self):
self.sendbufLock.acquire()
try:
sent = self.send(self.sendbuf)
except:
self.handle_close()
return
self.sendbuf = self.sendbuf[sent:]
self.sendbufLock.release()
with mininode_lock:
try:
sent = self.send(self.sendbuf)
except:
self.handle_close()
return
self.sendbuf = self.sendbuf[sent:]

def got_data(self):
while True:
Expand Down Expand Up @@ -1201,7 +1203,6 @@ def got_data(self):
def send_message(self, message, pushbuf=False):
if self.state != "connected" and not pushbuf:
return
self.sendbufLock.acquire()
self.show_debug_msg("Send %s" % repr(message))
command = message.command
data = message.serialize()
Expand All @@ -1214,9 +1215,9 @@ def send_message(self, message, pushbuf=False):
h = sha256(th)
tmsg += h[:4]
tmsg += data
self.sendbuf += tmsg
self.last_sent = time.time()
self.sendbufLock.release()
with mininode_lock:
self.sendbuf += tmsg
self.last_sent = time.time()

def got_message(self, message):
if message.command == "version":
Expand All @@ -1229,12 +1230,20 @@ def got_message(self, message):

def disconnect_node(self):
self.disconnect = True
self.send_message(self.messagemap['ping']())


class NetworkThread(Thread):
def run(self):
asyncore.loop(0.1, True)
while mininode_socket_map:
# We check for whether to disconnect outside of the asyncore
# loop to workaround the behavior of asyncore when using
# select
disconnected = []
for fd, obj in mininode_socket_map.items():
if obj.disconnect:
disconnected.append(obj)
[ obj.handle_close() for obj in disconnected ]
asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1)


# An exception we can raise if we detect a potential disconnect
Expand Down

0 comments on commit 82d06e2

Please sign in to comment.