# Six Degrees of Kevin Bacon
**Introduction** - Six Degrees of Kevin Bacon is a game based on the "six degrees of separation"
concept, which posits that any two people on Earth are six or fewer acquaintance links apart. Movie
buffs challenge each other to find the shortest path between an arbitrary actor and prolific actor
Kevin Bacon. It rests on the assumption that anyone involved in the film industry can be linked
through their film roles to Bacon within six steps.
The analysis of social networks can be a computationally intensive task, especially when dealing with
large volumes of data. It is also a challenging problem to devise a correct methodology to infer an
informative social network structure. Here, we will analyze a social network of actors and actresses
that co-participated in movies. We will do some simple descriptive analysis, and in the end try to
relate an actor/actress’s position in the social network with the success of the movies in which they
participate.

#### Rules & Notes - Please take your time to read the following points:

1. The submission deadline will be set for the 5th of June at 23:59.
2. It is acceptable that you **discuss** with your colleagues different approaches to solve each step of the problem set, but the assignment is individual. That is, you are responsible for writing your own code, and analysing the results. Clear cases of cheating will be penalized with 0 points in this assignment;
3. After review of your submission files, and before a mark is attributed, you might be called to orally defend your submission;
4. You will be scored first and foremost by the number of correct answers, secondly by the logic used in the trying to approach each step of the problem set;
5. Consider skipping questions that you are stuck in, and get back to them later;
6. Expect computations to take a few minutes to finish in some of the steps.
7. **IMPORTANT** It is expected you have developed skills beyond writting SQL queries. Any question where you directly write a SQL query (then for example create a temporary table and use spark.sql to pass the query) will receive a 25% penalty. Using the spark syntax (for example dataframe.select("\*").where("conditions")) is acceptable and does not incur this penalty.
8. **Questions** – Any questions about this assignment should be posted in the Forum@Moodle. The last class will be an open office session for anyone with questions concerning the assignment. 
9. **Delivery** - To fulfil this activity you will have to upload the following materials to Moodle:
    1. An exported IPython notebook. The notebook should be solved (have results displayed), but should contain all neccesary code so that when the notebook is run in databricks it should also replicate these results. This means the all data downloading and processing should be done in this notebook. It is also important you clearly indicate where your final answer to each question is when you are using multiple cells (for example you print "my final anwser is" before your answer or use cell comments).
    2. **Delivery** - You will also need to provide a signed statement of authorship, which is present in the last page;
    3. It is recommended you read the whole assignment before starting.
    4. You can add as many cells as you like to answer the questions.
    5. You can make use of caching or persisitng your RDDs or Dataframes, this may speed up performance.
    6. If you have trouble with graphframes in databricks (specifically the import statement) you need to make sure the graphframes package is installed on the cluster you are running. If you click home on the left, then click on the graphframes library which you loaded in Lab 9 you can install the package on your cluster (check the graphframes checkbox and click install)

#### Data Sources and Description
We will use data from IMDB. You can download raw datafiles
from https://datasets.imdbws.com. Note that the files are tab delimited (.tsv) You can find a
description of the each datafile in https://www.imdb.com/interfaces/

## Questions
### Data loading and preperation
Review the file descriptions and load the necessary data onto your databricks cluser and into spark dataframes. You will need to use shell commands to download the data, unzip the data, load the data into spark. Note that the data might require parsing and preprocessing to be ready for the questions below.

**Hints** You can use 'gunzip' to unzip the .tz files. The data files will then be tab seperated (.tsv), which you can load into a dataframe using the tab seperated option instead of the comma seperated option we have typically used in class: `.option(“sep”,”\t”)`

In [0]:
# Name: Francisco João Mourato Calha  |  id: m20210673

In [0]:
#libraries used

from graphframes import *
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import isnull, when, count, col
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler
import pyspark.ml.feature as MF
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import RobustScaler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator


In [0]:
#getting all the data and unzipping it

In [0]:
%sh

wget 'https://datasets.imdbws.com/name.basics.tsv.gz'
gunzip name.basics.tsv.gz

--2022-06-08 10:15:35--  https://datasets.imdbws.com/name.basics.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 99.84.66.78, 99.84.66.98, 99.84.66.41, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|99.84.66.78|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 228668577 (218M) [binary/octet-stream]
Saving to: ‘name.basics.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 7.79M 28s
    50K .......... .......... .......... .......... ..........  0% 5.51M 34s
   100K .......... .......... .......... .......... ..........  0% 8.79M 31s
   150K .......... .......... .......... .......... ..........  0% 31.2M 25s
   200K .......... .......... .......... .......... ..........  0% 13.1M 23s
   250K .......... .......... .......... .......... ..........  0% 45.6M 20s
   300K .......... .......... .......... .......... ..........  0% 13.4M 20s
   350K .......... .......... .......... .......... ..........  0% 16.7M 19s


In [0]:
%sh

wget 'https://datasets.imdbws.com/title.akas.tsv.gz'
gunzip title.akas.tsv.gz

--2022-06-08 10:15:57--  https://datasets.imdbws.com/title.akas.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 99.84.66.56, 99.84.66.78, 99.84.66.98, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|99.84.66.56|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 273528175 (261M) [binary/octet-stream]
Saving to: ‘title.akas.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 2.72M 96s
    50K .......... .......... .......... .......... ..........  0% 7.78M 65s
   100K .......... .......... .......... .......... ..........  0% 7.42M 55s
   150K .......... .......... .......... .......... ..........  0% 2.93M 63s
   200K .......... .......... .......... .......... ..........  0% 24.7M 53s
   250K .......... .......... .......... .......... ..........  0% 79.0M 45s
   300K .......... .......... .......... .......... ..........  0%  150M 38s
   350K .......... .......... .......... .......... ..........  0% 25.0M 35s
  

In [0]:
%sh

wget 'https://datasets.imdbws.com/title.basics.tsv.gz'
gunzip title.basics.tsv.gz

--2022-06-08 10:16:22--  https://datasets.imdbws.com/title.basics.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 99.84.66.41, 99.84.66.56, 99.84.66.78, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|99.84.66.41|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 156966383 (150M) [binary/octet-stream]
Saving to: ‘title.basics.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 13.9M 11s
    50K .......... .......... .......... .......... ..........  0% 4.56M 22s
   100K .......... .......... .......... .......... ..........  0% 13.7M 18s
   150K .......... .......... .......... .......... ..........  0% 6.79M 19s
   200K .......... .......... .......... .......... ..........  0% 12.9M 18s
   250K .......... .......... .......... .......... ..........  0% 13.2M 17s
   300K .......... .......... .......... .......... ..........  0% 21.7M 15s
   350K .......... .......... .......... .......... ..........  0% 12.6M 15

In [0]:
'''
%sh

wget 'https://datasets.imdbws.com/title.crew.tsv.gz'
gunzip title.crew.tsv.gz'''

Out[6]: "\n%sh\n\nwget 'https://datasets.imdbws.com/title.crew.tsv.gz'\ngunzip title.crew.tsv.gz"

In [0]:
'''
%sh

wget 'https://datasets.imdbws.com/title.episode.tsv.gz'
gunzip title.episode.tsv.gz'''

Out[7]: "\n%sh\n\nwget 'https://datasets.imdbws.com/title.episode.tsv.gz'\ngunzip title.episode.tsv.gz"

In [0]:
%sh

wget 'https://datasets.imdbws.com/title.principals.tsv.gz'
gunzip title.principals.tsv.gz

--2022-06-08 10:16:37--  https://datasets.imdbws.com/title.principals.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 99.84.66.78, 99.84.66.98, 99.84.66.41, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|99.84.66.78|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400651552 (382M) [binary/octet-stream]
Saving to: ‘title.principals.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 3.94M 97s
    50K .......... .......... .......... .......... ..........  0% 8.23M 72s
   100K .......... .......... .......... .......... ..........  0% 9.01M 62s
   150K .......... .......... .......... .......... ..........  0% 25.1M 50s
   200K .......... .......... .......... .......... ..........  0% 21.5M 44s
   250K .......... .......... .......... .......... ..........  0% 24.7M 39s
   300K .......... .......... .......... .......... ..........  0% 19.2M 36s
   350K .......... .......... .......... .......... ..........  0% 

In [0]:
%sh

wget 'https://datasets.imdbws.com/title.ratings.tsv.gz'
gunzip title.ratings.tsv.gz

--2022-06-08 10:17:25--  https://datasets.imdbws.com/title.ratings.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 99.84.66.56, 99.84.66.78, 99.84.66.98, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|99.84.66.56|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 6244725 (6.0M) [binary/octet-stream]
Saving to: ‘title.ratings.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 7.54M 1s
    50K .......... .......... .......... .......... ..........  1% 8.51M 1s
   100K .......... .......... .......... .......... ..........  2% 8.74M 1s
   150K .......... .......... .......... .......... ..........  3% 28.1M 1s
   200K .......... .......... .......... .......... ..........  4% 28.3M 0s
   250K .......... .......... .......... .......... ..........  4% 23.2M 0s
   300K .......... .......... .......... .......... ..........  5% 47.6M 0s
   350K .......... .......... .......... .......... ..........  6% 28.0M 0s
   400

In [0]:
#we can see below that we already have the data needed

In [0]:
%sh ls

conf
eventlogs
ganglia
logs
metastore_db
name.basics.tsv
preload_class.lst
title.akas.tsv
title.basics.tsv
title.principals.tsv
title.ratings.tsv


In [0]:
name_basics = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("delimiter", "\t") \
                        .load("file:/databricks/driver/name.basics.tsv")
name_basics.cache()

Out[12]: DataFrame[nconst: string, primaryName: string, birthYear: string, deathYear: string, primaryProfession: string, knownForTitles: string]

In [0]:
display(name_basics)

nconst,primaryName,birthYear,deathYear,primaryProfession,knownForTitles
nm0000001,Fred Astaire,1899,1987,"soundtrack,actor,miscellaneous","tt0053137,tt0031983,tt0050419,tt0072308"
nm0000002,Lauren Bacall,1924,2014,"actress,soundtrack","tt0038355,tt0071877,tt0117057,tt0037382"
nm0000003,Brigitte Bardot,1934,\N,"actress,soundtrack,music_department","tt0054452,tt0057345,tt0049189,tt0056404"
nm0000004,John Belushi,1949,1982,"actor,soundtrack,writer","tt0078723,tt0077975,tt0072562,tt0080455"
nm0000005,Ingmar Bergman,1918,2007,"writer,director,actor","tt0050976,tt0050986,tt0060827,tt0083922"
nm0000006,Ingrid Bergman,1915,1982,"actress,soundtrack,producer","tt0036855,tt0038109,tt0077711,tt0034583"
nm0000007,Humphrey Bogart,1899,1957,"actor,soundtrack,producer","tt0034583,tt0037382,tt0042593,tt0043265"
nm0000008,Marlon Brando,1924,2004,"actor,soundtrack,director","tt0070849,tt0068646,tt0047296,tt0078788"
nm0000009,Richard Burton,1925,1984,"actor,soundtrack,producer","tt0057877,tt0059749,tt0061184,tt0087803"
nm0000010,James Cagney,1899,1986,"actor,soundtrack,director","tt0029870,tt0042041,tt0035575,tt0031867"


In [0]:
title_akas = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("delimiter", "\t") \
                        .load("file:/databricks/driver/title.akas.tsv")
title_akas.cache()

Out[14]: DataFrame[titleId: string, ordering: int, title: string, region: string, language: string, types: string, attributes: string, isOriginalTitle: string]

In [0]:
display(title_akas)

titleId,ordering,title,region,language,types,attributes,isOriginalTitle
tt0000001,1,Карменсіта,UA,\N,imdbDisplay,\N,0
tt0000001,2,Carmencita,DE,\N,\N,literal title,0
tt0000001,3,Carmencita - spanyol tánc,HU,\N,imdbDisplay,\N,0
tt0000001,4,Καρμενσίτα,GR,\N,imdbDisplay,\N,0
tt0000001,5,Карменсита,RU,\N,imdbDisplay,\N,0
tt0000001,6,Carmencita,US,\N,imdbDisplay,\N,0
tt0000001,7,Carmencita,\N,\N,original,\N,1
tt0000001,8,カルメンチータ,JP,ja,imdbDisplay,\N,0
tt0000002,1,Le clown et ses chiens,\N,\N,original,\N,1
tt0000002,2,Le clown et ses chiens,FR,\N,imdbDisplay,\N,0


In [0]:
title_basics = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("delimiter", "\t") \
                        .load("file:/databricks/driver/title.basics.tsv")
title_basics.cache()

Out[16]: DataFrame[tconst: string, titleType: string, primaryTitle: string, originalTitle: string, isAdult: string, startYear: string, endYear: string, runtimeMinutes: string, genres: string]

In [0]:
display(title_basics)

tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"
tt0000006,short,Chinese Opium Den,Chinese Opium Den,0,1894,\N,1,Short
tt0000007,short,Corbett and Courtney Before the Kinetograph,Corbett and Courtney Before the Kinetograph,0,1894,\N,1,"Short,Sport"
tt0000008,short,Edison Kinetoscopic Record of a Sneeze,Edison Kinetoscopic Record of a Sneeze,0,1894,\N,1,"Documentary,Short"
tt0000009,short,Miss Jerry,Miss Jerry,0,1894,\N,40,"Romance,Short"
tt0000010,short,Leaving the Factory,La sortie de l'usine Lumière à Lyon,0,1895,\N,1,"Documentary,Short"


In [0]:
'''
title_crew = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("delimiter", "\t") \
                        .load("file:/databricks/driver/title.crew.tsv")
title_crew.cache()'''

Out[18]: '\ntitle_crew = spark.read.format("csv")                         .option("header", "true")                         .option("inferSchema", "true")                         .option("delimiter", "\t")                         .load("file:/databricks/driver/title.crew.tsv")\ntitle_crew.cache()'

In [0]:
'''
title_episode = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("delimiter", "\t") \
                        .load("file:/databricks/driver/title.episode.tsv")
title_episode.cache()'''

Out[19]: '\ntitle_episode = spark.read.format("csv")                         .option("header", "true")                         .option("inferSchema", "true")                         .option("delimiter", "\t")                         .load("file:/databricks/driver/title.episode.tsv")\ntitle_episode.cache()'

In [0]:
title_principals = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("delimiter", "\t") \
                        .load("file:/databricks/driver/title.principals.tsv")
title_principals.cache()

Out[20]: DataFrame[tconst: string, ordering: int, nconst: string, category: string, job: string, characters: string]

In [0]:
display(title_principals)

tconst,ordering,nconst,category,job,characters
tt0000001,1,nm1588970,self,\N,"[""Self""]"
tt0000001,2,nm0005690,director,\N,\N
tt0000001,3,nm0374658,cinematographer,director of photography,\N
tt0000002,1,nm0721526,director,\N,\N
tt0000002,2,nm1335271,composer,\N,\N
tt0000003,1,nm0721526,director,\N,\N
tt0000003,2,nm1770680,producer,producer,\N
tt0000003,3,nm1335271,composer,\N,\N
tt0000003,4,nm5442200,editor,\N,\N
tt0000004,1,nm0721526,director,\N,\N


In [0]:

title_ratings = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("delimiter", "\t") \
                        .load("file:/databricks/driver/title.ratings.tsv")
title_ratings.cache()

Out[22]: DataFrame[tconst: string, averageRating: double, numVotes: int]

In [0]:
display(title_ratings)

tconst,averageRating,numVotes
tt0000001,5.7,1884
tt0000002,5.9,250
tt0000003,6.5,1669
tt0000004,5.8,163
tt0000005,6.2,2490
tt0000006,5.2,166
tt0000007,5.4,776
tt0000008,5.4,2025
tt0000009,5.3,195
tt0000010,6.9,6812


### Network Inference, Let’s build a network
In the following questions you will look to summarise the data and build a network. We want to examine a network that abstracts how actors and actress are related through their co-participation in movies. To that end perform the following steps:

**Q1** Create a DataFrame that combines **all the information** on each of the titles (i.e., movies, tv-shows, etc …) and **all of the information** the participants in those movies (i.e., actors, directors, etc … ), make sure the actual names of the movies and participants are included. It may be worth reviewing the following questions to see how this dataframe will be used.

How many rows does your dataframe have?

In [0]:
#using only 3 tables, the ones needed for the following questions and performing two left joins to merge these tables. i choose to use left join because we want ALL the information of each title (so ALL titles) and only the information of the participats in THOSE titles. title_principals is the table between title_basics and name_basics that connect both.

In [0]:
dataframe = title_basics.join(title_principals, on='tconst',how='left')\
                        .join(name_basics, on='nconst', how='left').cache()

In [0]:
display(dataframe)

nconst,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres,ordering,category,job,characters,primaryName,birthYear,deathYear,primaryProfession,knownForTitles
nm0000086,tt0083109,movie,The Cabbage Soup,La soupe aux choux,0,1981,\N,98,"Comedy,Drama,Sci-Fi",1,actor,\N,"[""Claude Ratinier (Le Glaude)""]",Louis de Funès,1914,1983,"actor,writer,soundtrack","tt0074103,tt0079200,tt0069747,tt0064425"
nm0000086,tt0062120,movie,The Little Bather,Le petit baigneur,0,1968,\N,96,Comedy,1,actor,\N,"[""Louis-Philippe Fourchaume""]",Louis de Funès,1914,1983,"actor,writer,soundtrack","tt0074103,tt0079200,tt0069747,tt0064425"
nm0000086,tt2949354,tvMovie,Monsieur de Funès,Monsieur de Funès,0,2013,\N,82,Documentary,1,archive_footage,\N,"[""Self""]",Louis de Funès,1914,1983,"actor,writer,soundtrack","tt0074103,tt0079200,tt0069747,tt0064425"
nm0000086,tt0057422,movie,Squeak-squeak,Pouic-Pouic,0,1963,\N,86,Comedy,1,actor,\N,"[""Léonard Monestier""]",Louis de Funès,1914,1983,"actor,writer,soundtrack","tt0074103,tt0079200,tt0069747,tt0064425"
nm0000086,tt6983458,tvEpisode,Chroniques de France N° 1,Chroniques de France N° 1,0,1964,\N,26,Documentary,3,self,\N,"[""Self - (segment \""Maurice Chevalier\"")""]",Louis de Funès,1914,1983,"actor,writer,soundtrack","tt0074103,tt0079200,tt0069747,tt0064425"
nm0000086,tt16374010,tvEpisode,Episode dated 16 June 1963,Episode dated 16 June 1963,0,1963,\N,\N,News,3,self,\N,"[""Self""]",Louis de Funès,1914,1983,"actor,writer,soundtrack","tt0074103,tt0079200,tt0069747,tt0064425"
nm0000086,tt0043691,tvMovie,The Gamblers,Les joueurs,0,1950,\N,50,Comedy,1,actor,\N,"[""Piotr Petrovitch""]",Louis de Funès,1914,1983,"actor,writer,soundtrack","tt0074103,tt0079200,tt0069747,tt0064425"
nm0000086,tt0335439,short,Station mondaine,Station mondaine,0,1951,\N,\N,"Documentary,Short",1,actor,\N,"[""Le narrateur""]",Louis de Funès,1914,1983,"actor,writer,soundtrack","tt0074103,tt0079200,tt0069747,tt0064425"
nm0000086,tt2223153,tvEpisode,Episode dated 14 May 1966,Episode dated 14 May 1966,0,1966,\N,\N,Documentary,6,self,\N,"[""Self""]",Louis de Funès,1914,1983,"actor,writer,soundtrack","tt0074103,tt0079200,tt0069747,tt0064425"
nm0000086,tt2288018,tvEpisode,"Le Corniaud, Tous les enfants du monde","Le Corniaud, Tous les enfants du monde",0,1965,\N,43,Documentary,2,self,\N,"[""Self""]",Louis de Funès,1914,1983,"actor,writer,soundtrack","tt0074103,tt0079200,tt0069747,tt0064425"


My final anwser is:

In [0]:
dataframe.count()

Out[27]: 51545700

**Q2** Create a new DataFrame based on the previous step, with the following removed:
1. Any participant that is not an actor or actress (as measured by the category column);
1. All adult movies;
1. All dead actors or actresses;
1. All actors or actresses born before 1920 or with no date of birth listed;
1. All titles that are not of the type movie.

How many rows does your dataframe have?

In [0]:
#filtering the dataframe according with the constraints 

In [0]:
df = dataframe.filter(((dataframe.category == 'actor') | (dataframe.category == 'actress')) &\
                      (dataframe.isAdult == 0)& \
                      (dataframe.deathYear == "\\N") & \
                      ((dataframe.birthYear >= 1920) & \
                      (dataframe.birthYear != '\\N')) & \
                      (dataframe.titleType == 'movie' )).cache()

In [0]:
display(df)

nconst,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres,ordering,category,job,characters,primaryName,birthYear,deathYear,primaryProfession,knownForTitles
nm0000198,tt0110116,movie,Immortal Beloved,Immortal Beloved,0,1994,\N,121,"Biography,Drama,Music",1,actor,\N,"[""Ludwig van Beethoven""]",Gary Oldman,1958,\N,"actor,soundtrack,producer","tt0468569,tt1340800,tt0103874,tt4555426"
nm0000198,tt1345836,movie,The Dark Knight Rises,The Dark Knight Rises,0,2012,\N,164,"Action,Drama",4,actor,\N,"[""Commissioner Gordon""]",Gary Oldman,1958,\N,"actor,soundtrack,producer","tt0468569,tt1340800,tt0103874,tt4555426"
nm0000198,tt0097125,movie,Criminal Law,Criminal Law,0,1988,\N,117,"Crime,Drama,Thriller",1,actor,\N,"[""Ben Chase""]",Gary Oldman,1958,\N,"actor,soundtrack,producer","tt0468569,tt1340800,tt0103874,tt4555426"
nm0000198,tt3239932,movie,True Crime: The Movie,True Crime: The Movie,0,2012,\N,87,Documentary,3,actor,\N,"[""Rocky"",""Agent Masterson""]",Gary Oldman,1958,\N,"actor,soundtrack,producer","tt0468569,tt1340800,tt0103874,tt4555426"
nm0000198,tt0208874,movie,The Contender,The Contender,0,2000,\N,126,"Drama,Thriller",2,actor,\N,"[""Shelly Runyon""]",Gary Oldman,1958,\N,"actor,soundtrack,producer","tt0468569,tt1340800,tt0103874,tt4555426"
nm0000198,tt0100519,movie,Rosencrantz & Guildenstern Are Dead,Rosencrantz & Guildenstern Are Dead,0,1990,\N,117,"Comedy,Drama",1,actor,\N,"[""Rosencrantz""]",Gary Oldman,1958,\N,"actor,soundtrack,producer","tt0468569,tt1340800,tt0103874,tt4555426"
nm0000198,tt6111574,movie,The Woman in the Window,The Woman in the Window,0,2021,\N,100,"Crime,Drama,Mystery",2,actor,\N,"[""Alistair Russell""]",Gary Oldman,1958,\N,"actor,soundtrack,producer","tt0468569,tt1340800,tt0103874,tt4555426"
nm0000198,tt0118571,movie,Air Force One,Air Force One,0,1997,\N,124,"Action,Drama,Thriller",2,actor,\N,"[""Ivan Korshunov""]",Gary Oldman,1958,\N,"actor,soundtrack,producer","tt0468569,tt1340800,tt0103874,tt4555426"
nm0000198,tt2103281,movie,Dawn of the Planet of the Apes,Dawn of the Planet of the Apes,0,2014,\N,130,"Action,Adventure,Drama",1,actor,\N,"[""Dreyfus""]",Gary Oldman,1958,\N,"actor,soundtrack,producer","tt0468569,tt1340800,tt0103874,tt4555426"
nm0000198,tt10618286,movie,Mank,Mank,0,2020,\N,131,"Biography,Comedy,Drama",1,actor,\N,"[""Herman Mankiewicz""]",Gary Oldman,1958,\N,"actor,soundtrack,producer","tt0468569,tt1340800,tt0103874,tt4555426"


My final anwser is:

In [0]:
df.count()

Out[31]: 487603

**Q3** Convert the above Dataframe to an RDD. Use map and reduce to create a paired RDD which counts how many movies each actor / actress appears in.

Display names of the top 10 actors/actresses according to the number of movies in which they appeared. Be careful to deal with different actors / actresses with the same name, these could be different people.

In [0]:
#First, converting the dataframe to an rdd and using not only the name (which we want to display) but also the 'nconst' (which is unique).

In [0]:
paired_RDD = df.rdd.map(lambda x: ((x[0], x[14]), 1))
count_RDD = paired_RDD.reduceByKey(lambda a,b : a + b)
sorted_RDD = count_RDD.sortBy(lambda x: x[1], ascending=False)

My final anwser is:

In [0]:
top_10 = sorted_RDD.map(lambda x: (x[0][1], x[1]))
top_10.take(10)

Out[34]: [('Brahmanandam', 796),
 ('Mammootty', 364),
 ('Mohanlal', 351),
 ('Mithun Chakraborty', 329),
 ('Shakti Kapoor', 322),
 ('Eric Roberts', 313),
 ('Cüneyt Arkin', 300),
 ('Jagathi Sreekumar', 293),
 ('Dharmendra', 270),
 ('Helen', 269)]

**Q4** Start with the dataframe from Q2. Generate a DataFrame that lists all links of your network. Here we shall consider that a link connects a pair of actors/actresses if they participated in at least one movie together (actors / actresses should be represented by their unique ID's). For every link we then need anytime a pair of actors were together in a movie as a link in each direction (A -> B and B -> A). However links should be distinct we do not need duplicates when two actors worked together in several movies.

In [0]:
#creating the edges

#as the question only ask for the links (edges) i will only create the vertices below on page rank question

In [0]:
df_edges = (df.select(df['nconst'].alias('n1'),'tconst')).join(df.select(df['nconst'].alias('n2'),'tconst'), on='tconst')
df_filtered = df_edges.filter(df_edges.n1 != df_edges.n2).cache()    # to guarantee that n1 and n2 have to be different


In [0]:
# we can even check that when A -> B occur also occur B -> A, i mean, when n1 worked with n2 in a movie, n2 also worked with n1 in the same movie. 
#Example:

In [0]:
df_filtered.filter((df_filtered.n1 == 'nm0000113') & (df_filtered.n2 == 'nm0358316')|\
                    (df_filtered.n2 == 'nm0000113') & (df_filtered.n1 == 'nm0358316')).display()

tconst,n1,n2
tt2293640,nm0000113,nm0358316
tt2293640,nm0358316,nm0000113


In [0]:
#now i need to guarantee that links should be distinct i.e when two actors worked together in several movies.

In [0]:
EdgesListRow = df_filtered.select('n1','n2').distinct().collect()
sourceColumn = StructField('src', StringType(),True)
destinationColumn = StructField('dst', StringType(), True)
edgeSchema = StructType([sourceColumn, destinationColumn])
edgeDataFrame = sqlContext.createDataFrame(EdgesListRow, edgeSchema).persist()

My final anwser is:

In [0]:
edgeDataFrame.display()

src,dst
nm0051880,nm0502262
nm0343857,nm0092573
nm0001696,nm0507659
nm0009629,nm0796396
nm0516272,nm0009629
nm0148661,nm0811660
nm0001166,nm0329640
nm0012361,nm2588787
nm0560364,nm0004760
nm0000800,nm0004760


**Q5** Compute the page rank of each actor. This can be done using GraphFrames or
by using RDDs and the iterative implementation of the PageRank algorithm. Do not take
more than 5 iterations and use reset probility = 0.1.

List the top 10 actors / actresses by pagerank.

In [0]:
#creating the vertices, knowing that they must be unique so we need to drop duplicated ids.

In [0]:
verticesListRow = df.select('nconst').dropDuplicates().collect()
verticesSchema = StructType([StructField('id', StringType(), True)])
verticesDataFrame = sqlContext.createDataFrame(verticesListRow, verticesSchema).persist()

In [0]:
verticesDataFrame.display()

id
nm0000198
nm0000354
nm0000362
nm0000767
nm0002222
nm0003678
nm0004709
nm0004763
nm0005037
nm0005258


In [0]:
# creating a graph and then applying the page rank algorithm

In [0]:
ourGraph = GraphFrame(verticesDataFrame, edgeDataFrame)
pageRanks = ourGraph.pageRank(resetProbability=0.1, maxIter = 5)
pageRank_vertices = pageRanks.vertices.sort('pagerank',ascending = False)

My final answear is:

In [0]:
pageRank_vertices.show(10)

+---------+------------------+
|       id|          pagerank|
+---------+------------------+
|nm0000616| 53.69072773571389|
|nm0000514|28.450223793553228|
|nm0001744|27.191574641528177|
|nm0001803| 24.95031828455196|
|nm0920460|21.712645095656526|
|nm0000448| 21.46497421351268|
|nm0001595|21.208161365875714|
|nm0001698| 19.77463867014523|
|nm0000367|19.645853356534897|
|nm0004193|19.171056041079787|
+---------+------------------+
only showing top 10 rows



**Q6**: Create an RDD with the number of outDegrees for each actor. Display the top 10 by outdegrees.

My final answear is:

In [0]:
RDD_outDegrees = ourGraph.outDegrees.rdd.map(lambda x: (x[0],x[1]))\
                                        .sortBy(lambda x: x[1], ascending=False)
RDD_outDegrees.take(10)

Out[48]: [('nm0000616', 486),
 ('nm0000514', 284),
 ('nm0001744', 265),
 ('nm0000367', 264),
 ('nm0945189', 257),
 ('nm0256628', 253),
 ('nm0451600', 252),
 ('nm0001803', 246),
 ('nm0149822', 240),
 ('nm0004109', 234)]

### Let’s play Kevin’s own game

**Q7** Start with the graphframe / dataframe you developed in the previous questions. Using Spark GraphFrame and/or Spark Core library perform the following steps:

1. Identify the id of Kevin Bacon, there are two actors named ‘Kevin Bacon’, we will use the one with the highest degree, that is, the one that participated in most titles;
1. Estimate the shortest path between every actor in the database actors and Kevin Bacon, keep a dataframe with this information as you will need it later;
1. Summarise the data, that is, count the number of actors at each number of degress from kevin bacon (you will need to deal with actors unconnected to kevin bacon, if not connected to Kevin Bacon given these actors / actresses a score/degree of 20).

In [0]:
# 1.

# first lets get the IDs of the actors 'Kevin Bacon'; in this exercise i only got one Kevin Bacon 

In [0]:
kevin_bacon = df.filter(df['primaryName'] == 'Kevin Bacon')\
             .groupBy('nconst')\   # just to summarize the information
             .count()\
             .show()

+---------+-----+
|   nconst|count|
+---------+-----+
|nm0000102|   52|
+---------+-----+



In [0]:
id_kevin = ourGraph.outDegrees.filter(ourGraph.outDegrees.id == 'nm0000102') \
                   .sort('outDegree', ascendig = False)\    # see note below
                   .select('id').collect()[0]['id']

id_kevin
# this code was done taking into consideraion that i was supposed to have two 'Kevin Bacon' and choose the one with the highest degree. the only thing missing is that if i really had two 'Kevin Bacon' i would need: filter((ourGraph.outDegrees.id == 'nm0000102')|(ourGraph.outDegrees.id == 'the second id')) and then the sort makes sense.

Out[51]: 'nm0000102'

In [0]:
#2.

shortest_path = ourGraph.shortestPaths(landmarks=[id_kevin]).cache()

In [0]:
shortest_path.display()

id,distances
nm0057029,Map(nm0000102 -> 4)
nm0064609,Map()
nm8760959,Map()
nm0253708,Map(nm0000102 -> 2)
nm2916662,Map(nm0000102 -> 3)
nm1597349,Map()
nm0057080,Map(nm0000102 -> 4)
nm1335115,Map(nm0000102 -> 4)
nm3135033,Map(nm0000102 -> 6)
nm0177208,Map(nm0000102 -> 4)


In [0]:
#3.

In [0]:
all_connected = shortest_path.rdd.map(lambda x: (x[0],x[1]['nm0000102']) if len(x[1]) != 0 else (x[0],20))
#this is a paired RDD like (nconst, degree) if the dictionary is not empty we will get the degree value else the value is 20

summary = all_connected.toDF(['Person','Degree'])\
                       .groupBy('Degree')\
                       .count()\
                       .sort('Degree')


My final answear is:

In [0]:
summary.show()

+------+-----+
|Degree|count|
+------+-----+
|     0|    1|
|     1|  133|
|     2| 3547|
|     3|19475|
|     4|31215|
|     5|14930|
|     6| 2782|
|     7|  376|
|     8|   55|
|     9|    9|
|    20|20518|
+------+-----+



### Exploring the data with RDD's

Using RDDs and (not dataframes) answer the following questions (if you loaded your data into spark in a dataframe you can convert to an RDD of rows easily using `.rdd`):

**Q8** Movies can have multiple genres. Considering only titles of the type 'movie' what is the combination of genres that is the most popluar (as measured by number of reviews). Hint: paired RDD's will be useful.

In [0]:
# to answear this question only title_ratings and title_basics are necessary which will be converted to paired RDDs.

In [0]:
RDD_basics = title_basics.rdd.filter(lambda x: (x[1] == 'movie') & (x[8] != '\\N'))\
                             .map(lambda x: (x[0],x[8]))
# we only want the movies and we are not interested in movies without genre

RDD_ratings = title_ratings.rdd.map(lambda x: (x[0],x[2]))

In [0]:
RDD_basics.take(5), RDD_ratings.take(5)

Out[58]: ([('tt0000574', 'Action,Adventure,Biography'),
  ('tt0000591', 'Drama'),
  ('tt0000615', 'Drama'),
  ('tt0000630', 'Drama'),
  ('tt0000675', 'Drama')],
 [('tt0000001', 1884),
  ('tt0000002', 250),
  ('tt0000003', 1669),
  ('tt0000004', 163),
  ('tt0000005', 2490)])

In [0]:
# let's merge both rdds and then transform it into a new paired RDD with (genre,numVotes) and sum the numVotes per genre

In [0]:
RDD_merged = RDD_basics.join(RDD_ratings)
RDD_genre_rating = RDD_merged.map(lambda x: (x[1][0], x[1][1]))\
                             .reduceByKey(lambda a,b: a + b)\
                             .sortBy(lambda x: x[1], ascending = False)

My final answear is:

In [0]:
RDD_genre_rating.take(1)[0]

Out[61]: ('Action,Adventure,Sci-Fi', 51140697)

**Q9** Movies can have multiple genres. Considering only titles of the type 'movie', and movies with more than 400 ratings, what is the combination of genres that has the highest **average movie rating** (you can average the movie rating for each movie in that genre combination). Hint: paired RDD's will be useful.

In [0]:
# pratically we need to do the same as we did above but considering the average rating of the movie instead of the 'numVotes'. In the end, when grouping the ratings by the genre, we will need to calculate the average of the results.

In [0]:
RDD_basics_ = title_basics.rdd.filter(lambda x: (x[1] == 'movie') & (x[8] != '\\N'))\
                              .map(lambda x: (x[0],x[8]))
RDD_ratings_ = title_ratings.rdd.filter(lambda x: (x[2] > 400))\
                            .map(lambda x: (x[0],x[1]))

In [0]:
RDD_merged_ = RDD_basics_.join(RDD_ratings_)
RDD_averages = RDD_merged_.map(lambda x: (x[1][0], x[1][1]))\
                          .groupByKey() \
                          .mapValues(lambda x: sum(x) / len(x)) \   # average the results
                          .sortBy(lambda x: x[1], ascending = False)

My answear is:

In [0]:
RDD_averages.take(1)

Out[65]: [('Music,Musical', 8.4)]

**Q10** Movies can have multiple genres. What is **the individual genre** which is the most popular as meaured by number of votes. Votes for multiple genres count towards each genre listed. Hint: flatmap and pairedRDD's will be useful here.

In [0]:
#using the rdds created in Q8

#flatMapValues: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.flatMapValues.html

In [0]:
RDD_individual_genre_rating = RDD_merged.map(lambda x: (x[1][1], x[1][0]))\           # storing it like (numVotes, genres) in order to apply flatMapValues
                                        .flatMapValues(lambda x: x.split(','))\       # split the 'genres' on the ','
                                        .map(lambda x: (x[1],x[0]))\                  # genre as the key to perform reduceByKey
                                        .reduceByKey(lambda a,b: a + b)\
                                        .sortBy(lambda x: x[1], ascending = False)

My answaer is:

In [0]:
RDD_individual_genre_rating.take(1)

Out[68]: [('Drama', 487969283)]

## Engineering the perfect cast
We have created a number of potential features for predicting the rating of a movie based on its cast. Use sparkML to build a simple linear model to predict the rating of a movie based on the following features:

1. The total number of movies in which the actors / actresses have acted (based on Q3)
1. The average pagerank of the cast in each movie (based on Q5)
1. The average outDegree of the cast in each movie (based on Q6)
1. The average value for for the cast of degrees of Kevin Bacon (based on Q7).

You will need to create a dataframe with the required features and label. Use a pipeline to create the vectors required by sparkML and apply the model. Remember to split your dataset, leave 30% of the data for testing, when splitting your data use the option seed=0.

**Q11** Provide the coefficients of the regression and the accuracy of your model on that test dataset according to RSME.

In [0]:
# we need to merge all information in a single dataframe. To do it, we will use 'nconst' and in the end we can drop it.

In [0]:
data_joins =  df.rdd.map(lambda x: (x[0], 1))\
                 .reduceByKey(lambda a,b : a + b)\
                 .toDF(['nconst', 'count'])\          # for 1.
                 .join(df.select(['nconst','tconst']), on = 'nconst')\     # we will need 'tconst' in the future to group
                 .join(pageRank_vertices.select(pageRank_vertices['id'].alias('nconst'),'pagerank'),on = 'nconst')\        # for 3.
                 .join(RDD_outDegrees.toDF(['nconst', 'outDegrees']),on = 'nconst')\       # for 4.
                 .join(all_connected.toDF(['nconst','DegreeKB']),on = 'nconst')\         # for 5.
                 .drop('nconst').cache()

In [0]:
# now that we have all the information needed in a single dataframe we need to group by 'tconst' and aggregate the information as it is asked.

In [0]:
data = data_joins.groupBy('tconst').agg({'count':'sum', 'pagerank':'avg', 'outDegrees':'avg', 'DegreeKB':'avg'})\
                 .join(title_ratings.select(title_ratings['averageRating'].alias('rating'),'tconst'), on = 'tconst')


In [0]:
data.show(10)

+---------+---------------+----------+-------------------+-------------+------+
|   tconst|avg(outDegrees)|sum(count)|      avg(pagerank)|avg(DegreeKB)|rating|
+---------+---------------+----------+-------------------+-------------+------+
|tt0036077|            1.0|        17| 0.1779822304598052|          5.0|   7.5|
|tt0042303|           13.5|        68| 1.8992518523345572|          3.0|   6.6|
|tt0044431|            1.0|        14|0.40176691408112764|          5.0|   6.1|
|tt0044770|            4.0|        20| 1.2466639401145558|          3.5|   6.6|
|tt0045015|            1.0|        12|  0.279047871039105|          4.0|   5.6|
|tt0048006|           13.0|        80|  1.742202628073934|          3.0|   5.9|
|tt0049309|           12.0|        45| 1.7129026129178544|          4.0|   6.0|
|tt0049448|           97.0|        99|   8.35048381562023|          3.0|   6.9|
|tt0050254|           31.0|        62| 3.4561042913959445|          4.0|   6.1|
|tt0050651|            1.0|         7|0.

In [0]:
#checking for possible outliers

In [0]:
CONTINUOUS_FEATURES = data.columns[1:5]
data.select(*CONTINUOUS_FEATURES).summary("mean","stddev","min","1%","5%","50%","95%","99%","max",).show()

+-------+------------------+------------------+------------------+------------------+
|summary|   avg(outDegrees)|        sum(count)|     avg(pagerank)|     avg(DegreeKB)|
+-------+------------------+------------------+------------------+------------------+
|   mean| 34.76085908361231| 68.79224072466913| 2.903557449688704|3.8457588344017646|
| stddev|33.330464459704714|101.97909624315612|2.3952855635422696| 2.602621615294829|
|    min|               1.0|                 2|0.1511677262880853|               0.5|
|     1%|               1.0|                 2|0.2737485961963287|              1.75|
|     5%|               2.0|                 4|0.5634925138378909|               2.0|
|    50%|              25.0|                37| 2.326754637709051|               3.5|
|    95%| 99.33333333333333|               232| 7.022442107032612|               5.0|
|    99%|             146.5|               487|10.875284460286021|              20.0|
|    max|             486.0|              1653| 53.690

The columns "avg(outDegrees)","sum(count)" and "avg(pagerank)" seem to have some outliers.

In [0]:
#checking for missing values to see if we need to drop them or if is better to fill them

#inspiration for this code: https://stackoverflow.com/questions/44413132/count-the-number-of-missing-values-in-a-dataframe-spark

In [0]:
nan_columns = data.select([count(when(isnull(col), col)).alias(col) for col in data.columns]).toPandas()
nan_columns

Unnamed: 0,tconst,avg(outDegrees),sum(count),avg(pagerank),avg(DegreeKB),rating
0,0,0,0,0,0,0


We don't have any missing value.

Pipeline

In [0]:
#Note: i will use MinMaxScaler here(as we did in pratical classes) but as we are using the Linear Regression, really sensible to outliers, we should also tried RobustScaler. I will explore it later on Q14.

In [0]:
vector_assembler = MF.VectorAssembler(  
    inputCols=CONTINUOUS_FEATURES,
    outputCol="vector",
)
 
vector_scaler = MF.MinMaxScaler(  
    inputCol="vector",
    outputCol="vector_scaled",
)

lr =  LinearRegression(
    featuresCol="vector_scaled", labelCol="rating", predictionCol="predictions"
)


movies_pipeline = Pipeline(  
    stages=[vector_assembler, vector_scaler,lr],
)

In [0]:
#train-test split

In [0]:
train, test = data.randomSplit([0.7, 0.3], seed = 0)
train.cache()
test.cache()

Out[74]: DataFrame[tconst: string, avg(outDegrees): double, sum(count): bigint, avg(pagerank): double, avg(DegreeKB): double, rating: double]

In [0]:
ourModel = movies_pipeline.fit(train)
predictions = ourModel.transform(test)

evaluator = RegressionEvaluator(
    labelCol="rating",  
    predictionCol="predictions", 
    metricName="rmse",
)

rmse = evaluator.evaluate(predictions)

My final answear is:

In [0]:
print('RMSE of the model: '+str(rmse)+'\nValue of the intercept of the model: ' +str(ourModel.stages[2].intercept)+ '\nValues of the coefficients of the model: '+str(ourModel.stages[2].coefficients.values))

RMSE of the model: 1.2586001057640366
Value of the intercept of the model: 5.88131831475802
Values of the coefficients of the model: [ 4.62602759 -1.13612923 -6.18224448  0.28724882]


**Q12** What score would your model predict for the 1997 movie Titanic.

In [0]:
titanicID = title_basics.filter((title_basics.primaryTitle == 'Titanic') & (title_basics.startYear == 1997) & (title_basics.titleType == 'movie')).collect()[0][0]
#getting Titanic ID

dfTitanic = data.filter(data.tconst == titanicID).cache()
#dataframe with Titanic information

featTitanic = dfTitanic.drop('rating')
labelTitanic = dfTitanic.select('rating')
# splitting dfTitanic into features, that we will use to test the model in the future, and real label(to compare the results)

predictions_titanic = ourModel.transform(featTitanic)
predictedValue = predictions_titanic.collect()[0]['predictions']
labelValue = labelTitanic.collect()[0]['rating']

My final answer:

In [0]:
print(f" Predicted rating for Titanic {predictedValue} ") #predicted rating
print(f" Rating of Titanic = {labelValue} ") #real rating

 Predicted rating for Titanic 5.807227841387321 
 Rating of Titanic = 7.9 


**Q13** Create dummy variables for each of the top 10 movie genres for Q10. These variable should have a value of 1 if the movie was rated with that genre and 0 otherwise. For example the 1997 movie Titanic should have a 1 in the dummy variable column for Romance, and a 1 in the dummy variable column for Drama, and 0's in all the other dummy variable columns.

Does adding these variable to the regression improve your results? What is the new RMSE and predicted rating for the 1997 movie Titanic.

In [0]:
#creating top10_geres with the RDD already created for indvidual genres and rating

In [0]:
top10_genres = RDD_individual_genre_rating.map(lambda x: x[0]).take(10)
print(top10_genres)

['Drama', 'Action', 'Comedy', 'Adventure', 'Crime', 'Thriller', 'Sci-Fi', 'Romance', 'Mystery', 'Horror']


In [0]:
# to create the dummies variables i inspired myself in the link below.
#https://stackoverflow.com/questions/42805663/e-num-get-dummies-in-pyspark

In [0]:
ourDummies = [when(col("genres").contains(genre),1).otherwise(0).alias(genre) for genre in top10_genres]      
dummiesDF = title_basics.select("tconst", "genres", *ourDummies)

dataDummies = data.join(dummiesDF, on='tconst').cache()

dataDummies.show(10)

+---------+---------------+----------+-------------------+-------------+------+--------------------+-----+------+------+---------+-----+--------+------+-------+-------+------+
|   tconst|avg(outDegrees)|sum(count)|      avg(pagerank)|avg(DegreeKB)|rating|              genres|Drama|Action|Comedy|Adventure|Crime|Thriller|Sci-Fi|Romance|Mystery|Horror|
+---------+---------------+----------+-------------------+-------------+------+--------------------+-----+------+------+---------+-----+--------+------+-------+-------+------+
|tt0036077|            1.0|        17| 0.1779822304598052|          5.0|   7.5|               Drama|    1|     0|     0|        0|    0|       0|     0|      0|      0|     0|
|tt0042303|           13.5|        68| 1.8992518523345572|          3.0|   6.6|               Drama|    1|     0|     0|        0|    0|       0|     0|      0|      0|     0|
|tt0044431|            1.0|        14|0.40176691408112764|          5.0|   6.1|      Comedy,Musical|    0|     0|     1|

In [0]:
print(dataDummies.columns[1:5] + dataDummies.columns[7:])

['avg(outDegrees)', 'sum(count)', 'avg(pagerank)', 'avg(DegreeKB)', 'Drama', 'Action', 'Comedy', 'Adventure', 'Crime', 'Thriller', 'Sci-Fi', 'Romance', 'Mystery', 'Horror']


In [0]:
BINARY_FEATURES = dataDummies.columns[7:] 
CONTINUOUS_FEATURES =dataDummies.columns[1:5]

In [0]:
vector_continuous = MF.VectorAssembler(  
    inputCols=CONTINUOUS_FEATURES,
    outputCol="vector_continuous",
)
 
vector_scaler_minmax = MF.MinMaxScaler(  
    inputCol="vector_continuous",
    outputCol="vector_scaled_minmax",
)

vector_assembler = MF.VectorAssembler(
    inputCols=BINARY_FEATURES 
    + ["vector_scaled_minmax"],
    outputCol="features",
)

lr =  LinearRegression(
    featuresCol="features", labelCol="rating", predictionCol="predictions"
)


movies_pipeline = Pipeline(  
    stages=[vector_continuous,vector_scaler_minmax,vector_assembler,lr],
)

In [0]:
train, test = dataDummies.randomSplit([0.7, 0.3], seed = 0)
train.cache()
test.cache()

ourModel_ = movies_pipeline.fit(train)
predictions_ = ourModel_.transform(test)

dfTitanic_ = dataDummies.filter(dataDummies.tconst == titanicID).cache()

featTitanic = dfTitanic_.drop('rating')
labelTitanic = dfTitanic_.select('rating')

predictions_titanic = ourModel_.transform(featTitanic)
predictedValue = predictions_titanic.collect()[0]['predictions']
labelValue = labelTitanic.collect()[0]['rating']

In [0]:
rmse_ = evaluator.evaluate(predictions_)

My final answer is:

In [0]:
print(f" RMSE of the model {rmse_} ")
print(f" Predicted rating for Titanic {predictedValue} ")
print(f" Rating of Titanic = {labelValue} ")

 RMSE of the model 1.1978463942251345 
 Predicted rating for Titanic 6.145087052276939 
 Rating of Titanic = 7.9 


We can conclude the genres of the movie have meaning to predict the 'average rating' of the movie and in the following question we should keep them (RMSE decreased).

**Q14 - Open Question**: Improve your model by testing different machine learning algorithms, using hyperparameter tuning on these algorithms, changing the included features. What is the RMSE of you final model and what rating does it predict for the 1997 movie Titanic.

We already know that including the top10 genres in our model improved the results and makes sense. We can also think that if a movie has more or less adaptation/'versions', can also have meaning for its rating. Is expected that a movie with more adaptations has a higher rating (because it can mean that movie was popular).

In [0]:
numbVersion = title_akas.select(title_akas['titleId'].alias('tconst')).groupBy('tconst').count()
numbVersion.show(3)
#title_akas -- where we have the information about the adaptations/'versions' of a movie

+---------+-----+
|   tconst|count|
+---------+-----+
|tt0000658|    5|
|tt0000839|    2|
|tt0001170|    2|
+---------+-----+
only showing top 3 rows



In [0]:
data_new = dataDummies.join(numbVersion, on = 'tconst')

BINARY_FEATURES = data_new.columns[7:-1]
CONTINUOUS_FEATURES =data_new.columns[1:5] + [data_new.columns[-1]]     #all previous continuous features and the number of versions

vector_continuous = MF.VectorAssembler(  
    inputCols=CONTINUOUS_FEATURES,
    outputCol="vector_continuous",
)
 
vector_scaler_robust = MF.RobustScaler(  
    inputCol="vector_continuous",
    outputCol="vector_scaled_robust",
)

vector_assembler = MF.VectorAssembler(
    inputCols=BINARY_FEATURES 
    + ["vector_scaled_robust"],
    outputCol="features",
)

#checking also the results with the RobustScaler for linear regression which might be more appropriate in our case
#note: parameters of Gradient-boosted tree regressor explained below

lr =  LinearRegression(
    featuresCol="features", labelCol="rating", predictionCol="predictions"
)

rf = RandomForestRegressor(
    featuresCol="features", labelCol="rating", predictionCol="predictions"
)

gbt = GBTRegressor(
    featuresCol="features", labelCol="rating", predictionCol="predictions",maxDepth=5,maxIter = 10
)

movies_pipeline_1 = Pipeline(  
    stages=[vector_continuous,vector_scaler_robust,vector_assembler,lr],
)

movies_pipeline_2 = Pipeline(  
    stages=[vector_continuous,vector_scaler_robust,vector_assembler,rf],
)

movies_pipeline_3 = Pipeline(  
    stages=[vector_continuous,vector_scaler_robust,vector_assembler, gbt],
)

evaluator = RegressionEvaluator(
    labelCol="rating",  
    predictionCol="predictions", 
    metricName="rmse",
)

In [0]:
train, test = data_new.randomSplit([0.7, 0.3], seed = 0)
train.cache()
test.cache()

dfTitanic_NEW = data_new.filter(data_new.tconst == titanicID).cache()

featTitanic = dfTitanic_NEW.drop('rating')
labelTitanic = dfTitanic_NEW.select('rating')

For Linear Model

In [0]:
model_lr = movies_pipeline_1.fit(train)
predictions1 = model_lr.transform(test)
rmse1 = evaluator.evaluate(predictions1)
predictions_Titanic1 = model_lr.transform(featTitanic)

print('RMSE of the model: ' +str(rmse1)+ '\nPredicted rating: ' +str(predictions_Titanic1.collect()[0]['predictions']))

RMSE of the model: 1.1778759514052892
Predicted rating: 7.380385996092974


For Random Forest

In [0]:
model_rf = movies_pipeline_2.fit(train)
predictions2 = model_rf.transform(test)
rmse2 = evaluator.evaluate(predictions2)

In [0]:
predictions_Titanic2 = model_rf.transform(featTitanic)

print('RMSE of the model: ' +str(rmse2)+ '\nPredicted rating: ' +str(predictions_Titanic2.collect()[0]['predictions']))

RMSE of the model: 1.1796799439607009
Predicted rating: 6.635893289095067


For Gradient-boosted tree

This model was really difficult to run, with the cluster dying after a while, I had to reduce my parameters and do some experiments, but I took an interesting conclusion: \
 -first I tried to run with the deafult parameters, was impossible;\
 -then I even tried on a sample of my dataframe but the results were poor becuase the size was too small to can run it;\
 -so, I tried to change the default parameters mainly max depth to 2 and max iteration only 5, now I could run it but not good results;\
 -finally, as my results were improving I changed the max depth to the default value (5) and increase the max iterations to 10, and the results improved being my best model, so if I could further explore the parameteres, probably I would get better results.

In [0]:
model_gbt = movies_pipeline_3.fit(train)

In [0]:
predictions_Titanic3 = model_gbt.transform(featTitanic)

In [0]:
predictions3 = model_gbt.transform(test)
rmse3 = evaluator.evaluate(predictions3)

In [0]:
predictions_Titanic3 = model_gbt.transform(featTitanic)

print('RMSE of the model: ' +str(rmse3)+ '\nPredicted rating: ' +str(predictions_Titanic3.collect()[0]['predictions']))

RMSE of the model: 1.1600398440373452
Predicted rating: 7.467804159050522


Grid Search CV

To do this exercise properly I should run a GridSearch for the three models choosen and compare the resuls. However, cluster dies running a GridSearch with CV for the ensemblers chosen (for Gradient-boosted tree is impossible for me with the community version and for randomm forest i had to limit the parameters to a really small number). So, i will only run a GridSearchCV for Linear Regression to try to improve more the results (this model is really fast comparing to the others).

In [0]:
grid_search = (
    ParamGridBuilder()\
#    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()
)

#grid search for RF
# grid_search = (
#     ParamGridBuilder() 
#     .addGrid(rf.numTrees, [25,50])
#     .addGrid(rf.maxDepth, [2, 4, 10])
#     .build()
# )

#grid search for gbt
# grid_search = (
#     ParamGridBuilder() 
#     .addGrid(gbt.maxIter, [10, 20, 30])
#     .addGrid(gbt.maxDepth, [5, 10])
#     .build()
# )


cv = CrossValidator(
    estimator=movies_pipeline_1,
    estimatorParamMaps=grid_search,
    evaluator=evaluator,
    numFolds=2,
    seed=13,
)

In [0]:
cv_model = cv.fit(train)


In [0]:
predicts = cv_model.transform(test)

rmse1_update = evaluator.evaluate(predicts)
predicts_Titanic = cv_model.transform(featTitanic)

print('RMSE of the model: ' +str(rmse1_update)+ '\nPredicted rating: ' +str(predicts_Titanic.collect()[0]['predictions']))

RMSE of the model: 1.1778759514052894
Predicted rating: 7.380385996092976


Final Results

In [0]:
simpleDF = spark.createDataFrame(
    [
        (1, "lr + GridSearchCV", rmse1_update, predicts_Titanic.collect()[0]['predictions'],labelTitanic.collect()[0]['rating']),  
        (2, "rf", rmse2, predictions_Titanic2.collect()[0]['predictions'],labelTitanic.collect()[0]['rating']),
        (3, "gbt", rmse3, predictions_Titanic3.collect()[0]['predictions'],labelTitanic.collect()[0]['rating']),
    ],
    ["id", "model","RMSE","predTitanic","ratingTtanic"] 
)

simpleDF.show()

+---+-----------------+------------------+-----------------+------------+
| id|            model|              RMSE|      predTitanic|ratingTtanic|
+---+-----------------+------------------+-----------------+------------+
|  1|lr + GridSearchCV|1.1778759514052894|7.380385996092976|         7.9|
|  2|               rf|1.1796799439607009|6.635893289095067|         7.9|
|  3|              gbt|1.1600398440373452|7.467804159050522|         7.9|
+---+-----------------+------------------+-----------------+------------+



Final Conclusions:

If I have to pick one model, I would choose the Gradient-boosted tree regressor, it achieved good results (RMSE = 1.16 and predicted rating for Titanic of 7.47). For this model, I have to say that I really would like to see how far it could go if I could run it properly because the results obtained were increasing as I increased the parameters but I was limitated by the time and the community version.
Random Forest Regressor was indeed, the worst model however, with Grid Search CV we could probably improve the results. 
Finally, I would like to highlight my final Linear Regression with Robust Scaler which also achieve good results and the improvement from Q13 is visible. We can even see that we had no improvement using the GridSearchCV, which it's not strange at all since the search space was very limited.

One final note, the feature included was also important to achieve these results because I made some tests with it and without and the results were favourable. So, as expected, the number of versions/adaptations of a movie has meaning to predict the average rating of a movie.