# Homework 3 - Uncovering World Events using Twitter Hashtags

## ... and learning about Spark `DataFrames` along the way

In this notebook, we will use temporal information about Twitter hashtags to discover trending topics and potentially uncover world events as they occurred. 

__Hand-in:__

- __Due: 28.04.2020 23:59:59 CET__
- `git push` your final verion to your group's Renku repository before the due
- check if `Dockerfile`, `environment.yml` and `requirements.txt` are properly written
- add necessary comments and discussion to make your codes readable

### Hashtags

The idea here is that when an event is happening and people are having a conversation about it on Twitter, a set of uniform hashtags that represent the event spontaneously evolves. Twitter users then use those hashtags to communicate with one another. Some hashtags, like `#RT` for "retweet" or just `#retweet` are used frequently and don't tell us much about what is going on. But a sudden appearance of a hashtag like `#oscars` probably indicates that the oscars are underway. For a particularly cool example of this type of analysis, check out [this blog post about earthquake detection using Twitter data](https://blog.twitter.com/official/en_us/a/2015/usgs-twitter-data-earthquake-detection.html) (although they search the text and not necessarily hashtags).

In [1]:
%%local
%matplotlib inline
import matplotlib.pylab as plt
plt.rcParams['figure.figsize'] = (10,6)
plt.rcParams['font.size'] = 18
plt.style.use('fivethirtyeight')

## Initialize the `SparkSession`

In [2]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
522,application_1583239045420_4300,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f72ed7c51d0>

In [3]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
520,application_1583239045420_4298,pyspark,idle,Link,Link,
521,application_1583239045420_4299,pyspark,idle,Link,Link,
522,application_1583239045420_4300,pyspark,idle,Link,Link,✔


In [4]:
%%help

Magic,Example,Explanation
info,%%info,Outputs session information for the current Livy endpoint.
cleanup,%%cleanup -f,"Deletes all sessions for the current Livy endpoint, including this notebook's session. The force flag is mandatory."
delete,%%delete -f -s 0,Deletes a session by number for the current Livy endpoint. Cannot delete this kernel's session.
logs,%%logs,Outputs the current session's Livy logs.
configure,"%%configure -f {""executorMemory"": ""1000M"", ""executorCores"": 4}",Configure the session creation parameters. The force flag is mandatory if a session has already been  created and the session will be dropped and recreated. Look at Livy's POST /sessions Request Body for a list of valid parameters. Parameters must be passed in as a JSON string.
spark,%%spark -o df df = spark.read.parquet('...,"Executes spark commands.  Parameters:  -o VAR_NAME: The Spark dataframe of name VAR_NAME will be available in the %%local Python context as a  Pandas dataframe with the same name.  -m METHOD: Sample method, either take or sample.  -n MAXROWS: The maximum number of rows of a dataframe that will be pulled from Livy to Jupyter.  If this number is negative, then the number of rows will be unlimited.  -r FRACTION: Fraction used for sampling."
sql,%%sql -o tables -q SHOW TABLES,"Executes a SQL query against the variable sqlContext (Spark v1.x) or spark (Spark v2.x).  Parameters:  -o VAR_NAME: The result of the SQL query will be available in the %%local Python context as a  Pandas dataframe.  -q: The magic will return None instead of the dataframe (no visualization).  -m, -n, -r are the same as the %%spark parameters above."
local,%%local a = 1,All the code in subsequent lines will be executed locally. Code must be valid Python code.
send_to_spark,%%send_to_spark -o variable -t str -n var,"Sends a variable from local output to spark cluster.  Parameters:  -i VAR_NAME: Local Pandas DataFrame(or String) of name VAR_NAME will be available in the %%spark context as a Spark dataframe(or String) with the same name.  -t TYPE: Specifies the type of variable passed as -i. Available options are:  `str` for string and `df` for Pandas DataFrame. Optional, defaults to `str`.  -n NAME: Custom name of variable passed as -i. Optional, defaults to -i variable name.  -m MAXROWS: Maximum amount of Pandas rows that will be sent to Spark. Defaults to 2500."


## PART I: Set up the hashtag `DataFrame` (10 points / 60)

We have prepared the hashtag data spanning the time from May to July, 2016. This is a significant time in modern European history, e.g. see [Brexit](https://en.wikipedia.org/wiki/Brexit). Lets see if we can see any interesting trends about these events in the Twitter data. 

In [5]:
import pyspark.sql.functions as functions

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### I.a: TODO (1 point/10)

Load the **parquet** data from `/data/twitter/parquet/hashtags` into a Spark dataframe using the appropriate `SparkSession` method. 

Look at the first few rows of the dataset - note the timestamp and its units!

In [6]:
# TODO
df = spark.read.parquet("/data/twitter/parquet/hashtags")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- timestamp_s: long (nullable = true)
 |-- id: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- hashtag: string (nullable = true)

In [8]:
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------------------+----+-----------------+
|timestamp_s|                id|lang|          hashtag|
+-----------+------------------+----+-----------------+
| 1464768480|737918607980670976|  tr|          lovebts|
| 1464768480|737918607980670976|  tr|            방탄소년단|
| 1464768480|737918607963873281|  ja|          tokyofm|
| 1464768480|737918607976452096|  en|          SotoBot|
| 1464768480|737918607976497152|  en|           Fenton|
| 1464768480|737918607976497152|  en|     vintageglass|
| 1464768480|737918607976497152|  en|        glassbell|
| 1464768480|737918607959719941|  en|          vintage|
| 1464768480|737918607959719941|  en|            1970s|
| 1464768480|737918607959719941|  en|              mod|
| 1464768481|737918612158189568|  ar|        ابن_الغيم|
| 1464768481|737918612158156802|  en|creativitybooster|
| 1464768481|737918612158156802|  en|    growthhacking|
| 1464768481|737918612158156802|  en| entrepreneurship|
| 1464768481|737918612158222336|  en|    jewelry

### User-defined functions

A neat trick of spark dataframes is that you can essentially use something very much like an RDD `map` method but without switching to the RDD. If you are familiar with database languages, this works very much like e.g. a user-defined function in SQL. 

So, for example, if we wanted to make a user-defined python function that returns the hashtags in lowercase, we could do something like this:

In [9]:
@functions.udf
def lowercase(text):
    """Convert text to lowercase"""
    return text.lower()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The @functions.udf is a "decorator" -- this is really handy python syntactic sugar and in this case is equivalent to:

```python
def lowercase(text):
    return text.lower()
    
lowercase = functions.udf(lowercase)
```

It basically takes our function and adds to its functionality. In this case, it registers our function as a pyspark dataframe user-defined function (UDF).

Using these UDFs is very straightforward and analogous to other Spark dataframe operations. For example:

In [10]:
df.select(lowercase(df.hashtag)).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+
|lowercase(hashtag)|
+------------------+
|           lovebts|
|             방탄소년단|
|           tokyofm|
|           sotobot|
|            fenton|
|      vintageglass|
|         glassbell|
|           vintage|
|             1970s|
|               mod|
|         ابن_الغيم|
| creativitybooster|
|     growthhacking|
|  entrepreneurship|
|     jewelryonetsy|
|           jetteam|
|        teenchoice|
|  choicemalehottie|
|        nowplaying|
|              maui|
+------------------+
only showing top 20 rows

Using a framework like Spark is all about understanding the ins and outs of how it functions and knowing what it offers. One of the cool things about the dataframe API is that many functions are already defined for you (turning strings into lowercase being one of them). 

### I.b: TODO (2 points / 10)

Find the [Spark python API documentation](https://spark.apache.org/docs/latest/api/python/index.html). Look for the `sql` section and find the listing of `sql.functions`. Repeat the above (turning hashtags into lowercase) but use the built-in function.

In [11]:
df.select(functions.lower(df.hashtag)).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+
|   lower(hashtag)|
+-----------------+
|          lovebts|
|            방탄소년단|
|          tokyofm|
|          sotobot|
|           fenton|
|     vintageglass|
|        glassbell|
|          vintage|
|            1970s|
|              mod|
|        ابن_الغيم|
|creativitybooster|
|    growthhacking|
| entrepreneurship|
|    jewelryonetsy|
|          jetteam|
|       teenchoice|
| choicemalehottie|
|       nowplaying|
|             maui|
+-----------------+
only showing top 20 rows

We'll work with a combination of these built-in functions and user-defined functions for the remainder of this homework. 

### I.c: TODO (3 points/10)

Create `english_df` consisting of **lowercase** hashtags from only english-language tweets. In addition, convert the timestamp to a more readable format like this and name the column `date`:

```
2016-05-01 08:30:00
```

Your `english_df` should look something like this:

```
+-----------+----+------------+-------------------+
|timestamp_s|lang|     hashtag|               date|
+-----------+----+------------+-------------------+
| 1464768480|  en|     sotobot|2016-06-01 10:08:00|
| 1464768480|  en|      fenton|2016-06-01 10:08:00|
| 1464768480|  en|vintageglass|2016-06-01 10:08:00|
| 1464768480|  en|   glassbell|2016-06-01 10:08:00|
| 1464768480|  en|     vintage|2016-06-01 10:08:00|
+-----------+----+------------+-------------------+
```

In [32]:
english_df = df.select(df.timestamp_s, df.lang, functions.lower(df.hashtag).alias('hashtag'), 
                       functions.to_timestamp(df.timestamp_s).alias('date')).where(df.lang == 'en')
english_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----+------------------+-------------------+
|timestamp_s|lang|           hashtag|               date|
+-----------+----+------------------+-------------------+
| 1464768480|  en|           sotobot|2016-06-01 10:08:00|
| 1464768480|  en|            fenton|2016-06-01 10:08:00|
| 1464768480|  en|      vintageglass|2016-06-01 10:08:00|
| 1464768480|  en|         glassbell|2016-06-01 10:08:00|
| 1464768480|  en|           vintage|2016-06-01 10:08:00|
| 1464768480|  en|             1970s|2016-06-01 10:08:00|
| 1464768480|  en|               mod|2016-06-01 10:08:00|
| 1464768481|  en| creativitybooster|2016-06-01 10:08:01|
| 1464768481|  en|     growthhacking|2016-06-01 10:08:01|
| 1464768481|  en|  entrepreneurship|2016-06-01 10:08:01|
| 1464768481|  en|     jewelryonetsy|2016-06-01 10:08:01|
| 1464768481|  en|           jetteam|2016-06-01 10:08:01|
| 1464768481|  en|        teenchoice|2016-06-01 10:08:01|
| 1464768481|  en|  choicemalehottie|2016-06-01 10:08:01|
| 1464768481| 

### I.d: TODO (4 points/10)

We "claim" the data contains all twitters from May to July 2017, however, that's not true. Check which (date, hour) are in `english_df` and which are missing, e.g. 2016-05-01, 10h is in the dataframe while 2016-04-01, 10h not.

In [44]:
# lit: takes the argument literally. E.g. for concatenating strings
# Ordering of hours is as follows: 0, 1, 10, 11, ..., 19, 2, 20, 21, 22, 23, 3, 4, 5, ..., 9 (which makes perfect sense)
english_df.select(functions.concat(functions.to_date(english_df.date),
                                   functions.lit(', '), 
                                   functions.hour(english_df.date)).alias('(date, hour)')).distinct().orderBy('(date, hour)').show(100)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+
|  (date, hour)|
+--------------+
|2016-05-01, 10|
|2016-05-01, 11|
|2016-05-01, 12|
|2016-05-01, 13|
|2016-05-01, 14|
|2016-05-01, 15|
|2016-05-01, 16|
|2016-05-01, 17|
|2016-05-01, 18|
|2016-05-01, 19|
|2016-05-01, 20|
|2016-05-01, 21|
|2016-05-01, 22|
|2016-05-01, 23|
| 2016-05-01, 8|
| 2016-05-01, 9|
| 2016-05-02, 0|
|2016-05-02, 15|
|2016-05-02, 16|
|2016-05-02, 17|
|2016-05-02, 18|
|2016-05-02, 19|
|2016-05-02, 20|
|2016-05-02, 21|
|2016-05-02, 22|
|2016-05-02, 23|
| 2016-05-02, 5|
| 2016-05-02, 6|
| 2016-05-02, 7|
| 2016-05-03, 0|
| 2016-05-03, 1|
|2016-05-03, 10|
|2016-05-03, 11|
|2016-05-03, 12|
|2016-05-03, 13|
|2016-05-03, 14|
|2016-05-03, 15|
|2016-05-03, 16|
|2016-05-03, 17|
|2016-05-03, 18|
|2016-05-03, 19|
| 2016-05-03, 2|
|2016-05-03, 20|
|2016-05-03, 21|
|2016-05-03, 22|
|2016-05-03, 23|
| 2016-05-03, 3|
| 2016-05-03, 4|
| 2016-05-03, 5|
| 2016-05-03, 6|
| 2016-05-03, 7|
| 2016-05-03, 8|
| 2016-05-03, 9|
| 2016-05-04, 0|
| 2016-05-04, 1|
|2016-05-04, 1

## PART II: Twitter hashtag trends (50 points / 60)

In this section we will try to do a slightly more complicated analysis of the tweets. Our goal is to get an idea of tweet frequency as a function of time for certain hashtags. 

Lets build this up in steps. First, lets see how we can start to organize the tweets by their timestamps. 

As a first easy example, lets say we just want to count the number of tweets per minute over the entire span of our data. For this, we first need a "global" minute value, e.g. "minute of the year" or something similar. 

Spark provides us with some handy built-in dataframe functions that are made for transforming date and time fields. 

Have a look [here](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions) to see the whole list of custom dataframe functions - you will need to use them to complete the next set of TODO items.

Note that the functions can be combined. Consider the following dataframe and its transformation:

In [None]:
from pyspark.sql import Row

# create a sample dataframe with one column "degrees" going from 0 to 180
test_df = spark.createDataFrame(spark.sparkContext.range(180).map(lambda x: Row(degrees=x)), ['degrees'])

# define a function "sin_rad" that first converts degrees to radians and then takes the sine using built-in functions
sin_rad = functions.sin(functions.radians(test_df.degrees))

# show the result
test_df.select(sin_rad).show()

### DataFrames `groupBy`

We used `groupBy` already in the previous notebook, but here we will take more advantage of its features. 

One important thing to note is that unlike other RDD or DataFrame transformations, the `groupBy` does not return another DataFrame, but a `GroupedData` object instead, with its own methods. These methods allow you to do various transformations and aggregations on the data of the grouped rows. 

Conceptually the procedure is a lot like this:

![groupby](https://i.stack.imgur.com/sgCn1.jpg)

The column that is used for the `groupBy` is the `key` - once we have the values of a particular key all together, we can use various aggregation functions on them to generate a transformed dataset. In this example, the aggregation function is a simple `sum`. In the simple procedure below, the `key` will be the hashtag.

### II.a: TODO (1 point / 50)

Calculate the top five most common hashtags in the whole english-language dataset.

This should be your result:

```
+-------------+------+
|      hashtag| count|
+-------------+------+
|   mtvhottest|800527|
|veranomtv2016|539028|
|   teenchoice|345208|
|   nowplaying|178561|
|  gameinsight|165237|
+-------------+------+
```

## Daily hashtag trends

Now we will start to complicate the analysis a bit. Remember, our goal is to uncover trending topics on a timescale of a few days. A much needed column then is simply `day`. To convert the date string into day-of-year, you can use the built-in [dayofyear](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.dayofyear) function. 

In the subsequent sections we will then not only see which hashtags are globally most popular, but which ones experience the biggest changes in popularity - those are the "trending" topics. If there is suddenly a substantial increase of a hashtag over a matter of a day or two, it may signify an event taking place. 

### II.b: TODO (2 points / 50)

Create a dataframe called `daily_hashtag` that includes the columns `month`, `week`, `day` and `hashtag`. Use the `english_df` you made above to start, and make sure you find the appropriate spark dataframe functions to make your life easier. Show the result.

Try to match this view:

```
+-----+----+---+------------+
|month|week|day|     hashtag|
+-----+----+---+------------+
|    6|  22|153|     sotobot|
|    6|  22|153|      fenton|
|    6|  22|153|vintageglass|
|    6|  22|153|   glassbell|
|    6|  22|153|     vintage|
+-----+----+---+------------+
```

### II.c: TODO (2 points / 50)

Now we want to calculate the number of times a hashtag is used per day. Sort in descending order of daily counts and show the result. Call the resulting dataframe `day_counts`.

Your output should look like this:

```
+---+----------+----+-----+
|day|   hashtag|week|count|
+---+----------+----+-----+
|204|mtvhottest|  29|66372|
|205|mtvhottest|  29|63495|
|207|mtvhottest|  30|60768|
|208|mtvhottest|  30|59065|
|199|mtvhottest|  28|57956|
+---+----------+----+-----+
```

<div class="alert alert-info">
<p>Make sure you use `cache()` when you create `day_counts` because we will need it in the steps that follow!</p>
</div>

### II.d: TODO (2 points / 50)

To get an idea of which hashtags stay popular for several days, calculate the average number of daily occurences for each week. 

__Hint:__ use the `week` column we created above. Sort in descending order and show the top 20.

### Using `Window` functions 

Window functions are another awesome feature of dataframes. They allow users to accomplish complex tasks using very concise and simple code. 

Above we computed just the hashtag that had the most occurrences on *any* day. Now lets say we want to know the top tweets for *each* day.  

This is a non-trivial thing to compute and requires "windowing" our data. I recommend reading this [window functions article](https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html) to get acquainted with the idea. You can think of a window function as a fine-grained and more flexible `groupBy`. 

There are two things we need to define to use window functions:

1. the "window" to use, based on which columns (partitioning) and how the rows should be ordered 
2. the computation to carry out for each windowed group, e.g. a max, an average etc.

Lets see how this works by example. We will define a window function, `daily_window` that will partition data based on the `day` column. Within each window, the rows will be ordered by the daily hashtag count that we computed above. Finally, we will use the rank function **over** this window to give us the ranking of top tweets. 

In the end, this is a fairly complicated operation achieved in just a few lines of code! (can you think of how to do this with an RDD??)

In [None]:
from pyspark.sql import Window

First, we specify the window function and the ordering:

In [None]:
daily_window = Window.partitionBy('day').orderBy(functions.desc('count'))

The above window function says that we should window the data on the `day` column and order it by count. 

Now we need to define what we want to compute on the windowed data. We will start by just calculating the daily ranking of hashtags, so we can use the helpful built-in `functions.rank()` and sort:

In [None]:
daily_rank = functions.rank() \
                      .over(daily_window) \
                      .alias('rank')

### II.e: TODO (3 points / 50)
Now compute the top five hashtags for each day in our data:

## II.f: TODO - Compute the top five trending tweets per week (20 points / 50)

Use window functions (or other techniques!) to produce lists of top few trending tweets for each week. What's a "trending" tweet? Something that seems to be suddenly growing very rapidly in popularity. You should identify "brexit" in week 25 and other events like the death of The Artist Formerly Known as Prince, the Met gala, Euro 2016, the terrorist attacks in Nice, France etc. Make it as simple or as complicated as you want! The final listing should be clear and concise and the flow of your analysis should be easy to follow. If you make an implementation that is not immediately obvious, make sure you provide comments either in markdown cells or in comments in the code itself. 


## Plotting the timeseries of hashtag counts and using KMeans clustering

### II.g: TODO (8 points / 50)

The lists we obtained above are nice, but lets actually visualize some data. 

1. create a matrix that consists of hashtags as rows and daily counts as columns (hint: use `groupBy` and methods of `GroupedData`). 
2. use the `VectorAssembler` from the Spark ML library to create the feature vector which will consist of daily counts. 

If you extract any of these vectors you will obtain an array that represents the time series of daily counts - plot this time series for a few interesting hashtags you identified above. 

__Hint__: `isin` method of DataFrame columns might be useful. 

### II.h: TODO - Use KMeans to cluster hashtags based on the daily count timeseries (12 points / 50)

Use the DataFrame you created above to cluster the hashtag timeseries. Train the model and calculate the cluster membership for all hashtags. 

Show the cluster that includes "brexit" - does it make sense?

Again, be creative and see if you can get the clustering to give you meaningful hashtag groupings. Make sure you document your process and code and make your final notebook easy to understand even if the result is not optimal or complete. 

__Hint:__ Additional data cleaning or filtering might be necessary to get useful results from the model. 