# Scaling up

Originally released as part of a homework in ADA2019.

## Description

[Reddit](https://www.reddit.com/) aka *'the front page of the internet'* is a network of over a million *communities* aka *'subreddits'*, each of which covers a different topic based on people's interests. In other words, it is a *massive* collection of forums (corresponding to the aforementioned communities), where people can share content specific to a given topic or comment on other people’s posts.   

You are reddit's community manager and want to *appoint new moderators*. Because moderating a specific subreddit isn't a full-time job, you want the chosen moderators to moderate multiple subreddits at the same time. To make this choice effective, the moderators shouldn't have to spend too much time getting to know the community and the prevalent communication style, so it makes sense to let moderators moderate subreddits that are similar in communication style and language. At the same time, it also makes sense to let them moderate subreddits that are similar with respect to the participating users, because this allows moderators to track the behavior of individual users over multiple subreddits. For example, some users might only post offensive content once a month on a given subreddit, and therefore fly under the radar with someone moderating only that subreddit. However, considering all the subreddits these users post to, they might post something offensive every day but on different subreddits. Thus, a moderator in charge of all these subreddits would be able to ban such users much more effectively. In the light of the above description, your task is to find out ways to choose moderators considering both the textual content and the users of a subreddit.

### Dataset:
The dataset provided to you includes all the posts of the 15 largest subreddits written as of May 2015.

Reddit posts (provided to you via a [google drive folder](https://drive.google.com/a/epfl.ch/file/d/19SVHKbUTUPtC9HMmADJcAAIY1Xjq6WFv/view?usp=sharing))
```
reddit_posts
 |-- id: id of the post 
 |-- author: user name of the author 
 |-- body: text of the message
 |-- subreddit: name of the subreddit
```

Reddit scores (provided to you via a [google drive folder](https://drive.google.com/a/epfl.ch/file/d/1vr4PolJzTXr6ODSe3ucib5EAyp3rjxec/view?usp=sharing))
```
reddit_scores
 |-- id: id of the post 
 |-- score: score computed as sum of UP/DOWN votes
```

*Note: Jaccard similarity between subreddits represented using either the set of top-1000 words or the set of users can be computed locally (on the driver), however, all the other tasks have to be implemented in Spark.*

## B1. Getting a sense of the data

Start a PySpark instance...

In [1]:
import pyspark
import pyspark.sql
from pyspark.sql import *
from pyspark.sql.functions import *
import numpy as np 
import pandas as pd

# Stop the current session

conf = pyspark.SparkConf().setMaster("local[*]").setAll([
    ('spark.executor.memory', '12g'),  # find
    ('spark.driver.memory','4g'), # your
    ('spark.driver.maxResultSize', '2G') # setup
])

# create the session
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# create the context
sc = spark.sparkContext

# FIX for Spark 2.x
locale = sc._jvm.java.util.Locale
locale.setDefault(locale.forLanguageTag("en-US"))

spark



... and load the data in a Spark dataframe.

In [2]:
reddit_messages = spark.read.json("messages.json.gz")
reddit_scores = spark.read.json("score.json.gz")

### B1.1. Identify the most active subreddit

Print the list of subreddits along with the following information:
1. The total number of posts
2. The number of users with at least 1 message
3. The mean message length

*Note: Keep everything in one single dataframe and print the list sorted by number of posts in descending order.*

In [3]:
# Print structure of the data
reddit_messages.printSchema()

root
 |-- author: string (nullable = true)
 |-- body: string (nullable = true)
 |-- id: string (nullable = true)
 |-- subreddit: string (nullable = true)



### 1.

In [4]:
# Get the number of messages
print(f"There are {reddit_messages.count()} messages.")

There are 7984080 messages.


### 2.

In [5]:
# Get users with at least 1 message
users = reddit_messages.select("author").distinct()
print(f"There are {users.count()} users.")

There are 828424 users.


In [6]:
# To show how to use some of the commands, here is a small example of usage
reddit_messages.select(
    "body",
    length("body").alias("length")
).show(5)

# Getting the length for all messages
lengths = reddit_messages.select(
    length("body").alias("length")
)
average_length_df = lengths.agg({"length": "avg"})

# NOTE: This is still a dataframe. We print the size to show it:
print(f"'average_length' has {average_length_df.count()} rows and {len(average_length_df.columns)} columns.")

# NOTE: We thus can simply take the element (0, 0)
average_length = average_length_df.collect()[0][0]

print(f"The average length of a message is {average_length:.2f}.")

+--------------------+------+
|                body|length|
+--------------------+------+
|gg this one's ove...|    54|
|Nihilum and LG ar...|   212|
|Me too. Same hamm...|    33|
|well i think new ...|   446|
|That's something ...|  1036|
+--------------------+------+
only showing top 5 rows

'average_length' has 1 rows and 1 columns.
The average length of a message is 148.42.


### B1.2. Identify the largest subreddit

Print *two* different lists of subreddits: ordered by (1) the number of posts, and (2) the number of users. For each subreddit, print the name and the corresponding counts.

Additionally, (3) plot the mean of message length for each subreddit in descending order.

In [7]:
# subreddit ordered by number of messages
subreddits = reddit_messages.groupBy("subreddit").count().orderBy(desc("count"))
subreddits.show()

# subreddits ordered by number of users
subreddits = reddit_messages.groupBy("subreddit").agg(countDistinct("author").alias("users")).orderBy(desc("users"))
subreddits.show()

+---------------+-------+
|      subreddit|  count|
+---------------+-------+
|leagueoflegends|1151287|
|            nba| 704862|
|          funny| 691139|
|           pics| 564502|
|            nfl| 534345|
|         videos| 511492|
|           news| 477658|
| DestinyTheGame| 471160|
|         soccer| 455215|
|          DotA2| 445154|
|      worldnews| 439417|
|  AdviceAnimals| 411902|
|         hockey| 389329|
|GlobalOffensive| 382017|
|         movies| 354601|
+---------------+-------+

+---------------+------+
|      subreddit| users|
+---------------+------+
|          funny|224077|
|           pics|205305|
|         videos|157628|
|leagueoflegends|119321|
|  AdviceAnimals|115815|
|      worldnews| 99261|
|           news| 98736|
|         movies| 92484|
|GlobalOffensive| 46686|
|            nba| 45034|
|         soccer| 41648|
|            nfl| 41593|
|          DotA2| 41466|
| DestinyTheGame| 37008|
|         hockey| 25568|
+---------------+------+



### B1.3. Identify the subreddit with the highest average score

Print the list of subreddits sorted by their average content scores.

In [8]:
# Structure of the score dataframe
reddit_scores.printSchema()

# Merging scores with messages
reddit_messages_scores = reddit_messages.join(other=reddit_scores, on="id")

# Grouping messages by subreddit
(
    reddit_messages_scores
    .groupBy("subreddit") # grouping by subreddits
    .agg({"score": "avg"}) # computing the average score (thus for each subreddit)
    .orderBy(desc("avg(score)")) # (visual) ordering by average score
    .show()
)

root
 |-- id: string (nullable = true)
 |-- score: long (nullable = true)

+---------------+------------------+
|      subreddit|        avg(score)|
+---------------+------------------+
|         videos|12.649445152612358|
|           pics|12.216559020162904|
|          funny|12.041505399058655|
|  AdviceAnimals|11.251695791717447|
|         soccer|10.634627593554693|
|         movies|  9.82014997137628|
|            nfl| 9.048348913155358|
|            nba| 9.032795071943161|
|           news| 8.673421150697779|
|      worldnews|  7.86683719564787|
|         hockey| 6.520120515039979|
|leagueoflegends| 5.983557531701479|
|          DotA2| 4.880537971129092|
|GlobalOffensive| 4.351442475073099|
| DestinyTheGame|3.0288819084811953|
+---------------+------------------+



## B2. Moderator assignment based on Subreddit Textual Content

Different subreddits follow different communication styles inherent in the topic and the community. Having said that, the goal is to discover similar subreddits by only looking at the *words* present in the posted messages. Once such a list of similar subreddits is identified, an appropriately chosen moderator can then be assigned to all these subreddits.

Specifically, the task boils down to computing a similarity score between two subreddits based on the *words* present in their textual content. Your first idea is to use the *Jaccard similarity*, which is defined as the size of the intersection of two sets divided by the size of their union.

$Jaccard(A,B) = \frac{|A \cap B|}{|A \cup B|}$

In [9]:
def jaccard_similarity(list1, list2):
    s1 = set(list1)
    s2 = set(list2)
    return len(s1.intersection(s2)) / len(s1.union(s2))

### B2.1.
The first step requires constructing a set representation of each subreddit. The goal is to represent each subreddit as a *set of words* existing in the messages posted on that subreddit. Compute the 50,000 most frequent words across all the provided subreddits. Construct a representation for each subreddit by retaining only the words found in the previously identified set of 50,000 frequent words.

Some rules:
 * Words are defined as tokens matching the regular expression `\W`
 * Remove all the stop-words (English language)

*Note: You might find the [RegexTokenizer](https://spark.apache.org/docs/2.2.0/ml-features.html#tokenizer) and the [StopWordsRemover](https://spark.apache.org/docs/2.2.0/ml-features.html#stopwordsremover) utilities available in the package pyspark.ml useful for this task as they help you in transforming the features and removing stopwords.*

In [10]:
# Importing modules
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import StopWordsRemover

# Starting by tokenizing the messages
# NOTE: inputCol is the name of an existing col, while outputCol is the name of
# a col that will be created (actually added) by the tokenizer
tokenized_df = RegexTokenizer(inputCol="body", outputCol="words", pattern="\\W").transform(reddit_messages)
tokenized_df.show(5)

# Removing stopwords
# NOTE: Same as before for inputCol and outputCol
filtered_df = StopWordsRemover(inputCol="words", outputCol="tokens").transform(tokenized_df)
filtered_df.show(5)

# Making a list of the 50'000 most common words
most_common_words = filtered_df.select(explode("tokens").alias("word")).groupBy("word").count().orderBy(desc("count")).limit(50000)
most_common_words.show(5)

+-----------------+--------------------+-------+---------------+--------------------+
|           author|                body|     id|      subreddit|               words|
+-----------------+--------------------+-------+---------------+--------------------+
|        WyaOfWade|gg this one's ove...|cqug90h|            nba|[gg, this, one, s...|
|        BEE_REAL_|Nihilum and LG ar...|cqug90p|GlobalOffensive|[nihilum, and, lg...|
|        SlowRolla|Me too. Same hamm...|cqug916|           pics|[me, too, same, h...|
|   SenpaiOniichan|well i think new ...|cqug919|leagueoflegends|[well, i, think, ...|
|backwoodsofcanada|That's something ...|cqug91n|  AdviceAnimals|[that, s, somethi...|
+-----------------+--------------------+-------+---------------+--------------------+
only showing top 5 rows

+-----------------+--------------------+-------+---------------+--------------------+--------------------+
|           author|                body|     id|      subreddit|               words|         

In [12]:
# Getting the list of all words used per subreddit

words_for_each_subreddit = filtered_df.select("subreddit", explode("tokens").alias("word"))
words_for_each_subreddit.show()

list_of_words_for_each_subreddit = words_for_each_subreddit.agg(collect_list("word").alias("words"))
list_of_words_for_each_subreddit.show()

ConnectionRefusedError: [WinError 10061] Aucune connexion n’a pu être établie car l’ordinateur cible l’a expressément refusée

### B2.2.
* Compute the Jaccard similarity between all the subreddits using the set representation obtained in step **B2.1.**, and plot in a heatmap the similarity values of all the pairs of subreddits.
* Analyze this plot and discuss your observations. Do you observe that subreddits corresponding to similar topics possess higher Jaccard similarity?
* Provide detailed interpretations of the obtained results. Specifically,
    - Explain the limitations of your conclusions, and discuss the potential reasons.
    - Explain the potential problems with the Jaccard similarity function.

### B2.3.

* Alternatively, compute the 1000 most frequent words for each subreddit, construct its representation as the set of top-1000 words, and print a heatmap with the Jaccard similarity like in step **B2.2.**.
* Explain your observations in detail: how and why is this new result different from the one obtained in **B2.2.**?

*Note: Use the same rules specified in B2.1: words tokenized with the regex \W and stop-words removed*

## B3. Moderator assignment based on Subreddit Users

Subreddits can be seen as communities of people interacting about a common topic. As an alternative to the *textual content* based similarity in **B2**, your task here is to validate if similarity between two subreddits can be measured based on their participating users.

Of course users are not monothematic, and they interact with multiple subreddits. In this task, we are specifically interested in observing the amount of overlap across different subreddits based on their participating users. Similar to **B2**, the overlap is measured using the *Jaccard similarity*.


### B3.1.
Construct a set representation of each subreddit as the users that posted at least one time in that subreddit.

Some users are very talkative and active across different topics. Print the username of the person that posted in the maximum number of subreddits. *Note that users who posted at least once in a subreddit are considered as participant of that subreddit.*

### B3.2.

* Compute the Jaccard similarity between all the subreddits using the set representation obtained in step **B3.1.**, and visualise it similar to **B2**.
* Analyze this plot, identify highly similar pairs of subreddits, and clearly describe your observations.

## B4. Language vs. Users similarity
    
* Visualize the similarity scores based on word (**B2.3.**) and user (**B3**) similarity on the x and y axes respectively for the subreddit `NBA` compared to all the other subreddits. Do some semantically meaningful groups emerge? Provide clear explanataions of your observations.
* Furthermore, do you observe differences in similarities between various sports-related subreddits in the dataset? Please provide explanations of the reasons behind these differences, if any!