# Student: Beatriz Gonçalves - 20210695

# Six Degrees of Kevin Bacon
**Introduction** - Six Degrees of Kevin Bacon is a game based on the "six degrees of separation"
concept, which posits that any two people on Earth are six or fewer acquaintance links apart. Movie
buffs challenge each other to find the shortest path between an arbitrary actor and prolific actor
Kevin Bacon. It rests on the assumption that anyone involved in the film industry can be linked
through their film roles to Bacon within six steps.
The analysis of social networks can be a computationally intensive task, especially when dealing with
large volumes of data. It is also a challenging problem to devise a correct methodology to infer an
informative social network structure. Here, we will analyze a social network of actors and actresses
that co-participated in movies. We will do some simple descriptive analysis, and in the end try to
relate an actor/actress’s position in the social network with the success of the movies in which they
participate.

#### Rules & Notes - Please take your time to read the following points:

1. The submission deadline will be set for the 5th of June at 23:59.
2. It is acceptable that you **discuss** with your colleagues different approaches to solve each step of the problem set, but the assignment is individual. That is, you are responsible for writing your own code, and analysing the results. Clear cases of cheating will be penalized with 0 points in this assignment;
3. After review of your submission files, and before a mark is attributed, you might be called to orally defend your submission;
4. You will be scored first and foremost by the number of correct answers, secondly by the logic used in the trying to approach each step of the problem set;
5. Consider skipping questions that you are stuck in, and get back to them later;
6. Expect computations to take a few minutes to finish in some of the steps.
7. **IMPORTANT** It is expected you have developed skills beyond writting SQL queries. Any question where you directly write a SQL query (then for example create a temporary table and use spark.sql to pass the query) will receive a 25% penalty. Using the spark syntax (for example dataframe.select("\*").where("conditions")) is acceptable and does not incur this penalty.
8. **Questions** – Any questions about this assignment should be posted in the Forum@Moodle. The last class will be an open office session for anyone with questions concerning the assignment. 
9. **Delivery** - To fulfil this activity you will have to upload the following materials to Moodle:
    1. An exported IPython notebook. The notebook should be solved (have results displayed), but should contain all neccesary code so that when the notebook is run in databricks it should also replicate these results. This means the all data downloading and processing should be done in this notebook. It is also important you clearly indicate where your final answer to each question is when you are using multiple cells (for example you print "my final anwser is" before your answer or use cell comments).
    2. **Delivery** - You will also need to provide a signed statement of authorship, which is present in the last page;
    3. It is recommended you read the whole assignment before starting.
    4. You can add as many cells as you like to answer the questions.
    5. You can make use of caching or persisitng your RDDs or Dataframes, this may speed up performance.
    6. If you have trouble with graphframes in databricks (specifically the import statement) you need to make sure the graphframes package is installed on the cluster you are running. If you click home on the left, then click on the graphframes library which you loaded in Lab 9 you can install the package on your cluster (check the graphframes checkbox and click install)

#### Data Sources and Description
We will use data from IMDB. You can download raw datafiles
from https://datasets.imdbws.com. Note that the files are tab delimited (.tsv) You can find a
description of the each datafile in https://www.imdb.com/interfaces/

## Questions
### Data loading and preperation
Review the file descriptions and load the necessary data onto your databricks cluser and into spark dataframes. You will need to use shell commands to download the data, unzip the data, load the data into spark. Note that the data might require parsing and preprocessing to be ready for the questions below.

**Hints** You can use 'gunzip' to unzip the .tz files. The data files will then be tab seperated (.tsv), which you can load into a dataframe using the tab seperated option instead of the comma seperated option we have typically used in class: `.option(“sep”,”\t”)`

In [0]:
%%bash
# Download the data
wget 'https://datasets.imdbws.com/name.basics.tsv.gz'
wget 'https://datasets.imdbws.com/title.basics.tsv.gz'
wget 'https://datasets.imdbws.com/title.principals.tsv.gz'
wget 'https://datasets.imdbws.com/title.ratings.tsv.gz'

--2022-06-07 16:42:58--  https://datasets.imdbws.com/name.basics.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 99.84.66.56, 99.84.66.78, 99.84.66.98, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|99.84.66.56|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 228625979 (218M) [binary/octet-stream]
Saving to: ‘name.basics.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 6.52M 33s
    50K .......... .......... .......... .......... ..........  0% 6.83M 33s
   100K .......... .......... .......... .......... ..........  0% 6.12M 34s
   150K .......... .......... .......... .......... ..........  0%  133M 26s
   200K .......... .......... .......... .......... ..........  0% 65.3M 21s
   250K .......... .......... .......... .......... ..........  0% 33.7M 19s
   300K .......... .......... .......... .......... ..........  0% 20.5M 18s
   350K .......... .......... .......... .......... ..........  0% 18.8M 17s


In [0]:
%%bash
# Check if the files were downloaded
ls

conf
derby.log
eventlogs
logs
metastore_db
name.basics.tsv.gz
preload_class.lst
title.basics.tsv.gz
title.principals.tsv.gz
title.ratings.tsv.gz


In [0]:
%%bash
# Unzip the files
gunzip 'name.basics.tsv.gz'
gunzip 'title.basics.tsv.gz'
gunzip 'title.principals.tsv.gz'
gunzip 'title.ratings.tsv.gz'

In [0]:
%%bash
# Check if the files were unziped
ls -sh

total 3.5G
4.0K conf
4.0K derby.log
4.0K eventlogs
4.0K logs
4.0K metastore_db
668M name.basics.tsv
1.3M preload_class.lst
733M title.basics.tsv
2.1G title.principals.tsv
 21M title.ratings.tsv


In [0]:
# Load the data into spark
name_basics = spark.read.format("csv") \
          .option("header", "true") \
          .option("inferSchema", "true") \
          .option("sep","\t") \
          .load("file:/databricks/driver/name.basics.tsv")
title_basics = spark.read.format("csv") \
          .option("header", "true") \
          .option("inferSchema", "true") \
          .option("sep","\t") \
          .load("file:/databricks/driver/title.basics.tsv")
title_principals = spark.read.format("csv") \
          .option("header", "true") \
          .option("inferSchema", "true") \
          .option("sep","\t") \
          .load("file:/databricks/driver/title.principals.tsv")
title_ratings = spark.read.format("csv") \
          .option("header", "true") \
          .option("inferSchema", "true") \
          .option("sep","\t") \
          .load("file:/databricks/driver/title.ratings.tsv")

### Network Inference, Let’s build a network
In the following questions you will look to summarise the data and build a network. We want to examine a network that abstracts how actors and actress are related through their co-participation in movies. To that end perform the following steps:

**Q1** Create a DataFrame that combines **all the information** on each of the titles (i.e., movies, tv-shows, etc …) and **all of the information** the participants in those movies (i.e., actors, directors, etc … ), make sure the actual names of the movies and participants are included. It may be worth reviewing the following questions to see how this dataframe will be used.

How many rows does your dataframe have?

In [0]:
all_info = title_basics.join(title_principals, 'tconst').join(name_basics, 'nconst')
all_info.count()

Out[6]: 50619511

**Q2** Create a new DataFrame based on the previous step, with the following removed:
1. Any participant that is not an actor or actress (as measured by the category column);
1. All adult movies;
1. All dead actors or actresses;
1. All actors or actresses born before 1920 or with no date of birth listed;
1. All titles that are not of the type movie.

How many rows does your dataframe have?

In [0]:
df_filtered = all_info.filter(((all_info.category == 'actor') | (all_info.category == 'actress')) # Any participant that is not an actor or actress
                        & (all_info.isAdult == 0) # Remove adult movies
                        & (all_info.deathYear == '\\N') # Remove dead actors or actresses
                        & (all_info.birthYear.cast('int') >= 1920) # Remove actors or actresses born before 1920
                        & (all_info.titleType == 'movie')) # Remove titles that are not of the type movie

In [0]:
# Check if everything is correct
df_filtered.display()

nconst,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres,ordering,category,job,characters,primaryName,birthYear,deathYear,primaryProfession,knownForTitles
nm0000095,tt0087003,movie,Broadway Danny Rose,Broadway Danny Rose,0,1984,\N,84,Comedy,1,actor,\N,"[""Danny Rose""]",Woody Allen,1935,\N,"writer,director,actor","tt0075686,tt0079522,tt0091167,tt0118954"
nm0000095,tt0104466,movie,Husbands and Wives,Husbands and Wives,0,1992,\N,108,"Comedy,Drama,Romance",1,actor,\N,"[""Gabe Roth""]",Woody Allen,1935,\N,"writer,director,actor","tt0075686,tt0079522,tt0091167,tt0118954"
nm0000095,tt3538730,movie,"Barcelona, la rosa de foc","Barcelona, la rosa de foc",0,2014,\N,100,Documentary,1,actor,\N,"[""Narrator""]",Woody Allen,1935,\N,"writer,director,actor","tt0075686,tt0079522,tt0091167,tt0118954"
nm0000095,tt0065063,movie,Take the Money and Run,Take the Money and Run,0,1969,\N,85,"Comedy,Crime",1,actor,\N,"[""Virgil Starkwell""]",Woody Allen,1935,\N,"writer,director,actor","tt0075686,tt0079522,tt0091167,tt0118954"
nm0000095,tt0079522,movie,Manhattan,Manhattan,0,1979,\N,96,"Comedy,Drama,Romance",1,actor,\N,"[""Isaac""]",Woody Allen,1935,\N,"writer,director,actor","tt0075686,tt0079522,tt0091167,tt0118954"
nm0000095,tt0116242,movie,Everyone Says I Love You,Everyone Says I Love You,0,1996,\N,101,"Comedy,Musical,Romance",1,actor,\N,"[""Joe""]",Woody Allen,1935,\N,"writer,director,actor","tt0075686,tt0079522,tt0091167,tt0118954"
nm0000095,tt0066808,movie,Bananas,Bananas,0,1971,\N,82,Comedy,1,actor,\N,"[""Fielding Mellish""]",Woody Allen,1935,\N,"writer,director,actor","tt0075686,tt0079522,tt0091167,tt0118954"
nm0000095,tt0097965,movie,New York Stories,New York Stories,0,1989,\N,124,"Comedy,Drama,Romance",1,actor,\N,"[""Sheldon (segment \""Oedipus Wrecks\"")""]",Woody Allen,1935,\N,"writer,director,actor","tt0075686,tt0079522,tt0091167,tt0118954"
nm0000095,tt0118954,movie,Deconstructing Harry,Deconstructing Harry,0,1997,\N,96,Comedy,1,actor,\N,"[""Harry Block""]",Woody Allen,1935,\N,"writer,director,actor","tt0075686,tt0079522,tt0091167,tt0118954"
nm0000095,tt0120587,movie,Antz,Antz,0,1998,\N,83,"Adventure,Animation,Comedy",1,actor,\N,"[""Z""]",Woody Allen,1935,\N,"writer,director,actor","tt0075686,tt0079522,tt0091167,tt0118954"


In [0]:
df_filtered.count()

Out[8]: 487572

**Q3** Convert the above Dataframe to an RDD. Use map and reduce to create a paired RDD which counts how many movies each actor / actress appears in.

Display names of the top 10 actors/actresses according to the number of movies in which they appeared. Be careful to deal with different actors / actresses with the same name, these could be different people.

In [0]:
# Convert DataFrame to RDD
new_rdd = df_filtered.rdd
paired_rdd = new_rdd.map(lambda x: ((x.nconst, x.primaryName), 1))
# Do the count
countMovies = paired_rdd.reduceByKey(lambda a, b: a + b).map(lambda x: (x[0][0], x[0][1], x[1])).sortBy(lambda x: x[2], ascending=False)
countMovies.take(10)

Out[9]: [('nm0103977', 'Brahmanandam', 795),
 ('nm0007123', 'Mammootty', 364),
 ('nm0482320', 'Mohanlal', 351),
 ('nm0149822', 'Mithun Chakraborty', 329),
 ('nm0007106', 'Shakti Kapoor', 321),
 ('nm0000616', 'Eric Roberts', 313),
 ('nm0035067', 'Cüneyt Arkin', 300),
 ('nm0415549', 'Jagathi Sreekumar', 293),
 ('nm0004429', 'Dharmendra', 270),
 ('nm0374974', 'Helen', 269)]

**Q4** Start with the dataframe from Q2. Generate a DataFrame that lists all links of your network. Here we shall consider that a link connects a pair of actors/actresses if they participated in at least one movie together (actors / actresses should be represented by their unique ID's). For every link we then need anytime a pair of actors were together in a movie as a link in each direction (A -> B and B -> A). However links should be distinct we do not need duplicates when two actors worked together in several movies.

In [0]:
from pyspark.sql.functions import col

df_links = df_filtered.alias('a').join(df_filtered.alias('b')).where((col('a.tconst') == col('b.tconst')) & (col('a.nconst') != col('b.nconst'))).select(col('a.nconst'), col('b.nconst')).distinct()
df_links.show(5)

+---------+---------+
|   nconst|   nconst|
+---------+---------+
|nm0407094|nm0820615|
|nm0566478|nm0388155|
|nm0001989|nm0001012|
|nm0280772|nm0744675|
|nm0089392|nm0610092|
+---------+---------+
only showing top 5 rows



**Q5** Compute the page rank of each actor. This can be done using GraphFrames or
by using RDDs and the iterative implementation of the PageRank algorithm. Do not take
more than 5 iterations and use reset probility = 0.1.

List the top 10 actors / actresses by pagerank.

In [0]:
from graphframes import *
from pyspark.sql.types import *
from pyspark.sql import Row

# List of the vertices
verticesList = df_links.select(col('a.nconst')).distinct().rdd.flatMap(lambda x: x).collect()
# Load it into an RDD
verticesListRDD = sc.parallelize(verticesList, 1)
# Convert it to a dataframe
verticesListRowsRDD = verticesListRDD.map(lambda data: Row(data))
verticesSchema = StructType([StructField('id', StringType(), True)])
vertices_df = sqlContext.createDataFrame(verticesListRowsRDD, verticesSchema).persist()

vertices_df.show(5)

+---------+
|       id|
+---------+
|nm0132436|
|nm0706537|
|nm0090120|
|nm0682893|
|nm0481714|
+---------+
only showing top 5 rows



In [0]:
# List of the edges
edgeList = df_links.rdd.collect()
sourceColumn = StructField('src', StringType(),True)
destinationColumn = StructField('dst', StringType(), True)
edgeSchema = StructType([sourceColumn, destinationColumn])
# Load it into an RDD
edgeRDD = sc.parallelize(edgeList, 1)
# Convert it to a dataframe
edgeRDDRows = edgeRDD.map(lambda data: Row(data[0], data[1]))
edge_df = sqlContext.createDataFrame(edgeRDDRows, edgeSchema).persist()

edge_df.show(5)

+---------+---------+
|      src|      dst|
+---------+---------+
|nm0407094|nm0820615|
|nm0566478|nm0388155|
|nm0001989|nm0001012|
|nm0280772|nm0744675|
|nm0089392|nm0610092|
+---------+---------+
only showing top 5 rows



In [0]:
# I decided that all this graph's vertices should have connections, therefore I have defined the vertices as the actors that have links
new_graph = GraphFrame(vertices_df, edge_df)
pr = new_graph.pageRank(resetProbability=0.1, maxIter=5)

In [0]:
# Dataframe of the top 10 actors/actresses by pagerank
df_pr_actors = pr.vertices.sort('pagerank', ascending=False)
df_pr_actors.limit(10).show()

+---------+------------------+
|       id|          pagerank|
+---------+------------------+
|nm0000616| 46.87753329142539|
|nm0000514| 24.69066219903076|
|nm0001744| 23.69573016818671|
|nm0001803| 21.62880700690356|
|nm0000448|18.706657861444917|
|nm0920460| 18.64503263688061|
|nm0001595|18.257579936107135|
|nm0001698|17.172650322174956|
|nm0000367|16.966985485914577|
|nm0004193|16.796797887604676|
+---------+------------------+



**Q6**: Create an RDD with the number of outDegrees for each actor. Display the top 10 by outdegrees.

In [0]:
rdd_degrees = new_graph.outDegrees.sort('outDegree', ascending=False).rdd
rdd_degrees.take(10)

Out[15]: [Row(id='nm0000616', outDegree=489),
 Row(id='nm0000514', outDegree=283),
 Row(id='nm0001744', outDegree=265),
 Row(id='nm0000367', outDegree=263),
 Row(id='nm0945189', outDegree=257),
 Row(id='nm0256628', outDegree=253),
 Row(id='nm0451600', outDegree=252),
 Row(id='nm0001803', outDegree=247),
 Row(id='nm0149822', outDegree=240),
 Row(id='nm0004109', outDegree=233)]

### Let’s play Kevin’s own game

**Q7** Start with the graphframe / dataframe you developed in the previous questions. Using Spark GraphFrame and/or Spark Core library perform the following steps:

1. Identify the id of Kevin Bacon, there are two actors named ‘Kevin Bacon’, we will use the one with the highest degree, that is, the one that participated in most titles;
1. Estimate the shortest path between every actor in the database actors and Kevin Bacon, keep a dataframe with this information as you will need it later;
1. Summarise the data, that is, count the number of actors at each number of degress from kevin bacon (you will need to deal with actors unconnected to kevin bacon, if not connected to Kevin Bacon given these actors / actresses a score/degree of 20).

In [0]:
# Find Kevin Bacon in the dataframe
df_filtered.filter(df_filtered.primaryName == 'Kevin Bacon').select('nconst', 'primaryName').distinct().display()
kevin_bacon = df_filtered.filter(df_filtered.primaryName == 'Kevin Bacon').select('nconst').distinct().rdd.flatMap(lambda x: x).collect()

nconst,primaryName
nm0000102,Kevin Bacon


In [0]:
# Estimate the shortest path between every actor in the database and Kevin Bacon
shortest_paths = new_graph.shortestPaths(landmarks=kevin_bacon)
shortest_paths.show()

+---------+----------------+
|       id|       distances|
+---------+----------------+
|nm1521381|{nm0000102 -> 4}|
|nm1282664|              {}|
|nm2585918|{nm0000102 -> 4}|
|nm1989804|{nm0000102 -> 2}|
|nm4934471|{nm0000102 -> 4}|
|nm0252309|{nm0000102 -> 4}|
|nm0821850|{nm0000102 -> 5}|
|nm0779012|{nm0000102 -> 6}|
|nm3591758|{nm0000102 -> 6}|
|nm0433977|{nm0000102 -> 4}|
|nm0185819|{nm0000102 -> 2}|
|nm0510924|{nm0000102 -> 4}|
|nm2428370|{nm0000102 -> 3}|
|nm0186030|{nm0000102 -> 4}|
|nm0938194|{nm0000102 -> 4}|
|nm0167684|              {}|
|nm3148001|{nm0000102 -> 3}|
|nm0913335|{nm0000102 -> 6}|
|nm0698940|{nm0000102 -> 5}|
|nm3089916|{nm0000102 -> 3}|
+---------+----------------+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import explode_outer
# Actors unconnected to kevin bacon have a score/degree of 20
df_shortest_paths_kevin = shortest_paths.select("id", explode_outer("distances")).fillna(20)
# Count the number of actors at each number of degress from kevin bacon
df_shortest_paths_kevin_count = df_shortest_paths_kevin.groupBy("value").count()
df_shortest_paths_kevin_count.display()

value,count
1,133
6,2747
3,19532
20,6919
5,14904
4,31205
7,370
2,3552
8,59
9,10


### Exploring the data with RDD's

Using RDDs and (not dataframes) answer the following questions (if you loaded your data into spark in a dataframe you can convert to an RDD of rows easily using `.rdd`):

**Q8** Movies can have multiple genres. Considering only titles of the type 'movie' what is the combination of genres that is the most popluar (as measured by number of reviews). Hint: paired RDD's will be useful.

In [0]:
# Join dataframes and select only titles of the type 'movie'
df_movies = title_basics.join(title_ratings, 'tconst').where(col('titleType') == 'movie')
df_movies.display()

tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres,averageRating,numVotes
tt0000630,movie,Hamlet,Amleto,0,1908,\N,\N,Drama,3.9,25
tt0000675,movie,Don Quijote,Don Quijote,0,1908,\N,\N,Drama,4.9,19
tt0000862,movie,Faldgruben,Faldgruben,0,1909,\N,\N,\N,5.1,16
tt0000941,movie,Locura de amor,Locura de amor,0,1909,\N,\N,Drama,4.8,23
tt0001112,movie,Amleto,Amleto,0,1910,\N,\N,Drama,3.8,41
tt0001531,movie,"Captain Starlight, or Gentleman of the Road","Captain Starlight, or Gentleman of the Road",0,1911,\N,\N,\N,4.6,14
tt0001706,movie,The Infant at Snakeville,The Infant at Snakeville,0,1911,\N,\N,\N,5.2,16
tt0001790,movie,"Les Misérables, Part 1: Jean Valjean",Les misérables - Époque 1: Jean Valjean,0,1913,\N,60,Drama,6.0,44
tt0001812,movie,Oedipus Rex,Oedipus Rex,0,1911,\N,56,Drama,6.3,13
tt0001911,movie,Nell Gwynne,Sweet Nell of Old Drury,0,1911,\N,50,"Biography,Drama,History",4.5,22


In [0]:
rdd_sum = df_movies.select('genres', 'numVotes').rdd.reduceByKey(lambda a, b: (a + b)).sortBy(lambda x: x[1], ascending=False)
rdd_sum.take(1)

Out[20]: [('Action,Adventure,Sci-Fi', 51131332)]

**Q9** Movies can have multiple genres. Considering only titles of the type 'movie', and movies with more than 400 ratings, what is the combination of genres that has the highest **average movie rating** (you can average the movie rating for each movie in that genre combination). Hint: paired RDD's will be useful.

In [0]:
# Select only movies with more than 400 ratings
df_movies_filtered = df_movies.where(col('numVotes') > 400).select('genres', 'averageRating')
# Aggregate genres and calculate the sum and count of averageRating
rdd_sum_count = df_movies_filtered.rdd.aggregateByKey((0, 0), lambda a, b: (a[0] + b, a[1] + 1), lambda a, b: (a[0] + b[0], a[1] + b[1]))
# Calculate the average with sum and count previously calculated and sort the result in descending order
rdd_avg = rdd_sum_count.mapValues(lambda x: x[0] / x[1]).sortBy(lambda x: x[1], ascending=False)
rdd_avg.take(1)

Out[21]: [('Music,Musical', 8.4)]

**Q10** Movies can have multiple genres. What is **the individual genre** which is the most popular as meaured by number of votes. Votes for multiple genres count towards each genre listed. Hint: flatmap and pairedRDD's will be useful here.

In [0]:
# Get the rdd from question 8 that has the sum of numVotes for genres combination
df_genre = rdd_sum.toDF()
df_genre.display()

_1,_2
"Action,Adventure,Sci-Fi",51131332
Drama,40167852
"Adventure,Animation,Comedy",31886658
"Comedy,Drama,Romance",31841847
Comedy,28846951
"Action,Crime,Drama",28068566
"Action,Adventure,Fantasy",27460245
"Drama,Romance",26372511
"Comedy,Drama",25135430
"Action,Adventure,Comedy",21474394


In [0]:
from pyspark.sql.functions import explode, split
# Split the genres combination to get the individual genre
rdd_genre = df_genre.select(explode(split('_1', ',')), '_2').rdd
# Sum numVotes of each individual genre and get the highest
rdd_genre_sum = rdd_genre.map(lambda a: (a.col, a._2)).reduceByKey(lambda x, y: x + y).sortBy(lambda a: a[1], ascending=False)
rdd_genre_sum.take(1)

Out[23]: [('Drama', 487802390)]

## Engineering the perfect cast
We have created a number of potential features for predicting the rating of a movie based on its cast. Use sparkML to build a simple linear model to predict the rating of a movie based on the following features:

1. The total number of movies in which the actors / actresses have acted (based on Q3)
1. The average pagerank of the cast in each movie (based on Q5)
1. The average outDegree of the cast in each movie (based on Q6)
1. The average value for for the cast of degrees of Kevin Bacon (based on Q7).

You will need to create a dataframe with the required features and label. Use a pipeline to create the vectors required by sparkML and apply the model. Remember to split your dataset, leave 30% of the data for testing, when splitting your data use the option seed=0.

**Q11** Provide the coefficients of the regression and the accuracy of your model on that test dataset according to RSME.

In [0]:
df_filtered2 = df_filtered.join(df_movies, 'tconst').select('nconst', 'tconst', 'averageRating')
df_countMovies = countMovies.toDF()
df_degrees = rdd_degrees.toDF()

# Create a dataframe with 4 features:
data_feat = df_pr_actors.join(df_countMovies, df_countMovies._1 == df_pr_actors.id).drop('_1', '_2').withColumnRenamed("_3", "num_movies").withColumnRenamed("id", "nconst").join(df_filtered2, 'nconst').drop()

# The total number of movies in which the actors / actresses have acted
df_avg_movie = data_feat.groupBy('tconst').agg({'num_movies': 'sum'}).withColumnRenamed("sum(num_movies)", "num_movies")

# The average pagerank of the cast in each movie
df_avg_pr = data_feat.groupBy('tconst').agg({'pagerank': 'mean'}).withColumnRenamed("avg(pagerank)", "pagerank")

# The average outDegree of the cast in each movie
df_avg_d = data_feat.join(df_degrees, df_degrees.id == data_feat.nconst).drop('id')
df_avg_d = df_avg_d.groupBy('tconst').agg({'outDegree': 'mean'}).withColumnRenamed("avg(outDegree)", "outDegree")

# The average value for the cast of degrees of Kevin Bacon
df_distances = data_feat.join(df_shortest_paths_kevin, df_shortest_paths_kevin.id == data_feat.nconst).drop('id')
df_distances = df_distances.groupBy('tconst').agg({'value': 'mean'}).withColumnRenamed("avg(value)", "distances")

# Join all tables
data_feat = data_feat.drop('pagerank', 'num_movies').join(df_avg_pr, 'tconst').join(df_avg_d, 'tconst').join(df_distances, 'tconst').drop('id').join(df_avg_movie, 'tconst')

# Select only the necessary 4 columns
data_feat_final = data_feat.select('num_movies', 'pagerank', 'outDegree', 'distances', 'averageRating').distinct()
data_feat_final.display()

num_movies,pagerank,outDegree,distances,averageRating
6,0.6440841500116486,3.0,3.6666666666666665,5.1
30,2.1463785915058557,20.0,4.0,6.9
2,0.4987616202323367,2.0,5.0,6.8
57,4.334195563898708,61.0,1.5,7.0
41,2.593041270600765,19.0,3.25,6.8
40,1.485466455629052,13.25,3.5,5.8
28,4.192923571250432,38.0,3.0,6.3
30,1.8523839363936896,23.33333333333333,3.0,6.0
85,1.7353666554549136,33.0,4.0,8.0
21,2.854407537514598,30.0,3.5,5.3


In [0]:
# Check if there's any nan value that justifies using Imputer method
from pyspark.sql.functions import col,isnan, when, count
data_feat_final.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_feat_final.columns]).display()

num_movies,pagerank,outDegree,distances,averageRating
0,0,0,0,0


Since the dataframe did not have missing values it was useless to use the Imputer method.

I used MinMaxScaler since ML algorithms work better when features are on a similar scale and close to Normal Distribution.

In [0]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline

continuous_assembler = VectorAssembler(
    inputCols=['num_movies', 'pagerank', 'outDegree', 'distances'],
    outputCol="continuous",
)
continuous_scaler = MinMaxScaler(  
    inputCol="continuous",
    outputCol="continuous_scaled",
)
movies_pipeline = Pipeline(  
    stages=[continuous_assembler, continuous_scaler]
)
feature_assembler = VectorAssembler(
    inputCols=['continuous_scaled'],
    outputCol="features"
)

In [0]:
# Import LinearRegression class
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

# Define LinearRegression algorithm
lr = LinearRegression(featuresCol="features", labelCol="averageRating")

# Split data into training and test set
(training_q11, test_q11) = data_feat_final.randomSplit([0.7, 0.3], seed=0)

# Create pipeline and train
movies_pipeline.setStages([continuous_assembler, continuous_scaler, feature_assembler, lr])
model_q11 = movies_pipeline.fit(training_q11)

# Predict results
predictions_q11 = model_q11.transform(test_q11)

# Print the fitted model parameters
print(">>>> Model q11 intercept: ", (model_q11.stages[-1].intercept), ', coefficients: ', (model_q11.stages[-1].coefficients))

# Print RMSE
RMSE_q11 = model_q11.stages[-1].summary.rootMeanSquaredError
print("Model q11: Root Mean Squared Error = " + str(RMSE_q11))

>>>> Model q11 intercept:  5.874296028544863 , coefficients:  [-1.262644612844831,-5.977013723676303,4.569067934461575,0.26674501450934046]
Model q11: Root Mean Squared Error = 1.2622074334054092


**Q12** What score would your model predict for the 1997 movie Titanic.

In [0]:
# Get the information about the 1997 movie Titanic
df_titanic = df_movies.filter((df_movies.primaryTitle == 'Titanic') & (df_movies.startYear == 1997)).select('tconst')

# Select features from Titanic movie and transform that dataframe in vector
data_feat_titanic = df_titanic.join(data_feat, 'tconst').select('num_movies', 'pagerank', 'outDegree', 'distances', 'averageRating').distinct()

In [0]:
predictions_q12 = model_q11.transform(data_feat_titanic)
# Print the results
predictions_q12.select('features', 'averageRating', 'prediction').display()

features,averageRating,prediction
"Map(vectorType -> dense, length -> 4, values -> List(0.12310491206791996, 0.1721576428790993, 0.23975409836065575, 0.07692307692307693))",7.9,5.805841290628794


**Q13** Create dummy variables for each of the top 10 movie genres for Q10. These variable should have a value of 1 if the movie was rated with that genre and 0 otherwise. For example the 1997 movie Titanic should have a 1 in the dummy variable column for Romance, and a 1 in the dummy variable column for Drama, and 0's in all the other dummy variable columns.

Does adding these variable to the regression improve your results? What is the new RMSE and predicted rating for the 1997 movie Titanic.

In [0]:
# Get list of top 10 genres
data_itr_10 = rdd_genre_sum.toDF().limit(10).rdd.toLocalIterator()
list_10_genres = []

for row in data_itr_10:
  list_10_genres.append(row['_1'])

In [0]:
from pyspark.sql.functions import lit, udf, array
from pyspark.sql.types import IntegerType

# Define genres categories
def condition(arr):
  genres = arr[0]
  row = int(arr[1])
  if list_10_genres[row] in genres:
    return 1
  else:
    return 0

# Add to the previous dataframe the column genres
df_filtered3 = df_filtered.select('tconst', 'genres')
df_g = data_feat.join(df_filtered3, 'tconst').select('tconst', 'num_movies', 'pagerank', 'outDegree', 'distances', 'genres', 'averageRating').distinct()

for idx, row in enumerate(list_10_genres):
  # Create new column with genre name and set the index of the column name in the list as value
  df_g = df_g.withColumn(row, lit(idx))
  # Change the value of each row in the column created above based on the movie's genres
  genre_udf = udf(lambda arr: condition(arr), IntegerType())
  df_g = df_g.withColumn(row, genre_udf(array('genres', row)))
  
df_g.display()

tconst,num_movies,pagerank,outDegree,distances,genres,averageRating,Drama,Action,Comedy,Adventure,Crime,Thriller,Sci-Fi,Romance,Mystery,Horror
tt0031508,64,2.958519688782,35.0,3.0,"Comedy,Drama",6.3,1,0,1,0,0,0,0,0,0,0
tt0034920,10,0.3930861356011321,2.0,4.0,"Drama,Romance,War",6.2,1,0,0,0,0,0,0,1,0,0
tt0036077,17,0.1547693409835153,1.0,5.0,Drama,7.5,1,0,0,0,0,0,0,0,0,0
tt0038476,64,2.958519688782,35.0,3.0,"Drama,Romance",7.0,1,0,0,0,0,0,0,1,0,0
tt0038834,14,0.505470247741928,2.0,5.0,"Adventure,Drama,Romance",5.0,1,0,0,1,0,0,0,1,0,0
tt0038936,39,1.6762764597475022,16.0,3.0,Western,6.5,0,0,0,0,0,0,0,0,0,0
tt0039038,22,1.0915647736479674,5.0,3.0,"Comedy,Musical,Romance",5.9,0,0,1,0,0,0,0,1,0,0
tt0039752,3,1.0000000000000042,1.0,20.0,"Crime,Drama",7.4,1,0,0,0,1,0,0,0,0,0
tt0040352,34,2.073880807744936,21.0,4.0,Comedy,6.4,0,0,1,0,0,0,0,0,0,0
tt0041639,16,1.5381623849249215,4.0,4.0,Drama,5.8,1,0,0,0,0,0,0,0,0,0


In [0]:
# Assemble feature columns
continuous_assembler = VectorAssembler(  
    inputCols=['num_movies', 'pagerank', 'outDegree', 'distances', 'Drama', 'Action', 'Comedy', 'Adventure', 'Crime', 'Thriller', 'Sci-Fi', 'Romance', 'Mystery', 'Horror'],
    outputCol="continuous",
)
continuous_scaler = MinMaxScaler(  
    inputCol="continuous",
    outputCol="continuous_scaled",
)
# The pipeline contains three stages, encoded in the stages Param
movies_pipeline = Pipeline(  
    stages=[continuous_assembler, continuous_scaler]
)
feature_assembler = VectorAssembler(
    inputCols=["continuous_scaled"],
    outputCol="features"
)

In [0]:
# Split data into training and test set
(training_q13, test_q13) = df_g.randomSplit([0.7, 0.3], seed=0)

# Create pipeline and train
movies_pipeline.setStages([continuous_assembler, continuous_scaler, feature_assembler, lr])
model_q13 = movies_pipeline.fit(training_q13)

# Predict results
predictions_q13 = model_q13.transform(test_q13)

# Print the fitted model parameters
print(">>>> Model q13 intercept: ", (model_q13.stages[-1].intercept), ', coefficients: ', (model_q13.stages[-1].coefficients))

# Print RMSE
RMSE_q13 = model_q13.stages[-1].summary.rootMeanSquaredError
print("Model q13: Root Mean Squared Error = " + str(RMSE_q13))

>>>> Model q13 intercept:  5.94247940384839 , coefficients:  [-1.0891688818072187,-5.032100485977795,3.9584772488393374,0.3329847977099062,0.29866942859927026,-0.32816053387714433,-0.21886865606986716,-0.053780616812158025,-0.028194686872704537,-0.3163332009781402,-0.3465631586963895,-0.07256858205313894,0.04788635423413515,-0.8680866590149617]
Model q13: Root Mean Squared Error = 1.2051008307552862


In [0]:
# Select features from Titanic movie
data_titanic = df_titanic.join(df_g, 'tconst').drop('tconst').distinct()

# Predict averageRating
predictions_q13_titanic = model_q13.transform(data_titanic)
predictions_q13_titanic.select('features', 'averageRating', 'prediction').display()

features,averageRating,prediction
"Map(vectorType -> sparse, length -> 14, indices -> List(0, 1, 2, 3, 4, 11), values -> List(0.12310491206791996, 0.17218378719809643, 0.23975409836065575, 0.06896551724137932, 1.0, 1.0))",7.9,6.140077704219715


To conclude, adding these variables to the regression improve my results. The new RMSE was slightly small compared to the previous one I have calculated in question 11 and the predicted rating for the 1997 movie Titanic was also more closed to the label then it was in question 12.

**Q14 - Open Question**: Improve your model by testing different machine learning algorithms, using hyperparameter tuning on these algorithms, changing the included features. What is the RMSE of you final model and what rating does it predict for the 1997 movie Titanic.

In [0]:
# Create a dataframe with more 2 features: the movie's year and its duration in minutes
df_year = df_movies.select('startYear', 'runtimeMinutes', 'tconst').filter((df_movies.startYear != '\\N') & (df_movies.runtimeMinutes != '\\N'))
df_feat_q14 = df_year.join(df_g, 'tconst')

# Cast values of the 2 new tables to int
df_feat_q14 = df_feat_q14.withColumn("startYear", df_feat_q14["startYear"].cast(IntegerType())).withColumn("runtimeMinutes", df_feat_q14["runtimeMinutes"].cast(IntegerType()))

continuous_assembler = VectorAssembler(  
    inputCols=['num_movies', 'pagerank', 'outDegree', 'distances', 'Drama', 'Action', 'Comedy', 'Adventure', 'Crime', 'Thriller',
               'Sci-Fi', 'Romance', 'Mystery', 'Horror', 'startYear', 'runtimeMinutes'],
    outputCol="continuous",
)
continuous_scaler = MinMaxScaler(  
    inputCol="continuous",
    outputCol="continuous_scaled",
)
movies_pipeline = Pipeline(  
    stages=[continuous_assembler, continuous_scaler]
)
feature_assembler = VectorAssembler(
    inputCols=["continuous_scaled"],
    outputCol="features"
)
# Assemble feature columns
feature_assembler = VectorAssembler(
    inputCols=['continuous_scaled'],
    outputCol="features"
)

# Split data into training and test set
(training_q14, test_q14) = df_feat_q14.randomSplit([0.7, 0.3], seed=0)

In [0]:
# Create pipeline and train
movies_pipeline.setStages([continuous_assembler, continuous_scaler, feature_assembler, lr])
model_q14_lr = movies_pipeline.fit(training_q14)

# Predict results
predictions_q14 = model_q14_lr.transform(test_q14)

# Print RMSE
RMSE_q14 = model_q14_lr.stages[-1].summary.rootMeanSquaredError
print("Model q14 LinearRegression: Root Mean Squared Error = " + str(RMSE_q14))

Model q14 LinearRegression: Root Mean Squared Error = 1.1621493313542206


In [0]:
# Import GBTRegressor class
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Define GBTRegressor algorithm
gb = GBTRegressor(maxDepth=2, seed=0, featuresCol="features", labelCol="averageRating")
gb.setMaxIter(10)
# Minimum fraction of the weighted sample count that each child must have after split
gb.setMinWeightFractionPerNode(0.1)

# Create pipeline and train
movies_pipeline.setStages([continuous_assembler, continuous_scaler, feature_assembler, gb])
model_q14 = movies_pipeline.fit(training_q14)

# Predict results
predictions_q14 = model_q14.transform(test_q14)

# Print RMSE
evaluator = RegressionEvaluator(labelCol="averageRating", predictionCol="prediction", metricName="rmse")
RMSE_q14 = evaluator.evaluate(predictions_q14)
print("Model q14 GBTRegressor: Root Mean Squared Error = " + str(RMSE_q14))

Model q14 GBTRegressor: Root Mean Squared Error = 1.1791994465201496


In [0]:
from pyspark.ml.regression import RandomForestRegressor

# Train a RandomForest model
rf = RandomForestRegressor(maxDepth=5, seed=0, featuresCol="features", labelCol="averageRating")

# Create pipeline and train
movies_pipeline.setStages([continuous_assembler, continuous_scaler, feature_assembler, rf])
model_q14_rf = movies_pipeline.fit(training_q14)

# Predict results
predictions_q14 = model_q14_rf.transform(test_q14)

# Print RMSE
RMSE_q14 = evaluator.evaluate(predictions_q14)
print("Model q14 RandomForestRegressor: Root Mean Squared Error = " + str(RMSE_q14))

Model q14 RandomForestRegressor: Root Mean Squared Error = 1.146828537766463


In [0]:
# Select features from Titanic movie
df_q14_titanic = df_titanic.join(df_feat_q14, 'tconst').drop('tconst').distinct()

# Predict averageRating of Titanic movie with the best model
predictions_q14_titanic = model_q14_rf.transform(df_q14_titanic)
predictions_q14_titanic.select('features', 'averageRating', 'prediction').display()

features,averageRating,prediction
"Map(vectorType -> sparse, length -> 16, indices -> List(0, 1, 2, 3, 4, 11, 14, 15), values -> List(0.14646464646464646, 0.1721576428790993, 0.23975409836065575, 0.07692307692307693, 1.0, 1.0, 0.7282608695652174, 0.12348993288590604))",7.9,6.518153184809899


Concluding, I decided to add two features 'startYear' and 'runtimeMinutes', because they can influence the rating a user give to a movie.

Subsequently, I tried Linear Regression to see the difference in the RMSE between the previous features I used and the dataset with the new features I added. It has improved a little the RMSE but it was not significantly.

Then, I used GBT Regressor. Since it has a faster training speed and a higher efficiency, I believed it would predict better results than the Linear Regression. However, and even though I've defined it with 10 iterations and seed=0, it had the bigger RMSE compared to the other models.

Finally, the last model I tried was Random Forest because it ensembles multiple decision trees into its final decision and consequently this was the best model with a Root Mean Squared Error (RMSE) on data set equal to 1.146828537766463 and a more closer rating prediction for the Titanic movie (6.518153184809899).