Permalink
Browse files

Merge #7762: [ZMQ] append a message sequence number to every ZMQ noti…

…fication

0b25a9f [ZMQ] append a message sequence number to every ZMQ notification (Jonas Schnelli)
de821d5 [ZMQ] refactor message string (Jonas Schnelli)
  • Loading branch information...
laanwj committed Apr 19, 2016
2 parents 0e6fd5e + 0b25a9f commit a1eb344ba8f4ef7dade755c823a9d001f837ae7d
Showing with 74 additions and 13 deletions.
  1. +9 −5 contrib/zmq/zmq_sub.py
  2. +9 −0 doc/release-notes.md
  3. +5 −0 doc/zmq.md
  4. +13 −0 qa/rpc-tests/zmq_test.py
  5. +26 −8 src/zmq/zmqpublishnotifier.cpp
  6. +12 −0 src/zmq/zmqpublishnotifier.h
View
@@ -3,6 +3,7 @@
import array
import binascii
import zmq
import struct
port = 28332
@@ -19,18 +20,21 @@
msg = zmqSubSocket.recv_multipart()
topic = str(msg[0])
body = msg[1]
sequence = "Unknown";
if len(msg[-1]) == 4:
msgSequence = struct.unpack('<I', msg[-1])[-1]
sequence = str(msgSequence)
if topic == "hashblock":
print "- HASH BLOCK -"
print '- HASH BLOCK ('+sequence+') -'
print binascii.hexlify(body)
elif topic == "hashtx":
print '- HASH TX -'
print '- HASH TX ('+sequence+') -'
print binascii.hexlify(body)
elif topic == "rawblock":
print "- RAW BLOCK HEADER -"
print '- RAW BLOCK HEADER ('+sequence+') -'
print binascii.hexlify(body[:80])
elif topic == "rawtx":
print '- RAW TX -'
print '- RAW TX ('+sequence+') -'
print binascii.hexlify(body)
except KeyboardInterrupt:
View
@@ -52,6 +52,15 @@ The following outputs are affected by this change:
- REST `/rest/block/` (JSON format when including extended tx details)
- `bitcoin-tx -json`
### ZMQ
Each ZMQ notification now contains an up-counting sequence number that allows
listeners to detect lost notifications.
The sequence number is always the last element in a multi-part ZMQ notification and
therefore backward compatible.
Each message type has its own counter.
(https://github.com/bitcoin/bitcoin/pull/7762)
### Configuration and command-line options
### Block and transaction handling
View
@@ -99,3 +99,8 @@ using other means such as firewalling.
Note that when the block chain tip changes, a reorganisation may occur
and just the tip will be notified. It is up to the subscriber to
retrieve the chain from the last known block to the new tip.
There are several possibilities that ZMQ notification can get lost
during transmission depending on the communication type your are
using. Bitcoind appends an up-counting sequence number to each
notification which allows listeners to detect lost notifications.
View
@@ -11,6 +11,7 @@
from test_framework.util import *
import zmq
import binascii
import struct
try:
import http.client as httplib
@@ -47,11 +48,17 @@ def run_test(self):
print "listen..."
msg = self.zmqSubSocket.recv_multipart()
topic = msg[0]
assert_equal(topic, b"hashtx")
body = msg[1]
nseq = msg[2]
msgSequence = struct.unpack('<I', msg[-1])[-1]
assert_equal(msgSequence, 0) #must be sequence 0 on hashtx
msg = self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
msgSequence = struct.unpack('<I', msg[-1])[-1]
assert_equal(msgSequence, 0) #must be sequence 0 on hashblock
blkhash = bytes_to_hex_str(body)
assert_equal(genhashes[0], blkhash) #blockhash from generate must be equal to the hash received over zmq
@@ -61,12 +68,16 @@ def run_test(self):
self.sync_all()
zmqHashes = []
blockcount = 0
for x in range(0,n*2):
msg = self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
if topic == b"hashblock":
zmqHashes.append(bytes_to_hex_str(body))
msgSequence = struct.unpack('<I', msg[-1])[-1]
assert_equal(msgSequence, blockcount+1)
blockcount += 1
for x in range(0,n):
assert_equal(genhashes[x], zmqHashes[x]) #blockhash from generate must be equal to the hash received over zmq
@@ -82,6 +93,8 @@ def run_test(self):
hashZMQ = ""
if topic == b"hashtx":
hashZMQ = bytes_to_hex_str(body)
msgSequence = struct.unpack('<I', msg[-1])[-1]
assert_equal(msgSequence, blockcount+1)
assert_equal(hashRPC, hashZMQ) #blockhash from generate must be equal to the hash received over zmq
@@ -9,6 +9,11 @@
static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx";
// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
{
@@ -118,15 +123,31 @@ void CZMQAbstractPublishNotifier::Shutdown()
psocket = 0;
}
bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
{
assert(psocket);
/* send three parts, command & data & a LE 4byte sequence number */
unsigned char msgseq[sizeof(uint32_t)];
WriteLE32(&msgseq[0], nSequence);
int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0);
if (rc == -1)
return false;
/* increment memory only sequence number after sending */
nSequence++;
return true;
}
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint("zmq", "zmq: Publish hashblock %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0);
return rc == 0;
return SendMessage(MSG_HASHBLOCK, data, 32);
}
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
@@ -136,8 +157,7 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0);
return rc == 0;
return SendMessage(MSG_HASHTX, data, 32);
}
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
@@ -158,8 +178,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
ss << block;
}
int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0);
return rc == 0;
return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
}
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
@@ -168,6 +187,5 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << transaction;
int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0);
return rc == 0;
return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
}
@@ -11,7 +11,19 @@ class CBlockIndex;
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
{
private:
uint32_t nSequence; //! upcounting per message sequence number
public:
/* send zmq multipart message
parts:
* command
* data
* message sequence number
*/
bool SendMessage(const char *command, const void* data, size_t size);
bool Initialize(void *pcontext);
void Shutdown();
};

0 comments on commit a1eb344

Please sign in to comment.