Skip to content

Commit

Permalink
zmq: Add support to listen on multiple interfaces
Browse files Browse the repository at this point in the history
Summary:
> This PR adds support for ZeroMQ to listen on multiple interfaces, just like the RPC server.
> Currently, if you specify more than one e.g. zmqpubhashblock paramter, only the first one will be used. Therefore a user may be forced to listen on all interfaces (e.g. zmqpubhashblock=0.0.0.0:28332), which can result in an increased attack surface.
> With this PR a user can specify multiple interfaces to listen on, e.g.
> -zmqpubhashblock=tcp://127.0.0.1:28332 -zmqpubhashblock=tcp://192.168.1.123:28332.

This is a backport of [[bitcoin/bitcoin#18309 | core#18309]]

Test Plan: `ninja all check-all`

Reviewers: #bitcoin_abc, Fabien

Reviewed By: #bitcoin_abc, Fabien

Differential Revision: https://reviews.bitcoinabc.org/D10339
  • Loading branch information
n-thumann authored and PiRK committed Oct 14, 2021
1 parent e5c8cc0 commit 8b7ad52
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 16 deletions.
2 changes: 2 additions & 0 deletions doc/release-notes/release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ This release includes the following features and fixes:
if the transaction passes validation.
- A "sequence" notifier is added to ZeroMQ notifications, enabling client-side mempool
tracking.
- The same ZeroMQ notification (e.g. `-zmqpubhashtx=address`) can now be specified multiple
times to publish the same notification to different ZeroMQ sockets.
2 changes: 2 additions & 0 deletions doc/zmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Currently, the following notifications are supported:

The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.
The same notification can be specified more than once.

The option to set the PUB socket's outbound message high water mark
(SNDHWM) may be set individually for each notification:
Expand All @@ -81,6 +82,7 @@ The high water mark value must be an integer greater than or equal to 0.
For instance:

$ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://192.168.1.2:28332 \
-zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw \
-zmqpubhashtxhwm=10000

Expand Down
5 changes: 2 additions & 3 deletions src/zmq/zmqnotificationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ CZMQNotificationInterface *CZMQNotificationInterface::Create() {
std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
for (const auto &entry : factories) {
std::string arg("-zmq" + entry.first);
if (gArgs.IsArgSet(arg)) {
const auto &factory = entry.second;
const std::string address = gArgs.GetArg(arg, "");
const auto &factory = entry.second;
for (const std::string &address : gArgs.GetArgs(arg)) {
std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
notifier->SetType(entry.first);
notifier->SetAddress(address);
Expand Down
29 changes: 16 additions & 13 deletions src/zmq/zmqpublishnotifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command,

bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) {
BlockHash hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(),
this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++) {
data[31 - i] = hash.begin()[i];
Expand All @@ -196,7 +197,8 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) {
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(
const CTransaction &transaction) {
TxId txid = transaction.GetId();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", txid.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", txid.GetHex(),
this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++) {
data[31 - i] = txid.begin()[i];
Expand All @@ -205,8 +207,8 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(
}

bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) {
LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n",
pindex->GetBlockHash().GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n",
pindex->GetBlockHash().GetHex(), this->address);

const Config &config = GetConfig();
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
Expand All @@ -228,7 +230,8 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) {
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(
const CTransaction &transaction) {
TxId txid = transaction.GetId();
LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", txid.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", txid.GetHex(),
this->address);
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
ss << transaction;
return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
Expand All @@ -238,8 +241,8 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(
bool CZMQPublishSequenceNotifier::NotifyBlockConnect(
const CBlockIndex *pindex) {
BlockHash hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s\n",
hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n",
hash.GetHex(), this->address);
char data[sizeof(BlockHash) + 1];
for (unsigned int i = 0; i < sizeof(BlockHash); i++) {
data[sizeof(BlockHash) - 1 - i] = hash.begin()[i];
Expand All @@ -252,8 +255,8 @@ bool CZMQPublishSequenceNotifier::NotifyBlockConnect(
bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(
const CBlockIndex *pindex) {
BlockHash hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s\n",
hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n",
hash.GetHex(), this->address);
char data[sizeof(BlockHash) + 1];
for (unsigned int i = 0; i < sizeof(BlockHash); i++) {
data[sizeof(BlockHash) - 1 - i] = hash.begin()[i];
Expand All @@ -266,8 +269,8 @@ bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(
bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(
const CTransaction &transaction, uint64_t mempool_sequence) {
TxId txid = transaction.GetId();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s\n",
txid.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n",
txid.GetHex(), this->address);
uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1];
for (unsigned int i = 0; i < sizeof(TxId); i++) {
data[sizeof(TxId) - 1 - i] = txid.begin()[i];
Expand All @@ -281,8 +284,8 @@ bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(
bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(
const CTransaction &transaction, uint64_t mempool_sequence) {
TxId txid = transaction.GetId();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s\n",
txid.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n",
txid.GetHex(), this->address);
uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1];
for (unsigned int i = 0; i < sizeof(TxId); i++) {
data[sizeof(TxId) - 1 - i] = txid.begin()[i];
Expand Down
29 changes: 29 additions & 0 deletions test/functional/interface_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def run_test(self):
self.test_sequence()
self.test_mempool_sync()
self.test_reorg()
self.test_multiple_interfaces()
finally:
# Destroy the ZMQ context.
self.log.debug("Destroying ZMQ context")
Expand Down Expand Up @@ -619,6 +620,34 @@ def test_mempool_sync(self):

self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE)

def test_multiple_interfaces(self):
# Set up two subscribers with different addresses
subscribers = []
for i in range(2):
address = f"tcp://127.0.0.1:{28334 + i}"
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
hashblock = ZMQSubscriber(socket, b"hashblock")
socket.connect(address)
subscribers.append({'address': address, 'hashblock': hashblock})

self.restart_node(
0,
[f'-zmqpub{subscriber["hashblock"].topic.decode()}={subscriber["address"]}'
for subscriber in subscribers])

# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)

# Generate 1 block in nodes[0] and receive all notifications
self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE)

# Should receive the same block hash on both subscribers
assert_equal(self.nodes[0].getbestblockhash(),
subscribers[0]['hashblock'].receive().hex())
assert_equal(self.nodes[0].getbestblockhash(),
subscribers[1]['hashblock'].receive().hex())


if __name__ == '__main__':
ZMQTest().main()

0 comments on commit 8b7ad52

Please sign in to comment.