Skip to content

Commit

Permalink
ZMQ: add options to configure outbound message high water mark, aka S…
Browse files Browse the repository at this point in the history
…NDHWM

Github-Pull: bitcoin#14060
Rebased-From: a4edb16
  • Loading branch information
mruddy authored and luke-jr committed Dec 24, 2018
1 parent e4b91fb commit c84c99e
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 11 deletions.
1 change: 1 addition & 0 deletions contrib/zmq/zmq_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self):
self.zmqContext = zmq.asyncio.Context()

self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
Expand Down
1 change: 1 addition & 0 deletions contrib/zmq/zmq_sub3.4.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(self):
self.zmqContext = zmq.asyncio.Context()

self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
Expand Down
13 changes: 12 additions & 1 deletion doc/zmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,21 @@ Currently, the following notifications are supported:
The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.

The option to set the PUB socket's outbound message high water mark
(SNDHWM) may be set individually for each notification:

-zmqpubhashtxhwm=n
-zmqpubhashblockhwm=n
-zmqpubrawblockhwm=n
-zmqpubrawtxhwm=n

The high water mark value must be an integer greater than or equal to 0.

For instance:

$ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw
-zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw \
-zmqpubhashtxhwm=10000

Each PUB notification has a topic and body, where the header
corresponds to the notification type. For instance, for the
Expand Down
13 changes: 13 additions & 0 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
#include <openssl/crypto.h>

#if ENABLE_ZMQ
#include <zmq/zmqabstractnotifier.h>
#include <zmq/zmqnotificationinterface.h>
#include <zmq/zmqrpc.h>
#endif
Expand Down Expand Up @@ -445,13 +446,25 @@ void SetupServerArgs()
gArgs.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawwallettx=<address>", "Enable publish raw wallet transaction in <address>", false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubhashwallettxhwm=<n>", strprintf("Set publish hash wallet transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawwallettxhwm=<n>", strprintf("Set publish raw wallet transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
#else
hidden_args.emplace_back("-zmqpubhashblock=<address>");
hidden_args.emplace_back("-zmqpubhashtx=<address>");
hidden_args.emplace_back("-zmqpubhashwallettx=<address>");
hidden_args.emplace_back("-zmqpubrawblock=<address>");
hidden_args.emplace_back("-zmqpubrawtx=<address>");
hidden_args.emplace_back("-zmqpubrawwallettx=<address>");
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashtxhwm=<n>");
hidden_args.emplace_back("-zmqpubhashwallettxhwm=<n>");
hidden_args.emplace_back("-zmqpubrawblockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
hidden_args.emplace_back("-zmqpubrawwallettxhwm=<n>");
#endif

gArgs.AddArg("-checkblocks=<n>", strprintf("How many blocks to check at startup (default: %u, 0 = all)", DEFAULT_CHECKBLOCKS), true, OptionsCategory::DEBUG_TEST);
Expand Down
1 change: 1 addition & 0 deletions src/zmq/zmqabstractnotifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <zmq/zmqabstractnotifier.h>
#include <util.h>

const int CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM;

CZMQAbstractNotifier::~CZMQAbstractNotifier()
{
Expand Down
11 changes: 10 additions & 1 deletion src/zmq/zmqabstractnotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)();
class CZMQAbstractNotifier
{
public:
CZMQAbstractNotifier() : psocket(nullptr) { }
static const int DEFAULT_ZMQ_SNDHWM {1000};

CZMQAbstractNotifier() : psocket(nullptr), outbound_message_high_water_mark(DEFAULT_ZMQ_SNDHWM) { }
virtual ~CZMQAbstractNotifier();

template <typename T>
Expand All @@ -28,6 +30,12 @@ class CZMQAbstractNotifier
void SetType(const std::string &t) { type = t; }
std::string GetAddress() const { return address; }
void SetAddress(const std::string &a) { address = a; }
int GetOutboundMessageHighWaterMark() const { return outbound_message_high_water_mark; }
void SetOutboundMessageHighWaterMark(const int sndhwm) {
if (sndhwm >= 0) {
outbound_message_high_water_mark = sndhwm;
}
}

virtual bool Initialize(void *pcontext) = 0;
virtual void Shutdown() = 0;
Expand All @@ -40,6 +48,7 @@ class CZMQAbstractNotifier
void *psocket;
std::string type;
std::string address;
int outbound_message_high_water_mark; // aka SNDHWM
};

#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
7 changes: 4 additions & 3 deletions src/zmq/zmqnotificationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
CZMQAbstractNotifier *notifier = factory();
notifier->SetType(entry.first);
notifier->SetAddress(address);
notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
notifiers.push_back(notifier);
}
}
Expand Down Expand Up @@ -108,11 +109,11 @@ bool CZMQNotificationInterface::Initialize()
CZMQAbstractNotifier *notifier = *i;
if (notifier->Initialize(pcontext))
{
LogPrint(BCLog::ZMQ, " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
}
else
{
LogPrint(BCLog::ZMQ, " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
break;
}
}
Expand All @@ -139,7 +140,7 @@ void CZMQNotificationInterface::Shutdown()
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{
CZMQAbstractNotifier *notifier = *i;
LogPrint(BCLog::ZMQ, " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
notifier->Shutdown();
}
zmq_ctx_destroy(pcontext);
Expand Down
16 changes: 13 additions & 3 deletions src/zmq/zmqpublishnotifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,18 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
return false;
}

int rc = zmq_bind(psocket, address.c_str());
if (rc!=0)
LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);

int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
if (rc != 0)
{
zmqError("Failed to set outbound message high water mark");
zmq_close(psocket);
return false;
}

rc = zmq_bind(psocket, address.c_str());
if (rc != 0)
{
zmqError("Failed to bind address");
zmq_close(psocket);
Expand Down Expand Up @@ -124,7 +134,7 @@ void CZMQAbstractPublishNotifier::Shutdown()

if (count == 1)
{
LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address);
LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
int linger = 0;
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
zmq_close(psocket);
Expand Down
2 changes: 1 addition & 1 deletion src/zmq/zmqpublishnotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class CBlockIndex;
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
{
private:
uint32_t nSequence; //!< upcounting per message sequence number
uint32_t nSequence {0U}; //!< upcounting per message sequence number

public:

Expand Down
4 changes: 3 additions & 1 deletion src/zmq/zmqrpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ UniValue getzmqnotifications(const JSONRPCRequest& request)
"[\n"
" { (json object)\n"
" \"type\": \"pubhashtx\", (string) Type of notification\n"
" \"address\": \"...\" (string) Address of the publisher\n"
" \"address\": \"...\", (string) Address of the publisher\n"
" \"hwm\": n (numeric) Outbound message high water mark\n"
" },\n"
" ...\n"
"]\n"
Expand All @@ -38,6 +39,7 @@ UniValue getzmqnotifications(const JSONRPCRequest& request)
UniValue obj(UniValue::VOBJ);
obj.pushKV("type", n->GetType());
obj.pushKV("address", n->GetAddress());
obj.pushKV("hwm", n->GetOutboundMessageHighWaterMark());
result.push_back(obj);
}
}
Expand Down
2 changes: 1 addition & 1 deletion test/functional/rpc_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def _test_getzmqnotifications(self):

self.restart_node(0, extra_args=["-zmqpubhashtx=%s" % self.address])
assert_equal(self.nodes[0].getzmqnotifications(), [
{"type": "pubhashtx", "address": self.address},
{"type": "pubhashtx", "address": self.address, 'hwm': 1000},
])


Expand Down

0 comments on commit c84c99e

Please sign in to comment.