# Using hmb_messaging python module

The HMB protocol has been developped by GFZ/GEOFON for the sharing of seismological data in real time, although the protocol actually makes no particular requirements on the type of data that it transmits. This module is a client library for the python language which:
* handles opening sessions on the HMB server.
* reconnecting automatically in the event of connection error.
* sending messages in the correct format.
* receiving streams of messages from the server.
* can handle messages in both the json and bson formats.
* ...

HMB uses a publish-subscribe pattern. A central server contains several '**buses**' which each contain many '**queues**'. Messages are published to a particular **queue** - which are created dynamically as needed. Clients chose a **bus** but can suscribe to multiple **queues** and optionally filter the messages within each **queue**. 

Each message in a **queue** has a sequence number, this enables clients to sporadically connect to the server to retreive messages without missing any messages, or to retrieve older messages which are cached on the server.

Here is an example of using the library that returns the last 4 messages on the SYSTEM_ALERT **queue** on the MTtest_rule1 **bus**.

In [1]:
from IPython.display import display

In [2]:
#import hmb_messaging as hmb
import Traffic_Rob4.traffic.lib.hmb_messaging as hmb

In [3]:
# Choose here the bus to subsribe:
#hmbbus='http://cerf.emsc-csem.org:80/MTtest_rule0' # CLOSEST
hmbbus='http://cerf.emsc-csem.org:80/MTtest_rule1' # FASTER
#hmbbus='http://cerf.emsc-csem.org:80/MTtest_rule2' # STRONGEST
#hmbbus='http://cerf.emsc-csem.org:80/MTtest_rule3' # HIGHEST

hmbparam = {
        'queue': {
            'SYSTEM_ALERT': { #choose queue to listen on
                    'seq': -5,  #choose message to start at.
                    'keep': True  #make request blocking
                    }
            }
        }
#each message in the queue has a sequence number. Setting:
#seq = -1 => start with next new message
#seq = -2 => start with most recent message
#seq = -3 => start with message before the last message.
#...
#seq = 10 => start with the 10th message in the queue

hmbconn = hmb.Hmbsession(url=hmbbus, param=hmbparam, use_bson=False)

msgs = hmbconn.recv()
for msg in msgs:
    msg['data'] = '...'
    print msg

2018-10-09 07:40:19,309 - INFO - hmb_messaging.60063 - hmbsession opened, sid=AAV3xtl4DE5KM-YZ, cid=-5qVa_B1wiRJMIDe
2018-10-09 07:40:19,312 - INFO - hmb_messaging.60063 - hmbsession parameters are: {'queue': {'SYSTEM_ALERT': {'seq': 1047, 'keep': True}}, 'cid': u'-5qVa_B1wiRJMIDe'}


{u'queue': u'SYSTEM_ALERT', u'data': '...', u'type': u'SYSTEM_ALERT', u'sender': u'm75Ofx33WWC89c9U', u'seq': 1047}
{u'queue': u'SYSTEM_ALERT', u'data': '...', u'type': u'SYSTEM_ALERT', u'sender': u'nfdJqi2y5PV88Q_G', u'seq': 1048}
{u'queue': u'SYSTEM_ALERT', u'data': '...', u'type': u'SYSTEM_ALERT', u'sender': u'mY9mlFG4xF1TjdJz', u'seq': 1049}
{u'queue': u'SYSTEM_ALERT', u'data': '...', u'type': u'SYSTEM_ALERT', u'sender': u'RCSWw4HvJDrYlOJp', u'seq': 1050}


Using `'keep': True` means that the `hmbconn.recv()` function call will pause until there is at least one message (or heartbeat) received. Otherwise `'keep': False` means that the call will immediately return if there are no new messages - which can be useful if the process has other things to do while waiting for the next hmb message.

Now a more advance example:

In [4]:
# Choose here the bus to subsribe:
#hmbbus='http://cerf.emsc-csem.org:80/MTtest_rule0' # CLOSEST
hmbbus='http://cerf.emsc-csem.org:80/MTtest_rule1' # FASTER
#hmbbus='http://cerf.emsc-csem.org:80/MTtest_rule2' # STRONGEST
#hmbbus='http://cerf.emsc-csem.org:80/MTtest_rule3' # HIGHEST

hmbparam = {
        'heartbeat': 10, #sec
        'queue': {
            'SYSTEM_ALERT': { #choose queue to listen on
                    'seq': -2,  #choose message to start at.
                    'keep': True,  #make request blocking
                    'topics':  ['*'],  #filter messages using the optional topic field - pattern accepts wildcards
                    #'filter': ,
                    }
            }
        }

hmbconn = hmb.Hmbsession(url=hmbbus, param=hmbparam, use_bson=True, retry_wait=2, timeout=(6.05,11), autocreate_queues=True)
#setup timeouts just longer than the heartbeat rate
#set a delay between retries

i = 0
limit = 1
while True:
    msgs = hmbconn.recv(retries=10)
    #the method will retry in the event of an error or timeout by reopening a
    #new connection to the server and trying again.

    #nb. msgs might be empty due to eliminated heartbeat messages
    if len(msgs) == 0: print 'heartbeat'
    
    for msg in msgs:
        msg['data'] = '...'
        print msg
    
    #limiter
    i += len(msgs)
    if i >= limit: break

2018-10-09 07:40:19,390 - INFO - hmb_messaging.60063 - hmbsession opened, sid=AAV3xtl5TF2tjIU1, cid=-DAPt-XLxWqrr8FK
2018-10-09 07:40:19,393 - INFO - hmb_messaging.60063 - hmbsession parameters are: {'queue': {'SYSTEM_ALERT': {'topics': ['*'], 'seq': 1050L, 'keep': True}}, 'heartbeat': 10, 'cid': u'-DAPt-XLxWqrr8FK'}


{u'queue': u'SYSTEM_ALERT', u'data': '...', u'type': u'SYSTEM_ALERT', u'sender': u'RCSWw4HvJDrYlOJp', u'seq': 1050L}


This example uses a 10s heartbeat to make sure that the connection has not died -this is configured in the `hmbparam` dictionary. Also in `hmbparam`, the `'topics'` field shows where we would put a list of message topics to select from that particular queue and the `'filter'` field uses a mongodb style message filter. See the hmb documentation for details on these and other filtering options.

We have also configured timeouts on the underlying http connection using the timeout parameter of the `Hmbsession` init - extra parameters like this are passed directly to the `requests` library (see http://docs.python-requests.org/en/master/user/advanced/#timeouts). In this case, we have a 6.05 second connection timeout and an 11 second timeout on reading data.

This example uses bson to transfer messages from the server to the client. Messages are automatically transformed into a sequence of dictionaries by the `Hmbsession` object.

When we receive messages from the connection in this case, we have requested to make 10 attempts to connect to the hmb server. We have also requested for a 2s pause between retries (the `retry_wait` parameter of the `Hmbsession` init).

Lastly, this listener uses the `autocreate_queues` option to create the queue if it is missing, otherwise an error would be raised on each attempt of the client to connect to the server until the queue was created by another process.

We can get information on the hmb server:

In [11]:
display(hmbconn.info())
display(hmbconn.features())
display(hmbconn.status())

{u'queue': {u'ANNOUNCEMENT': {u'endseq': 3,
   u'endtime': None,
   u'startseq': 0,
   u'starttime': None,
   u'topic': {u'': {u'endtime': None, u'starttime': None}}},
  u'SYSTEM_ALERT': {u'endseq': 1051,
   u'endtime': None,
   u'startseq': 951,
   u'starttime': None,
   u'topic': {u'': {u'endtime': None, u'starttime': None}}},
  u'SYSTEM_ALERT100': {u'endseq': 1,
   u'endtime': None,
   u'startseq': 0,
   u'starttime': None,
   u'topic': {u'': {u'endtime': None, u'starttime': None}}},
  u'myqueue': {u'endseq': 1,
   u'endtime': None,
   u'startseq': 0,
   u'starttime': None,
   u'topic': {u'': {u'endtime': None, u'starttime': None}}}}}

{u'capabilities': [u'JSON',
  u'BSON',
  u'INFO',
  u'STREAM',
  u'WINDOW',
  u'FILTER',
  u'OOD'],
 u'functions': [],
 u'software': u'httpmsgbus v0.16 (2017.119)'}

{u'session': {}}

Here is an example of searching for particular hmb messages:

In [8]:
# Choose here the bus to subsribe:
#hmbbus='http://cerf.emsc-csem.org:80/MTtest_rule0' # CLOSEST
hmbbus='http://cerf.emsc-csem.org:80/MTtest_rule1' # FASTER
#hmbbus='http://cerf.emsc-csem.org:80/MTtest_rule2' # STRONGEST
#hmbbus='http://cerf.emsc-csem.org:80/MTtest_rule3' # HIGHEST

hmbparam = {
        'queue': {
            'SYSTEM_ALERT': { #choose queue to listen on
                    'seq': -25,  #choose message to start at.
                    'endseq': -10,  #choose message to end at
                    'keep': False,  #make request blocking
                    'topics': ['*']  #filter messages using the optional topic field - pattern accepts wildcards                
                    }
            }
        }

hmbconn = hmb.Hmbsession(url=hmbbus, param=hmbparam, use_bson=False)

# Note that if we request many messages from the server then we might not recieve them all with one call to recv() 
# and so we should be prepared to call recv() as many times as necessary in order to get all of the messages. When
# we have received all of the messages we will see an EOF message.

# The method recv_all() handles this for us (it should only be used when 'keep'==False):
msgs = hmbconn.recv_all()
for msg in msgs:
    msg['data'] = '...'
    print msg


2018-10-09 07:45:07,900 - INFO - hmb_messaging.60063 - hmbsession opened, sid=AAV3xuqrlAeNAbb1, cid=aRcbW2rTJnKBtHVz
2018-10-09 07:45:07,902 - INFO - hmb_messaging.60063 - hmbsession parameters are: {'queue': {'SYSTEM_ALERT': {'endseq': -10, 'topics': ['*'], 'seq': 1027, 'keep': False}}, 'cid': u'aRcbW2rTJnKBtHVz'}


{u'queue': u'SYSTEM_ALERT', u'data': '...', u'type': u'SYSTEM_ALERT', u'sender': u'mM4dXzAeNnNzGvG5', u'seq': 1027}
{u'queue': u'SYSTEM_ALERT', u'data': '...', u'type': u'SYSTEM_ALERT', u'sender': u'5Dk48BFexiT4zF7x', u'seq': 1028}
{u'queue': u'SYSTEM_ALERT', u'data': '...', u'type': u'SYSTEM_ALERT', u'sender': u'C_Cdl0x003AH7b3o', u'seq': 1029}
{u'queue': u'SYSTEM_ALERT', u'data': '...', u'type': u'SYSTEM_ALERT', u'sender': u'OZLcgR87IEo8v4qW', u'seq': 1030}
{u'queue': u'SYSTEM_ALERT', u'data': '...', u'type': u'SYSTEM_ALERT', u'sender': u'GSUoWHQX96MWfPOh', u'seq': 1031}
{u'queue': u'SYSTEM_ALERT', u'data': '...', u'type': u'SYSTEM_ALERT', u'sender': u'gClL8_XbfmW0B5A8', u'seq': 1032}
{u'queue': u'SYSTEM_ALERT', u'data': '...', u'type': u'SYSTEM_ALERT', u'sender': u'MF5KocqtgPHs2DcI', u'seq': 1033}
{u'queue': u'SYSTEM_ALERT', u'data': '...', u'type': u'SYSTEM_ALERT', u'sender': u'JmINGf3w31G7rroD', u'seq': 1034}
{u'queue': u'SYSTEM_ALERT', u'data': '...', u'type': u'SYSTEM_ALERT', u'

Next, let's see how to send hmb messages. The messages should be python dictionaries and need to contain at least the `'type'`, `'queue'` and `'data'` fields to be accepted by the hmb server. They can also contain `'topic'`, `'starttime'`  and `'endtime'` fields (it is unclear what is the format of time fields but it appears to require epoch time format here and isoformat strings when providing search parameters). It is also possible to send messages out of order (if the hmbserver supports the OOD capability) in which case an integer `'seq'` field should be defined.

The method `send_msg()` creates a message in the correct format. If more control is required or to send multiple messages simultaneously, the lower level `send msg()` method can be used.

In [10]:
hmbconn.send_msg(
            mtype = 'TEST',  #the message types are just strings, possible values depend upon the application.
            queue = 'myqueue',  #queues are created as needed ie. as soon as a message is sent to them.
            data  = {'field1':'value1','msg':'this is a test'},  #anything the json/bson can encode
            topic = None,  #an optional field that can be used by listeners to filter messages.
            retries = 1,
            )

2018-10-09 08:19:58,741 - INFO - hmb_messaging.60063 - hmbsession opened, sid=AAV3x2dLT63W8nVM, cid=aRcbW2rTJnKBtHVz
2018-10-09 08:19:58,743 - INFO - hmb_messaging.60063 - hmbsession parameters are: {'queue': {'SYSTEM_ALERT': {'endseq': -10, 'topics': ['*'], 'seq': 1051, 'keep': False}}, 'cid': u'aRcbW2rTJnKBtHVz'}


### dependencies
hmb_messaging relies on the following python packages (beyond those in the standard library)
* requests
* pymongo (for the bson module)
