Skip to content

Commit

Permalink
zmq test: deduplicate test setup code (node restart, topics subscript…
Browse files Browse the repository at this point in the history
…ion)

Summary: This is a backport of [[bitcoin/bitcoin#20953 | core#20953]]

Test Plan:
```
test/functional/test_runner.py interface_zmq
```

Reviewers: #bitcoin_abc, Fabien

Reviewed By: #bitcoin_abc, Fabien

Differential Revision: https://reviews.bitcoinabc.org/D10588
  • Loading branch information
theStack authored and PiRK committed Nov 30, 2021
1 parent f8e2580 commit f6f7b13
Showing 1 changed file with 45 additions and 81 deletions.
126 changes: 45 additions & 81 deletions test/functional/interface_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,39 +88,49 @@ def run_test(self):
self.log.debug("Destroying ZMQ context")
self.ctx.destroy(linger=None)

# Restart node with the specified zmq notifications enabled, subscribe to
# all of them and return the corresponding ZMQSubscriber objects.
def setup_zmq_test(self, services, recv_timeout=60, connect_nodes=False):
subscribers = []
for topic, address in services:
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, recv_timeout * 1000)
subscribers.append(ZMQSubscriber(socket, topic.encode()))

self.restart_node(
0,
self.extra_args[0] + [
f"-zmqpub{topic}={address}" for topic, address in services
])

if connect_nodes:
self.connect_nodes(0, 1)

for i, sub in enumerate(subscribers):
sub.socket.connect(services[i][1])

# Relax so that the subscribers are ready before publishing zmq
# messages
sleep(0.2)

return subscribers

def test_basic(self):

# Invalid zmq arguments don't take down the node, see #17185.
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])

address = 'tcp://127.0.0.1:28332'
sockets = []
subs = []
services = [b"hashblock", b"hashtx", b"rawblock", b"rawtx"]
for service in services:
sockets.append(self.ctx.socket(zmq.SUB))
sockets[-1].set(zmq.RCVTIMEO, 60000)
subs.append(ZMQSubscriber(sockets[-1], service))

# Subscribe to all available topics.
subs = self.setup_zmq_test(
[(topic, address)
for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]],
connect_nodes=True)

hashblock = subs[0]
hashtx = subs[1]
rawblock = subs[2]
rawtx = subs[3]

self.restart_node(
0,
self.extra_args[0] + [
f"-zmqpub{sub.topic.decode()}={address}" for sub in [
hashblock, hashtx, rawblock, rawtx]]
)

self.connect_nodes(0, 1)
for socket in sockets:
socket.connect(address)
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)

num_blocks = 5
self.log.info(
"Generate {0} blocks (and {0} coinbase txes)".format(num_blocks))
Expand Down Expand Up @@ -188,27 +198,11 @@ def test_reorg(self):

address = 'tcp://127.0.0.1:28333'

services = [b"hashblock", b"hashtx"]
sockets = []
subs = []
for service in services:
sockets.append(self.ctx.socket(zmq.SUB))
# 2 second timeout to check end of notifications
sockets[-1].set(zmq.RCVTIMEO, 2000)
subs.append(ZMQSubscriber(sockets[-1], service))

# Subscribe to all available topics.
hashblock = subs[0]
hashtx = subs[1]

# Should only notify the tip if a reorg occurs
self.restart_node(
0, self.extra_args[0] + [f'-zmqpub{sub.topic.decode()}={address}'
for sub in [hashblock, hashtx]])
for socket in sockets:
socket.connect(address)
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
hashblock, hashtx = self.setup_zmq_test(
[(topic, address) for topic in ["hashblock", "hashtx"]],
# 2 second timeout to check end of notifications
recv_timeout=2)

# Generate 1 block in nodes[0] with 1 mempool tx and receive all
# notifications
Expand Down Expand Up @@ -307,16 +301,7 @@ def test_sequence(self):
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
"""
self.log.info("Testing 'sequence' publisher")
address = 'tcp://127.0.0.1:28333'
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
seq = ZMQSubscriber(socket, b'sequence')

self.restart_node(
0, self.extra_args[0] + [f'-zmqpub{seq.topic.decode()}={address}'])
socket.connect(address)
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])

# Mempool sequence number starts at 1
seq_num = 1
Expand Down Expand Up @@ -507,17 +492,8 @@ def test_mempool_sync(self):
return

self.log.info("Testing 'mempool sync' usage of sequence notifier")
address = 'tcp://127.0.0.1:28333'
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
seq = ZMQSubscriber(socket, b'sequence')

self.restart_node(
0, self.extra_args[0] + [f'-zmqpub{seq.topic.decode()}={address}'])
self.connect_nodes(0, 1)
socket.connect(address)
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")],
connect_nodes=True)

# In-memory counter, should always start at 1
next_mempool_seq = self.nodes[0].getrawmempool(
Expand Down Expand Up @@ -617,31 +593,19 @@ def test_mempool_sync(self):

def test_multiple_interfaces(self):
# Set up two subscribers with different addresses
subscribers = []
for i in range(2):
address = f"tcp://127.0.0.1:{28334 + i}"
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
hashblock = ZMQSubscriber(socket, b"hashblock")
socket.connect(address)
subscribers.append({'address': address, 'hashblock': hashblock})

self.restart_node(
0,
[f'-zmqpub{subscriber["hashblock"].topic.decode()}={subscriber["address"]}'
for subscriber in subscribers])

# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
subscribers = self.setup_zmq_test([
("hashblock", "tcp://127.0.0.1:28334"),
("hashblock", "tcp://127.0.0.1:28335"),
])

# Generate 1 block in nodes[0] and receive all notifications
self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE)

# Should receive the same block hash on both subscribers
assert_equal(self.nodes[0].getbestblockhash(),
subscribers[0]['hashblock'].receive().hex())
subscribers[0].receive().hex())
assert_equal(self.nodes[0].getbestblockhash(),
subscribers[1]['hashblock'].receive().hex())
subscribers[1].receive().hex())


if __name__ == '__main__':
Expand Down

0 comments on commit f6f7b13

Please sign in to comment.