# INTRODUCTION TO LIVE DATA FEEDS 
**via the STOMP PROTOCOL**

<div class='alert alert-success'><b>NOTE:</b>This jupyter notebook is slide enabled. If you have <b>rise</b> installed you will be able to go into the slideshow mode by pressing the slideshow button below.</div>
<img src='./images/rise-slideshow.png'>

Publish/subscribe messaging, or pub/sub messaging, is a form of asynchronous service-to-service communication used in serverless and microservices architectures. In a pub/sub model, any message published to a topic is immediately received by all of the subscribers to the topic. Pub/sub messaging can be used to enable event-driven architectures, or to decouple applications in order to increase performance, reliability and scalability.

In this hands-on Beginners Machine Learning online workshop, you will learn how to listen to live feeds, parse save the received messages. For this workshop we will be using Network Rail's live data feeds accessible [here](https://www.networkrail.co.uk/who-we-are/transparency-and-ethics/transparency/open-data-feeds/).

<div class='alert alert-info'>
    Information about the current Network Rail Feeds can be found 
    <a href='https://wiki.openraildata.com/index.php/About_the_Network_Rail_feeds'><b>here</b></a>
    </div>
    
<img src='./images/nr-feeds.png'>

During the workshop we will listen to Network Rail's live train describer data in this Jupyter notebook. We first will learn about the STOMP protocol, then look at how one can listen to these messages, and then will define our own behaviours to save and parse these messages received.

At the end of this tutorial, you are required to complete a set of exercises to distill your understanding of the content we will cover in this workshop.

<div class='alert alert-danger'>
    <b>Prerequisite:</b> Before you proceed with this tutorial you will need to register for an account on the <a href = 'https://datafeeds.networkrail.co.uk/ntrod/login'>network rail data feed portal</a>. Once your account has been activated you will be able to subscribe to feeds via the portal. Instructions how to do this is in the `account-creation.ipynb` notebook. It may take a few hours to a few days for your account to be activated.
</div>

### What is STOMP?
[STOMP](https://stomp.github.io/stomp-specification-1.2.html#Abstract) (Simple Text Oriented Messaging Protocol) is a text based protocol used with message oriented middleware in IoT (Internet of Things). It defines a text based wire-format for messages passed between these clients and servers.

The main philosophies driving the design of STOMP are simplicity and interoperability.

Like other broker based protocols, STOMP specifies publish-subscribe mechanism. Here clients subscribe to topics (similar to requesting a web path in HTTP) and they will be notified whenever messages have been sent to that particular topic by broker. 

```python
import stomp 

# Define communication setings here
c = stomp.Connection(host_and_ports=[('datafeeds.networkrail.co.uk', 61618)],
                     vhost='datafeeds.networkrail.co.uk')
```

```python
# start a connection and authenticate
c.connect(username='username',
          passcode='password',
          wait=True)

# Subscribe to topics
c.subscribe('/topic/TD_WESS_SIG_AREA', id='test-td')
```

The main difference between HTTP and STOMP is the protocol of communication. While HTTP works within a request-response constraint, STOMP uses less contraining communication protocols - for instance, clients can keep a connection open with a sever to receive messages as they are broadcasted by the server.

### Why was STOMP created?
STOMP arose from a need to connect to enterprise message brokers from scripting languages such as Python. It distinguishes itself by covering a small subset of commonly used messaging operations rather than providing a comprehensive messaging API.

The STOMP Architecture is show below:
<img src='./images/STOMP-protocol-architecture.jpg' width='60%' style='margin: 0 auto'>

STOMP is designed to be a lightweight protocol that is easy to implement both on the client and server side in a wide range of languages. This implies, in particular, that there are not many constraints on the architecture of servers and many features such as destination naming and reliability semantics are implementation specific.


### What are STOMP frames?
STOMP is a **frame-based** protocol which assumes a reliable 2-way streaming network protocol (such as TCP) underneath. Think of **protocols** as established rules of communication between servers and clients. The client and server will communicate using STOMP frames sent over the stream. 

<div class= 'alert alert-info'>
All STOMP clients and servers MUST support UTF-8 encoding and decoding. Therefore, for maximum interoperability in a heterogeneous computing environment, it is RECOMMENDED that text based content be encoded with UTF-8.
    </div>

### What is the STOMP client?
A STOMP client is a user-agent which can act in two (possibly simultaneous) modes:
- as a producer, sending messages to a destination on the server via a SEND frame
- as a consumer, sending a SUBSCRIBE frame for a given destination and receiving messages from the server as MESSAGE frames.

```python
import stomp
import sleep

# Connection Settings 
client = stomp.Connection()

# Listening Behaviour
client.set_listener()
```

```python
# Establishing Connection
client.connect()

# Subscribe to a topic and listen to messages based on listening Behaviour 
client.subscribe()
```

```python
# Send a message to the broker
client.send()

# Disconnect the client from the broker
client.disconnect()
```

### What does connecting mean?
A STOMP client initiates the stream or TCP connection to the server by sending the CONNECT frame. If the server accepts the connection attempt it will respond with a CONNECTED frame.

The server can also reject any connection attempt. The server will respond back with an ERROR frame explaining why the connection was rejected and then close the connection.

```python
# start a connection and authenticate
c.connect(username='username',
          passcode='password',
          wait=True)
```

### What is subscribing?
The `SUBSCRIBE` frame is used to register to listen to a given destination. 

```python
c = stomp.Connection()
c.subscribe('/topic/feed_topic', 'test-td')
```

### What is listening?
To receive messages back from the messaging system, you need to setup some sort of listener on your connection, and then subscribe to the destination (see STOMP subscribe). Listeners are simply a subclass which implements the methods in the `ConnectionListener` class. Stomp provides a few implementations of listeners, but the simplest is `PrintingListener` which just prints all interactions between the client and server.

In [None]:
c = stomp.Connection()

c.set_listener('Printing Listener', PrintingListener)

c.connect(username=NETWORK_RAIL_AUTH['username'],
           passcode=NETWORK_RAIL_AUTH['password'],
           wait=True)

c.subscribe('/topic/feed_topic', 'test-td')
sleep(x) # Listen for x number of seconds
c.disconnect()

<div class='alert alert-danger'>
    <b>WARNING:</b> stomp.py may behave unexpectedly if you set multiple listener specs on one client. Always create a new connection client and then set a new listener.
</div>

We can also provide our custom `ConnectionListener`. To do so we need to define the `ConnectionListner` class. Instantiate it and provide the instantiated object to the `c.set_listener()` method.

In [None]:
class ConnectionListener():
    def __init__(self, mq):
        self._mq = mq

    def on_message(self, headers, message):
        self._mq.ack(id=headers['message-id'], subscription=headers['subscription'])

In [None]:
c = stomp.Connection()
Listener_obj = ConnectionListener()
c.set_listener('Our Custom Listener', Listener_obj)

c.connect(username=NETWORK_RAIL_AUTH['username'],
           passcode=NETWORK_RAIL_AUTH['password'],
           wait=True)

c.subscribe('/topic/feed_topic', 'test-td', ack = 'client-individual')
sleep(x) # Listen for x number of seconds
c.disconnect()

### What is heartbeating?
**Heart-beating** can optionally be used to test the healthiness of the underlying TCP connection and to make sure that the remote end is alive and kicking.

The heart-beat header provides enough information so that each party can find out if heart-beats can be used, in which direction, and with which frequency.

```python
c = stomp.Connection(heartbeats=(10000, 5000))
```

The first number represents what the stomp client can send (outgoing heart-beats):
- `heartbeats = (0, n)` means it cannot send heart-beats
- otherwise it is the smallest number of milliseconds between heart-beats that it can guarantee. For instance, `heartbeats = (10000, n)` is 10 seconds.

The second number represents what the client would like to get (incoming heart-beats):
- `heartbeats = (n, 0)` means it does not want to receive heart-beats
- otherwise it is the desired number of milliseconds between heart-beats. For instance, `heartbeats = (n, 5000)` is 5 seconds.

Regarding the heart-beats themselves, any new data received over the network connection is an indication that the remote end is alive. In a given direction, if heart-beats are expected every `n` milliseconds.

<div class='alert alert-danger'>If, inside a time window of at least <b>n</b> milliseconds, the receiver did not receive any new data, it MAY consider the connection as dead</div>

### Acknowledgements - What are Ack and Nacks - Why are they used?
Acknowledgements are a way to tell the message server that a message was either consumed, or not. Assume a collection of clients on a server listening on a queue, and a message which requires significant processing. One of the clients receives the message, checks resource usage on the server and decides to send a nack as a consequence. The message server could, at that point, decide to send to a failover server for processing (that’s a possible use, anyway).

`ACK` is used to acknowledge consumption of a message from a subscription using `client` or `client-individual` acknowledgment. Any messages received from such a subscription will not be considered to have been consumed until the message has been acknowledged via an `ACK`.

`NACK` is the opposite of `ACK`. It is used to tell the server that the client did not consume the message. The server can then either send the message to a different client, discard it, or put it in a dead letter queue. The exact behavior is server specific.

```python
c = stomp.Connection()
c.subscribe('/topic/feed_topic', 'test-td', ack = 'client-individual')
```

The valid values for the **ack header** are `auto`, `client`, or `client-individual`. If the header is not set, it defaults to `auto`.

When the ack mode is `auto`, then the client does not need to send the server `ACK` frames for the messages it receives. The server will assume the client has received the message as soon as it sends it to the client. This acknowledgment mode can cause messages being transmitted to the client to get dropped.

When the ack mode is `client`, then the client MUST send the server `ACK` frames for the messages it processes. 

<div class='alert alert-danger'>
If the connection fails before a client sends an `ACK` frame for the message the server will assume the message has not been processed and MAY redeliver the message to another client. The `ACK` frames sent by the client will be treated as a cumulative acknowledgment. This means the acknowledgment operates on the message specified in the ACK frame and all messages sent to the subscription before the ACK'ed message.
</div>

When the ack mode is `client-individual`, the acknowledgment operates just like the client acknowledgment mode except that the `ACK` or `NACK` frames sent by the client are not cumulative. This means that an `ACK` or `NACK` frame for a subsequent message MUST NOT cause a previous message to get acknowledged.

<div class='alert alert-info'>In case the client did not process some messages, it SHOULD send `NACK` frames to tell the server it did not consume these messages.</div>

In [None]:
c = stomp.Connection()

c.connect(username=NETWORK_RAIL_AUTH['username'],
           passcode=NETWORK_RAIL_AUTH['password'],
           wait=True)

c.subscribe('/topic/feed_topic', 'test-td', ack = 'client-individual')
sleep(x) # Listen for x number of seconds
c.disconnect()

### Begin and Disconnect
BEGIN is used to start a transaction. Transactions in this case apply to sending and acknowledging - any messages sent or acknowledged during a transaction will be processed automically based on the transaction.

```python
c = stomp.Connection()
c.subscribe()
c.begin()
sleep(x)
c.disconnect()
```

A client can disconnect from the server at anytime by closing the socket but there is no guarantee that the previously sent frames have been received by the server. Therefore, a graceful shutdown is required where the client is assured that all previous frames have been received by the server.

Fortuneately, Python `stomp.py` library supports graceful shutdown/disconnections through a receipt parameter (automatically generated if you don’t provide it). The connection is only dropped when the server sends back a response to that receipt.

## Connecting to Network Rail's Live Data Feed

First import the required libraries

In [42]:
import pandas as pd
import json
import stomp
from time import sleep

### Constructing a listener class

To receive messages back from the messaging system, you need to setup some sort of listener on your connection, and then subscribe to the destination

In [38]:
class ConnectionListener():
    def __init__(self, mq):
        self._mq = mq
        self.msg_list = []

    def on_message(self, headers, message):
        print(message)
        self.msg_list.append(message)
        self._mq.ack(id=headers['message-id'], subscription=headers['subscription'])

### Collecting the user credentials for authentication
To establish a connection we will need to provide the script with your credentials.

In [40]:
username = input('What is your account username? ')
password = input('What is your account password? ')
NETWORK_RAIL_AUTH = {'username': username, 
                     'password': password}

print('Authentication information saved as',
      f"username: {NETWORK_RAIL_AUTH['username']}",
      f"and password: {NETWORK_RAIL_AUTH['password']}")

What is your account username?admin
What is your account password?password
Authentication information saved as username: admin and password: password


<div class="alert alert-danger">
    <b>DANGER</b>: Be careful not to commit this notebook or script to any version control system with your credentials hardcoded. A more secure way of providing your credentials is by constructing an argument parser using Python's <a href = 'https://docs.python.org/3/library/argparse.html'>argparse module</a>.
    </div>

```python
# python
import argparse

ap = argparse.ArgumentParser()
ap.add_argument('-u', '--username', required=True, help='What is your account username?')
ap.add_argument('-p', '--password', required=True, help='What is your account password?')
NETWORK_RAIL_AUTH = vars(ap.parse_args())
```

```python
c = stomp.Connection()
c.connect(username=NETWORK_RAIL_AUTH['username'],
          passcode=NETWORK_RAIL_AUTH['password'],
          wait=True)
```

You can then run your programme using the follwing command once your argument parser is constructed.

```bash
# bash
your_script.py -u admin -p password
```

### Putting it all together
You can create a `listen` function as follows to listen to any datafeed topic.

In [1]:
def listen(secs):
    c = stomp.Connection(host_and_ports=[('datafeeds.networkrail.co.uk', 61618)],
                          keepalive=True,
                          vhost='datafeeds.networkrail.co.uk',
                          heartbeats=(10000, 5000))

    Listener_obj = ConnectionListener(c)
    c.set_listener('', Listener_obj)

    c.connect(username=NETWORK_RAIL_AUTH['username'],
               passcode=NETWORK_RAIL_AUTH['password'],
               wait=True)
    c.subscribe('/topic/TD_WESS_SIG_AREA', 'test-td', ack='client-individual')

    sleep(secs) # secs
    c.disconnect()

    output = Listener_obj.msg_list

    return output

Now let's listen to the feed for 20 seconds before disconnecting.

In [70]:
output = listen(20)

keepalive: unable to detect any implementation, DISABLED!


[{"CT_MSG":{"time":"1589335868000","area_id":"FH","msg_type":"CT","report_time":"0254"}},{"CT_MSG":{"time":"1589335868000","area_id":"BP","msg_type":"CT","report_time":"0217"}},{"CT_MSG":{"time":"1589335869000","area_id":"BS","msg_type":"CT","report_time":"2157"}}]
[{"CA_MSG":{"to":"0510","time":"1589335872000","area_id":"BE","msg_type":"CA","from":"0514","descr":"428I"}},{"CT_MSG":{"time":"1589335874000","area_id":"HT","msg_type":"CT","report_time":"0306"}},{"CT_MSG":{"time":"1589335876000","area_id":"EH","msg_type":"CT","report_time":"0311"}},{"SF_MSG":{"time":"1589335875000","area_id":"EH","address":"3F","msg_type":"SF","data":"00"}}]
[{"CT_MSG":{"time":"1589335876000","area_id":"SU","msg_type":"CT","report_time":"0310"}},{"SF_MSG":{"time":"1589335878000","area_id":"BE","address":"35","msg_type":"SF","data":"AF"}}]
[{"CA_MSG":{"to":"0962","time":"1589335885000","area_id":"SU","msg_type":"CA","from":"0964","descr":"662I"}}]


### Parsing the messages recieved
Messages received are in string format. Looking closely we can see the messages are sem-structured similar to JSON files.  We will need to first parse them into JSON objects then parse them into a pandas dataframe.

To start, we can use a nested for loop to achieve this. For each message recieved we will parse it into a JSON object, then convert the JSON object into a dataframe. We then transpose the dataframe before appending it to a list. Once all messages have been converted to a dataframe and appended in a list, we will use the `pd.concat` method to concatenate all the dataframes into a single dataframe.

In [116]:
df_list = []
for o in output:
    j = json.loads(o)
    for msg in j:
        df = pd.DataFrame(msg).T
        df_list.append(df)

pd.concat(df_list)

Unnamed: 0,area_id,msg_type,report_time,time,descr,from,to,address,data
CT_MSG,FH,CT,254.0,1589335868000,,,,,
CT_MSG,BP,CT,217.0,1589335868000,,,,,
CT_MSG,BS,CT,2157.0,1589335869000,,,,,
CA_MSG,BE,CA,,1589335872000,428I,514.0,510.0,,
CT_MSG,HT,CT,306.0,1589335874000,,,,,
CT_MSG,EH,CT,311.0,1589335876000,,,,,
SF_MSG,EH,SF,,1589335875000,,,,3F,00
CT_MSG,SU,CT,310.0,1589335876000,,,,,
SF_MSG,BE,SF,,1589335878000,,,,35,AF
CA_MSG,SU,CA,,1589335885000,662I,964.0,962.0,,


Whever possible, do not use loops - in particular nested loops as they will slow down your algorithms.
Looking closely, the above operation can be completed in one line of code using a list comprehension.

In [116]:
pd.concat([pd.DataFrame(msg).T for o in output for msg in json.loads(o)])

Unnamed: 0,area_id,msg_type,report_time,time,descr,from,to,address,data
CT_MSG,FH,CT,254.0,1589335868000,,,,,
CT_MSG,BP,CT,217.0,1589335868000,,,,,
CT_MSG,BS,CT,2157.0,1589335869000,,,,,
CA_MSG,BE,CA,,1589335872000,428I,514.0,510.0,,
CT_MSG,HT,CT,306.0,1589335874000,,,,,
CT_MSG,EH,CT,311.0,1589335876000,,,,,
SF_MSG,EH,SF,,1589335875000,,,,3F,00
CT_MSG,SU,CT,310.0,1589335876000,,,,,
SF_MSG,BE,SF,,1589335878000,,,,35,AF
CA_MSG,SU,CA,,1589335885000,662I,964.0,962.0,,


Let's put all of this parsing logic into a function for later use.

In [None]:
def parser(output):
    df_list = []
    for o in output:
        j = json.loads(o)
        for msg in j:
            df = pd.DataFrame(msg).T
            df_list.append(df)

    return pd.concat([pd.DataFrame(msg).T for o in output for msg in json.loads(o)])

In [115]:
msgs_df = parser(output)
msgs_df

Unnamed: 0,area_id,msg_type,report_time,time,descr,from,to,address,data
CT_MSG,FH,CT,254.0,1589335868000,,,,,
CT_MSG,BP,CT,217.0,1589335868000,,,,,
CT_MSG,BS,CT,2157.0,1589335869000,,,,,
CA_MSG,BE,CA,,1589335872000,428I,514.0,510.0,,
CT_MSG,HT,CT,306.0,1589335874000,,,,,
CT_MSG,EH,CT,311.0,1589335876000,,,,,
SF_MSG,EH,SF,,1589335875000,,,,3F,00
CT_MSG,SU,CT,310.0,1589335876000,,,,,
SF_MSG,BE,SF,,1589335878000,,,,35,AF
CA_MSG,SU,CA,,1589335885000,662I,964.0,962.0,,


We can filter out the null values via a simple boolean indexing operation `df[df.col.notnull()]`

In [79]:
ca_msgs = msgs_df[msgs_df.descr.notnull()]
ca_msgs

Unnamed: 0,area_id,msg_type,report_time,time,descr,from,to,address,data
CA_MSG,BE,CA,,1589335872000,428I,514,510,,
CA_MSG,SU,CA,,1589335885000,662I,964,962,,


# PROJECTS
Projects are designed to solidify your understanding of the content above. Solutions are not provided on purpose so you are encouraged to submit a pull request to contribute your solutions.

## PROJECT 1 - STOMP Command-line application
<div class="alert alert-success">
Write a command line application that you can use to connect to any of the Network Rail feeds and topics using the credentials which you provide via the terminal. <br/>
    
The application should accept any of the following commands via the terminal.    
    <ul>
        <li><b>-u, --username:</b> string - Required - help=account username</li>
        <li><b>-p, --password:</b> string - Required - help=account password</li>
        <li><b>-f, --feed:</b> string - Required - help=live feed to connect to</li>
        <li><b>-t, --topic:</b> string - Required - help= topic to subscribe to</li>
        <li><b>-pa, --parse:</b> bool - optional - help=parse messages into tabular format</li>
        <li><b>-v, --verbose:</b> bool - optional - help=print out messages as they are recieved</li>
        <li><b>-s, --save:</b> bool - optional - help=save messages into disk</li>
        <li><b>-fp, --filepath:</b> string - optional - help=filepath to save messages to</li>
        <li><b>-h, --help</b> - help=manual for user on how to use this cli</li>
    </ul>
</div>  

<div class="alert alert-success">
You should be able to run your application using the command below:  
</div>  

```bash
# bash
python my_script.py \ 
--username  username \
--password password \
--feed td \
--topic TD_WESS_SIG_AREA \
--verbose \
--parse \
--save \
--filepath data/
```

## PROJECT 2 - Insufficient Memory
<div class="alert alert-success">
Your computer memory keeps running out as you listen to the network rail live data feed for a long period of time. Can you implement an algorithm that would: <br/> 
   <ul>
  <li>save the messages received before the memory runs out </li>
  <li>clears the memory after each save to procide sufficient space for future messages </li>    
  </ul> 
    
For this project assume that after every 10 messages you computer memory runs out.
</div>  

In [16]:
# TODO - Your code below


## PROJECT 3 - Organised Storage
<div class="alert alert-success">
Your computer is aleady full of random files in your preferred storage location. If you save the messages without any naming convention, you may risk losing your files in the haystack.<br/> 
   <ol>
  <li>Can you save the messages received in separate files - based on the minute of day they've been receieved? </li>
  <li>Use the naming convention: <span style = 'color: red'>datafeed_topic_year_month_day_hour_minute</span></li>    
  </ol> 
    
Here is an example how your message folder should look like after running the algorithm for 15 minutes:

   <ul>
  <li>nr_td-wessex_24_05_20_13_00.csv</li>
  <li>nr_td-wessex_24_05_20_13_01.csv</li>   
  <li>nr_td-wessex_24_05_20_13_02.csv</li>   
  <li>nr_td-wessex_24_05_20_13_03.csv</li>
    </ul>
   <ul>
  <li>nr_td-wessex_24_05_20_13_13.csv</li>   
  <li>nr_td-wessex_24_05_20_13_14.csv</li>   
  <li>nr_td-wessex_24_05_20_13_15.csv</li>   
  </ul> 
    
</div>  

In [36]:
# TODO - Your code below

## PROJECT 4 - Parsing messages from other Network Rail feeds

<div class="alert alert-success">
Messages from one data feed and topic may not be too interesting to work with. You will want to listen and save messages from different topics and feeds then to conduct a good analysis.<br/> 
   
    

<div class='alert alert-info'>
    Information about the current Network Rail Feeds can be found 
    <a href='https://wiki.openraildata.com/index.php/About_the_Network_Rail_feeds'><b>here</b></a>
    </div>
    
Each message broker can broadcast several feeds. Several topics can be subscribed in each feed. Each feed can provide a header and body information in each message received.

   <ul>
  <li>Broker</li>
       <ul>
           <li>Feed</li>
            <ul>
                <li>Topic</li>
                <ul>
                    <li>Header</li>
                    <li>Message Body</li>
                </ul>
           </ul>
       </ul>
  </ul> 
    
</div>  

<div class="alert alert-success">
See if you can develop an algorithm that is capable of parsing and saving messages from other data feeds. Is it possible to join these messages for a richer understanding of the data received?
</div>


In [None]:
# TODO - Your code below
