---<br>
jupyter:<br>
  jupytext:<br>
    text_representation:<br>
      extension: .py<br>
      format_name: percent<br>
      format_version: '1.3'<br>
      jupytext_version: 1.13.7<br>
  kernelspec:<br>
    display_name: Python 3 (ipykernel)<br>
    language: python<br>
    name: python3<br>
---

%% [markdown]<br>
# Homework 2 - Data Wrangling with Hadoop<br>
<br>
The goal of this assignment is to put into action the data wrangling techniques from the exercises of week-3 and week-4. We highly suggest you to finish these two exercises first and then start the homework. In this homework, we are going to reuse the same __sbb__ and __twitter__ datasets as seen before during these two weeks. <br>
<br>
## Hand-in Instructions<br>
- __Due: 05.04.2022 23:59 CET__<br>
- Fork this project as a private group project<br>
- Verify that all your team members are listed as group members of the project<br>
- `git push` your final verion to your group's Renku repository before the due date<br>
- Verify that `Dockerfile`, `environment.yml` and `requirements.txt` are properly written and notebook is functional<br>
- Add necessary comments and discussion to make your queries readable<br>
<br>
## Hive Documentation<br>
<br>
Hive queries: <https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select><br>
<br>
Hive functions: <https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF>

%% [markdown]<br>
<div style="font-size: 100%" class="alert alert-block alert-warning"><br>
    <b>Get yourself ready:</b> <br>
    <br><br>
    Before you jump into the questions, please first go through the notebook <a href='./prepare_env.ipynb'>prepare_env.ipynb</a> and make sure that your environment is properly set up.<br>
    <br><br><br>
    <b>Cluster Usage:</b><br>
    <br><br>
    As there are many of you working with the cluster, we encourage you to prototype your queries on small data samples before running them on whole datasets.<br>
    <br><br><br>
    <b>Try to use as much HiveQL as possible and avoid using pandas operations. Also, whenever possible, try to apply the methods you learned in class to optimize your queries to minimize the use of computing resources.</b><br>
</div>

%% [markdown]<br>
## Part I: SBB/CFF/FFS Data (40 Points)<br>
<br>
Data source: <https://opentransportdata.swiss/en/dataset/istdaten><br>
<br>
In this part, you will leverage Hive to perform exploratory analysis of data published by the [Open Data Platform Swiss Public Transport](https://opentransportdata.swiss).<br>
<br>
Format: the dataset is originally presented as a collection of textfiles with fields separated by ';' (semi-colon). For efficiency, the textfiles have been converted into Optimized Row Columnar ([_ORC_](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC)) file format. <br>
<br>
Location: you can find the data in ORC format on HDFS at the path `/data/sbb/part_orc/istdaten`.<br>
<br>
The full description from opentransportdata.swiss can be found in <https://opentransportdata.swiss/de/cookbook/ist-daten/> in four languages. There may be inconsistencies or missing information between the translations.. In that case we suggest you rely on the German version and use an automated translator when necessary. We will clarify if there is still anything unclear in class and Slack. Here we remind you the relevant column descriptions:<br>
<br>
- `BETRIEBSTAG`: date of the trip<br>
- `FAHRT_BEZEICHNER`: identifies the trip<br>
- `BETREIBER_ABK`, `BETREIBER_NAME`: operator (name will contain the full name, e.g. Schweizerische Bundesbahnen for SBB)<br>
- `PRODUKT_ID`: type of transport, e.g. train, bus<br>
- `LINIEN_ID`: for trains, this is the train number<br>
- `LINIEN_TEXT`,`VERKEHRSMITTEL_TEXT`: for trains, the service type (IC, IR, RE, etc.)<br>
- `ZUSATZFAHRT_TF`: boolean, true if this is an additional trip (not part of the regular schedule)<br>
- `FAELLT_AUS_TF`: boolean, true if this trip failed (cancelled or not completed)<br>
- `HALTESTELLEN_NAME`: name of the stop<br>
- `ANKUNFTSZEIT`: arrival time at the stop according to schedule<br>
- `AN_PROGNOSE`: actual arrival time<br>
- `AN_PROGNOSE_STATUS`: show how the actual arrival time is calcluated<br>
- `ABFAHRTSZEIT`: departure time at the stop according to schedule<br>
- `AB_PROGNOSE`: actual departure time<br>
- `AB_PROGNOSE_STATUS`: show how the actual departure time is calcluated<br>
- `DURCHFAHRT_TF`: boolean, true if the transport does not stop there<br>
<br>
Each line of the file represents a stop and contains arrival and departure times. When the stop is the start or end of a journey, the corresponding columns will be empty (`ANKUNFTSZEIT`/`ABFAHRTSZEIT`).<br>
<br>
In some cases, the actual times were not measured so the `AN_PROGNOSE_STATUS`/`AB_PROGNOSE_STATUS` will be empty or set to `PROGNOSE` and `AN_PROGNOSE`/`AB_PROGNOSE` will be empty.

%% [markdown]<br>
__Initialization__

%%

In [None]:
import os
import pandas as pd
pd.set_option("display.max_columns", 50)
import matplotlib.pyplot as plt
# %matplotlib inline
import plotly.express as px
import plotly.graph_objects as go
import warnings
warnings.simplefilter(action='ignore', category=UserWarning)

In [None]:
username = os.environ['RENKU_USERNAME']
hiveaddr = os.environ['HIVE_SERVER2']
(hivehost,hiveport) = hiveaddr.split(':')
print("Operating as: {0}".format(username))

%%

In [None]:
from pyhive import hive

create connection

In [None]:
conn = hive.connect(host=hivehost, 
                    port=hiveport,
                    username=username) 
# create cursor
cur = conn.cursor()

%% [markdown]<br>
### a) Prepare the table - 5/40<br>
<br>
Complete the code in the cell below, replace all `TODO` in order to create a Hive Table of SBB Istadaten.<br>
<br>
The table has the following properties:<br>
<br>
* The table is in your database, which must have the same name as your gaspar ID<br>
* The table name is `sbb_orc`<br>
* The table must be external<br>
* The table content consist of ORC files in the HDFS folder `/data/sbb/part_orc/istdaten`<br>
* The table is _partitioned_, and the number of partitions should not exceed 50<br>


%%<br>
# Create your database if it does not exist

query = 
<br>
CREATE DATABASE IF NOT EXISTS {0} LOCATION '/user/{0}/hive'<br>
format(username)

In [None]:
cur.execute(query)

%%<br>
# Make your database the default

query = 
<br>
USE {0}<br>
format(username)

In [None]:
cur.execute(query)

%%

query = 
<br>
DROP TABLE IF EXISTS {0}.sbb_orc<br>
format(username)

In [None]:
cur.execute(query)

%%

query = 
<br>
CREATE EXTERNAL TABLE {0}.sbb_orc<br>
   # TODO<br>
format(username)

In [None]:
cur.execute(query)

%%

query = 
<br>
   # TODO<br>
format(username)

In [None]:
cur.execute(query)

%% [markdown]<br>
**Checkpoint**<br>
<br>
Run the cells below and verify that your table satisfies all the required properties

%%

query = 
<br>
DESCRIBE {0}.sbb_orc<br>
format(username)

In [None]:
cur.execute(query)
cur.fetchall()

%%

query = 
<br>
SHOW PARTITIONS {0}.sbb_orc<br>
format(username)

In [None]:
cur.execute(query)
cur.fetchall()

%% [markdown]<br>
### b) Type of transport - 5/40<br>
<br>
In the exercise of week-3, you have already explored the stop distribution of different types of transport on a small data set. Now, let's do the same for a full two years worth of data.<br>
<br>
- Query `sbb_orc` to get the total number of stops for different types of transport in each month of 2019 and 2020, and order it by time and type of transport.<br>
|month_year|ttype|stops|<br>
|---|---|---|<br>
|...|...|...|<br>
- Use `plotly` to create a facet bar chart partitioned by the type of transportation. <br>
- Document any patterns or abnormalities you can find.<br>
<br>
__Note__: <br>
- In general, one entry in the `sbb_orc` table means one stop.<br>
- You might need to filter out the rows where:<br>
    - `BETRIEBSTAG` is not in the format of `__.__.____`<br>
    - `PRODUKT_ID` is NULL or empty<br>
- Facet plot with plotly: https://plotly.com/python/facet-plots/

%%<br>
You may need more than one query, do not hesitate to create as many as you need.<br>
# query = 
<br>
#<br>
#     /* TODO */<br>
#<br>

<br>
cur.execute(query, conn)

query = 
<br>
    # TODO<br>
    <br>
format(username)

In [None]:
df_ttype = pd.read_sql(query, conn)

%%

In [None]:
fig = px.bar(
    df_ttype,
    
    # TODO
)

TODO: make your figure more readable

In [None]:
fig.show()

%% [markdown]<br>
### c) Schedule - 10/40<br>
<br>
- Select a any day on a typical week day (not Saturday, not Sunday, not a bank holiday) from `sbb_orc`. Query the table for that one-day and get the set of IC (`VERKEHRSMITTEL_TEXT`) trains you can take to go (without connections) from Gen猫ve to Lausanne on that day. <br>
- Display the train number (`LINIEN_ID`) as well as the schedule (arrival and departure time) of the trains.<br>
<br>
|train_number|departure|arrival|<br>
|---|---|---|<br>
|...|...|...|<br>
<br>
__Note:__ <br>
- The schedule of IC from Gen猫ve to Lausanne has not changed for the past few years. You can use the advanced search of SBB's website to check your answer.<br>
- Do not hesitate to create intermediary tables or views (see [_CTAS_](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-CreateTableAsSelect(CTAS)))<br>
- You might need to add filters on these flags: `ZUSATZFAHRT_TF`, `FAELLT_AUS_TF`, `DURCHFAHRT_TF` <br>
- Functions that could be useful: `unix_timestamp`, `to_utc_timestamp`, `date_format`.

%%<br>
You may need more than one query, do not hesitate to create more

# query = 
<br>
#<br>
#     TODO<br>
#<br>

<br>
cur.execute(query, conn)

query = 
<br>
    <br>
    # TODO<br>
    <br>
format(username)

In [None]:
pd.read_sql(query, conn)

%%

%%

%%

%% [markdown]<br>
### d) Delay percentiles - 10/40<br>
<br>
- Query `sbb_orc` to compute the 50th and 75th percentiles of __arrival__ delays for IC 702, 704, ..., 728, 730 (15 trains total) at Gen猫ve main station. <br>
- Use `plotly` to plot your results in a proper way. <br>
- Which trains are the most disrupted? Can you find the tendency and interpret?<br>
<br>
__Note:__<br>
- Do not hesitate to create intermediary tables. <br>
- When the train is ahead of schedule, count this as a delay of 0.<br>
- Use only stops with `AN_PROGNOSE_STATUS` equal to __REAL__ or __GESCHAETZT__.<br>
- Functions that may be useful: `unix_timestamp`, `percentile_approx`, `if`

%%<br>
You may need more than one query, do not hesitate to create more

# query = 
<br>
#<br>
#     TODO<br>
#<br>

<br>
cur.execute(query, conn)

query = 
<br>
    # TODO<br>
    <br>
format(username)

In [None]:
df_delays_ic_gen = pd.read_sql(query, conn)

%%

%%

In [None]:
fig = px.bar(
    df_delays_ic_gen, 
    
    # TODO
)

TODO: make your figure more readable

In [None]:
fig.show()

%% [markdown]<br>
### e) Delay heatmap 10/40<br>
<br>
- For each week (1 to 52) of each year from 2019 to 2021, query `sbb_orc` to compute the median of delays of all trains __departing__ from any train stations in Z眉rich area during that week. <br>
- Use `plotly` to draw a heatmap year x week (year columns x week rows) of the median delays. <br>
- In which weeks were the trains delayed the most/least? Can you explain the results?<br>
<br>
__Note:__<br>
- Do not hesitate to create intermediary tables. <br>
- When the train is ahead of schedule, count this as a delay of 0 (no negative delays).<br>
- Use only stops with `AB_PROGNOSE_STATUS` equal to __REAL__ or __GESCHAETZT__.<br>
- For simplicty, a train station in Z眉rich area <=> it's a train station & its `HALTESTELLEN_NAME` starts with __Z眉rich__.<br>
- Heatmap with `plotly`: https://plotly.com/python/heatmaps/<br>
- Functions that may be useful: `unix_timestamp`, `from_unixtime`, `weekofyear`, `percentile_approx`, `if`

%%<br>
You may need more than one query, do not hesitate to create more

# query = 
<br>
#<br>
#     TODO<br>
#<br>

<br>
cur.execute(query, conn)

query = 
<br>
    # TODO<br>
format(username)

In [None]:
df_delays_zurich = pd.read_sql(query, conn)

%%

%%

%%

In [None]:
fig = px.imshow(
    df_delays_zurich,
    
    # TODO
    
)

TODO: make your figure more readable

In [None]:
fig.show()

%% [markdown]<br>
## Part II: Twitter Data (20 Points)<br>
<br>
Data source: https://archive.org/details/twitterstream?sort=-publicdate <br>
<br>
In this part, you will leverage Hive to extract the hashtags from the source data, and then perform light exploration of the prepared data. <br>
<br>
### Dataset Description <br>
<br>
Format: the dataset is presented as a collection of textfiles containing one JSON document per line. The data is organized in a hierarchy of folders, with one file per minute. The textfiles have been compressed using bzip2. In this part, we will mainly focus on __2016 twitter data__.<br>
<br>
Location: you can find the data on HDFS at the path `/data/twitter/json/year={year}/month={month}/day={day}/{hour}/{minute}.json.bz2`. <br>
<br>
Relevant fields: <br>
- `created_at`, `timestamp_ms`: The first is a human-readable string representation of when the tweet was posted. The latter represents the same instant as a timestamp since UNIX epoch.<br>
- `lang`: the language of the tweet content <br>
- `entities`: parsed entities from the tweet, e.g. hashtags, user mentions, URLs.<br>
- In this repository, you can find [a tweet example](../data/tweet-example.json).<br>
<br>
Note:  Pay attention to the time units! and check the Date-Time functions in the Hive [_UDF_](https://cwiki.apache.org/confluence/display/hive/Languagemanual+udf#LanguageManualUDF-DateFunctions) documentation.

%% [markdown]<br>
<div style="font-size: 100%" class="alert alert-block alert-danger"><br>
    <b>Disclaimer</b><br>
    <br><br>
    This dataset contains unfiltered data from Twitter. As such, you may be exposed to tweets/hashtags containing vulgarities, references to sexual acts, drug usage, etc.<br>
    </div>

%% [markdown]<br>
### a) JsonSerDe - 4/20<br>
<br>
In the exercise of week 4, you have already seen how to use the [SerDe framework](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-RowFormats&SerDe) to extract JSON fields from raw text format. <br>
<br>
In this question, please use SerDe to create an <font color="red" size="3px">EXTERNAL</font> table with __one day__ (e.g. 01.07.2016) of twitter data. You only need to extract three columns: `timestamp_ms`, `lang` and `entities`(with the field `hashtags` only) with following schema (you need to figure out what to fill in `TODO`):<br>
```<br>
timestamp_ms string,<br>
lang         string,<br>
entities     struct<hashtags:array<...<text:..., indices:...>>><br>
```<br>
<br>
The table you create should be similar to:<br>
<br>
| timestamp_ms | lang | entities |<br>
|---|---|---|<br>
| 1234567890001 | en | {"hashtags":[]} |<br>
| 1234567890002 | fr | {"hashtags":[{"text":"hashtag1","indices":[10]}]} |<br>
| 1234567890002 | jp | {"hashtags":[{"text":"hashtag1","indices":[14,23]}, {"text":"hashtag2","indices":[45]}]} |<br>
<br>
__Note:__<br>
   - JsonSerDe: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-RowFormats&SerDe<br>
   - Hive data types: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types#LanguageManualTypes<br>
   - Hive complex types: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types#LanguageManualTypes-ComplexTypes

%%

query=
<br>
    DROP TABLE IF EXISTS {0}.twitter_hashtags<br>
format(username)

In [None]:
cur.execute(query)

query=
<br>
    CREATE EXTERNAL TABLE {0}.twitter_hashtags<br>
        <br>
    # TODO<br>
format(username)

In [None]:
cur.execute(query)

%%

query=
<br>
    # TODO<br>
format(username)

In [None]:
cur.execute(query)

%%

query=
<br>
    SELECT<br>
    # TODO<br>
    <br>
    FROM {0}.twitter_hashtags<br>
    <br>
    WHERE<br>
    # TODO<br>
    <br>
    LIMIT 10<br>
format(username)

In [None]:
pd.read_sql(query, conn)

%% [markdown]<br>
### b) Explosion - 4/20<br>
<br>
In a), you created a table where each row could contain a list of multiple hashtags. Create another table **containing one day of data only** by normalizing the table obtained from the previous step. This means that each row should contain exactly one hashtag. Include `timestamp_ms` and `lang` in the resulting table, as shown below.<br>
<br>
| timestamp_ms | lang | hashtag |<br>
|---|---|---|<br>
| 1234567890001 | es | hashtag1 |<br>
| 1234567890001 | es | hashtag2 |<br>
| 1234567890002 | en | hashtag2 |<br>
| 1234567890003 | zh | hashtag3 |<br>
<br>
__Note:__<br>
   - `LateralView`: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView<br>
   - `explode` function: <https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-explode>

%%

query=
<br>
    DROP TABLE IF EXISTS {0}.twitter_hashtags_norm<br>
format(username)

In [None]:
cur.execute(query)

query=
<br>
    CREATE TABLE IF NOT EXISTS {0}.twitter_hashtags_norm<br>
    STORED AS ORC<br>
    AS<br>
        <br>
        # TODO<br>
        <br>
format(username)

In [None]:
cur.execute(query)

%%

query=
<br>
    SELECT * FROM {0}.twitter_hashtags_norm LIMIT 10<br>
format(username)

In [None]:
pd.read_sql(query, conn)

%% [markdown]<br>
### c) Hashtags - 8/20<br>
<br>
Query the normailized table you obtained in b). Create a table of the top 20 most mentioned hashtags with the contribution of each language. And, for each hashtag, order languages by their contributions. You should have a table similar to:<br>
<br>
|hashtag|lang|lang_count|total_count|<br>
|---|---|---|---|<br>
|hashtag_1|en|2000|3500|<br>
|hashtag_1|fr|1000|3500|<br>
|hashtag_1|jp|500|3500|<br>
|hashtag_2|te|500|500|<br>
<br>
Use `plotly` to create a stacked bar chart to show the results.<br>
<br>
__Note:__ to properly order the bars, you may need:<br>
```python<br>
fig.update_layout(xaxis_categoryorder = 'total descending')<br>
```

%%<br>
You may need more than one query, do not hesitate to create more<br>
# query = 
<br>
#<br>
#     TODO<br>
#<br>

<br>
cur.execute(query, conn)

query=
<br>
    <br>
    # TODO<br>
    <br>
format(username)

In [None]:
cur.execute(query)

%%

query=
<br>
    <br>
    # TODO<br>
    <br>
format(username)

In [None]:
df_hashtag = pd.read_sql(query, conn)

%%

%%

In [None]:
fig = px.bar(
    df_hashtag,
    
    # TODO

In [None]:
)

TODO: make your figure more readable

In [None]:
fig.show()

%% [markdown]<br>
### d) HBase - 4/20<br>
<br>
In the lecture and exercise of week-4, you have learnt what's HBase, how to create an Hbase table and how to create an external Hive table on top of the HBase table. Now, let's try to save the results of question c) into HBase, such that each entry looks like:<br>
```<br>
(b'PIE', {b'cf1:total_count': b'31415926', b'cf2:langs': b'ja,en,ko,fr'})<br>
``` <br>
where the key is the hashtag, `total_count` is the total count of the hashtag, and `langs` is a string of  unique language abbreviations concatenated with commas. <br>
<br>
__Note:__<br>
- To accomplish the task, you need to follow these steps:<br>
    - Create an Hbase table called `twitter_hbase`, in **your hbase namespace**, with two column families and fields (cf1, cf2)<br>
    - Create an external Hive table called `twitter_hive_on_hbase` on top of the Hbase table. <br>
    - Populate the HBase table with the results of question c).<br>
- You may find function `concat_ws` and `collect_list` useful.

%%

In [None]:
import happybase
hbaseaddr = os.environ['HBASE_SERVER']
hbase_connection = happybase.Connection(hbaseaddr, transport='framed',protocol='compact')

%%<br>
TODO

%%

%%

%%

%%

%% [markdown]<br>
# That's all, folks!