# DSLab Homework3 - Uncovering World Events using Twitter Hashtags

## ... and learning about Spark `DataFrames` along the way

In this notebook, we will use temporal information about Twitter hashtags to discover trending topics and potentially uncover world events as they occurred. 

## Hand-in Instructions:

- __Due: 27.04.2021 23:59:59 CET__
- your project must be private
- `git push` your final verion to the master branch of your group's Renku repository before the due date
- check if `Dockerfile`, `environment.yml` and `requirements.txt` are properly written
- add necessary comments and discussion to make your codes readable

## Hashtags

The idea here is that when an event is happening and people are having a conversation about it on Twitter, a set of uniform hashtags that represent the event spontaneously evolves. Twitter users then use those hashtags to communicate with one another. Some hashtags, like `#RT` for "retweet" or just `#retweet` are used frequently and don't tell us much about what is going on. But a sudden appearance of a hashtag like `#oscars` probably indicates that the oscars are underway. For a particularly cool example of this type of analysis, check out [this blog post about earthquake detection using Twitter data](https://blog.twitter.com/official/en_us/a/2015/usgs-twitter-data-earthquake-detection.html) (although they search the text and not necessarily hashtags).

## Initialize the environment

In [None]:
%%local
import os
username = os.environ['JUPYTERHUB_USER']

# set the application name as "<your_gaspar_id>-homework3"
get_ipython().run_cell_magic('configure', line="-f", cell='{ "name":"%s-homework3", "executorMemory":"4G", "executorCores":4, "numExecutors":10, "driverMemory": "4G" }' % username)

Send `username` to Saprk kernel, which will frist start the Spark application if there is no active session.

In [None]:
%%send_to_spark -i username -t str -n username

In [None]:
print('We are using Spark %s' % spark.version)

In [None]:
%%help

## PART I: Set up (5 points)

The twitter stream data is downloaded from [Archive Team: The Twitter Stream Grab](https://archive.org/details/twitterstream), which is a collection of a random sample of all tweets. We have parsed the stream data and prepared the twitter hashtag data of __2020__, a very special and different year in many ways. Let's see if we can see any trends about all these events of 2020 in the Twitter data. 

<div style="font-size: 100%" class="alert alert-block alert-danger">
<b>Disclaimer</b>
<br>
This dataset contains unfiltered data from Twitter. As such, you may be exposed to tweets/hashtags containing vulgarities, references to sexual acts, drug usage, etc.
</div>

### a) Load data - 1/10

Load the **orc** data from `/data/twitter/orc/hashtags/` into a Spark dataframe using the appropriate `SparkSession` method. 

Look at the first few rows of the dataset - note the timestamp and its units!

In [None]:
df = spark.read.orc('/data/twitter/orc/hashtags/')

In [None]:
df.printSchema()

In [None]:
df.show(n=5, truncate=False, vertical=False)

<div style="font-size: 100%" class="alert alert-block alert-info">
    <b>Cluster Usage:</b> As there are many of you working with the cluster, we encourage you to
    <ul>
        <li>prototype your queries on small data samples before running them on whole datasets</li>
        <li>save your intermediate results in your own directory at hdfs <b>"/user/&lt;your-gaspar-id&gt;/"</b></li>
    </ul>
</div>

For example:

```python
    # create a subset of original dataset
    df_sample = df.sample(0.01)
    
    # save as orc
    df_sample.write.orc('/user/%s/sample.orc' % username, mode='overwrite')

```

### b) Functions - 2/10

In [None]:
import pyspark.sql.functions as F

__User-defined functions__

A neat trick of spark dataframes is that you can essentially use something very much like an RDD `map` method but without switching to the RDD. If you are familiar with database languages, this works very much like e.g. a user-defined function in SQL. 

So, for example, if we wanted to make a user-defined python function that returns the hashtags in lowercase, we could do something like this:

In [None]:
@F.udf
def lowercase(text):
    """Convert text to lowercase"""
    return text.lower()

The `@F.udf` is a "decorator" -- this is really handy python syntactic sugar and in this case is equivalent to:

```python
def lowercase(text):
    return text.lower()
    
lowercase = F.udf(lowercase)
```

It basically takes our function and adds to its functionality. In this case, it registers our function as a pyspark dataframe user-defined function (UDF).

Using these UDFs is very straightforward and analogous to other Spark dataframe operations. For example:

In [None]:
df.select(lowercase(df.hashtag)).show(n=5)

__Built-in functions__

Using a framework like Spark is all about understanding the ins and outs of how it functions and knowing what it offers. One of the cool things about the dataframe API is that many functions are already defined for you (turning strings into lowercase being one of them). Find the [Spark python API documentation](https://spark.apache.org/docs/2.3.2/api/python/index.html). Look for the `sql` section and find the listing of `sql.functions`. Repeat the above (turning hashtags into lowercase) but use the built-in function.

In [None]:
df.select(F.lower(df.hashtag)).show(n=5)

We'll work with a combination of these built-in functions and user-defined functions for the remainder of this homework. 

Note that the functions can be combined. Consider the following dataframe and its transformation:

In [None]:
from pyspark.sql import Row

# create a sample dataframe with one column "degrees" going from 0 to 180
test_df = spark.createDataFrame(spark.sparkContext.range(180).map(lambda x: Row(degrees=x)), ['degrees'])

# define a function "sin_rad" that first converts degrees to radians and then takes the sine using built-in functions
sin_rad = F.sin(F.radians(test_df.degrees))

# show the result
test_df.select(sin_rad).show()

### c) Tweets in english - 2/10

- Create `english_df` with only english-language tweets. 
- Turn hashtags into lowercase.
- Convert the timestamp to a more readable format and name the new column as `date`.
- Sort the table in chronological order. 

Your `english_df` should look something like this:

```
+-----------+----+-----------+-------------------+
|timestamp_s|lang|    hashtag|               date|
+-----------+----+-----------+-------------------+
| 1577862000|  en| spurfamily|2020-01-01 08:00:00|
| 1577862000|  en|newyear2020|2020-01-01 08:00:00|
| 1577862000|  en|     master|2020-01-01 08:00:00|
| 1577862000|  en|  spurrific|2020-01-01 08:00:00|
| 1577862000|  en|     master|2020-01-01 08:00:00|
+-----------+----+-----------+-------------------+
```

__Note:__ 
- The hashtags may not be in english.
- [pyspark.sql.functions](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html#module-pyspark.sql.functions)

In [None]:
english_df = df.where(df.lang == 'en') \
               .select(df.timestamp_s, df.lang, F.lower(df.hashtag).alias('hashtag'), F.from_unixtime(df.timestamp_s).alias('date'))\
               .orderBy(df.timestamp_s)

In [None]:
english_df.show(n=5)

## PART II: Twitter hashtag trends (30 points)

In this section we will try to do a slightly more complicated analysis of the tweets. Our goal is to get an idea of tweet frequency as a function of time for certain hashtags. 

Have a look [here](http://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html#module-pyspark.sql.functions) to see the whole list of custom dataframe functions - you will need to use them to complete the next set of TODO items.

### a) Top hashtags - 1/30

We used `groupBy` already in the previous notebooks, but here we will take more advantage of its features. 

One important thing to note is that unlike other RDD or DataFrame transformations, the `groupBy` does not return another DataFrame, but a `GroupedData` object instead, with its own methods. These methods allow you to do various transformations and aggregations on the data of the grouped rows. 

Conceptually the procedure is a lot like this:

![groupby](https://i.stack.imgur.com/sgCn1.jpg)

The column that is used for the `groupBy` is the `key` - once we have the values of a particular key all together, we can use various aggregation functions on them to generate a transformed dataset. In this example, the aggregation function is a simple `sum`. In the simple procedure below, the `key` will be the hashtag.


Use `groupBy`, calculate the top 5 most common hashtags in the whole english-language dataset.

This should be your result:

```
+-----------------+-------+
|          hashtag|  count|
+-----------------+-------+
|              bts|1200196|
|          endsars|1019280|
|          covid19| 717238|
|            방탄소년단| 488160|
|sarkaruvaaripaata| 480124|
+-----------------+-------+
```

In [None]:
english_df.groupBy(english_df.hashtag).count().orderBy(F.col('count').desc()).show(n=5)

### b) Daily hashtags - 2/50

Now, let's see how we can start to organize the tweets by their timestamps. Remember, our goal is to uncover trending topics on a timescale of a few days. A much needed column then is simply `day`. Spark provides us with some handy built-in dataframe functions that are made for transforming date and time fields.

- Create a dataframe called `daily_hashtag` that includes the columns `month`, `week`, `day` and `hashtag`. 
- Use the `english_df` you made above to start, and make sure you find the appropriate spark dataframe functions to make your life easier. For example, to convert the date string into day-of-year, you can use the built-in [dayofyear](http://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html#pyspark.sql.functions.dayofyear) function. 
- For the simplicity of following analysis, filter only tweets of 2020.
- Show the result.

Try to match this view:

```
+-----+----+---+-----------+
|month|week|day|    hashtag|
+-----+----+---+-----------+
|    1|   1|  1| spurfamily|
|    1|   1|  1|newyear2020|
|    1|   1|  1|     master|
|    1|   1|  1|  spurrific|
|    1|   1|  1|     master|
+-----+----+---+-----------+
```

In [None]:
daily_hashtag = english_df.where(F.year(english_df.date) == 2020)\
               .select(F.month(english_df.date).alias('month'), F.weekofyear(english_df.date).alias('week'), F.dayofyear(english_df.date).alias('day'), english_df.hashtag)

In [None]:
daily_hashtag.show(n=5)

### c) Daily counts - 2/50

Now we want to calculate the number of times a hashtag is used per day based on the dataframe `daily_hashtag`. Sort in descending order of daily counts and show the result. Call the resulting dataframe `day_counts`.

Your output should look like this:

```
+---+----------------------+----+------+
|day|hashtag               |week|count |
+---+----------------------+----+------+
|229|pawankalyanbirthdaycdp|33  |202241|
|222|hbdmaheshbabu         |32  |195718|
|228|pawankalyanbirthdaycdp|33  |152037|
|357|100freeiphone12       |52  |122068|
|221|hbdmaheshbabu         |32  |120401|
+---+----------------------+----+------+
```

<div class="alert alert-info">
<p>Make sure you use <b>cache()</b> when you create <b>day_counts</b> because we will need it in the steps that follow!</p>
</div>

In [None]:
test = daily_hashtag.sample(0.001)

In [None]:
day_counts = daily_hashtag.groupBy(daily_hashtag.day, daily_hashtag.hashtag)\
            .agg(F.max(daily_hashtag.week).alias('week'), F.count(daily_hashtag.day).alias('count'))\
            .orderBy(F.col('count').desc()).cache()

In [None]:
day_counts.show(n=5, truncate=False)

### d) Weekly average - 2/50

To get an idea of which hashtags stay popular for several days, calculate the average number of daily occurences for each week. Sort in descending order and show the top 20.

__Note:__
- Use the `week` column we created above.
- Calculate the weekly average using `F.mean(...)`.

In [None]:
day_counts.groupBy(day_counts.week, daily_hashtag.hashtag).agg(F.mean('count').alias('mean')).orderBy(F.col('mean').desc()).show(n=20, truncate=False)

### e) Ranking - 3/20

Window functions are another awesome feature of dataframes. They allow users to accomplish complex tasks using very concise and simple code. 

Above we computed just the hashtag that had the most occurrences on *any* day. Now lets say we want to know the top tweets for *each* day.  

This is a non-trivial thing to compute and requires "windowing" our data. I recommend reading this [window functions article](https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html) to get acquainted with the idea. You can think of a window function as a fine-grained and more flexible `groupBy`. 

There are two things we need to define to use window functions:

1. the "window" to use, based on which columns (partitioning) and how the rows should be ordered 
2. the computation to carry out for each windowed group, e.g. a max, an average etc.

Lets see how this works by example. We will define a window function, `daily_window` that will partition data based on the `day` column. Within each window, the rows will be ordered by the daily hashtag count that we computed above. Finally, we will use the rank function **over** this window to give us the ranking of top tweets. 

In the end, this is a fairly complicated operation achieved in just a few lines of code! (can you think of how to do this with an RDD??)

In [None]:
from pyspark.sql import Window

First, we specify the window function and the ordering:

In [None]:
daily_window = Window.partitionBy('day').orderBy(F.desc('count'))

The above window function says that we should window the data on the `day` column and order it by count. 

Now we need to define what we want to compute on the windowed data. We will start by just calculating the daily ranking of hashtags, so we can use the helpful built-in `F.rank()` and sort:

In [None]:
daily_rank = F.rank() \
              .over(daily_window) \
              .alias('rank')

Now compute the top five hashtags for each day in our data:

In [None]:
day_counts.withColumn('rank', daily_rank).where(F.col('rank') <= 5).orderBy(day_counts.day, F.col('rank')).show(n=20)

### f) Rolling sum - 5/30

With window functions, you can also calculate the statistics of a rolling window. 

In this question, construct a 7-day rolling window (including the day and 6 days before) to calculate the rolling sum of the daily occurences for each hashtag.

Your results should be like:
- For the hashtag `covid19`:

```
+---+----+-----+-------+-----------+
|day|week|count|hashtag|rolling_sum|
+---+----+-----+-------+-----------+
| 42|   7|   85|covid19|         85|
| 43|   7|   94|covid19|        179|
| 45|   7|  192|covid19|        371|
| 46|   7|   97|covid19|        468|
| 47|   7|  168|covid19|        636|
| 48|   8|  317|covid19|        953|
| 49|   8|  116|covid19|        984|
| 51|   8|  234|covid19|       1124|
| 52|   8|  197|covid19|       1129|
| 53|   8|  369|covid19|       1401|
+---+----+-----+-------+-----------+
```

- For the hashtag `bts`:

```
+---+----+-----+-------+-----------+
|day|week|count|hashtag|rolling_sum|
+---+----+-----+-------+-----------+
|  1|   1| 2522|    bts|       2522|
|  2|   1| 1341|    bts|       3863|
|  3|   1|  471|    bts|       4334|
|  4|   1|  763|    bts|       5097|
|  5|   1| 2144|    bts|       7241|
|  6|   2| 1394|    bts|       8635|
|  7|   2| 1673|    bts|      10308|
|  8|   2| 5694|    bts|      13480|
|  9|   2| 5942|    bts|      18081|
| 10|   2| 5392|    bts|      23002|
+---+----+-----+-------+-----------+
```

In [None]:
hashtag_window = Window.partitionBy('hashtag').orderBy(F.col('day')).rangeBetween(-6, Window.currentRow)
rolling_sum = F.sum('count') \
              .over(hashtag_window) \              

In [None]:
rs_counts = day_counts.withColumn('rolling_sum', rolling_sum)

In [None]:
rs_counts.filter('hashtag == "covid19"').show(n=10)

In [None]:
rs_counts.filter('hashtag == "bts"').show(n=10)

### g) DIY - 15/20

Use window functions (or other techniques!) to produce lists of top few trending tweets for each week. What's a __"trending"__ tweet? Something that seems to be __suddenly growing very rapidly in popularity__. 

You should be able to identify, for example, Oscars-related hashtags in week 7 when [the 92nd Academy Awards ceremony took place](https://www.oscars.org/oscars/ceremonies/2020), COVID-related hashtags in week 11 when [WHO declared COVID-19 a pandemic](https://www.who.int/director-general/speeches/detail/who-director-general-s-opening-remarks-at-the-media-briefing-on-covid-19---11-march-2020), and other events like the movement of Black Life Matters in late May, the United States presidential elections, the 2020 American Music Awards, etc.

The final listing should be clear and concise and the flow of your analysis should be easy to follow. If you make an implementation that is not immediately obvious, make sure you provide comments either in markdown cells or in comments in the code itself.

### Solution:

We propose here a simple method to detect top trending weekly hashtags.
For each hashtag and day, we calculate the difference counts between that day and the day before. For the first day (i.e has no prior day in the dataframe), the difference is set to 0.
Over a week, for each hashtag, we keep track of the max difference in counts. A trending hashtag (i.e grows suddenly and rapidly in counts) would have large daily differences. For each week, we rank the max differences and take the top 5.

In [None]:
hashtag_window = Window.partitionBy('hashtag').orderBy(F.col('day'))

trending = rs_counts.withColumn("prev_value", F.lag(F.col('count')).over(hashtag_window))
trending = trending.withColumn("diff", F.when(F.isnull(F.col('count') - F.col('prev_value')), 0)
                              .otherwise(F.col('count') - F.col('prev_value')))

In [None]:
trending = trending.groupBy(trending.week, trending.hashtag).agg(F.max('diff').alias('max_diff')).orderBy(F.col('max_diff').desc())

In [None]:
week_window = Window.partitionBy('week').orderBy(F.col('max_diff').desc())
weekly_rank = F.rank() \
              .over(week_window) \
              .alias('rank')

In [None]:
trending= trending.withColumn('rank', weekly_rank).where(F.col('rank') <= 5).groupBy(trending.week).agg(F.collect_list("hashtag").alias("trending_hashtags")).orderBy(F.col('week'))

In [None]:
trending.show(n=55, truncate= False)

__Our approach seems to find trending events like Oscars in week 7, covid19 in week 11, Black Lives Matter in weeks 22, 23.__

## PART III: Hashtag clustering (25 points)

### a) Feature vector - 3/25

- Create a dataframe `daily_hashtag_matrix` that consists of hashtags as rows and daily counts as columns (hint: use `groupBy` and methods of `GroupedData`). Each row of the matrix represents the time series of daily counts of one hashtag. Cache the result.

- Create the feature vector which consists of daily counts using the [`VectorAssembler`](https://spark.apache.org/docs/2.3.2/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler) from the Spark ML library. Cache the result.

In [None]:
daily_hashtag_matrix = day_counts.select("day", "hashtag", "count").orderBy("day").groupBy("hashtag").pivot("day").sum("count").fillna(0).cache()

In [None]:
cols = daily_hashtag_matrix.schema.names[1:]

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=cols, outputCol="features")

In [None]:
df_vector = assembler.transform(daily_hashtag_matrix).select("hashtag", "features").cache()

### b) Visualization - 2/25

Visualize the time sereis you just created. 

- Select a few interesting hashtags you identified above. `isin` method of DataFrame columns might be useful.
- Convert the subset DataFrame as pandas DataFrame with the method `toPandas`.
- Plot the time series for the chosen hashtags with matplotlib. To visualize in this PySpark kernel, you need the magic `%matplot plt`. For details, please check [here](https://raw.githubusercontent.com/jupyter-incubator/sparkmagic/master/screenshots/matplotlib.png).

In [None]:
import matplotlib
matplotlib.use('agg')
import matplotlib.pylab as plt

plt.clf()
plt.rcParams['figure.figsize'] = (30,8)
plt.rcParams['font.size'] = 12
plt.style.use('fivethirtyeight')

In [None]:
interesting_hashtags = ['covid19', 'blacklivesmatter', 'ripkobe', 'australia']

In [None]:
time_series = df_vector.filter(df_vector['hashtag'].isin(interesting_hashtags)).toPandas()

In [None]:
for ht in interesting_hashtags:
    plt.plot(cols, time_series[time_series["hashtag"] == ht]['features'].values[0])

plt.xticks([])
plt.legend(interesting_hashtags, loc="upper right")

In [None]:
%matplot plt

### c) KMeans clustering - 20/25

Use KMeans to cluster hashtags based on the daily count timeseries you created above. Train the model and calculate the cluster membership for all hashtags. Again, be creative and see if you can get meaningful hashtag groupings. 

Validate your results by showing certain clusters, for example, those including some interesting hashtags you identified above. Do they make sense?

Make sure you document each step of your process well so that your final notebook is easy to understand even if the result is not optimal or complete. 

__Note:__ 
- Additional data cleaning, feature engineering, deminsion reduction, etc. might be necessary to get meaningful results from the model. 
- For available methods, check [pyspark.sql.functions documentation](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html#module-pyspark.sql.functions), [Spark MLlib Guide](https://spark.apache.org/docs/2.3.2/ml-guide.html) and [pyspark.ml documentation](https://spark.apache.org/docs/2.3.2/api/python/pyspark.ml.html).

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import numpy as np

In [None]:
max_k = 30
k_list = np.linspace(2, max_k, 29)
silhouettes = []

In [None]:
k_list

In [None]:
for k in k_list:
    kmeans = KMeans().setK(k)

    model = kmeans.fit(df_vector)
    predictions = model.transform(df_vector)

    # Evaluate clustering by computing Silhouette score
    evaluator = ClusteringEvaluator()
    silhouette = evaluator.evaluate(predictions)    
    silhouettes.append(silhouette)

In [None]:
plt.clf()
plt.rcParams['figure.figsize'] = (30,8)
plt.rcParams['font.size'] = 12
plt.style.use('fivethirtyeight')

In [None]:
plt.plot(k_list, silhouettes)

In [None]:
%matplot plt

The best k seems to be 11

# That's all, folks!