#Pratical Exam by:

* Adriana Costinha, 20230567
* Ana Filipa Silva, 20230577

# Six Degrees of Kevin Bacon
**Introduction** - Six Degrees of Kevin Bacon is a game based on the "six degrees of separation"
concept, which posits that any two people on Earth are six or fewer acquaintance links apart. Movie
buffs challenge each other to find the shortest path between an arbitrary actor and prolific actor
Kevin Bacon. It rests on the assumption that anyone involved in the film industry can be linked
through their film roles to Bacon within six steps.
The analysis of social networks can be a computationally intensive task, especially when dealing with
large volumes of data. It is also a challenging problem to devise a correct methodology to infer an
informative social network structure. Here, we will analyze a social network of actors and actresses
that co-participated in movies. We will do some simple descriptive analysis, and in the end try to
relate an actor/actress’s position in the social network with the success of the movies in which they
participate.

#### Rules & Notes - Please take your time to read the following points:

1. The submission deadline shall be set for the 10th of June at 23:59.
2. It is acceptable that you **discuss** with your colleagues different approaches to solve each step of the problem set. You are responsible for writing your own code, and analysing the results. Clear cases of cheating will be penalized with 0 points in this assignment;
3. After review of your submission files, and before a mark is attributed, you might be called to orally defend your submission;
4. You will be scored first and foremost by the number of correct answers, secondly by the logic used in the trying to approach each step of the problem set;
5. Consider skipping questions that you are stuck in, and get back to them later;
6. Expect computations to take a few minutes to finish in some of the steps.
7. **IMPORTANT** It is expected you have developed skills beyond writting SQL queries. Any question where you directly write a SQL query (then for example create a temporary table and use spark.sql to pass the query) will receive a 25% penalty. Using the Spark syntax (for example dataframe.select("\*").where("conditions")) is acceptable and does not incur this penalty. Comment your code in a reasonable fashion.
8. **Questions** – Any questions about this assignment should be posted in the Forum@Moodle. The last class will be an open office session for anyone with questions concerning the assignment. 
9. **Delivery** - To fulfil this activity you will have to upload the following materials to Moodle:
    1. An exported IPython notebook. The notebook should be solved (have results displayed), but should contain all neccesary code so that when the notebook is run in databricks it should also replicate these results. This means the all data downloading and processing should be done in this notebook. It is also important you clearly indicate where your final answer to each question is when you are using multiple cells (for example you print "my final anwser is" before your answer or use cell comments). Please make sure to name your file in the following way: *[student_number1]_[student_number2]_submission.ipynb*. As an example: *19740001_197400010_submission.ipynb*
    2. **Delivery** - You will also need to provide a signed statement of authorship, which is present in the last page;
    3. It is recommended you read the whole assignment before starting.
    4. You can add as many cells as you like to answer the questions.
    5. You can make use of caching or persisting your RDDs or Dataframes, this may speed up performance.
    6. If you have trouble with graphframes in databricks (specifically the import statement) you need to make sure the graphframes package is installed on the cluster you are running. If you click home on the left, then click on the graphframes library, from where you can install the package on your cluster (check the graphframes checkbox and click install). Another installation option is using the JAR available on Moodle with the graphframes library.
10. **Note**: By including the name and student number of each group member  in the submission notebook, this will be considered as a declaration of authorship.

#### Data Sources and Description
We will use data from IMDB. You can download raw datafiles
from https://datasets.imdbws.com. Note that the files are tab delimited (.tsv) You can find a
description of the each datafile in https://www.imdb.com/interfaces/

## Questions
### Data loading and preparation
Review the file descriptions and load the necessary data onto your databricks cluser and into spark dataframes. You will need to use shell commands to download the data, unzip the data, load the data into spark. Note that the data might require parsing and preprocessing to be ready for the questions below.

**Hints** You can use 'gunzip' to unzip the .tz files. The data files will then be tab seperated (.tsv), which you can load into a dataframe using the tab seperated option instead of the comma seperated option we have typically used in class: `.option(“sep”,”\t”)`

In [0]:
%sh
wget -O name.basics.tsv.gz --timeout=15 https://datasets.imdbws.com/name.basics.tsv.gz
wget -O title.basics.tsv.gz --timeout=15 https://datasets.imdbws.com/title.basics.tsv.gz
wget -O title.principals.tsv.gz --timeout=15 https://datasets.imdbws.com/title.principals.tsv.gz
wget -O title.ratings.tsv.gz --timeout=15 https://datasets.imdbws.com/title.ratings.tsv.gz 

--2024-06-09 19:29:52--  https://datasets.imdbws.com/name.basics.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 99.84.66.56, 99.84.66.78, 99.84.66.98, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|99.84.66.56|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 267637948 (255M) [binary/octet-stream]
Saving to: ‘name.basics.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 6.72M 38s
    50K .......... .......... .......... .......... ..........  0% 9.13M 33s
   100K .......... .......... .......... .......... ..........  0% 8.29M 32s
   150K .......... .......... .......... .......... ..........  0% 28.6M 26s
   200K .......... .......... .......... .......... ..........  0% 12.8M 25s
   250K .......... .......... .......... .......... ..........  0% 32.3M 22s
   300K .......... .......... .......... .......... ..........  0% 29.2M 20s
   350K .......... .......... .......... .......... ..........  0% 31.0M 19s


In [0]:
%sh
gunzip /databricks/driver/name.basics.tsv.gz
gunzip /databricks/driver/title.basics.tsv.gz
gunzip /databricks/driver/title.principals.tsv.gz
gunzip /databricks/driver/title.ratings.tsv.gz

gzip: /databricks/driver/name.basics.tsv already exists;	not overwritten
gzip: /databricks/driver/title.basics.tsv already exists;	not overwritten
gzip: /databricks/driver/title.principals.tsv already exists;	not overwritten
gzip: /databricks/driver/title.ratings.tsv already exists;	not overwritten


In [0]:
name_basics_path = "file:/databricks/driver/name.basics.tsv"
title_basics_path = "file:/databricks/driver/title.basics.tsv"
title_principals_path = "file:/databricks/driver/title.principals.tsv"
title_ratings_path = "file:/databricks/driver/title.ratings.tsv"

Loading the csv files.

In [0]:
title_basics_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", "\t") \
    .load(title_basics_path)

title_principals_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", "\t") \
    .load(title_principals_path)

title_ratings_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", "\t") \
    .load(title_ratings_path)

name_basics_df = spark.read.format("csv") \
          .option("header", "true") \
          .option("inferSchema", "true") \
          .option("delimiter", "\t") \
          .load(name_basics_path)

#Preprocessing

Since most columns had the type integer in their original data, we've decided to convert each column to a more appropriate type, such as, for example, ``startYear`` being an integer, instead of a string. Also, we are splitting some strings into arrays of strings, using a comma as the delimiter. For example the column ``primaryProfession``.

In [0]:
from pyspark.sql.functions import col, split

name_basics_df = name_basics_df.withColumn("birthYear", col("birthYear").cast("integer")) \
    .withColumn("deathYear", col("deathYear").cast("integer")) \
    .withColumn("primaryProfession", split(col("primaryProfession"), ",")) \
    .withColumn("knownForTitles", split(col("knownForTitles"), ","))

title_basics_df = title_basics_df.withColumn("isAdult", col("isAdult").cast("integer")) \
    .withColumn("startYear", col("startYear").cast("integer")) \
    .withColumn("endYear", col("endYear").cast("integer")) \
    .withColumn("runtimeMinutes", col("runtimeMinutes").cast("integer")) \
    .withColumn("genres", split(col("genres"), ","))

title_principals_df = title_principals_df.withColumn("ordering", col("ordering").cast("integer"))

title_ratings_df = title_ratings_df.withColumn("averageRating", col("averageRating").cast("double")) \
    .withColumn("numVotes", col("numVotes").cast("integer"))

In [0]:
name_basics_df.show()

+---------+-------------------+---------+---------+--------------------+--------------------+
|   nconst|        primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+-------------------+---------+---------+--------------------+--------------------+
|nm0000001|       Fred Astaire|     1899|     1987|actor,miscellaneo...|tt0072308,tt00504...|
|nm0000002|      Lauren Bacall|     1924|     2014|actress,soundtrac...|tt0037382,tt00752...|
|nm0000003|    Brigitte Bardot|     1934|       \N|actress,music_dep...|tt0057345,tt00491...|
|nm0000004|       John Belushi|     1949|     1982|actor,writer,musi...|tt0072562,tt00779...|
|nm0000005|     Ingmar Bergman|     1918|     2007|writer,director,a...|tt0050986,tt00839...|
|nm0000006|     Ingrid Bergman|     1915|     1982|actress,producer,...|tt0034583,tt00368...|
|nm0000007|    Humphrey Bogart|     1899|     1957|actor,producer,mi...|tt0034583,tt00425...|
|nm0000008|      Marlon Brando|     1924|     2004|actor,dir

Althrought this notebook, we create 'checkpoints' using parquets to help us saving the results we're getting.

In [0]:
dbutils.fs.mkdirs("/pratical_exam")

Out[63]: True

In [0]:
output_path1 = "/pratical_exam/title_ratings_df.parquet"
title_ratings_df.write.mode("overwrite").parquet(output_path1)
output_path2 = "/pratical_exam/title_basics_df.parquet"
title_basics_df.write.mode("overwrite").parquet(output_path2)
output_path3 = "/pratical_exam/title_principals_df.parquet"
title_principals_df.write.mode("overwrite").parquet(output_path3)
output_path4 = "/pratical_exam/name_basics_df.parquet"
name_basics_df.write.mode("overwrite").parquet(output_path4)


### Network Inference, Let’s build a network
In the following questions you will look to summarise the data and build a network. We want to examine a network that abstracts how actors and actress are related through their co-participation in movies. To that end perform the following steps:

**Q1** Create a DataFrame that combines **all the information** on each of the titles (i.e., movies, tv-shows, etc …) and **all of the information** the participants in those movies (i.e., actors, directors, etc … ), make sure the actual names of the movies and participants are included. It may be worth reviewing the following questions to see how this dataframe will be used.

How many rows does your dataframe have?

Lets analyse each dataframe individually first, to plan how we'll merge them.

Since we know from the next questions we won't be working with tv-shows and with crew people, we decided to not merge the 'title_episode_df' and 'title_crew_df' to the main dataframe created. Additionaly, the 'title_akas_df' doesn't add any new information, and so we won't be merging it either.

In [0]:
title_df = title_basics_df.join(title_ratings_df, "tconst") #tconst: unique identifier of the title
title_principals_df = title_principals_df.join(title_df, "tconst")
merged_df = title_principals_df.join(name_basics_df, "nconst") #nconst: unique identifier of the name/person

Our final answer is:

In [0]:
print(f"Total number of rows in the dataframe is {merged_df.count()}.")

Total number of rows in the dataframe is 20097107.


In [0]:
merged_df.write.mode('overwrite').parquet("/pratical_exam/merged_df.parquet")

**Q2** Create a new DataFrame based on the previous step, with the following removed:
1. Any participant that is not an actor or actress (as measured by the category column);
1. All adult movies;
1. All dead actors or actresses;
1. All actors or actresses born before 1920 or with no date of birth listed;
1. All titles that are not of the type movie.

How many rows does your dataframe have?

In [0]:
merged_df = spark.read.parquet("/pratical_exam/merged_df.parquet")

In [0]:
#Imports:
from pyspark.sql.functions import col, isnull

1st we create a copy of the merged dataframe.

2nd: to remove all participants that aren't an actor/actress, we need to separate the array of strings 'primaryProfession'. 

3rd: remove all adult movies. 

4th: remove all death actors/actresses. 

5th: remove all actors/actresses born before 1920 or with no date of birth listed

6th: remove all title types that aren't movies.


In [0]:
final_df = merged_df.select("*") #1

final_df = final_df.filter(
    final_df.category.isin(["actor", "actress"]) & #2
    (final_df.isAdult == 0) & #3
    isnull(final_df.deathYear) & #4
    (final_df.birthYear >= 1920) & #5
    (final_df.titleType == 'movie') #6
)

We drop the now irrelevant columns:

In [0]:
columns_to_remove = ["category", "isAdult", "deathYear", "birthYear", "titleType"]
final_df = final_df.drop(*columns_to_remove)

Our final answer:

In [0]:
print(f"Total number of rows in the dataframe is {final_df.count()}.")

Total number of rows in the dataframe is 749189.


**Q3** Convert the above Dataframe to an RDD. Use map and reduce to create a paired RDD which counts how many movies each actor / actress appears in.

Display names of the top 10 actors/actresses according to the number of movies in which they appeared. Be careful to deal with different actors / actresses with the same name, these could be different people.

We start by converting the dataframe ``final_df`` to an RDD.

In [0]:
RDD = final_df.rdd

Then, we create a paired RDD, where its value is one, and key is the name of the actor/actress .

In [0]:
names_rdd = RDD.map(lambda row: ((row.nconst, row.primaryName), 1))

We count how many movies each actor appears in, and covert the counts back to a dataframe ``total_movie_df``. We sort the dataframe by descending order and show the top 10.

Our final answer is the table below:

In [0]:
total_movie_rdd = names_rdd.reduceByKey(lambda x, y: x+y)
total_movie_df = total_movie_rdd.toDF(["Actor/Actress", "Movie Count"])

top_actors = total_movie_df.sort("Movie Count", ascending=False).limit(10)
top_actors.show()

+--------------------+-----------+
|       Actor/Actress|Movie Count|
+--------------------+-----------+
|{nm0103977, Brahm...|        798|
|{nm0415549, Jagat...|        514|
|{nm0007106, Shakt...|        476|
|{nm0007123, Mammo...|        411|
|{nm0482320, Mohan...|        378|
|{nm0149822, Mithu...|        370|
|{nm0045119, Aruna...|        368|
|{nm0080238, Tanik...|        365|
|{nm0451600, Anupa...|        365|
| {nm0621937, Nassar}|        365|
+--------------------+-----------+



**Q4** Start with the dataframe from Q2. Generate a DataFrame that lists all links of your network. Here we shall consider that a link connects a pair of actors/actresses if they participated in at least one movie together (actors / actresses should be represented by their unique ID's). For every link we then need anytime a pair of actors were together in a movie as a link in each direction (A -> B and B -> A). However links should be distinct we do not need duplicates when two actors worked together in several movies. 

Display a DataFrame with the first 10 edges.

In [0]:
#Imports:
from pyspark.sql.functions import col

To generate ``links_df`` based on the ``final_df`` gotten from Q2. And we join it with itself, in order to get the pairs of actors who performed in the same movie. We then filter out the pairs with the same and select the ID columns, renaming them to ID1 and ID2. We remove the duplicates and finally show the 10 first edges.

The duplicates here, reseprent the same person "working with themselves", which doesn't make sense, to the question asked.

Our final answer is the table below:

In [0]:
network_links_df = final_df.alias("df1").join(final_df.alias("df2"), col("df1.tconst") == col("df2.tconst"))

network_links_df = network_links_df.filter(col("df1.nconst") != col("df2.nconst")).select(col("df1.nconst").alias("ID1"), col("df2.nconst").alias("ID2"))

network_links_df = network_links_df.distinct()

network_links_df.show(10)

+---------+---------+
|      ID1|      ID2|
+---------+---------+
|nm0566478|nm0388155|
|nm0001989|nm0001012|
|nm0145061|nm0760709|
|nm0723771|nm0555533|
|nm0405673|nm0107012|
|nm0161850|nm0065572|
|nm0000728|nm0265276|
|nm0846357|nm0171993|
|nm0513419|nm0000476|
|nm0841700|nm0041332|
+---------+---------+
only showing top 10 rows



**Q5** Compute the page rank of each actor. This can be done using GraphFrames or
by using RDDs and the iterative implementation of the PageRank algorithm. Do not take
more than 5 iterations and use reset probility = 0.1.

List the top 10 actors / actresses by pagerank.

In [0]:
#Imports:
from graphframes import *

We start by creating two dataframes ``vertices`` and ``edges`` where each represent the vertices and edges of a Graph, ``gf``. The edges represent the links between actors that participat in the same movie, while the vestices represent the actors themselves.

Side note: The graphframe expects the columns representing vertices to be called 'id' and 'name', as well as the source and destination vertex ids to be called 'src' and 'dst', respectively.

In [0]:
vertices = final_df.select(col("nconst").alias("id"), col("primaryName").alias("name")).distinct()

edges = network_links_df.withColumnRenamed("ID1", "src").withColumnRenamed("ID2", "dst")

gf = GraphFrame(vertices, edges)



After generating the graph, we then create the PageRank, order it descendingly and show the top10 actor.

Our final answer is the table below:

In [0]:
pagerank = gf.pageRank(resetProbability=0.1, maxIter=5)
pagerank.vertices.orderBy("pagerank", ascending=False).show(10)



+---------+----------------+------------------+
|       id|            name|          pagerank|
+---------+----------------+------------------+
|nm0000616|    Eric Roberts| 51.50617837766773|
|nm0000514|  Michael Madsen| 31.78200213665834|
|nm0001803|     Danny Trejo| 23.46116322012824|
|nm0202966|     Keith David|22.650591633177818|
|nm0261724|     Joe Estevez|21.939555266738925|
|nm0000448| Lance Henriksen|21.811845509555827|
|nm0000532|Malcolm McDowell|21.317152766292033|
|nm0001595|    Michael Paré|21.074833945156218|
|nm0726223|  Richard Riehle|20.165281108827507|
|nm0000367|Gérard Depardieu|19.713805782406496|
+---------+----------------+------------------+
only showing top 10 rows



**Q6**: Create an RDD with the number of outDegrees for each actor. Display the top 10 by outdegrees.

We create ``od_rdd`` and sort it descendingly returning the top 10 outdegrees.

Side note: Since we're working with an RDD, we cannot use the command ``.show()``, so we used a for loop.

The result below is our final answer:

In [0]:
od_rdd = gf.outDegrees.rdd.map(lambda x: (x.id, x.outDegree))

top10_actors_od = od_rdd.sortBy(lambda x: x[1], ascending=False)

top10_actors = top10_actors_od.take(10)

for actor in top10_actors:
    print(actor)



('nm0000616', 1118)
('nm0000514', 790)
('nm0451600', 722)
('nm0000367', 657)
('nm0202966', 656)
('nm0621937', 641)
('nm0410902', 623)
('nm0256628', 616)
('nm0695177', 606)
('nm0000168', 604)


### Let’s play Kevin’s own game

**Q7** Start with the graphframe / dataframe you developed in the previous questions. Using Spark GraphFrame and/or Spark Core library perform the following steps:

1. Identify the id of Kevin Bacon, there are two actors named ‘Kevin Bacon’, we will use the one with the highest degree, that is, the one that participated in most titles;
1. Estimate the shortest path between every actor in the database actors and Kevin Bacon, keep a dataframe with this information as you will need it later;
1. Summarise the data, that is, count the number of actors at each number of degress from kevin bacon (you will need to deal with actors unconnected to kevin bacon, if not connected to Kevin Bacon given these actors / actresses a score/degree of 20).

In [0]:
#Imports:
from graphframes import GraphFrame
from pyspark.sql.functions import col, lit, when, count

We start by converting the ``od_rdd`` back to a dataframe, because it's easier to join with vertices. 

Then, we filter the IDs for Kevin Bacon and join them with the ``od_df`` resulting on the highest degree ID, meaning the ID correspondent to the actor that has participated in the most number of titles

In [0]:
od_df = od_rdd.toDF(["id", "outDegree"])

kb_ids = gf.vertices.filter("name = 'Kevin Bacon'") 

kb_highest_id = kb_ids.join(od_df, kb_ids.id == od_df.id) \
                                            .orderBy(col("outDegree").desc()) \
                                            .select(kb_ids.id) \
                                            .first()[0]
                                            
print(kb_highest_id)

nm0000102


Next, we calculate the shortest path from all actors to Kevin Bacon, ``shortest_paths``.

Finally, we extract the distance to Kevin Bacon from the resulting 'distances' column. The function ``getItem`` retrieves the distance to Kevin Bacon and store it to a new column called ``distance``.

In [0]:

shortest_paths = gf.shortestPaths(landmarks=[kb_highest_id])
kb_dist = shortest_paths.withColumn("distance", col("distances").getItem(kb_highest_id))



Finally, we fill the distance with 20, as asked, to those who aren't connected to Kevin Bacon.

The table below is our final answer:

In [0]:
kb_dist = kb_dist.fillna({'distance': 20})
numb_count = kb_dist.groupBy('distance').count().orderBy("distance")
numb_count.show()

+--------+-----+
|distance|count|
+--------+-----+
|       0|    1|
|       1|  334|
|       2|13313|
|       3|54196|
|       4|38619|
|       5| 4052|
|       6|  319|
|       7|   32|
|       8|    3|
|      20|10284|
+--------+-----+



### Exploring the data with RDD's

Using RDDs and (not dataframes) answer the following questions (if you loaded your data into spark in a dataframe you can convert to an RDD of rows easily using `.rdd`):

**Q8** Movies can have multiple genres. Considering only titles of the type 'movie' what is the combination of genres that is the most popluar (as measured by number of reviews). Hint: paired RDD's will be useful.

In [0]:
ratings_path ='/pratical_exam/title_ratings_df.parquet'
ratings_df = spark.read.parquet(ratings_path)

titles_path ='/pratical_exam/title_basics_df.parquet'
titles_df = spark.read.parquet(titles_path)

Transforming the dataframes in RDDs and then filter the titles by the type movie.

In [0]:
ratings_rdd = ratings_df.rdd
titles_rdd = titles_df.rdd
movies_rdd = titles_rdd.filter(lambda row: row['titleType'] == 'movie')

Next, we map ``movies_rdd`` to create key-value pairs where the key is 'tconst' and the value is the entire row. Similarly, we map ``ratings_rdd`` to create key-value pairs, where the key is also 'tconst' and the value is a tuple containing ``averageRating`` and ``numVotes``. Finally, we join both rdds by 'tconst' as we can see below:

In [0]:
movie_ratings_rdd = movies_rdd.map(lambda row: (row['tconst'], row)) \
                              .join(ratings_rdd.map(lambda row: (row.tconst, (row.averageRating, row.numVotes)))) \
                              .map(lambda x: (x[1][0]['genres'], x[1][1]))

for row in movie_ratings_rdd.take(2):
  print(row)

(['Action', 'Adventure', 'Biography'], (6.0, 906))
(['Drama'], (4.3, 25))


After joining both rdds, now ``movie_ratings_rdd``, we map it in order to only get the array of movie genres, the average rating and number of votes. Meaning, each element of ``movie_ratings_rdd`` becomes a tuple where the first element is the array of genres and the second element is the number of voters, as we can see below:

In [0]:
genre_votes_rdd = movie_ratings_rdd.map(lambda x: (tuple(x[0]), x[1][1]))
for row in genre_votes_rdd.take(2):
  print(row)

(('Action', 'Adventure', 'Biography'), 906)
(('Drama',), 25)


Finally, we sum the number of votes for each combination of genres and find the combination with the highest number of votes.

In [0]:
genre_votes_sum_rdd = genre_votes_rdd.reduceByKey(lambda x, y: x + y)
for row in genre_votes_sum_rdd.take(2):
  print(row)

(('\\N',), 235813)
(('War',), 44332)


Our final answer:

In [0]:
most_popular_genre = genre_votes_sum_rdd.max(key=lambda x: x[1])

print("Most popular combination of genres, by number of reviews:", most_popular_genre)

Most popular combination of genres, by number of reviews: (('Action', 'Adventure', 'Sci-Fi'), 54358980)


**Q9** Movies can have multiple genres. Considering only titles of the type 'movie', and movies with more than 400 ratings, what is the combination of genres that has the highest **average movie rating** (you can average the movie rating for each movie in that genre combination). Hint: paired RDD's will be useful.

We start by filtering movies with more than 400 ratings, since we already only have the type 'movie' filtered.

In [0]:
min_movies_rdd = movie_genre_rdd.filter(lambda x: x[1][1] > 400)
for row in min_movies_rdd.take(2):
  print(row)

(['Action', 'Adventure', 'Biography'], (6.0, 906))
(['Adventure', 'Drama', 'Fantasy'], (7.0, 3495))


Next, we map the ``min_movies_rdd`` to extract the genre combinations and their corresponding ratings with a count of 1 - indicating that this record represents one movie.

In [0]:
genre_rating_count_rdd = min_movies_rdd.map(lambda x: (tuple(x[0]), (x[1][0], 1)))
for row in genre_votes_rdd.take(2):
  print(row)

(('Action', 'Adventure', 'Biography'), 906)
(('Drama',), 25)


Then we calculate average ratings for each unique combination of movie genres


In [0]:
genre_rating_sum_count_rdd = genre_rating_count_rdd.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) #sum
for row in genre_votes_rdd.take(2):
  print(row)

(('Action', 'Adventure', 'Biography'), 906)
(('Drama',), 25)


In [0]:
genre_avg_rating_rdd = genre_rating_sum_count_rdd.map(lambda x: (x[0], x[1][0] / x[1][1])) #dividing to average
for row in genre_votes_rdd.take(2):
  print(row)

(('Action', 'Adventure', 'Biography'), 906)
(('Drama',), 25)


We find the most popular genre combination based on the highest average rating

In [0]:
most_popular_genre_by_avg_rating = genre_avg_rating_rdd.max(key=lambda x: x[1])
print(most_popular_genre_by_avg_rating)

(('Action', 'Documentary', 'Mystery'), 8.3)


Our final answer:

In [0]:
print("The combination of genres that has the highest average movie ratings, for the titles of the type 'movie' with more than 400 ratings is:", most_popular_genre_by_avg_rating)

The combination of genres that has the highest average movie ratings, for the titles of the type 'movie' with more than 400 ratings is: (('Action', 'Documentary', 'Mystery'), 8.3)


**Q10** Movies can have multiple genres. What is **the individual genre** which is the most popular as meaured by number of votes. Votes for multiple genres count towards each genre listed. Hint: flatmap and pairedRDD's will be useful here.

We first convert the Dataframes to RDD's.

In [0]:
ratings_rdd = ratings_df.rdd
titles_rdd = titles_df.rdd

We filter only the movies.

In [0]:
movies_rdd = titles_rdd.filter(lambda row: row.titleType == 'movie')


We start by pairing each movie with its vote count using a unique movie identifier. Then for movies of multiple genres, we ensure each genre gets counted separately by distributing the movie’s votes across its genres


First, we pair each movie by its ID 'tconst' with its corresponding row. Then, we pair each movie, again, by its ID 'tconst' but now with the corresponding number of votes. We join both of these maps and flatten the joined rdd. In this 'flatMap' function, it creates a tuple of the genre and the number of votes.

In [0]:
movie_ratings_rdd = movies_rdd.map(lambda row: (row.tconst, row)) \ #tuple w key= 'tconst' and value= entire row
                              .join(ratings_rdd.map(lambda row: (row.tconst, row.numVotes))) \
                              .flatMap(lambda x: [(genre, x[1][1]) for genre in x[1][0]['genres']])

After extracting genres and their corresponding number of votes from the joined rdd, we need to aggregate the total number of votes for each genre. So we use the `reduceByKey` function that groups the ``movie_ratings_rdd`` by key, genres, and then we sum the votes for each genre. This way we're capable of getting the highest total number of votes by using the `max` function.


In [0]:
genre_votes_sum_rdd = movie_ratings_rdd.reduceByKey(lambda x, y: x + y)


In [0]:

most_popular_genre = genre_votes_sum_rdd.max(key=lambda x: x[1])



Our final answer:

In [0]:
print("The individual genre which is the most popular as meaured by number of votes is: ", most_popular_genre)

The individual genre which is the most popular as meaured by number of votes is:  ('Drama', 572549123)


## Engineering the perfect cast
We have created a number of potential features for predicting the rating of a movie based on its cast. Use sparkML to build a simple linear model to predict the rating of a movie based on the following features:

1. The total number of movies in which the actors / actresses have acted (based on Q3)
1. The average pagerank of the cast in each movie (based on Q5)
1. The average outDegree of the cast in each movie (based on Q6)
1. The average value for for the cast of degrees of Kevin Bacon (based on Q7).

You will need to create a dataframe with the required features and label. Use a pipeline to create the vectors required by sparkML and apply the model. Remember to split your dataset, leave 30% of the data for testing, when splitting your data use the option seed=0.

**Q11** Provide the coefficients of the regression and the accuracy of your model on that test dataset according to RSME.

In [0]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.sql.functions import sum as _sum

We start by creating a dataframe with only the IDs of this project, the 'tconst' - movie id - and the 'nconst' - actor names. This will serve as a type of a 'linking table': ``ids_df``.

To the ``total_movie_df`` we add a new column - 'nconst'- based on the values in the column 'Actor/Actress._1', and call this new datafram ``top_10_actors_aux``. Finally we drop the 'Actor/Actress' column, since we won't need it. We do this to prepare us for the next steps, them being: creating the sugested features.

For the first suggested feature: 
* Total number of movies in which the actors have acted: we create a new dataframe, ``total_movies_cast``, by left joining the ``ids_df`` with ``top_10_actors_aux`` on the 'nconst' column. After the join, we group the resulting df by 'tconst' and finally aggregate the grouped data using the '_sum' function applied to the ``Movie Count``.

For the second suggested feature:
* Average PageRank of the cast in each movie: We create a new dataframe, ``avg_pagerank``, by left joining the ``ids_df`` with the pagerank.vertices - defined on Q5- based on the condition ids_df.nconst == pagerank.vertices.id. Similarly to the first feature, after the join, we group the resulting df by 'tcosnt' and aggregate it now by the average of the pagerank values.

For the third suggested feature:
*The average outDegree of the cast in each movie: We first transform the rdd we created in Q6 to a dataframe, in order to perform the left join with ``ids_df``, on 'tconst'. Group it by the 'tconst' and aggregate the average of the outdregree values. Resulting on the new dataframe ``avg_od``

For the fourth suggested feature:
* The average value for the cast of degrees of Kevin Bacon: Similarly to all the features above, we left join the ``ids_df`` with the kb_dist -calculated on Q7- on the condition ids_df.ncost == kb_dist.id. We then select both 'tconst' and 'name' columns and explode the array column 'distances' into multiple rows -this basicaly creates new rows for each value in the'distances' array.

In [0]:
ids_df = final_df.select('tconst', 'nconst')

top_10_actors_aux = total_movie_df.withColumn("nconst", col("Actor/Actress._1"))
top_10_actors_aux = top_10_actors_aux.drop("Actor/Actress")

#feature1:
total_movies_cast = ids_df.join(top_10_actors_aux, on='nconst', how='left').groupBy('tconst').agg(_sum('Movie Count').alias('total_movies_cast'))

#feature2:
avg_pagerank = ids_df.join(pagerank.vertices, on=(ids_df.nconst == pagerank.vertices.id), how='left').groupby('tconst').agg(avg('pagerank').alias('avg_pagerank'))

#feature3:
od_df = od_rdd.toDF(["nconst", "outDegree"])
avg_od = ids_df.join(od_df, on='nconst', how='left').groupby('tconst').agg(avg('outDegree').alias('avg_od'))

#feature4:
avg_kb_dist_cast = ids_df.join(kb_dist, on=(ids_df.nconst == kb_dist.id), how='left').select('tconst', 'name', explode('distances')).groupBy('tconst').agg(avg(col('value')).alias('avg_distance'))

We create a dataframe with the movie ids and their corresponding average rating - the target variable.

In [0]:
final_df_aux = final_df.select(col('tconst'), col('averageRating'))

We create a new dataframe fully composed of our new features.

In [0]:

reg_df = total_movies_df \
    .join(avg_pagerank, 'tconst', 'left') \
    .join(avg_od, 'tconst', 'left') \
    .join(avg_kb_dist_cast, 'tconst', 'left') \
    .join(final_df_aux, 'tconst', 'left')

After joining all the previous created dataframes, we decided to fill any NaN value with the median correspondent to each column. To achieve this, we must garantee all features are of the type float. To fill with the median, we need to define the quartile to 0.5. And finally, we remove all duplicated that might have emerged.

In [0]:
reg_df = reg_df.withColumn("total_movies_cast", col("total_movies_cast").cast("float")) \
               .withColumn("avg_pagerank", col("avg_pagerank").cast("float")) \
               .withColumn("avg_od", col("avg_od").cast("float")) \
               .withColumn("avg_distance", col("avg_distance").cast("float"))

In [0]:
median_outDegree = reg_df.approxQuantile('avg_od', [0.5], 0)[0]
median__tmc  = reg_df.approxQuantile('total_movies_cast', [0.5], 0)[0]
median__pg = reg_df.approxQuantile('avg_pagerank', [0.5], 0)[0]
median__distance = reg_df.approxQuantile('avg_distance', [0.5], 0)[0]


In [0]:
reg_df = reg_df.na.fill(median_outDegree, ['avg_od'])
reg_df = reg_df.na.fill(median__tmc, ['total_movies_cast'])
reg_df = reg_df.na.fill(median__pg, ['avg_pagerank'])
reg_df = reg_df.na.fill(median__distance, ['avg_distance'])

In [0]:
reg_df = reg_df.distinct()
reg_df_final = reg_df

In [0]:
reg_df_final.display()

tconst,total_movies_cast,avg_pagerank,avg_outDegree,avg_distance,averageRating
tt0380609,286.0,6.395001,199.0,1.75,6.1
tt0340104,59.0,2.0183005,52.0,3.0,4.3
tt0169886,37.0,1.736848,47.0,2.75,4.7
tt0250642,170.0,5.4823194,253.0,3.0,8.3
tt2933666,15.0,0.734422,10.5,4.0,5.6
tt0458204,460.0,4.558726,216.85715,3.0,5.4
tt0282141,203.0,4.644628,160.2,2.4,5.6
tt7893416,211.0,5.487861,179.75,2.5,4.8
tt7575440,209.0,4.2102356,121.42857,2.5714285,5.3
tt0317914,38.0,1.2421392,24.8,2.6,6.4


we save a new parquet here, since we now we'll need to have the ``reg_df_final`` dataframe ahead.

In [0]:
output_path = "/pratical_exam/reg_df_final.parquet"
reg_df_final.write.mode("overwrite").parquet(output_path)

In [0]:
avg_od.show(1)

+---------+-------------+
|   tconst|avg_outDegree|
+---------+-------------+
|tt0380609|        199.0|
+---------+-------------+
only showing top 1 row



In [0]:
avg_pagerank.show(1)

+---------+-----------------+
|   tconst|     avg_pagerank|
+---------+-----------------+
|tt0380609|6.395001135584797|
+---------+-----------------+
only showing top 1 row



In [0]:
reg_df.show(1)

+---------+-----------------+-----------------+-------------+------------+-------------+
|   tconst|total_movies_cast|     avg_pagerank|avg_outDegree|avg_distance|averageRating|
+---------+-----------------+-----------------+-------------+------------+-------------+
|tt0380609|              286|6.395001135584797|        199.0|        1.75|          6.1|
+---------+-----------------+-----------------+-------------+------------+-------------+
only showing top 1 row



After creating the sugested features -and saving them in a new dataframe- it's time to run our linear regression model in order to predict the rating of the movie. 

We start by defining a vector of the feature columns we created using the ``VectorAssembler`` function. The split the data with the requesits asked - 30% for test and with seed=0. 

We define the liner regression model with the feature columns and define the ``averageRating`` column as the target variable. Train the model using the pipeline created, and finally make predictions on the test. To evalute the model, we used the ``RegressionEvaluator`` function, and calculated the RMSE for the predictions achieved. 

Finally to extract the coeffiecents and interception, we simply extract the model -using -1 means we're accessing the last stage in the pipeline- and ask for them directly.

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

#Vector of Features:
feature_columns = ['total_movies_cast', 'avg_pagerank', 'avg_od', 'avg_distance']
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

#Splitting the data:
train_data, test_data = reg_df_final.randomSplit([0.7, 0.3], seed=0)

#Model:
lr = LinearRegression(featuresCol='features', labelCol='averageRating')

pipeline = Pipeline(stages=[assembler, lr])

model = pipeline.fit(train_data)

predictions = model.transform(test_data)

evaluator = RegressionEvaluator(labelCol='averageRating', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on the test data is = {rmse}")

#Extracting the coefficients and intercept:
lr_model = model.stages[-1]

coefficients = lr_model.coefficients
intercept = lr_model.intercept

print(f"Coefficients of the lr model: {coefficients}")
print(f"Intercept of the lr model: {intercept}")


Root Mean Squared Error (RMSE) on the test data is = 1.3132565465195898
Coefficients of the lr model: [-0.0009544035615189366,-0.16157959943974337,0.006266902511958607,0.052887724256057075]
Intercept of the lr model: 5.838030428290657


Our final answer for Q11:

* Root Mean Squared Error (RMSE) on the test data is = 1.3132565465195898

* Coefficients of the lr model: [-0.0009544035615189366,-0.16157959943974337,0.006266902511958607,0.052887724256057075]

* Intercept of the lr model: 5.838030428290657

**Q12** What score would your model predict for the 1997 movie Titanic.

In order to be able to predict our model for the 1997 movie Titanic, we first need to find its ID. 

So we filter our ``final_df`` dataframe, since it's only made up of movies, and search for the start year: 1997 and primary tittle: Titanic. We get multiple duplicates of the same ID, so we simply apply a 'distinct' function and get the Titanic's ID. 

In [0]:
filtered_title = final_df.filter((col('primaryTitle') == 'Titanic') & (col('startYear') == 1997))
titanic_id = filtered_title.select('tconst').distinct()
titanic_id.show()

+---------+
|   tconst|
+---------+
|tt0120338|
+---------+



After extracting the ID, we filter ``reg_df_final`` dataframe - composed of the features created in Q11 and movie ID 'tconst'- and filter it by the Titanic's ID, selecting after the feature columns. 

Finally, we give them to the model as ``titanic_features`` and get a prediction.

Side note: titanic_id above is a dataframe composed of only one row and one column. This is why in the cell below we need to redefine it as a string.

In [0]:
titanic_id = "tt0120338"
titanic_features = reg_df_final.filter(reg_df_final.tconst == titanic_id).select(feature_columns)
titanic_prediction = model.transform(titanic_features)
titanic_prediction.select("prediction").show()

+-----------------+
|       prediction|
+-----------------+
|5.891509184144949|
+-----------------+



Our final answer for Q12 is a prediction of: 5.891509184144949

**Q13** Create dummy variables for each of the top 10 movie genres for Q10. These variable should have a value of 1 if the movie was rated with that genre and 0 otherwise. For example the 1997 movie Titanic should have a 1 in the dummy variable column for Romance, and a 1 in the dummy variable column for Drama, and 0's in all the other dummy variable columns.

Does adding these variable to the regression improve your results? What is the new RMSE and predicted rating for the 1997 movie Titanic.

To create the movie genres gotten from Q10, we need to rerun it in order to get the top 10, as shown below. Then we simply organized it in order to not have the total sum, and only the genre.

In [0]:
#Imports:

from pyspark.sql.functions import col, when, array_contains
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator


In [0]:
top10_genres = genre_votes_sum_rdd.sortBy(lambda x: x[1], ascending=False).take(10)
print(top10_genres)

[('Drama', 572549123), ('Action', 377623034), ('Comedy', 340738989), ('Adventure', 297375019), ('Crime', 217832411), ('Thriller', 190748214), ('Sci-Fi', 140284087), ('Romance', 137888149), ('Mystery', 120807705), ('Horror', 111143054)]


In [0]:
genres = [genre[0] for genre in top10_genres]
print(genres)

['Drama', 'Action', 'Comedy', 'Adventure', 'Crime', 'Thriller', 'Sci-Fi', 'Romance', 'Mystery', 'Horror']


In [0]:
titles_df.show(1) #POR MIM APAGAMOS ISTO, SÓ SERVIU DE APOIO A FAZER

+---------+---------+------------+-------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|primaryTitle|originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+------------+-------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|  Carmencita|   Carmencita|      0|     1894|   null|             1|[Documentary, Short]|
+---------+---------+------------+-------------+-------+---------+-------+--------------+--------------------+
only showing top 1 row



We use a for loop to create multiple columns, one for each genre. Then, the condition 'when' -inside the 'withColumn'- checks if the arrat of genres ``col("genres")`` contains the current genre. If it does, it asssigns the value 1 to the genre columns -otherwise, it assigns the value 0. This is basically creating the dummy variables in the titles_df.

In [0]:
for genre in genres:
    titles_df = titles_df.withColumn(genre, when(array_contains(col("genres"), genre), 1).otherwise(0))

Before moving on to the models prediction, we first need to left join the ``reg_df_final`` with titles_df -but only the genres and ID columns- on the 'tconst' column, getting the new dataframe: ``final_prediction_df``.

In [0]:
final_prediction_df = reg_df_final.join(titles_df.select(["tconst"] + genres), on="tconst", how="left")


We then search for any null value.

In [0]:
for genre in genres:
    null_count = final_prediction_df.filter(col(genre).isNull()).count()
    print(f"Null count in column {genre}: {null_count}")

Null count in column Drama: 0
Null count in column Action: 0
Null count in column Comedy: 0
Null count in column Adventure: 0
Null count in column Crime: 0
Null count in column Thriller: 0
Null count in column Sci-Fi: 0
Null count in column Romance: 0
Null count in column Mystery: 0
Null count in column Horror: 0


In [0]:
final_prediction_df.show()

+---------+-----------------+------------+-------------+------------+-------------+-----+------+------+---------+-----+--------+------+-------+-------+------+
|   tconst|total_movies_cast|avg_pagerank|avg_outDegree|avg_distance|averageRating|Drama|Action|Comedy|Adventure|Crime|Thriller|Sci-Fi|Romance|Mystery|Horror|
+---------+-----------------+------------+-------------+------------+-------------+-----+------+------+---------+-----+--------+------+-------+-------+------+
|tt0089218|            307.0|    7.027639|       203.75|        1.75|          7.7|    0|     0|     1|        1|    0|       0|     0|      0|      0|     0|
|tt0169886|             37.0|    1.736848|         47.0|        2.75|          4.7|    0|     0|     0|        0|    0|       0|     1|      0|      0|     0|
|tt0250642|            170.0|   5.4823194|        253.0|         3.0|          8.3|    1|     0|     0|        0|    0|       0|     0|      0|      0|     0|
|tt0282141|            203.0|    4.644628|    

Then, we update our ``feature_columns`` dataframe.

In [0]:
feature_columns = ['total_movies_cast', 'avg_pagerank', 'avg_od', 'avg_distance'] + genres
print(feature_columns)

['total_movies_cast', 'avg_pagerank', 'avg_outDegree', 'avg_distance', 'Drama', 'Action', 'Comedy', 'Adventure', 'Crime', 'Thriller', 'Sci-Fi', 'Romance', 'Mystery', 'Horror']


And finally feed it to the model.

In [0]:
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features') #vectoring

train_data, test_data = final_prediction_df.randomSplit([0.7, 0.3], seed=0) #splitting

lr = LinearRegression(featuresCol='features', labelCol='averageRating') #model

pipeline = Pipeline(stages=[assembler, lr])

model = pipeline.fit(train_data)

predictions = model.transform(test_data)

In [0]:
evaluator = RegressionEvaluator(labelCol='averageRating', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Square Error (RMSE) for model with dummy variables = {rmse}")

Root Mean Square Error (RMSE) for model with dummy variables = 1.243037997219378


In [0]:
# now predicting with the new model -> since we got a lower rmse (before: 1.3; after: 1.24)
features_titanic = final_prediction_df.filter(final_prediction_df.tconst == titanic_id).select(feature_columns)
new_predictions = model.transform(features_titanic)
new_predictions.select("prediction").show()

+-----------------+
|       prediction|
+-----------------+
|6.184449131926541|
+-----------------+



Our final answer for Q13 is:
* RMSE = 1.243037997219378
* Prediction = 6.184449131926541

As we can see, we achieved a lower RMSE value, comparing to Q12. This means adding the dummy variables was a good decisions getting us a better result.

In [0]:

final_prediction_df_path = "/pratical_exam/final_prediction_df.parquet"
final_prediction_df.write.mode("overwrite").parquet(final_prediction_df_path)

**Q14 - Open Question**: Improve your model by testing different machine learning algorithms, using hyperparameter tuning on these algorithms, changing the included features. What is the RMSE of you final model and what rating does it predict for the 1997 movie Titanic.

In [0]:
final_prediction_df_path ="/pratical_exam/final_prediction_df.parquet"
final_prediction_df = spark.read.parquet(final_prediction_df_path)

We defined the columns to use in order to train our model: 'total_movies_cast', 'avg_pagerank', 'avg_outDegree', 'avg_distance', 'averageRating'; plus the dummy variables seen before.

In [0]:
feature_columns = ['total_movies_cast', 'avg_pagerank', 'avg_outDegree', 'avg_distance', 'Drama', 'Action', 'Comedy', 'Adventure', 'Crime', 'Thriller', 'Sci-Fi', 'Romance', 'Mystery', 'Horror']

In [0]:
#Imports:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, GBTRegressor, RandomForestRegressor, GeneralizedLinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.sql.functions import col

Just like before, we start by transforming the features into a vector and split the data.

In [0]:
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
train_data, test_data = final_prediction_df.randomSplit([0.7, 0.3], seed=0)

As asked, we decided to try two differente Machine Learning Models to check if we're able to obtain better results:
  * Random Forest Regressor
  * Decision Tree Regressor

In [0]:
rf = RandomForestRegressor(featuresCol='features', labelCol='averageRating')
dt = DecisionTreeRegressor(featuresCol='features', labelCol='averageRating')

We setted up the parameter grids for each model.

In [0]:
paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 50]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

paramGrid_dt = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 20]) \
    .addGrid(dt.minInstancesPerNode, [1, 2, 4]) \
    .build()

We setted up the crossvalidation steps needed to evaluate the models, as well as the evaluator we'll be using to get the RMSE.

In [0]:
evaluator = RegressionEvaluator(labelCol='averageRating', predictionCol='prediction', metricName='rmse')

cv_rf = CrossValidator(estimator=Pipeline(stages=[assembler, rf]), estimatorParamMaps=paramGrid_rf, evaluator=evaluator, numFolds=3)
cv_dt = CrossValidator(estimator=Pipeline(stages=[assembler, dt]), estimatorParamMaps=paramGrid_dt, evaluator=evaluator, numFolds=3)

Finally, we trained each model, ``RFR`` and ``DTR``, on our chosen columns.

In [0]:
model_rf = cv_rf.fit(train_data)

In [0]:
model_dt = cv_dt.fit(train_data)

After training our models, we evaluated in the test data which model performed better through the RMSE - the lower the RMSE the better.

When using LR we obtained a RMSE of 1.24, and with Decision Tree Regressor we obtained a RMSE of 0.106. This means that this model predicts the target value more precisely. So we will use this model when further investigating if using other variables improves our results. 

In [0]:
rmse_rf = evaluator.evaluate(model_rf.bestModel.transform(test_data))
rmse_dtr = evaluator.evaluate(model_dt.bestModel.transform(test_data))

print(f"Random Forest RMSE: {rmse_rf}")
print(f"Decision Tree Regressor RMSE: {rmse_dtr}")

best_model, best_rmse = min([
    ('Random Forest', rmse_rf),
    ('Decision Tree Regressor', rmse_dtr)
], key=lambda x: x[1])

print(f"The best model is {best_model} with an RMSE of {best_rmse}")

Random Forest RMSE: 0.1866352625893324
Decision Tree Regressor RMSE: 0.10687774117794545
The best model is Decision Tree Regressor with an RMSE of 0.10687774117794545


We saved the best model if need for later usage. 

In [0]:
best_model_1 = model_dt

In order to use this model to further testing we need to know the best hyperparameters found before.

In [0]:
best_params = model_dt.bestModel.stages[-1]
best_maxDepth = best_params.getMaxDepth()
best_minInstancesPerNode = best_params.getMinInstancesPerNode()
best_maxBins = best_params.getMaxBins()
print(f"Best parameters for Decision Tree are: maxDepth = {best_maxDepth}, minInstancesPerNode = {best_minInstancesPerNode}, maxBins = {best_maxBins}")

Best parameters for Decision Tree are: maxDepth = 10, minInstancesPerNode = 4, maxBins = 32


Now we are going to use two more columns: `startYear` and `runtimeMinutes`. The year of the movie and its length can gives us interesting insights, and therefore improve the models prediction.

So we join titles_df with final_prediction_df on 'tconst'. 

In [0]:
new_features_df = titles_df.join(final_prediction_df, "tconst")

We decided to check if this two new columns had null values, and as it can be seen below, they indeed had multiple null values. To correct this, we decided to replace them with the median, but to do this, we first needed to change the types to float. 

In [0]:
null_count_startYear = new_features_df.filter(col("startYear").isNull()).count()
print(f"Null count in column startYear: {null_count_startYear}")

Null count in column startYear: 22


In [0]:
null_count_run = new_features_df.filter(col("runtimeMinutes").isNull()).count()
print(f"Null count in column startYear: {null_count_run}")

Null count in column startYear: 17826


In [0]:
columns_to_cast = ['total_movies_cast', 'avg_pagerank', 'avg_outDegree', 'avg_distance', 
                   'Drama', 'Action', 'Comedy', 'Adventure', 
                   'Crime', 'Thriller', 'Sci-Fi', 'Romance', 'Mystery', 
                   'Horror', 'startYear', 'runtimeMinutes']

for column_name in columns_to_cast:
    new_features_df = new_features_df.withColumn(column_name, col(column_name).cast("float"))


In [0]:
median_startYear = new_features_df.approxQuantile('startYear', [0.5], 0)[0]
median_run  = new_features_df.approxQuantile('runtimeMinutes', [0.5], 0)[0]

new_features_df = new_features_df.na.fill(median_startYear, ['startYear'])
new_features_df = new_features_df.na.fill(median_run, ['runtimeMinutes'])

Now, with our two new columns prepared we redefine our ``new_features``to apply to our model so we can finally see if they improve our model's, ``DTR``, perfomance. 

In [0]:
new_features = ['total_movies_cast', 'avg_pagerank', 'avg_outDegree', 'avg_distance', 'Drama', 'Action', 'Comedy', 'Adventure', 'Crime', 'Thriller', 'Sci-Fi', 'Romance', 'Mystery', 'Horror', 'startYear', 'runtimeMinutes']

Since we couldn't perform a grid search for ``DTR`` with the new 2 columns, because it was computationally expensive, we decided to train the model with the same hyperparameters obtained in the previous ``DTR`` that uses only the initial and dummy variables. We know this isn't the best approach to this type of situation, but we still decided to resolve this issue this way. 

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

assembler = VectorAssembler(inputCols=new_features, outputCol='features')
train_data, test_data = new_features_df.randomSplit([0.7, 0.3], seed=0)

dt = DecisionTreeRegressor(
    featuresCol='features',
    labelCol='averageRating',
    maxDepth=10,
    minInstancesPerNode=4,
    maxBins=32
)

pipeline = Pipeline(stages=[assembler, dt])

model = pipeline.fit(train_data)

predictions = model.transform(test_data)

evaluator = RegressionEvaluator(labelCol='averageRating', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on the test data is = {rmse}")

dtr_model = model.stages[-1]

Root Mean Squared Error (RMSE) on the test data is = 1.217538351691175


 



So comparing the obtained RMSE of 1.217 and 0.106 on the ``DTR``with the initial variables and dummuy variables, after hypertunning, we are going to use the `best_model_1` -the second ``DTR``- to predict titanic.

In [0]:
feature_columns = ['total_movies_cast', 'avg_pagerank', 'avg_outDegree', 'avg_distance', 'Drama', 'Action', 'Comedy', 'Adventure', 'Crime', 'Thriller', 'Sci-Fi', 'Romance', 'Mystery', 'Horror']

In [0]:
titanic_id = "tt0120338"
features_titanic = final_prediction_df.filter(final_prediction_df.tconst == titanic_id).select(feature_columns)
new_predictions = best_model_1.transform(features_titanic)
new_predictions.select("prediction").show()


+-----------------+
|       prediction|
+-----------------+
|7.976712328767125|
+-----------------+



As it can be seen, using ``DTR`` with hyper-parameter tunning, we have achieved a different result when it comes to the prediction of the titanic. Changing from 6.184 to 7.976.

This shows that it's always better to take into account different approaches, and further improve the model hyperparameter and the information feeded into them. As the results can be significantly differnt. 

Our final answer for Q14: 
* Model: ``DTR`` with intial and dummy variables - with the hyperparameters: maxDepth = 10, minInstancesPerNode = 4, maxBins = 32.
* RMSE: 0.10687774117794545
* Variables: ['total_movies_cast', 'avg_pagerank', 'avg_outDegree', 'avg_distance', 'Drama', 'Action', 'Comedy', 'Adventure', 'Crime', 'Thriller', 'Sci-Fi', 'Romance', 'Mystery', 'Horror']
* Prediction: 7.976712328767125
