# Homework 4 - More trains (Part I & II)

__Hand-in:__

- __Due: 12.05.2020 23:59:59 CET__
- `git push` your final verion to your group's Renku repository before the due
- check if `Dockerfile`, `environment.yml` and `requirements.txt` are properly written
- add necessary comments and discussion to make your codes readable

For this homework, you will be working with the real-time streams of the NS, the train company of the Netherlands. You can see an example webpage that uses the same streams to display the train information on a map: https://spoorkaart.mwnn.nl/ . 

To help you and avoid having too many connections to the NS streaming servers, we have setup a service that collects the streams and pushes them to our Kafka instance. The related topics are: 

`ndovloketnl-arrivals`: For each arrival of a train in a station, describe the previous and next station, time of arrival (planned and actual), track number,...

`ndovloketnl-departures`: For each departure of a train from a station, describe the previous and next station, time of departure (planned and actual), track number,...

`ndovloketnl-gps`: For each train, describe the current location, speed, bearing.

The events are serialized in JSON (actually converted from XML), with properties in their original language. Google translate could help you understand all of them, but we will provide you with some useful mappings.

---
## Create a Kafka client

In [None]:
import os
from pykafka import KafkaClient
from pykafka.common import OffsetType

username = os.environ['JUPYTERHUB_USER']

ZOOKEEPER_QUORUM = 'iccluster044.iccluster.epfl.ch:2181,'\
                   'iccluster054.iccluster.epfl.ch:2181,'\
                   'iccluster059.iccluster.epfl.ch:2181'

client = KafkaClient(zookeeper_hosts=ZOOKEEPER_QUORUM)

---

## Part I - Live Plot (20 points / 50)

The goal of this part is to obtain an interactive plot use the train positions from the GPS stream. We encourage you to use the examples from last week to achieve the expected result.

First, let's write a function to decode the messages from the `ndovloketnl-gps` topic.

In [None]:
import json
from pykafka.common import OffsetType

example_gps = client.topics[b'ndovloketnl-gps'].get_simple_consumer(
    auto_offset_reset=OffsetType.EARLIEST,
    reset_offset_on_start=True
).consume()
json.loads(example_gps.value)

We can see that the message has the following structure:

```
{
  'tns3:ArrayOfTreinLocation': {
    'tns3:TreinLocation': [
      <train_info_1>,
      <train_info_2>,
      ...
    ]
  }
}
```

With the `<train_info_x>` messages containing:
- `tns3:TreinNummer`: the train number. This number is used in passenger information displays.
- `tns3:MaterieelDeelNummer`: the train car number. It identifies the physical train car.
- `tns3:Materieelvolgnummer`: the car position. 1 is the car in front of the train, 2 the next one, etc.
- `tns3:GpsDatumTijd`: the datetime given by the GPS.
- `tns3:Latitude`, `tns3:Longitude`, `tns3:Elevation`: 3D coordinates given by the GPS.
- `tns3:Snelheid`: speed, most likely given by the GPS.
- `tns3:Richting`: heading, most likely given by the GPS.
- `tns3:AantalSatelieten`: number of GPS satellites in view.

We also notice that when a train is composed of multiple cars, the position is given in an array, with the position of all individual cars.

**Question I.a. (5/20)** Write a function `extract_gps_data` which takes the message as input and extracts the train number, train car and GPS data from the source messages. Using this function, you should be able to obtain the example table, or something similar:

| timestamp | train_number | car_number | car_position | longitude | latitude | elevation | heading | speed |
|:---------:|:------------:|:----------:|:------------:|:---------:|:--------:|:---------:|:-------:|:-----:|
|    ...    |      ...     |     ...    |      ...     |    ...    |    ...   |    ...    |   ...   |  ...  |

__Hints:__
- The messages can be occaionally are empty, for example, `tns3:ArrayOfTreinLocation` or `tns3:TreinLocation` can be empty.
- Not every message shares exactly the same structure, for example, `tns3:TreinMaterieelDelen` may be a list but not always
- You may find Python disctionary [get(key, default)](https://docs.python.org/3.7/library/stdtypes.html#dict.get) method helpful.

In [None]:
def extract_gps_data(msg):
    <--TODO-->

In [4]:
# Example results from "extract_gps_data"
import numpy as np
import pandas as pd

pd.DataFrame(
    data=extract_gps_data(json.loads(example_gps.value)),
    columns=['timestamp', 'train_number', 'car_number', 'car_position', 
             'longitude', 'latitude', 'elevation', 'heading', 'speed']
).head(n=20)

Unnamed: 0,timestamp,train_number,car_number,car_position,longitude,latitude,elevation,heading,speed
0,2020-04-17T10:54:45Z,2253,8658,1,4.0380105,51.4648525,0.0,0.0,0.0
1,2020-04-17T10:54:48Z,5451,2939,1,4.5396635994124,52.374485813855,0.0,268.0,39.6
2,2020-04-17T10:54:40Z,5451,2985,2,4.541686919749,52.374542255335,0.0,267.4,39.6
3,2020-04-17T10:54:48Z,5434,2980,1,4.7549210692524,52.384401828139,0.0,95.6,82.8
4,2020-04-17T10:54:44Z,5434,2994,2,4.7528178171193,52.384679626375,0.0,105.8,79.2
5,2020-04-17T10:54:39Z,3347,2333,1,4.8917345,52.6786076667,0.0,90.93,95.0
6,2020-04-17T10:54:42Z,3347,2745,1,4.80994833333,52.4445896667,0.0,160.43,70.0
7,2020-04-17T10:54:46Z,3347,2737,2,4.80987116667,52.4447323333,0.0,160.38,69.0
8,2020-04-17T10:54:40Z,8845,4246,1,4.86512666667,52.0707221667,0.0,94.53,54.0
9,2020-04-17T10:54:39Z,8845,4084,2,4.86335633333,52.0708353333,0.0,96.14,53.0


**Question I.b (15/20)** Make a live plot of the train positions.

You can do so by using `bokeh`; use last week's lab as an example.

See also: https://docs.bokeh.org/en/latest/docs/user_guide/geo.html#tile-provider-maps

You can compare your plot to one of the live services: https://spoorkaart.mwnn.nl/, http://treinenradar.nl/

__Q (1/15)__ To plot points with GPS location information on bokeh's map, we need a transoformer. What is the following transformer capable of? Check `bokeh`'s documenation on [Tile Provider Maps](https://docs.bokeh.org/en/latest/docs/user_guide/geo.html#tile-provider-maps).

In [None]:
from pyproj import Transformer
transformer = Transformer.from_proj("EPSG:4326", "EPSG:3857", always_xy=True)

__Answer:__ Write your answer here.

To perform a transfromation, you need to use the method `Transfromer.transform`, please check [here](https://pyproj4.github.io/pyproj/stable/api/transformer.html?highlight=transformer#pyproj.transformer.Transformer.transform).

__Q (14/15)__ Let's make the plot.

**Care should be taken for the following point:**
- We expect the train positions to fall on rail tracks on the map. Showing each train as a circle is good enough. Check [Scatter Markers](https://docs.bokeh.org/en/2.0.2/docs/user_guide/plotting.html?highlight=scatter#scatter-markers).
- One train may have many cars. You do not need to show every car on the map, please keep only car whose `car_position` equals to '1'.
- Provide an interactive label with the train number (we do not expect train type, as this needs to be recovered from other sources). Check [Hovertool](https://docs.bokeh.org/en/2.0.1/docs/user_guide/tools.html#hovertool).

**You can get bonus points if you make followings happen on your plot:**
- Trains on the map should not appear/disappear when data is absent for a few messages.
- Find a way to show where the train is heading.
- Add any other pieces of information that may be of interest to users.

In [None]:
from bokeh.io import push_notebook, show, output_notebook
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource, Range1d
from bokeh.tile_providers import get_provider, OSM

import time

output_notebook()

In [None]:
x = [] # longitude
y = [] # latitude
tn = [] # train_number
source = ColumnDataSource(<--TODO-->)

TOOLTIPS = [
    <--TODO-->
]

# create the map
tile_provider = get_provider(OSM)
p = figure(x_axis_type="mercator", y_axis_type="mercator", tooltips=TOOLTIPS)
p.add_tile(tile_provider)

# add circle points
p.circle('x', 'y', source=source, size=5, line_color="navy", fill_color="orange", alpha=0.8)

# make the plot centered at Amsterdam
x_min, y_min = transformer.transform(4.4, 52.2)
x_max, y_max = transformer.transform(5.4, 52.6)
p.x_range = Range1d(x_min, x_max)
p.y_range = Range1d(y_min, y_max)

t=show(p, notebook_handle=True)

Create a simple consumer for `ndovloketnl-gps`, which consumes the latest information from the stream.

In [None]:
consumer = <--TODO-->

Make the plot alive. You can refer the exercise of week 9 for an idea.

In [None]:
try:
    for message in consumer:
        if message is not None:
            
            <--TODO-->
            
            push_notebook(handle=t)
            time.sleep(0.1)
except:
    print("Plot interrupted.")

---

# Part II - Locate Message (10 points / 50)

After you finish this part, you are able to locate the message given a specific timestamp.

You can find below a helper function to read a message at a specific offset from a Kafka topic.

In [None]:
def fetch_message_at(topic, offset):
    if isinstance(topic, str):
        topic = topic.encode('utf-8')
    t = client.topics[topic]
    consumer = t.get_simple_consumer()
    p = list(consumer.partitions.values())[0]
    consumer.reset_offsets([(p,int(offset)-1)], )
    return consumer.consume()

In [None]:
msg = fetch_message_at(b'ndovloketnl-gps', 100)

In [None]:
msg.offset

In [None]:
msg.value

**Question II.a (5/10)** Write a function to extract the median timestamp from a message of the `ndovloketnl-gps` topic. You can reuse the `extract_gps_data` function from part I.

In [None]:
example_gps = client.topics[b'ndovloketnl-gps'].get_simple_consumer(
    auto_offset_reset=OffsetType.EARLIEST,
    reset_offset_on_start=True
).consume()

In [None]:
# Answer II.a
import pandas as pd
import numpy as np
def extract_gps_time_approx(msg):
    <--TODO-->

In [12]:
# Example results from `extract_gps_time_approx`
extract_gps_time_approx(json.loads(example_gps.value))

numpy.datetime64('2020-04-17T10:54:43.000000000')

**Question II.b (5/10)** Using `fetch_message_at` and `extract_gps_time_approx`, write a function named `search_gps` to find the "first" offset for a given timestamp in the `ndovloketnl-gps` topic. You function should use [Binary Search Algorithm](https://en.wikipedia.org/wiki/Binary_search_algorithm).

More preciseley, if we note `offset = search_gps(ts)` where `ts` is a timestamp, then we have:
```
ts <= extract_gps_time_approx(fetch_message_at('ndovloketnl-gps', offset))

extract_gps_time_approx(fetch_message_at('ndovloketnl-gps', offset - 1)) < ts
```

In [None]:
def search_gps(findTimeStr):
    findTime = findTimeStr.to_datetime64()
    <--TODO-->

In [14]:
# Example results from `search_gps`
search_gps(pd.Timestamp('2020-04-23'))

4763

In [15]:
# Verify that offset returned above returns a timestamp on or after 2020-04-23 (replace <--OFFSET-->)
extract_gps_time_approx(json.loads(fetch_message_at('ndovloketnl-gps', <--OFFSET-->).value))

numpy.datetime64('2020-04-23T00:00:04.000000000')

In [16]:
# Verify that offset returned above returns a timestamp before 2020-04-23 (replace <--OFFSET-->)
extract_gps_time_approx(json.loads(fetch_message_at('ndovloketnl-gps', <--OFFSET--> - 1).value))

numpy.datetime64('2020-04-22T23:59:54.000000000')