# Big Data Analytics - Practical Exam 2024


# Six Degrees of Kevin Bacon
**Introduction** - Six Degrees of Kevin Bacon is a game based on the "six degrees of separation"
concept, which posits that any two people on Earth are six or fewer acquaintance links apart. Movie
buffs challenge each other to find the shortest path between an arbitrary actor and prolific actor
Kevin Bacon. It rests on the assumption that anyone involved in the film industry can be linked
through their film roles to Bacon within six steps.
The analysis of social networks can be a computationally intensive task, especially when dealing with
large volumes of data. It is also a challenging problem to devise a correct methodology to infer an
informative social network structure. Here, we will analyze a social network of actors and actresses
that co-participated in movies. We will do some simple descriptive analysis, and in the end try to
relate an actor/actress’s position in the social network with the success of the movies in which they
participate.

#### Rules & Notes - Please take your time to read the following points:

1. The submission deadline shall be set for the 10th of June at 23:59.
2. It is acceptable that you **discuss** with your colleagues different approaches to solve each step of the problem set. You are responsible for writing your own code, and analysing the results. Clear cases of cheating will be penalized with 0 points in this assignment;
3. After review of your submission files, and before a mark is attributed, you might be called to orally defend your submission;
4. You will be scored first and foremost by the number of correct answers, secondly by the logic used in the trying to approach each step of the problem set;
5. Consider skipping questions that you are stuck in, and get back to them later;
6. Expect computations to take a few minutes to finish in some of the steps.
7. **IMPORTANT** It is expected you have developed skills beyond writting SQL queries. Any question where you directly write a SQL query (then for example create a temporary table and use spark.sql to pass the query) will receive a 25% penalty. Using the Spark syntax (for example dataframe.select("\*").where("conditions")) is acceptable and does not incur this penalty. Comment your code in a reasonable fashion.
8. **Questions** – Any questions about this assignment should be posted in the Forum@Moodle. The last class will be an open office session for anyone with questions concerning the assignment. 
9. **Delivery** - To fulfil this activity you will have to upload the following materials to Moodle:
    1. An exported IPython notebook. The notebook should be solved (have results displayed), but should contain all neccesary code so that when the notebook is run in databricks it should also replicate these results. This means the all data downloading and processing should be done in this notebook. It is also important you clearly indicate where your final answer to each question is when you are using multiple cells (for example you print "my final anwser is" before your answer or use cell comments). Please make sure to name your file in the following way: *[student_number1]_[student_number2]_submission.ipynb*. As an example: *19740001_197400010_submission.ipynb*
    2. **Delivery** - You will also need to provide a signed statement of authorship, which is present in the last page;
    3. It is recommended you read the whole assignment before starting.
    4. You can add as many cells as you like to answer the questions.
    5. You can make use of caching or persisting your RDDs or Dataframes, this may speed up performance.
    6. If you have trouble with graphframes in databricks (specifically the import statement) you need to make sure the graphframes package is installed on the cluster you are running. If you click home on the left, then click on the graphframes library, from where you can install the package on your cluster (check the graphframes checkbox and click install). Another installation option is using the JAR available on Moodle with the graphframes library.
10. **Note**: By including the name and student number of each group member  in the submission notebook, this will be considered as a declaration of authorship.

#### Data Sources and Description
We will use data from IMDB. You can download raw datafiles
from https://datasets.imdbws.com. Note that the files are tab delimited (.tsv) You can find a
description of the each datafile in https://www.imdb.com/interfaces/

## Questions
### Data loading and preperation
Review the file descriptions and load the necessary data onto your databricks cluser and into spark dataframes. You will need to use shell commands to download the data, unzip the data, load the data into spark. Note that the data might require parsing and preprocessing to be ready for the questions below.

**Hints** You can use 'gunzip' to unzip the .tz files. The data files will then be tab seperated (.tsv), which you can load into a dataframe using the tab seperated option instead of the comma seperated option we have typically used in class: `.option(“sep”,”\t”)`

> Initial Imports

In [None]:
from graphframes import GraphFrame

from pyspark.sql import functions as F

> Load IMDB Data

In [None]:
%sh
wget -P /tmp https://datasets.imdbws.com/name.basics.tsv.gz
wget -P /tmp https://datasets.imdbws.com/title.basics.tsv.gz
wget -P /tmp https://datasets.imdbws.com/title.principals.tsv.gz
wget -P /tmp https://datasets.imdbws.com/title.ratings.tsv.gz

> Copy loaded data to DBFS

In [None]:
dbutils.fs.cp('file:/tmp/name.basics.tsv.gz', 'dbfs:/FileStore/tables/')
dbutils.fs.cp('file:/tmp/title.basics.tsv.gz', 'dbfs:/FileStore/tables/')
dbutils.fs.cp('file:/tmp/title.principals.tsv.gz', 'dbfs:/FileStore/tables/')
dbutils.fs.cp('file:/tmp/title.ratings.tsv.gz', 'dbfs:/FileStore/tables/')

True

> Load the data in dataframes

In [None]:
title_basics_df = spark.read \
  .option("header", "true") \
  .option("sep", "\t") \
  .option("inferSchema", "true") \
  .csv("/FileStore/tables/title.basics.tsv.gz")

title_principals_df = spark.read \
  .option("header", "true") \
  .option("sep", "\t") \
  .option("inferSchema", "true") \
  .csv("/FileStore/tables/title.principals.tsv.gz")

title_ratings_df = spark.read \
  .option("header", "true") \
  .option("sep", "\t") \
  .option("inferSchema", "true") \
  .csv("/FileStore/tables/title.ratings.tsv.gz")

name_basics_df = spark.read \
  .option("header", "true") \
  .option("sep", "\t") \
  .option("inferSchema", "true") \
  .csv("/FileStore/tables/name.basics.tsv.gz")


> Data Exploration on dataframes

In [None]:
title_basics_df.show(1)
title_basics_df.printSchema()

+---------+---------+------------+-------------+-------+---------+-------+--------------+-----------------+
|   tconst|titleType|primaryTitle|originalTitle|isAdult|startYear|endYear|runtimeMinutes|           genres|
+---------+---------+------------+-------------+-------+---------+-------+--------------+-----------------+
|tt0000001|    short|  Carmencita|   Carmencita|      0|     1894|     \N|             1|Documentary,Short|
+---------+---------+------------+-------------+-------+---------+-------+--------------+-----------------+
only showing top 1 row

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- endYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)



In [None]:
title_basics_df = title_basics_df \
  .withColumn("isAdult", F.col("isAdult").cast("integer")) \
  .withColumn("startYear", F.col("startYear").cast("integer")) \
  .withColumn("endYear", F.col("endYear").cast("integer")) \
  .withColumn("runtimeMinutes", F.col("runtimeMinutes").cast("integer"))

In [None]:
title_principals_df.show(1)
title_principals_df.printSchema()

+---------+--------+---------+--------+---+----------+
|   tconst|ordering|   nconst|category|job|characters|
+---------+--------+---------+--------+---+----------+
|tt0000001|       1|nm1588970|    self| \N|  ["Self"]|
+---------+--------+---------+--------+---+----------+
only showing top 1 row

root
 |-- tconst: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- nconst: string (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)
 |-- characters: string (nullable = true)



In [None]:
title_ratings_df.show(1)
title_ratings_df.printSchema()

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    2059|
+---------+-------------+--------+
only showing top 1 row

root
 |-- tconst: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: integer (nullable = true)



In [None]:
name_basics_df.show(1)
name_basics_df.printSchema()

+---------+------------+---------+---------+--------------------+--------------------+
|   nconst| primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+------------+---------+---------+--------------------+--------------------+
|nm0000001|Fred Astaire|     1899|     1987|actor,miscellaneo...|tt0072308,tt00504...|
+---------+------------+---------+---------+--------------------+--------------------+
only showing top 1 row

root
 |-- nconst: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: string (nullable = true)
 |-- deathYear: string (nullable = true)
 |-- primaryProfession: string (nullable = true)
 |-- knownForTitles: string (nullable = true)



In [None]:
name_basics_df = name_basics_df \
  .withColumn("birthYear", F.col("birthYear").cast("integer")) \
  .withColumn("deathYear", F.col("deathYear").cast("integer"))

> Save dataframes as parquet files

In [None]:
title_basics_df.write.parquet("/FileStore/tables/parquet/title_basics_df.parquet")

title_principals_df.write.parquet("/FileStore/tables/parquet/title_principals_df.parquet")

title_ratings_df.write.parquet("/FileStore/tables/parquet/title_ratings_df.parquet")

name_basics_df.write.parquet("/FileStore/tables/parquet/name_basics.parquet")

>> Reload the saved dataframes

In [None]:
title_basics_df = spark.read.parquet("/FileStore/tables/parquet/title_basics_df.parquet")

title_principals_df = spark.read.parquet("/FileStore/tables/parquet/title_principals_df.parquet")

title_ratings_df = spark.read.parquet("/FileStore/tables/parquet/title_ratings_df.parquet")

name_basics_df = spark.read.parquet("/FileStore/tables/parquet/name_basics.parquet")

### Network Inference, Let’s build a network
In the following questions you will look to summarise the data and build a network. We want to examine a network that abstracts how actors and actress are related through their co-participation in movies. To that end perform the following steps:

**Q1** Create a DataFrame that combines **all the information** on each of the titles (i.e., movies, tv-shows, etc …) and **all of the information** the participants in those movies (i.e., actors, directors, etc … ), make sure the actual names of the movies and participants are included. It may be worth reviewing the following questions to see how this dataframe will be used.

How many rows does your dataframe have?

In [None]:
# We combine the dataframes df_title_principals + df_title_basics + df_name_basics + df_title_ratings(for later use)
# [no akas, no episode, no crew] as we'll not be using them later
# example use:
# -for Q2: title_principals, title_basics, name_basics
# -for Q9: title_ratings

# left join on title basics
network_df = title_basics_df \
  .join(title_principals_df, "tconst", "left") \
  .join(name_basics_df, "nconst", "left") \
  .join(title_ratings_df, "tconst", "left")

network_df.cache()

DataFrame[tconst: string, nconst: string, titleType: string, primaryTitle: string, originalTitle: string, isAdult: int, startYear: int, endYear: int, runtimeMinutes: int, genres: string, ordering: int, category: string, job: string, characters: string, primaryName: string, birthYear: int, deathYear: int, primaryProfession: string, knownForTitles: string, averageRating: double, numVotes: int]

In [None]:
network_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- nconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: integer (nullable = true)
 |-- startYear: integer (nullable = true)
 |-- endYear: integer (nullable = true)
 |-- runtimeMinutes: integer (nullable = true)
 |-- genres: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)
 |-- characters: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: integer (nullable = true)
 |-- deathYear: integer (nullable = true)
 |-- primaryProfession: string (nullable = true)
 |-- knownForTitles: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: integer (nullable = true)



In [None]:
network_df.count()

87295049

#### Q1 ANSWER: the created dataframe has *87295049* rows.

**Q2** Create a new DataFrame based on the previous step, with the following removed:
1. Any participant that is not an actor or actress (as measured by the category column);
1. All adult movies;
1. All dead actors or actresses;
1. All actors or actresses born before 1920 or with no date of birth listed;
1. All titles that are not of the type movie.

How many rows does your dataframe have?

In [None]:
network_filtered_df = network_df \
  .filter((F.col("category") == "actor") | (F.col("category") == "actress")) \
  .filter(F.col("isAdult") == 0) \
  .filter(F.col("deathYear").isNull()) \
  .filter(F.col("birthYear") >= 1920) \
  .filter(F.col("titleType") == "movie")

network_filtered_df.cache()

930743


In [None]:
print(network_filtered_df.count())

930743


#### Q2 ANSWER: the created dataframe has *930743* rows.

**Q3** Convert the above Dataframe to an RDD. Use map and reduce to create a paired RDD which counts how many movies each actor / actress appears in.

Display names of the top 10 actors/actresses according to the number of movies in which they appeared. Be careful to deal with different actors / actresses with the same name, these could be different people.

In [None]:
# convert DataFrame to RDD
network_filtered_rdd = network_filtered_df.rdd

# *using 'nconst' instead of 'primaryName' in order to avoid losing info due to actors/actresses with the same name
# pair RDD | count appearances of each actor/actress
movies_count_df = network_filtered_rdd \
  .map(lambda r: (r["nconst"], 1)) \
  .reduceByKey(lambda a, b: a + b) \
  .toDF(["nconst", "movie_count"])
# sort from top movie count
actors_raking_df = movies_count_df \
  .join(name_basics_df, "nconst") \
  .orderBy(F.col("movie_count").desc())

In [None]:
# Display the names of the top 10 actors/actresses based on movie count
actors_raking_df \
  .limit(10) \
  .select("primaryName", "movie_count") \
  .show()

+-----------------+-----------+
|      primaryName|movie_count|
+-----------------+-----------+
|     Brahmanandam|       1130|
|Jagathy Sreekumar|        659|
|    Shakti Kapoor|        600|
|     Eric Roberts|        492|
|      Aruna Irani|        467|
|           Nassar|        440|
|        Mammootty|        437|
|            Helen|        433|
|Tanikella Bharani|        412|
|      Anupam Kher|        409|
+-----------------+-----------+



#### Q3 ANSWER: top 10 actors/actresses (according to the number of movies in which they appeared):

|      primaryName|movie_count|
|-----------------|-----------|
|     Brahmanandam|       1130|
|Jagathy Sreekumar|        659|
|    Shakti Kapoor|        600|
|     Eric Roberts|        492|
|      Aruna Irani|        467|
|           Nassar|        440|
|        Mammootty|        437|
|            Helen|        433|
|Tanikella Bharani|        412|
|      Anupam Kher|        409|

**Q4** Start with the dataframe from Q2. Generate a DataFrame that lists all links of your network. Here we shall consider that a link connects a pair of actors/actresses if they participated in at least one movie together (actors / actresses should be represented by their unique ID's). For every link we then need anytime a pair of actors were together in a movie as a link in each direction (A -> B and B -> A). However links should be distinct we do not need duplicates when two actors worked together in several movies. 

Display a DataFrame with the first 10 edges.

In [None]:
# select columns of title-id and name-id 
work_links_df = network_filtered_df.select("tconst", "nconst")

# perform a self-join to find links (=pairs of actors/actresses who participated in the same movie)
pairs_df = work_links_df \
  .alias("a") \
  .join(work_links_df.alias("b"), "tconst") \
  .filter(F.col("a.nconst") < F.col("b.nconst"))

# create a union and drop the duplicates
bd_pairs_df = pairs_df \
  .select(F.col("a.nconst").alias("actor1"), F.col("b.nconst").alias("actor2")) \
  .union(pairs_df.select(F.col("b.nconst").alias("actor1"), F.col("a.nconst").alias("actor2"))) \
  .distinct()

In [None]:
# display first 10 pairs
bd_pairs_df.show(10)

+---------+---------+
|   actor1|   actor2|
+---------+---------+
|nm0344892|nm1625681|
|nm0241222|nm0991758|
|nm0045119|nm0222426|
|nm0000420|nm1249052|
|nm0001857|nm3571592|
|nm0578935|nm0723172|
|nm0041517|nm0350208|
|nm0028846|nm1940584|
|nm0059077|nm0109386|
|nm0972598|nm5000434|
+---------+---------+
only showing top 10 rows



#### Q4 ANSWER: 10 first pairs of actors/actresses who participated in the same movie in the dataframe 'bd_pairs_df'


| nconst     | nconst     |
|------------|------------|
|nm0344892|nm1625681|
|nm0241222|nm0991758|
|nm0045119|nm0222426|
|nm0000420|nm1249052|
|nm0001857|nm3571592|
|nm0578935|nm0723172|
|nm0041517|nm0350208|
|nm0028846|nm1940584|
|nm0059077|nm0109386|
|nm0972598|nm5000434|

**Q5** Compute the page rank of each actor. This can be done using GraphFrames or
by using RDDs and the iterative implementation of the PageRank algorithm. Do not take
more than 5 iterations and use reset probility = 0.1.

List the top 10 actors / actresses by pagerank.

*Install graphframes library: clusters -> click on working cluster -> "Libraries" tab -> "Install New" -> "Maven" -> in coordinates enter <br> '**graphframes:graphframes:0.8.1-spark3.0-s_2.12**' -> install*

In [None]:
# using GraphFrames #

# parameters:
# reset_probability = 0.1
# iterations = 5
# using dataframe from Q4

# create vertices df with distinct actors/actresses
vertices_df = network_filtered_df \
  .select("nconst", "primaryName") \
  .distinct() \
  .withColumnRenamed("nconst", "id") # required by GraphFrame library
# create edges df with links between actors/actresses
edges_df = bd_pairs_df.select(
  F.col("actor1").alias("src"),
  F.col("actor2").alias("dst")
)

In [None]:
# graphframe
graph = GraphFrame(vertices_df, edges_df)
#compute pagerank
pagerank_results = graph.pageRank(resetProbability=0.1, maxIter=5)



In [None]:
# getting the top 10 actors/actresses by PageRank
top_actors = pagerank_results.vertices \
  .orderBy(F.col("pagerank").desc()) \
  .limit(10)

In [None]:
# display
top_actors.select("id", "primaryName", "pagerank").show()

+---------+----------------+------------------+
|       id|     primaryName|          pagerank|
+---------+----------------+------------------+
|nm0000616|    Eric Roberts| 63.18792464931231|
|nm0000514|  Michael Madsen| 34.07127419751417|
|nm0001803|     Danny Trejo| 26.68548298873733|
|nm0202966|     Keith David|24.948370778119802|
|nm0001595|    Michael Paré| 24.44215091221613|
|nm0261724|     Joe Estevez|23.999657624728556|
|nm0726223|  Richard Riehle|23.060298350193882|
|nm0000532|Malcolm McDowell| 22.96604219239015|
|nm0442207|   Lloyd Kaufman|22.866559524956568|
|nm0000448| Lance Henriksen|22.373924174702854|
+---------+----------------+------------------+



#### Q5 ANSWER: The 10 most influential actors/actresses within the IMDb network (pagerank) :


|id          |primaryName |pagerank    |
|------------|------------|------------|
|nm0000616|    Eric Roberts| 63.18792464931231|
|nm0000514|  Michael Madsen| 34.07127419751417|
|nm0001803|     Danny Trejo| 26.68548298873733|
|nm0202966|     Keith David|24.948370778119802|
|nm0001595|    Michael Paré| 24.44215091221613|
|nm0261724|     Joe Estevez|23.999657624728556|
|nm0726223|  Richard Riehle|23.060298350193882|
|nm0000532|Malcolm McDowell| 22.96604219239015|
|nm0442207|   Lloyd Kaufman|22.866559524956568|
|nm0000448| Lance Henriksen|22.373924174702854|

**Q6**: Create an RDD with the number of outDegrees for each actor. Display the top 10 by outdegrees.

In [None]:
# from graphframe of Q5
out_degrees = graph.outDegrees
# sorting actors/actresses by outDegree
out_degrees_rdd = out_degrees.rdd \
  .map(lambda r: (r["id"], r["outDegree"])) \
  .sortBy(lambda x: x[1], ascending=False)

In [None]:
print(out_degrees_rdd.take(10))

[('nm0000616', 1337), ('nm0000514', 841), ('nm0451600', 760), ('nm0202966', 707), ('nm0410902', 703), ('nm0621937', 688), ('nm0000367', 677), ('nm0001803', 663), ('nm0256628', 658), ('nm0695177', 648)]


#### Q6 ANSWER: The 10 actors/actresses based on the number of collaborations (movies) they've been involved (outdegrees):

|       id|outDegree|
|---------|---------|
|nm0000616|     1337|
|nm0000514|      841|
|nm0451600|      760|
|nm0202966|      706|
|nm0410902|      703|
|nm0621937|      688|
|nm0000367|      676|
|nm0001803|      663|
|nm0256628|      658|
|nm0695177|      648|

### Let’s play Kevin’s own game

**Q7** Start with the graphframe / dataframe you developed in the previous questions. Using Spark GraphFrame and/or Spark Core library perform the following steps:

1. Identify the id of Kevin Bacon, there are two actors named ‘Kevin Bacon’, we will use the one with the highest degree, that is, the one that participated in most titles;
1. Estimate the shortest path between every actor in the database actors and Kevin Bacon, keep a dataframe with this information as you will need it later;
1. Summarise the data, that is, count the number of actors at each number of degress from kevin bacon (you will need to deal with actors unconnected to kevin bacon, if not connected to Kevin Bacon given these actors / actresses a score/degree of 20).

In [None]:
#### 1 ####
# getting id of Kevin Bacon by filtering names and sorting outdegrees
kevin_bacon_id = vertices_df \
  .filter((F.col("primaryName") == "Kevin Bacon")) \
  .join(out_degrees, vertices_df.id == out_degrees.id) \
  .orderBy(F.col("outDegree").desc()) \
  .first()["id"]

In [None]:
#### 2 ####
# creating a dataframe with the shortest paths between every actor and Kevin Bacon
shortest_paths = graph.shortestPaths(landmarks=[kevin_bacon_id])

distances_df = shortest_paths \
  .select("id", F.explode("distances") \
  .alias("landmark", "distance")) \
  .filter(F.col("landmark") == kevin_bacon_id)



In [None]:
#### 3 ####
distances_df = distances_df \
  .withColumn("distance", F.col("distance").cast("int"))

all_actors_df = vertices_df \
  .select(F.col("id") \
  .alias("actor_id"))

# joining id and distance of actor/actress
distances_df = all_actors_df \
  .join(distances_df, all_actors_df.actor_id == distances_df.id, "left_outer") \
  .drop("id")

# impute value 20 for actors with no connection to Kevin Bacon
distances_df = distances_df.na.fill({"distance": 20})

# number of actors with each distance, sorted by distance
summary_df = distances_df \
  .groupBy("distance") \
  .count() \
  .orderBy("distance")

In [None]:
summary_df.show()

+--------+-----+
|distance|count|
+--------+-----+
|       0|    1|
|       1|  354|
|       2|14170|
|       3|58562|
|       4|42460|
|       5| 4842|
|       6|  510|
|       7|   56|
|       8|   20|
|       9|    3|
|      20|15329|
+--------+-----+



#### Q7 ANSWER: 
1) Stored in 'kevin_bacon_id'

2) Created: 'df_shortest_paths'

3) 
|distances|count|
|---------|-----|
|       0|    1|
|       1|  354|
|       2|14170|
|       3|58562|
|       4|42460|
|       5| 4842|
|       6|  510|
|       7|   56|
|       8|   20|
|       9|    3|
|      20|15329|

### Exploring the data with RDD's

Using RDDs and (not dataframes) answer the following questions (if you loaded your data into spark in a dataframe you can convert to an RDD of rows easily using `.rdd`):

**Q8** Movies can have multiple genres. Considering only titles of the type 'movie' what is the combination of genres that is the most popluar (as measured by number of reviews). Hint: paired RDD's will be useful.

In [None]:
def create_genres_votes_pairs(r):
  """
  Creates pairs of genres combinations with the number of votes.

  Args:
    r (dict): A dictionary with genres and numVotes.

  Returns:
    list: A list with tuples of sorted genre combination and the number of votes.
          Returns an empty list if there is no genres or numVotes is None.
  """

  if r["genres"] and r["numVotes"] is not None:
    # sorting the list will apply determinism in the combinations
    genres_sorted_list = sorted(r["genres"].split(','))
    genre_combination = ', '.join(genres_sorted_list)
    return [(genre_combination, r["numVotes"])]

  return []

In [None]:
# getting the rating of each movie by 'tconst'
movies_ratings_df = title_basics_df \
  .filter(F.col("titleType") == "movie") \
  .join(title_ratings_df, "tconst", "left")
# convert to RDD
movies_ratings_rdd = movies_ratings_df.rdd
# sort combinations of genres by popularity (= number of votes)
most_popular_genres_combinations = movies_ratings_rdd \
  .flatMap(create_genres_votes_pairs) \
  .reduceByKey(lambda a, b: a + b) \
  .sortBy(lambda x: x[1], ascending=False) \
  .take(1)


In [None]:
print(most_popular_genres_combinations)

[('Action, Adventure, Sci-Fi', 54358980)]


#### Q8 ANSWER: Most popular combination of genres, as measures by number of reviews: 'Action, Adventure, Sci-Fi'

**Q9** Movies can have multiple genres. Considering only titles of the type 'movie', and movies with more than 400 ratings, what is the combination of genres that has the highest **average movie rating** (you can average the movie rating for each movie in that genre combination). Hint: paired RDD's will be useful.

In [None]:
def create_genre_reviews_pairs(r):
  """
  Creates pairs of genres combinations with the number of votes.

  Args:
    r (dict): A dictionary with genres and numVotes.

  Returns:
    list: A list with tuples of sorted genre combination and the number of votes.
          Returns an empty list if there is no genres or numVotes is None.
  """  

  if r['genres'] and r['averageRating'] is not None:
    genres_sorted_list = sorted(r['genres'].split(','))
    genre_combination = ', '.join(genres_sorted_list)
    return [(genre_combination, (r['averageRating'], 1))]

  return []

In [None]:
# Filter movies with more than 400 votes, join title basics with ratings data
movies_ratings_df = title_basics_df \
  .join(title_ratings_df, "tconst", "inner") \
  .filter((F.col("titleType") == "movie") & (F.col("numVotes") > 400))
# convert to RDD
movies_ratings_rdd = movies_ratings_df.rdd
# sort combinations of genres by highest average rating
highest_combination = movies_ratings_rdd \
  .flatMap(create_genre_reviews_pairs) \
  .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
  .mapValues(lambda x: x[0] / x[1]) \
  .sortBy(lambda x: x[1], ascending=False) \
  .take(1)


In [None]:
print(highest_combination)

[('Action, Documentary, Mystery', 8.3)]


#### Q9 ANSWER: Most popular combination of genres with the highest average movie rating:'Action, Documentary, Mystery'

**Q10** Movies can have multiple genres. What is **the individual genre** which is the most popular as meaured by number of votes. Votes for multiple genres count towards each genre listed. Hint: flatmap and pairedRDD's will be useful here.

In [None]:
def create_genre_votes_pairs(r):
  """
  Creates pairs of genre with the number of votes.

  Args:
    r (dict): A dictionary with genres and numVotes as keys.

  Returns:
    list: A list with tuples of genre and the number of votes.
          Returns an empty list if there is no genres or numVotes is None.
  """  

  if r['genres'] and r['numVotes'] is not None:
    return [(genre, r['numVotes']) for genre in r['genres'].split(',')]
  
  return []

In [None]:
# filter only the movies
movies_ratings_filtered_df = movies_ratings_df \
  .filter(F.col("titleType") == "movie")

# convert to RDD 
movies_ratings_rdd = movies_ratings_filtered_df.rdd
# sort genres (individually) by number of votes
most_popular_genre = movies_ratings_rdd \
  .flatMap(create_genre_votes_pairs) \
  .reduceByKey(lambda a, b: a + b) \
  .sortBy(lambda x: x[1], ascending=False)

In [None]:
print(most_popular_genre.take(1))

[('Drama', 564014416)]


#### Q10 ANSWER: Most popular (individual) genre, as measures by number of votes: 'Drama'

## Engineering the perfect cast
We have created a number of potential features for predicting the rating of a movie based on its cast. Use sparkML to build a simple linear model to predict the rating of a movie based on the following features:

1. The total number of movies in which the actors / actresses have acted (based on Q3)
1. The average pagerank of the cast in each movie (based on Q5)
1. The average outDegree of the cast in each movie (based on Q6)
1. The average value for for the cast of degrees of Kevin Bacon (based on Q7).

You will need to create a dataframe with the required features and label. Use a pipeline to create the vectors required by sparkML and apply the model. Remember to split your dataset, leave 30% of the data for testing, when splitting your data use the option seed=0.

**Q11** Provide the coefficients of the regression and the accuracy of your model on that test dataset according to RSME.

In [None]:
# Q3-> actors_raking_df
# Q5-> pagerank_results
# Q6-> out_degrees
# Q7-> distances_df
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import MinMaxScaler

In [None]:
def evaluate_model(model, predictions):
  """
  It runs the regression evaluator using RMSE as the metric,
  then shows the RMSE, and the coefficients.

  Args:
    model (object): The model to be evaluated
    predictions (object): The predictions
  """

  evaluator = RegressionEvaluator(
    labelCol="averageRating",
    predictionCol="prediction",
    metricName="rmse"
  )

  results = {
    "rmse": evaluator.evaluate(predictions),
  }
  
  if hasattr(model.stages[-1], 'coefficients'):
    results.update({
      "coefficients": model.stages[-1].coefficients,
      "intercept": model.stages[-1].intercept
    })

  display(results)

In [None]:
def get_pipeline(features, model):
  """
  It buils a pipeline with an imputer, assembler, and scaler

  Args:
    features (list): The features
    model (object): The model to be used

  Returns:
    pipeline: a new pipeline
  """

  # impute missing values with 0 yields better results than mode, median or mean
  imputer = Imputer(missingValue=0, inputCols=features, outputCols=features)
  assembler = VectorAssembler(inputCols=features, outputCol="assembled_features")
  scaler = MinMaxScaler(inputCol="assembled_features", outputCol="features")

  return Pipeline(stages=[imputer, assembler, scaler, model])

In [None]:
# feature 1
movies_count_df = actors_raking_df \
  .select("nconst", "movie_count")
# feature 2
pagerank_df = pagerank_results.vertices \
  .select("id", "pagerank") \
  .withColumnRenamed("id", "nconst")
# feature 3
out_degrees_df = out_degrees \
  .select("id", "outDegree") \
  .withColumnRenamed("id", "nconst")
# feature 4
distance_from_kevin_df = distances_df \
  .select("actor_id", "distance") \
  .withColumnRenamed("actor_id", "nconst")
# aggregating features in a dataframe
features_df = network_filtered_df \
  .join(movies_count_df, "nconst", "left") \
  .join(pagerank_df, "nconst", "left") \
  .join(out_degrees_df, "nconst", "left") \
  .join(distance_from_kevin_df, "nconst", "left") \
  .groupBy("tconst") \
  .agg(
    F.avg("movie_count").alias("averageMovieCount"),
    F.avg("pagerank").alias("averagePagerank"),
    F.avg("outDegree").alias("averageOutDegree"),
    F.avg("distance").alias("averageDistance")
  )
# this variable containes the names of the features
features = ["averageMovieCount", "averagePagerank", "averageOutDegree", "averageDistance"]
# select features and label
features_label_df = features_df \
  .join(title_ratings_df, "tconst") \
  .select(*features, "averageRating")

In [None]:
# data split
(train_df, test_df) = features_label_df.randomSplit([0.7, 0.3], seed=0)
train_df.show(1)

+-----------------+-------------------+----------------+---------------+-------------+
|averageMovieCount|    averagePagerank|averageOutDegree|averageDistance|averageRating|
+-----------------+-------------------+----------------+---------------+-------------+
|              1.0|0.10761500911483049|            NULL|           20.0|          2.4|
+-----------------+-------------------+----------------+---------------+-------------+
only showing top 1 row



In [None]:
pipeline = get_pipeline(features, LinearRegression(featuresCol="assembled_features", labelCol="averageRating"))

In [None]:
# fitting the model
model = pipeline.fit(train_df)

In [None]:
predictions = model.transform(test_df) # making the predictions
evaluate_model(model, predictions) #evaluating the model

{'rmse': 1.3164892185154347,
 'coefficients': DenseVector([-0.0002, -0.1115, 0.0032, 0.0106]),
 'intercept': 5.970414512920328}

#### Q11 ANSWER:
- features: ["averageMovieCount", "averagePagerank", "averageOutDegree", "averageDistance"]
- intercept: 5.970414512920328
- coefficients: DenseVector([-0.0002, -0.1115, 0.0032, 0.0106])
- accuracy of the model according to RMSE: 1.3164892185154347

**Q12** What score would your model predict for the 1997 movie Titanic.

In [None]:
# filter the dataframe to include only the 1997 movie Titanic
titanic_df = title_basics_df \
  .filter(F.col("primaryTitle") == "Titanic") \
  .filter(F.col("startYear") == 1997) \
  .filter(F.col("titleType") == "movie")
# filter the cast dataframe to include only the cast of Titanic
titanic_cast = network_filtered_df \
  .filter(F.col("tconst") == titanic_df["tconst"]) \
  .select("nconst")
# join Titanic cast dataframe with other feature dataframes
titanic_features_df = titanic_cast \
  .join(movies_count_df, "nconst", "left") \
  .join(pagerank_df, "nconst", "left") \
  .join(out_degrees_df, "nconst", "left") \
  .join(distance_from_kevin_df, "nconst", "left") \
  .agg(
    F.avg("movie_count").alias("averageMovieCount"),
    F.avg("pagerank").alias("averagePagerank"),
    F.avg("outDegree").alias("averageOutDegree"),
    F.avg("distance").alias("averageDistance")
  )
# display first row
titanic_features_df.show(1)

+------------------+------------------+-----------------+------------------+
| averageMovieCount|   averagePagerank| averageOutDegree|   averageDistance|
+------------------+------------------+-----------------+------------------+
|39.127560454389666|3.0995923148847213|95.95603762529855|3.3580300899388984|
+------------------+------------------+-----------------+------------------+



In [None]:
# actual value of avg rating on titanic
titanic_df.join(movies_ratings_df, on="tconst", how="left").select("averageRating").show()

+-------------+
|averageRating|
+-------------+
|          7.9|
+-------------+



In [None]:
# modeling & prediction
titanic_prediction = model \
  .transform(titanic_features_df) \
  .select("prediction") \
  .first()

print(titanic_prediction["prediction"])

5.9564151871319995


#### Q12 ANSWER: Predicted average score for the 1997 movie Titanic: 5.9564151871319995

**Q13** Create dummy variables for each of the top 10 movie genres for Q10. These variable should have a value of 1 if the movie was rated with that genre and 0 otherwise. For example the 1997 movie Titanic should have a 1 in the dummy variable column for Romance, and a 1 in the dummy variable column for Drama, and 0's in all the other dummy variable columns.

Does adding these variable to the regression improve your results? What is the new RMSE and predicted rating for the 1997 movie Titanic.

In [None]:
## Create Dummy Variables
# identify the top 10 genres from Q10:most_popular_genre 
top_ten_genres = [genre for genre, _ in most_popular_genre.take(10)]
# create dummy variables for each of the top genres
for genre in top_ten_genres:
  movies_ratings_filtered_df = movies_ratings_filtered_df \
    .withColumn(genre, F.when(movies_ratings_filtered_df.genres.contains(genre), 1).otherwise(0))
# include dummy variables in the dataframe
features_with_dummies_df = features_df \
  .join(movies_ratings_filtered_df.select("tconst", *top_ten_genres), "tconst")
# combine original columns with dummy
features_with_dummies = features + top_ten_genres
# include target
features_with_dummies_label_df = features_with_dummies_df \
  .join(title_ratings_df, "tconst") \
  .select(*features_with_dummies, "averageRating")

In [None]:
# split the data
(train_df, test_df) = features_with_dummies_label_df.randomSplit([0.7, 0.3], seed=0)
train_df.show(10)

+-----------------+-------------------+----------------+---------------+-----+------+------+---------+-----+--------+------+-------+-------+------+-------------+
|averageMovieCount|    averagePagerank|averageOutDegree|averageDistance|Drama|Action|Comedy|Adventure|Crime|Thriller|Sci-Fi|Romance|Mystery|Horror|averageRating|
+-----------------+-------------------+----------------+---------------+-----+------+------+---------+-----+--------+------+-------+-------+------+-------------+
|              1.0|0.10761500911483049|            NULL|           20.0|    0|     0|     0|        0|    0|       0|     0|      0|      0|     0|          7.3|
|              1.0|0.10761500911483049|            NULL|           20.0|    0|     0|     0|        0|    0|       0|     0|      0|      0|     0|          7.5|
|              1.0|0.10761500911483049|            NULL|           20.0|    0|     0|     0|        0|    0|       0|     0|      0|      0|     0|          7.7|
|              1.0|0.1076150

In [None]:
## Regression with dummies
pipeline = get_pipeline(features_with_dummies, LinearRegression(featuresCol="assembled_features", labelCol="averageRating"))

In [None]:
# fit the model
model_with_dummies = pipeline.fit(train_df)

In [None]:
# new RMSE
predictions = model_with_dummies.transform(test_df)
evaluate_model(model_with_dummies, predictions)

{'rmse': 1.2066880758555962,
 'coefficients': DenseVector([-0.0, -0.1842, 0.0064, -0.0237, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
 'intercept': 6.003869947291806}

In [None]:
## new predicted rating for the 1997 movie Titanic
for genre in top_ten_genres:
  titanic_df = titanic_df \
    .withColumn(genre, F.when(titanic_df.genres.contains(genre), 1).otherwise(0))
# define features for the top 10 genres
titanic_features_df = titanic_features_df \
  .crossJoin(titanic_df.select(*top_ten_genres))

titanic_features_df.show()

+------------------+-----------------+-----------------+------------------+-----+------+------+---------+-----+--------+------+-------+-------+------+
| averageMovieCount|  averagePagerank| averageOutDegree|   averageDistance|Drama|Action|Comedy|Adventure|Crime|Thriller|Sci-Fi|Romance|Mystery|Horror|
+------------------+-----------------+-----------------+------------------+-----+------+------+---------+-----+--------+------+-------+-------+------+
|39.127560454389666|3.099592314884722|95.95603762529855|3.3580300899388984|    1|     0|     0|        0|    0|       0|     0|      1|      0|     0|
+------------------+-----------------+-----------------+------------------+-----+------+------+---------+-----+--------+------+-------+-------+------+



In [None]:
titanic_prediction = model_with_dummies \
  .transform(titanic_features_df) \
  .select("prediction") \
  .first()

print(titanic_prediction["prediction"])

5.967223129539539


#### Q13 Answer:
Yes, it helped improve the results a _little_ bit. The new *RMSE* is 1.20 (was 1.31), and the new *predicted* value for 1997 Titanic movie is 5.96 (was 5.95).

**Q14 - Open Question**: Improve your model by testing different machine learning algorithms, using hyperparameter tuning on these algorithms, changing the included features. What is the RMSE of you final model and what rating does it predict for the 1997 movie Titanic.

In [None]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

#### Gradient Boosted Tree Regressor

In [None]:
pipeline = get_pipeline(features_with_dummies, GBTRegressor(featuresCol="features", labelCol="averageRating", maxDepth=20, seed=42))

In [None]:
gbt_model = pipeline.fit(train_df)

In [None]:
predictions = gbt_model.transform(test_df)
evaluate_model(gbt_model, predictions)

{'rmse': 1.0274561282609307}

In [None]:
titanic_prediction = gbt_model \
  .transform(titanic_features_df) \
  .select("prediction") \
  .first()

print(titanic_prediction["prediction"])

7.544688111083342


#### Random Forest Regressor

In [None]:
pipeline = get_pipeline(features_with_dummies, RandomForestRegressor(featuresCol="features", labelCol="averageRating", numTrees=5, maxDepth=5))

In [None]:
rfr_model = pipeline.fit(train_df)

In [None]:
predictions = model.transform(test_df)
evaluate_model(rfr_model, predictions)

{'rmse': 1.2160554472690757}

In [None]:
titanic_prediction = rfr_model \
  .transform(titanic_features_df) \
  .select("prediction") \
  .first()

print(titanic_prediction["prediction"])

6.317666605896088


#### Random Forest Regressor with Hyperparameter Tunning

In [None]:
rfr = RandomForestRegressor(featuresCol="features", labelCol="averageRating")
pipeline = get_pipeline(features_with_dummies, rfr)

In [None]:
paramGrid = (ParamGridBuilder()
  .addGrid(rfr.numTrees, [10, 20, 30])
  .addGrid(rfr.maxDepth, [10, 20, 30])
  .build())

In [None]:
evaluator = RegressionEvaluator(labelCol="averageRating", predictionCol="prediction", metricName="rmse")

cross_validator = CrossValidator(
  estimator=pipeline,
  estimatorParamMaps=paramGrid,
  evaluator=evaluator,
  numFolds=3
)

In [None]:
cv_model = cross_validator.fit(train_df)

In [None]:
predictions = cv_model.transform(test_df)
print(evaluator.evaluate(predictions))

1.1546649159103306


In [None]:
titanic_prediction = cv_model \
  .transform(titanic_features_df) \
  .select("prediction") \
  .first()

print(titanic_prediction["prediction"])

6.37429394448341


#### Changing features

In [None]:
features_with_dummies_numvotes = features_with_dummies + ["numVotes"]

In [None]:
features_with_dummies_label_df = features_with_dummies_df \
  .join(title_ratings_df, "tconst") \
  .select(*features_with_dummies_numvotes, "averageRating")

In [None]:
pipeline = get_pipeline(features_with_dummies_numvotes, RandomForestRegressor(featuresCol="features", labelCol="averageRating", numTrees=5, maxDepth=5))

In [None]:
(train_df, test_df) = features_with_dummies_label_df.randomSplit([0.7, 0.3], seed=0)

In [None]:
rfr_model = pipeline.fit(train_df)

In [None]:
predictions = rfr_model.transform(test_df)
print(evaluator.evaluate(predictions))

1.1512289822609025


In [None]:
titanic_features_df = titanic_features_df \
  .crossJoin(titanic_df.join(movies_ratings_df, on="tconst", how="left").select("numVotes"))

titanic_features_df.show()

+------------------+------------------+-----------------+------------------+-----+------+------+---------+-----+--------+------+-------+-------+------+--------+
| averageMovieCount|   averagePagerank| averageOutDegree|   averageDistance|Drama|Action|Comedy|Adventure|Crime|Thriller|Sci-Fi|Romance|Mystery|Horror|numVotes|
+------------------+------------------+-----------------+------------------+-----+------+------+---------+-----+--------+------+-------+-------+------+--------+
|39.127560454389666|3.0995923148847213|95.95603762529855|3.3580300899388984|    1|     0|     0|        0|    0|       0|     0|      1|      0|     0| 1286967|
+------------------+------------------+-----------------+------------------+-----+------+------+---------+-----+--------+------+-------+-------+------+--------+



In [None]:
titanic_prediction = rfr_model \
  .transform(titanic_features_df) \
  .select("prediction") \
  .first()

print(titanic_prediction["prediction"])

7.081851678361451


#### Q14 Answer:

Our best model is **GBTRegressor**, which *predicted* 7.54 (7.9 y_true) for the 1997 Titanic movie, while a 1.02 *RMSE*.