Skip to content

Commit

Permalink
Implement zmq notifications for chainlocked blocks (#2899)
Browse files Browse the repository at this point in the history
* Unify zmq message order

* Implement `zmqpubhashchainlock` and `zmqpubrawchainlock`
  • Loading branch information
UdjinM6 committed May 8, 2019
1 parent 66a2cde commit 89f6f75
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 39 deletions.
30 changes: 19 additions & 11 deletions contrib/zmq/zmq_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ def __init__(self):

self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashchainlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtxlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernancevote")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernanceobject")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashinstantsenddoublespend")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawchainlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtxlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote")
Expand All @@ -69,38 +71,44 @@ async def handle(self) :
if topic == b"hashblock":
print('- HASH BLOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashchainlock":
print('- HASH CHAINLOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashtx":
print ('- HASH TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashtxlock":
print('- HASH TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernancevote":
print('- HASH GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernanceobject":
print('- HASH GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashinstantsenddoublespend":
print('- HASH IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawblock":
print('- RAW BLOCK HEADER ('+sequence+') -')
print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == b"rawchainlock":
print('- RAW CHAINLOCK ('+sequence+') -')
print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == b"rawtx":
print('- RAW TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawtxlock":
print('- RAW TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernancevote":
print('- HASH GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernanceobject":
print('- HASH GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawgovernancevote":
print('- RAW GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawgovernanceobject":
print('- RAW GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashinstantsenddoublespend":
print('- HASH IS DOUBLE SPEND ('+sequence+') -')
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
# schedule ourselves to receive the next message
asyncio.ensure_future(self.handle())
Expand Down
30 changes: 19 additions & 11 deletions contrib/zmq/zmq_sub3.4.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ def __init__(self):

self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashchainlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtxlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernancevote")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernanceobject")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashinstantsenddoublespend")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawchainlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtxlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote")
Expand All @@ -72,38 +74,44 @@ def handle(self) :
if topic == b"hashblock":
print('- HASH BLOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashchainlock":
print('- HASH CHAINLOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashtx":
print ('- HASH TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashtxlock":
print('- HASH TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernancevote":
print('- HASH GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernanceobject":
print('- HASH GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashinstantsenddoublespend":
print('- HASH IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawblock":
print('- RAW BLOCK HEADER ('+sequence+') -')
print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == b"rawchainlock":
print('- RAW CHAINLOCK ('+sequence+') -')
print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == b"rawtx":
print('- RAW TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawtxlock":
print('- RAW TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernancevote":
print('- HASH GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernanceobject":
print('- HASH GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawgovernancevote":
print('- RAW GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawgovernanceobject":
print('- RAW GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashinstantsenddoublespend":
print('- HASH IS DOUBLE SPEND ('+sequence+') -')
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
# schedule ourselves to receive the next message
asyncio.ensure_future(self.handle())
Expand Down
12 changes: 7 additions & 5 deletions doc/zmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,20 @@ the command line or in the configuration file.

Currently, the following notifications are supported:

-zmqpubhashblock=address
-zmqpubhashchainlock=address
-zmqpubhashtx=address
-zmqpubhashtxlock=address
-zmqpubhashblock=address
-zmqpubhashgovernancevote=address
-zmqpubhashgovernanceobject=address
-zmqpubhashinstantsenddoublespend=address
-zmqpubrawblock=address
-zmqpubrawchainlock=address
-zmqpubrawtx=address
-zmqpubrawtxlock=address
-zmqpubhashgovernancevote=address
-zmqpubhashgovernanceobject=address
-zmqpubrawgovernancevote=address
-zmqpubhashgovernanceobject=address
-zmqpubrawgovernanceobject=address
-zmqpubrawinstantsenddoublespend=address
-zmqpubhashinstantsenddoublespend=address

The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.
Expand Down
5 changes: 5 additions & 0 deletions src/zmq/zmqabstractnotifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ bool CZMQAbstractNotifier::NotifyBlock(const CBlockIndex * /*CBlockIndex*/)
return true;
}

bool CZMQAbstractNotifier::NotifyChainLock(const CBlockIndex * /*CBlockIndex*/)
{
return true;
}

bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/)
{
return true;
Expand Down
1 change: 1 addition & 0 deletions src/zmq/zmqabstractnotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class CZMQAbstractNotifier
virtual void Shutdown() = 0;

virtual bool NotifyBlock(const CBlockIndex *pindex);
virtual bool NotifyChainLock(const CBlockIndex *pindex);
virtual bool NotifyTransaction(const CTransaction &transaction);
virtual bool NotifyTransactionLock(const CTransaction &transaction);
virtual bool NotifyGovernanceVote(const CGovernanceVote &vote);
Expand Down
19 changes: 19 additions & 0 deletions src/zmq/zmqnotificationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
std::list<CZMQAbstractNotifier*> notifiers;

factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
factories["pubhashchainlock"] = CZMQAbstractNotifier::Create<CZMQPublishHashChainLockNotifier>;
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
factories["pubhashtxlock"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionLockNotifier>;
factories["pubhashgovernancevote"] = CZMQAbstractNotifier::Create<CZMQPublishHashGovernanceVoteNotifier>;
factories["pubhashgovernanceobject"] = CZMQAbstractNotifier::Create<CZMQPublishHashGovernanceObjectNotifier>;
factories["pubhashinstantsenddoublespend"] = CZMQAbstractNotifier::Create<CZMQPublishHashInstantSendDoubleSpendNotifier>;
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
factories["pubrawchainlock"] = CZMQAbstractNotifier::Create<CZMQPublishRawChainLockNotifier>;
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
factories["pubrawtxlock"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionLockNotifier>;
factories["pubrawgovernancevote"] = CZMQAbstractNotifier::Create<CZMQPublishRawGovernanceVoteNotifier>;
Expand Down Expand Up @@ -152,6 +154,23 @@ void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, co
}
}

void CZMQNotificationInterface::NotifyChainLock(const CBlockIndex *pindex)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyChainLock(pindex))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}

void CZMQNotificationInterface::SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, int posInBlock)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
Expand Down
1 change: 1 addition & 0 deletions src/zmq/zmqnotificationinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class CZMQNotificationInterface : public CValidationInterface
// CValidationInterface
void SyncTransaction(const CTransaction& tx, const CBlockIndex *pindex, int posInBlock) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void NotifyChainLock(const CBlockIndex *pindex) override;
void NotifyTransactionLock(const CTransaction &tx) override;
void NotifyGovernanceVote(const CGovernanceVote& vote) override;
void NotifyGovernanceObject(const CGovernanceObject& object) override;
Expand Down
57 changes: 45 additions & 12 deletions src/zmq/zmqpublishnotifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@

static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;

static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_HASHTXLOCK = "hashtxlock";
static const char *MSG_HASHGVOTE = "hashgovernancevote";
static const char *MSG_HASHGOBJ = "hashgovernanceobject";
static const char *MSG_HASHISCON = "hashinstantsenddoublespend";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx";
static const char *MSG_RAWTXLOCK = "rawtxlock";
static const char *MSG_RAWGVOTE = "rawgovernancevote";
static const char *MSG_RAWGOBJ = "rawgovernanceobject";
static const char *MSG_RAWISCON = "rawinstantsenddoublespend";
static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHCHAINLOCK = "hashchainlock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_HASHTXLOCK = "hashtxlock";
static const char *MSG_HASHGVOTE = "hashgovernancevote";
static const char *MSG_HASHGOBJ = "hashgovernanceobject";
static const char *MSG_HASHISCON = "hashinstantsenddoublespend";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWCHAINLOCK = "rawchainlock";
static const char *MSG_RAWTX = "rawtx";
static const char *MSG_RAWTXLOCK = "rawtxlock";
static const char *MSG_RAWGVOTE = "rawgovernancevote";
static const char *MSG_RAWGOBJ = "rawgovernanceobject";
static const char *MSG_RAWISCON = "rawinstantsenddoublespend";

// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
Expand Down Expand Up @@ -159,6 +161,16 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
return SendMessage(MSG_HASHBLOCK, data, 32);
}

bool CZMQPublishHashChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint("zmq", "zmq: Publish hashchainlock %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
return SendMessage(MSG_HASHCHAINLOCK, data, 32);
}

bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
Expand Down Expand Up @@ -234,6 +246,27 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
}

bool CZMQPublishRawChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex)
{
LogPrint("zmq", "zmq: Publish rawchainlock %s\n", pindex->GetBlockHash().GetHex());

const Consensus::Params& consensusParams = Params().GetConsensus();
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
{
LOCK(cs_main);
CBlock block;
if(!ReadBlockFromDisk(block, pindex, consensusParams))
{
zmqError("Can't read block from disk");
return false;
}

ss << block;
}

return SendMessage(MSG_RAWCHAINLOCK, &(*ss.begin()), ss.size());
}

bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
Expand Down
12 changes: 12 additions & 0 deletions src/zmq/zmqpublishnotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier
bool NotifyBlock(const CBlockIndex *pindex) override;
};

class CZMQPublishHashChainLockNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyChainLock(const CBlockIndex *pindex) override;
};

class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier
{
public:
Expand Down Expand Up @@ -72,6 +78,12 @@ class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier
bool NotifyBlock(const CBlockIndex *pindex) override;
};

class CZMQPublishRawChainLockNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyChainLock(const CBlockIndex *pindex) override;
};

class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier
{
public:
Expand Down

0 comments on commit 89f6f75

Please sign in to comment.