# DSLab Homework4 - More trains (PART I & II)

## Hand-in Instructions:
- __Due: 11.05.2021 23:59:59 CET__
- your project must be private
- git push your final verion to the master branch of your group's Renku repository before the due date
- check if Dockerfile, environment.yml and requirements.txt are properly written
- add necessary comments and discussion to make your codes readable

## NS Streams
For this homework, you will be working with the real-time streams of the NS, the train company of the Netherlands. You can see an example webpage that uses the same streams to display the train information on a map: https://spoorkaart.mwnn.nl/ . 

To help you and avoid having too many connections to the NS streaming servers, we have setup a service that collects the streams and pushes them to our Kafka instance. The related topics are: 

`ndovloketnl-arrivals`: For each arrival of a train in a station, describe the previous and next station, time of arrival (planned and actual), track number,...

`ndovloketnl-departures`: For each departure of a train from a station, describe the previous and next station, time of departure (planned and actual), track number,...

`ndovloketnl-gps`: For each train, describe the current location, speed, bearing.

The events are serialized in JSON (actually converted from XML), with properties in their original language. Google translate could help you understand all of them, but we will provide you with some useful mappings.

---

**Part I & II are in ipython kernel**

In [3]:
ipython = get_ipython()
print('Current kernel: {}'.format(ipython.kernel.kernel_info['implementation']))

Current kernel: ipython


---

## Create a Kafka client

In [4]:
import os
from pykafka import KafkaClient
from pykafka.common import OffsetType

username = os.environ['JUPYTERHUB_USER']

ZOOKEEPER_QUORUM = 'iccluster040.iccluster.epfl.ch:2181,' \
                   'iccluster064.iccluster.epfl.ch:2181,' \
                   'iccluster065.iccluster.epfl.ch:2181'

client = KafkaClient(zookeeper_hosts=ZOOKEEPER_QUORUM)

---

## PART I - Live Plot (20 points)

The goal of this part is to obtain an interactive plot use the train positions from the GPS stream.

First, let's write a function to decode the messages from the `ndovloketnl-gps` topic.

In [None]:
import json
from pykafka.common import OffsetType

example_gps = client.topics[b'ndovloketnl-gps'].get_simple_consumer(
    auto_offset_reset=OffsetType.EARLIEST,
    reset_offset_on_start=True
).consume()

print(json.dumps(json.loads(example_gps.value), indent=2))

#### We can see that the message has the following structure:

```
{
  'tns3:ArrayOfTreinLocation': {
    'tns3:TreinLocation': [
      <train_info_1>,
      <train_info_2>,
      ...
    ]
  }
}
```

Each `<train_info_x>` message contains:
- `tns3:TreinNummer`: the train number. This number is used in passenger information displays.
- `tns3:TreinMaterieelDelen`:
    - `tns3:MaterieelDeelNummer`: the train car number. It identifies the physical train car.
    - `tns3:Materieelvolgnummer`: the car position. 1 is the car in front of the train, 2 the next one, etc.
    - `tns3:GpsDatumTijd`: the datetime given by the GPS.
    - `tns3:Latitude`, `tns3:Longitude`, `tns3:Elevation`: 3D coordinates given by the GPS.
    - `tns3:Snelheid`: speed, most likely given by the GPS.
    - `tns3:Richting`: heading, most likely given by the GPS.
    - `tns3:AantalSatelieten`: number of GPS satellites in view.
    - ...

We also notice that when a train is composed of multiple cars, the position is given in an array, with the position of all individual cars.

### a) Extract data - 5/20

Write a function `extract_gps_data` which takes the message as input and extracts the train number, train car and GPS data from the source messages. Using this function, you should be able to obtain the example table, or something similar:

|            timestamp | train_number | car_number | car_position |       longitude |        latitude | elevation | heading | speed |
|---------------------:|-------------:|-----------:|-------------:|----------------:|----------------:|----------:|--------:|------:|
| 2021-04-26T11:18:38Z | 4651         | 2414       | 1            | 4.4337813744686 | 52.126090732796 | 0.0       | 0.0     | 0     |
| 2021-04-26T11:18:29Z | 646          | 4029       | 1            | 6.13383283333   | 52.788337       | 0.0       | 104.83  | 103.0 |
| 2021-04-26T11:18:29Z | 5747         | 2628       | 1            | 4.8238861121011 | 52.338504198172 | 0.0       | 90.5    | 126.0 |
| 2021-04-26T11:18:19Z | 5747         | 2430       | 2            | 4.8168466316014 | 52.338447739203 | 0.0       | 85.8    | 118.8 |


__Note:__
- The messages can be occaionally are empty, for example, `tns3:ArrayOfTreinLocation` or `tns3:TreinLocation` can be empty.
- Not every message shares exactly the same structure, for example, `tns3:TreinMaterieelDelen` may be a list but not always
- You may find Python disctionary [get(key, default)](https://docs.python.org/3.7/library/stdtypes.html#dict.get) method helpful.

In [5]:
import numpy as np
def extract_gps_data(msg):
    trains = msg.get('tns3:ArrayOfTreinLocation', {}).get('tns3:TreinLocation', [])
    
    res = []
    
    for train in trains:
        
        train_number = train.get('tns3:TreinNummer')

        train_data = train.get('tns3:TreinMaterieelDelen', {})
        
        if type(train_data) == list:
            
            for car_data in train_data:
                
                timestamp = car_data.get('tns3:GpsDatumTijd')
                car_number = car_data.get('tns3:MaterieelDeelNummer')
                car_position = car_data.get('tns3:Materieelvolgnummer')
                longitude = car_data.get('tns3:Longitude')
                latitude = car_data.get('tns3:Latitude')
                elevation = car_data.get('tns3:Elevation')
                heading = car_data.get('tns3:Richting')
                speed = car_data.get('tns3:Snelheid')
                
                res.append([timestamp, train_number, car_number, car_position, longitude, latitude, elevation, heading, speed])

        else:
            
            timestamp = train_data.get('tns3:GpsDatumTijd')
            car_number = train_data.get('tns3:MaterieelDeelNummer')
            car_position = train_data.get('tns3:Materieelvolgnummer')
            longitude = train_data.get('tns3:Longitude')
            latitude = train_data.get('tns3:Latitude')
            elevation = train_data.get('tns3:Elevation')
            heading = train_data.get('tns3:Richting')
            speed = train_data.get('tns3:Snelheid')

            res.append([timestamp, train_number, car_number, car_position, longitude, latitude, elevation, heading, speed])
    
    if not len(res):
        pass
    
    return res

In [None]:
import pandas as pd
df_example = pd.DataFrame(
    data = extract_gps_data(json.loads(example_gps.value)),
    columns = ['timestamp', 'train_number', 'car_number', 'car_position', 
               'longitude', 'latitude', 'elevation', 'heading', 'speed']
)
df_example.head(5)

### b) Trains on the map - 5/20

Each row of `df_example` represants one car of one train in the real world. 

Use `plotly` to properly visualize trains in the `df_example` on a map. Set `title` as the median timestamp and `hovername` as the train number.

**Note:**
- We expect the train positions to fall on rail tracks on the map. Showing each train as a circle is good enough. Check [Scatter Plots on Mapbox](https://plotly.com/python/scattermapbox/).
- One train may have many cars. You do not need to show every car on the map, please keep only cars with `car_position` equal to '1'.
- Set an interactive label with the train number (we do not expect train type, as this needs to be recovered from other sources).

In [None]:
df_example['latitude'] = df_example.latitude.astype(float)
df_example['longitude'] = df_example.longitude.astype(float)
df_example['timestamp'] = pd.to_datetime(df_example.timestamp)

In [None]:
import plotly.express as px

In [None]:
static_fig = px.scatter_mapbox(
    data_frame=df_example[df_example.car_position == '1'],
    mapbox_style='open-street-map',
    lat="latitude",
    lon="longitude",
    title=str(df_example.timestamp.median()).split('.')[0],
    hover_name='train_number',
    height=900
)

In [None]:
static_fig

### c) Trains on the move - 10/20

From the static map above, use `plotly` to make a live plot of the train positions consuming the `ndovloketnl-gps` stream.

Upon receving a new message, you need to:

- Update train locations
- Update hover information
- Update title

You can compare your plot to one of the live services: https://spoorkaart.mwnn.nl/, http://treinenradar.nl/

Create a simple consumer for `ndovloketnl-gps`, which consumes the earliest/latest information from the stream.

In [None]:
consumer = client.topics[b'ndovloketnl-gps'].get_simple_consumer(
    #auto_offset_reset=OffsetType.LATEST,
    auto_offset_reset = OffsetType.EARLIEST,
    reset_offset_on_start=True
)

In [None]:
import plotly.graph_objects as go

stream_fig = go.FigureWidget(static_fig)
stream_data = stream_fig.data[0]
stream_fig

Make the plot alive. You can refer the exercise for an idea.

In [None]:
import time
try:
    for message in consumer:
        if message is not None:
            # TODO: parse the message and update the figure
            df = pd.DataFrame(
                data = extract_gps_data(json.loads(message.value)),
                columns = ['timestamp', 'train_number', 'car_number', 'car_position', 
                           'longitude', 'latitude', 'elevation', 'heading', 'speed']
            )
            
            df = df[df.car_position == '1']
            
            df['latitude'] = df.latitude.astype(float)
            df['longitude'] = df.longitude.astype(float)
            df['timestamp'] = pd.to_datetime(df.timestamp)
            
            with stream_fig.batch_update():
                stream_data.lon = df.longitude
                stream_data.lat = df.latitude
                stream_data.hovertext = df.train_number
                stream_fig.update_layout(title_text=str(df.timestamp.mean()).split('.')[0])
            
        # sleep
        time.sleep(0.1)
except KeyboardInterrupt:
    print("Plot interrupted.")

---

## PART II - Locate Message (10 points)

After you finish this part, you are able to locate the message given a specific timestamp.

You can find below a helper function to read a message at a specific offset from a Kafka topic.

In [6]:
def fetch_message_at(topic, offset):
    if isinstance(topic, str):
        topic = topic.encode('utf-8')
    t = client.topics[topic]
    consumer = t.get_simple_consumer()
    p = list(consumer.partitions.values())[0]
    print(len)
    consumer.reset_offsets([(p,int(offset)-1)], )
    return consumer.consume()

In [7]:
msg = fetch_message_at(b'ndovloketnl-gps', 5)

In [27]:
OffsetType.LATEST

-1

In [9]:
msg.value

b'{"tns3:ArrayOfTreinLocation": {"@xmlns:tns3": "http://schemas.datacontract.org/2004/07/Cognos.Infrastructure.Models", "tns3:TreinLocation": [{"tns3:TreinNummer": "4651", "tns3:TreinMaterieelDelen": {"tns3:MaterieelDeelNummer": "2414", "tns3:Materieelvolgnummer": "1", "tns3:GpsDatumTijd": "2021-04-26T11:19:18Z", "tns3:Orientatie": "0", "tns3:Bron": "NTT", "tns3:Fix": "1", "tns3:Berichttype": null, "tns3:Longitude": "4.4337830827578", "tns3:Latitude": "52.126099645235", "tns3:Elevation": "0.0", "tns3:Snelheid": "0", "tns3:Richting": "0.0", "tns3:Hdop": "4.72", "tns3:AantalSatelieten": "0"}}, {"tns3:TreinNummer": "646", "tns3:TreinMaterieelDelen": {"tns3:MaterieelDeelNummer": "4029", "tns3:Materieelvolgnummer": "1", "tns3:GpsDatumTijd": "2021-04-26T11:19:19Z", "tns3:Orientatie": "0", "tns3:Bron": "NTT", "tns3:Fix": "1", "tns3:Berichttype": null, "tns3:Longitude": "6.15785933333", "tns3:Latitude": "52.7836878333", "tns3:Elevation": "0.0", "tns3:Snelheid": "131.0", "tns3:Richting": "127.8

### a) Median timestamp - 5/10

Write a function to extract the median timestamp from a message of the `ndovloketnl-gps` topic. You can reuse the `extract_gps_data` function from part I.

In [None]:
example_gps = client.topics[b'ndovloketnl-gps'].get_simple_consumer(
    auto_offset_reset=OffsetType.EARLIEST,
    reset_offset_on_start=True
).consume()

In [13]:
client.topics[b'ndovloketnl-gps'].get_simple_consumer(
    auto_offset_reset=OffsetType.EARLIEST,
    reset_offset_on_start=True
).held_offsets

{0: -2}

In [25]:
fetch_message_at('ndovloketnl-gps', -1).offset

0

In [None]:
import pandas as pd
import numpy as np
def extract_gps_time_approx(msg):
    info_list = extract_gps_data(msg)
    timestamp_series = pd.to_datetime(pd.Series([x[0][:-1] for x in info_list]))
    return np.datetime64(timestamp_series.median())

In [None]:
assert extract_gps_time_approx(json.loads(example_gps.value)) == np.datetime64('2021-04-26T11:18:36.200000000')

### b) Binary search - 5/10

Using `fetch_message_at` and `extract_gps_time_approx`, write a function named `search_gps` to find the "first" offset for a given timestamp in the `ndovloketnl-gps` topic. You function should use [Binary Search Algorithm](https://en.wikipedia.org/wiki/Binary_search_algorithm).

More preciseley, if we note `offset = search_gps(ts)` where `ts` is a timestamp, then we have:
```
ts <= extract_gps_time_approx(fetch_message_at('ndovloketnl-gps', offset))

extract_gps_time_approx(fetch_message_at('ndovloketnl-gps', offset - 1)) < ts
```

In [None]:
def search_gps(findTime)
    timestamp = np.datetime64(findTime)
    start = OffsetType.EARLIEST
    end = OffsetType.LATEST
    while start < end:
        midpoint = start + (end - start) // 2
        midpoint_ = 
    
    return start

In [None]:
def test_search_gps(tsStr):
    ts = np.datetime64(tsStr)
    offset = search_gps(ts)
    ts_after_offset = extract_gps_time_approx(json.loads(fetch_message_at('ndovloketnl-gps', offset).value))
    ts_before_offset = extract_gps_time_approx(json.loads(fetch_message_at('ndovloketnl-gps', offset - 1).value))
    assert ts_before_offset < ts <= ts_after_offset

In [None]:
test_search_gps('2021-04-26 12:00:00')

In [None]:
test_search_gps('2021-04-26 15:29:56')

In [None]:
test_search_gps('2021-04-26 17:00:06')