# Six Degrees of Kevin Bacon
**Introduction** - Six Degrees of Kevin Bacon is a game based on the "six degrees of separation"
concept, which posits that any two people on Earth are six or fewer acquaintance links apart. Movie
buffs challenge each other to find the shortest path between an arbitrary actor and prolific actor
Kevin Bacon. It rests on the assumption that anyone involved in the film industry can be linked
through their film roles to Bacon within six steps.
The analysis of social networks can be a computationally intensive task, especially when dealing with
large volumes of data. It is also a challenging problem to devise a correct methodology to infer an
informative social network structure. Here, we will analyze a social network of actors and actresses
that co-participated in movies. We will do some simple descriptive analysis, and in the end try to
relate an actor/actress’s position in the social network with the success of the movies in which they
participate.

#### Rules & Notes - Please take your time to read the following points:

1. The submission deadline will be set for the 5th of June at 23:59.
2. It is acceptable that you **discuss** with your colleagues different approaches to solve each step of the problem set, but the assignment is individual. That is, you are responsible for writing your own code, and analysing the results. Clear cases of cheating will be penalized with 0 points in this assignment;
3. After review of your submission files, and before a mark is attributed, you might be called to orally defend your submission;
4. You will be scored first and foremost by the number of correct answers, secondly by the logic used in the trying to approach each step of the problem set;
5. Consider skipping questions that you are stuck in, and get back to them later;
6. Expect computations to take a few minutes to finish in some of the steps.
7. **IMPORTANT** It is expected you have developed skills beyond writting SQL queries. Any question where you directly write a SQL query (then for example create a temporary table and use spark.sql to pass the query) will receive a 25% penalty. Using the spark syntax (for example dataframe.select("\*").where("conditions")) is acceptable and does not incur this penalty.
8. **Questions** – Any questions about this assignment should be posted in the Forum@Moodle. The last class will be an open office session for anyone with questions concerning the assignment. 
9. **Delivery** - To fulfil this activity you will have to upload the following materials to Moodle:
    1. An exported IPython notebook. The notebook should be solved (have results displayed), but should contain all neccesary code so that when the notebook is run in databricks it should also replicate these results. This means the all data downloading and processing should be done in this notebook. It is also important you clearly indicate where your final answer to each question is when you are using multiple cells (for example you print "my final anwser is" before your answer or use cell comments).
    2. **Delivery** - You will also need to provide a signed statement of authorship, which is present in the last page;
    3. It is recommended you read the whole assignment before starting.
    4. You can add as many cells as you like to answer the questions.
    5. You can make use of caching or persisitng your RDDs or Dataframes, this may speed up performance.
    6. If you have trouble with graphframes in databricks (specifically the import statement) you need to make sure the graphframes package is installed on the cluster you are running. If you click home on the left, then click on the graphframes library which you loaded in Lab 9 you can install the package on your cluster (check the graphframes checkbox and click install)

#### Data Sources and Description
We will use data from IMDB. You can download raw datafiles
from https://datasets.imdbws.com. Note that the files are tab delimited (.tsv) You can find a
description of the each datafile in https://www.imdb.com/interfaces/

## Questions
### Data loading and preperation
Review the file descriptions and load the necessary data onto your databricks cluser and into spark dataframes. You will need to use shell commands to download the data, unzip the data, load the data into spark. Note that the data might require parsing and preprocessing to be ready for the questions below.

**Hints** You can use 'gunzip' to unzip the .tz files. The data files will then be tab seperated (.tsv), which you can load into a dataframe using the tab seperated option instead of the comma seperated option we have typically used in class: `.option(“sep”,”\t”)`

In [0]:
# Some of the cells were runned in different days or time spans because they take so long to run, for example, the display() cells

In [0]:
%sh wget 'https://datasets.imdbws.com/name.basics.tsv.gz'

--2022-06-07 23:49:57--  https://datasets.imdbws.com/name.basics.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 99.84.66.98, 99.84.66.41, 99.84.66.56, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|99.84.66.98|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 228625979 (218M) [binary/octet-stream]
Saving to: ‘name.basics.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 4.00M 54s
    50K .......... .......... .......... .......... ..........  0% 8.11M 41s
   100K .......... .......... .......... .......... ..........  0% 8.69M 35s
   150K .......... .......... .......... .......... ..........  0% 19.0M 29s
   200K .......... .......... .......... .......... ..........  0% 28.3M 25s
   250K .......... .......... .......... .......... ..........  0% 19.4M 23s
   300K .......... .......... .......... .......... ..........  0% 52.2M 20s
   350K .......... .......... .......... .......... ..........  0% 28.5M 19s


In [0]:
%sh wget 'https://datasets.imdbws.com/title.akas.tsv.gz'

--2022-06-07 23:50:10--  https://datasets.imdbws.com/title.akas.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 99.84.66.78, 99.84.66.98, 99.84.66.41, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|99.84.66.78|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 273528175 (261M) [binary/octet-stream]
Saving to: ‘title.akas.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 5.44M 48s
    50K .......... .......... .......... .......... ..........  0% 6.80M 43s
   100K .......... .......... .......... .......... ..........  0% 8.68M 39s
   150K .......... .......... .......... .......... ..........  0% 29.6M 31s
   200K .......... .......... .......... .......... ..........  0% 19.2M 28s
   250K .......... .......... .......... .......... ..........  0% 30.1M 25s
   300K .......... .......... .......... .......... ..........  0% 40.0M 22s
   350K .......... .......... .......... .......... ..........  0% 30.7M 20s
  

In [0]:
%sh wget 'https://datasets.imdbws.com/title.basics.tsv.gz'

--2022-06-07 23:50:21--  https://datasets.imdbws.com/title.basics.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 99.84.66.56, 99.84.66.78, 99.84.66.98, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|99.84.66.56|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 156922207 (150M) [binary/octet-stream]
Saving to: ‘title.basics.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 1.73M 87s
    50K .......... .......... .......... .......... ..........  0% 7.66M 53s
   100K .......... .......... .......... .......... ..........  0% 8.63M 41s
   150K .......... .......... .......... .......... ..........  0% 26.7M 32s
   200K .......... .......... .......... .......... ..........  0% 17.1M 28s
   250K .......... .......... .......... .......... ..........  0% 33.5M 24s
   300K .......... .......... .......... .......... ..........  0% 35.8M 21s
   350K .......... .......... .......... .......... ..........  0% 28.1M 19

In [0]:
%sh wget 'https://datasets.imdbws.com/title.crew.tsv.gz'

--2022-06-07 23:50:28--  https://datasets.imdbws.com/title.crew.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 99.84.66.41, 99.84.66.56, 99.84.66.78, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|99.84.66.41|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 61020951 (58M) [binary/octet-stream]
Saving to: ‘title.crew.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 7.73M 8s
    50K .......... .......... .......... .......... ..........  0% 7.63M 8s
   100K .......... .......... .......... .......... ..........  0% 7.51M 8s
   150K .......... .......... .......... .......... ..........  0% 12.2M 7s
   200K .......... .......... .......... .......... ..........  0% 26.1M 6s
   250K .......... .......... .......... .......... ..........  0% 16.7M 6s
   300K .......... .......... .......... .......... ..........  0% 69.8M 5s
   350K .......... .......... .......... .......... ..........  0% 14.4M 5s
   400K ....

In [0]:
%sh wget 'https://datasets.imdbws.com/title.episode.tsv.gz'

--2022-06-07 23:50:30--  https://datasets.imdbws.com/title.episode.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 99.84.66.98, 99.84.66.41, 99.84.66.56, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|99.84.66.98|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 36617005 (35M) [binary/octet-stream]
Saving to: ‘title.episode.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 8.64M 4s
    50K .......... .......... .......... .......... ..........  0% 7.36M 4s
   100K .......... .......... .......... .......... ..........  0% 8.07M 4s
   150K .......... .......... .......... .......... ..........  0% 7.99M 4s
   200K .......... .......... .......... .......... ..........  0% 20.3M 4s
   250K .......... .......... .......... .......... ..........  0% 12.0M 4s
   300K .......... .......... .......... .......... ..........  0% 27.3M 3s
   350K .......... .......... .......... .......... ..........  1% 14.8M 3s
   400

In [0]:
%sh wget 'https://datasets.imdbws.com/title.principals.tsv.gz'

--2022-06-07 23:50:32--  https://datasets.imdbws.com/title.principals.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 99.84.66.78, 99.84.66.98, 99.84.66.41, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|99.84.66.78|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400651552 (382M) [binary/octet-stream]
Saving to: ‘title.principals.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 5.16M 74s
    50K .......... .......... .......... .......... ..........  0% 6.74M 65s
   100K .......... .......... .......... .......... ..........  0% 5.31M 68s
   150K .......... .......... .......... .......... ..........  0% 28.4M 54s
   200K .......... .......... .......... .......... ..........  0% 5.15M 58s
   250K .......... .......... .......... .......... ..........  0% 11.1M 54s
   300K .......... .......... .......... .......... ..........  0% 6.39M 55s
   350K .......... .......... .......... .......... ..........  0% 

In [0]:
%sh wget 'https://datasets.imdbws.com/title.ratings.tsv.gz'

--2022-06-07 23:50:46--  https://datasets.imdbws.com/title.ratings.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 99.84.66.56, 99.84.66.78, 99.84.66.98, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|99.84.66.56|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 6244725 (6.0M) [binary/octet-stream]
Saving to: ‘title.ratings.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 6.33M 1s
    50K .......... .......... .......... .......... ..........  1% 7.91M 1s
   100K .......... .......... .......... .......... ..........  2% 5.24M 1s
   150K .......... .......... .......... .......... ..........  3% 6.45M 1s
   200K .......... .......... .......... .......... ..........  4% 7.57M 1s
   250K .......... .......... .......... .......... ..........  4% 10.5M 1s
   300K .......... .......... .......... .......... ..........  5% 11.7M 1s
   350K .......... .......... .......... .......... ..........  6% 8.23M 1s
   400

In [0]:
%%bash

ls

conf
eventlogs
ganglia
logs
metastore_db
name.basics.tsv.gz
preload_class.lst
title.akas.tsv.gz
title.basics.tsv.gz
title.crew.tsv.gz
title.episode.tsv.gz
title.principals.tsv.gz
title.ratings.tsv.gz


In [0]:
# storing the data into varibales

In [0]:
name = spark.read.csv('file:/databricks/driver/name.basics.tsv.gz', header = True, inferSchema = True, sep = '\t')

In [0]:
akas = spark.read.csv('file:/databricks/driver/title.akas.tsv.gz', header = True, inferSchema = True, sep = '\t')

In [0]:
basics = spark.read.csv('file:/databricks/driver/title.basics.tsv.gz', header = True, inferSchema = True, sep = '\t')

In [0]:
crew = spark.read.csv('file:/databricks/driver/title.crew.tsv.gz', header = True, inferSchema = True, sep = '\t')

In [0]:
episode = spark.read.csv('file:/databricks/driver/title.episode.tsv.gz', header = True, inferSchema = True, sep = '\t')

In [0]:
principals = spark.read.csv('file:/databricks/driver/title.principals.tsv.gz', header = True, inferSchema = True, sep = '\t')

In [0]:
ratings = spark.read.csv('file:/databricks/driver/title.ratings.tsv.gz', header = True, inferSchema = True, sep = '\t')

In [0]:
display(name)

nconst,primaryName,birthYear,deathYear,primaryProfession,knownForTitles
nm0000001,Fred Astaire,1899,1987,"soundtrack,actor,miscellaneous","tt0072308,tt0050419,tt0053137,tt0031983"
nm0000002,Lauren Bacall,1924,2014,"actress,soundtrack","tt0037382,tt0038355,tt0071877,tt0117057"
nm0000003,Brigitte Bardot,1934,\N,"actress,soundtrack,music_department","tt0056404,tt0057345,tt0049189,tt0054452"
nm0000004,John Belushi,1949,1982,"actor,soundtrack,writer","tt0072562,tt0077975,tt0080455,tt0078723"
nm0000005,Ingmar Bergman,1918,2007,"writer,director,actor","tt0050976,tt0050986,tt0060827,tt0083922"
nm0000006,Ingrid Bergman,1915,1982,"actress,soundtrack,producer","tt0034583,tt0036855,tt0038109,tt0077711"
nm0000007,Humphrey Bogart,1899,1957,"actor,soundtrack,producer","tt0034583,tt0037382,tt0042593,tt0043265"
nm0000008,Marlon Brando,1924,2004,"actor,soundtrack,director","tt0070849,tt0078788,tt0047296,tt0068646"
nm0000009,Richard Burton,1925,1984,"actor,soundtrack,producer","tt0087803,tt0057877,tt0059749,tt0061184"
nm0000010,James Cagney,1899,1986,"actor,soundtrack,director","tt0031867,tt0042041,tt0035575,tt0029870"


In [0]:
display(akas)

titleId,ordering,title,region,language,types,attributes,isOriginalTitle
tt0000001,1,Карменсіта,UA,\N,imdbDisplay,\N,0
tt0000001,2,Carmencita,DE,\N,\N,literal title,0
tt0000001,3,Carmencita - spanyol tánc,HU,\N,imdbDisplay,\N,0
tt0000001,4,Καρμενσίτα,GR,\N,imdbDisplay,\N,0
tt0000001,5,Карменсита,RU,\N,imdbDisplay,\N,0
tt0000001,6,Carmencita,US,\N,imdbDisplay,\N,0
tt0000001,7,Carmencita,\N,\N,original,\N,1
tt0000001,8,カルメンチータ,JP,ja,imdbDisplay,\N,0
tt0000002,1,Le clown et ses chiens,\N,\N,original,\N,1
tt0000002,2,Le clown et ses chiens,FR,\N,imdbDisplay,\N,0


In [0]:
display(basics)

tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"
tt0000006,short,Chinese Opium Den,Chinese Opium Den,0,1894,\N,1,Short
tt0000007,short,Corbett and Courtney Before the Kinetograph,Corbett and Courtney Before the Kinetograph,0,1894,\N,1,"Short,Sport"
tt0000008,short,Edison Kinetoscopic Record of a Sneeze,Edison Kinetoscopic Record of a Sneeze,0,1894,\N,1,"Documentary,Short"
tt0000009,short,Miss Jerry,Miss Jerry,0,1894,\N,40,"Romance,Short"
tt0000010,short,Leaving the Factory,La sortie de l'usine Lumière à Lyon,0,1895,\N,1,"Documentary,Short"


In [0]:
display(crew)

tconst,directors,writers
tt0000001,nm0005690,\N
tt0000002,nm0721526,\N
tt0000003,nm0721526,\N
tt0000004,nm0721526,\N
tt0000005,nm0005690,\N
tt0000006,nm0005690,\N
tt0000007,"nm0005690,nm0374658",\N
tt0000008,nm0005690,\N
tt0000009,nm0085156,nm0085156
tt0000010,nm0525910,\N


In [0]:
display(episode)

tconst,parentTconst,seasonNumber,episodeNumber
tt0020666,tt15180956,1,2
tt0020829,tt15180956,1,1
tt0021166,tt15180956,1,3
tt0021612,tt15180956,2,2
tt0021655,tt15180956,2,5
tt0021663,tt15180956,2,6
tt0021664,tt15180956,2,4
tt0021701,tt15180956,2,1
tt0021802,tt15180956,2,11
tt0022009,tt15180956,2,10


In [0]:
display(principals)

tconst,ordering,nconst,category,job,characters
tt0000001,1,nm1588970,self,\N,"[""Self""]"
tt0000001,2,nm0005690,director,\N,\N
tt0000001,3,nm0374658,cinematographer,director of photography,\N
tt0000002,1,nm0721526,director,\N,\N
tt0000002,2,nm1335271,composer,\N,\N
tt0000003,1,nm0721526,director,\N,\N
tt0000003,2,nm1770680,producer,producer,\N
tt0000003,3,nm1335271,composer,\N,\N
tt0000003,4,nm5442200,editor,\N,\N
tt0000004,1,nm0721526,director,\N,\N


In [0]:
display(ratings)

tconst,averageRating,numVotes
tt0000001,5.7,1883
tt0000002,5.9,250
tt0000003,6.5,1667
tt0000004,5.8,163
tt0000005,6.2,2489
tt0000006,5.2,166
tt0000007,5.4,774
tt0000008,5.4,2025
tt0000009,5.3,195
tt0000010,6.9,6808


### Network Inference, Let’s build a network
In the following questions you will look to summarise the data and build a network. We want to examine a network that abstracts how actors and actress are related through their co-participation in movies. To that end perform the following steps:

**Q1** Create a DataFrame that combines **all the information** on each of the titles (i.e., movies, tv-shows, etc …) and **all of the information** the participants in those movies (i.e., actors, directors, etc … ), make sure the actual names of the movies and participants are included. It may be worth reviewing the following questions to see how this dataframe will be used.

How many rows does your dataframe have?

In [0]:
# Since its "all the Information", I decided to perform outer joins

df = principals.join(basics, ["tconst"], how = "outer")\
               .join(name, ["nconst"], how = "outer")\
               .join(ratings, ["tconst"], how = "outer")

In [0]:
display(df)

tconst,nconst,ordering,category,job,characters,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres,primaryName,birthYear,deathYear,primaryProfession,knownForTitles,averageRating,numVotes
tt0000658,nm0169871,1.0,director,\N,\N,short,The Puppet's Nightmare,Le cauchemar de Fantoche,0,1908,\N,2,"Animation,Short",Émile Cohl,1857,1938,"director,animation_department,writer","tt1003497,tt0794279,tt1003447,tt1003400",6.5,230.0
tt0000839,nm0378408,2.0,producer,producer,\N,short,The Curse of Money,The Curse of Money,0,1909,\N,\N,"Drama,Short",Cecil M. Hepworth,1873,1953,"cinematographer,producer,director","tt0006129,tt0000420,tt0006866,tt0000309",,
tt0000839,nm0294276,1.0,director,\N,\N,short,The Curse of Money,The Curse of Money,0,1909,\N,\N,"Drama,Short",Theo Frenkel,1871,1956,"director,actor,writer","tt0010946,tt0010076,tt0011219,tt0014170",,
tt0001170,nm0607104,10.0,actor,\N,\N,short,A Cowboy's Vindication,A Cowboy's Vindication,0,1910,\N,\N,"Short,Western",Chick Morrison,1878,1924,actor,"tt0005070,tt0006849,tt0013759,tt0011973",,
tt0001170,nm0001908,1.0,actor,\N,"[""Frank Morrison""]",short,A Cowboy's Vindication,A Cowboy's Vindication,0,1910,\N,\N,"Short,Western",Gilbert M. 'Broncho Billy' Anderson,1880,1971,"director,actor,producer","tt0176832,tt0001706,tt0183803,tt0003719",,
tt0001170,nm0693055,9.0,actor,\N,\N,short,A Cowboy's Vindication,A Cowboy's Vindication,0,1910,\N,\N,"Short,Western",Victor Potel,1889,1947,"actor,writer,director","tt0037077,tt0034240,tt0312963,tt0022028",,
tt0001170,nm0789632,3.0,actor,\N,"[""Jess Gibbs""]",short,A Cowboy's Vindication,A Cowboy's Vindication,0,1910,\N,\N,"Short,Western",Brinsley Shaw,1876,1931,"actor,director,writer","tt0010907,tt0016694,tt0344400,tt0167005",,
tt0001170,nm1400009,6.0,actor,\N,\N,short,A Cowboy's Vindication,A Cowboy's Vindication,0,1910,\N,\N,"Short,Western",William A. Russell,1878,1914,actor,"tt1644591,tt0354954,tt0372255,tt0372421",,
tt0001170,nm0355582,4.0,actor,\N,"[""Will Morrison""]",short,A Cowboy's Vindication,A Cowboy's Vindication,0,1910,\N,\N,"Short,Western",Franklyn Hall,1886,\N,"actor,writer,director","tt0007558,tt0004996,tt0003442,tt0012551",,
tt0001170,nm0865178,5.0,actor,\N,\N,short,A Cowboy's Vindication,A Cowboy's Vindication,0,1910,\N,\N,"Short,Western",Harry Todd,1863,1935,actor,"tt0001801,tt0367226,tt0014527,tt0025599",,


In [0]:
# Q:1 - Final Awnser --> Total Number of rows

df.count()

Out[20]: 58532262

In [0]:
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import col,when

# Data Preprocessing of the \N values (into nulls)

df = df.select([when(col(c)==r"\N", None).otherwise(col(c)).alias(c) for c in df.columns])

In [0]:
display(df)

tconst,nconst,ordering,category,job,characters,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres,primaryName,birthYear,deathYear,primaryProfession,knownForTitles,averageRating,numVotes
tt0000658,nm0169871,1.0,director,,,short,The Puppet's Nightmare,Le cauchemar de Fantoche,0,1908,,2.0,"Animation,Short",Émile Cohl,1857.0,1938.0,"director,animation_department,writer","tt1003497,tt0794279,tt1003447,tt1003400",6.5,230.0
tt0000839,nm0378408,2.0,producer,producer,,short,The Curse of Money,The Curse of Money,0,1909,,,"Drama,Short",Cecil M. Hepworth,1873.0,1953.0,"cinematographer,producer,director","tt0006129,tt0000420,tt0006866,tt0000309",,
tt0000839,nm0294276,1.0,director,,,short,The Curse of Money,The Curse of Money,0,1909,,,"Drama,Short",Theo Frenkel,1871.0,1956.0,"director,actor,writer","tt0010946,tt0010076,tt0011219,tt0014170",,
tt0001170,nm0607104,10.0,actor,,,short,A Cowboy's Vindication,A Cowboy's Vindication,0,1910,,,"Short,Western",Chick Morrison,1878.0,1924.0,actor,"tt0005070,tt0006849,tt0013759,tt0011973",,
tt0001170,nm0001908,1.0,actor,,"[""Frank Morrison""]",short,A Cowboy's Vindication,A Cowboy's Vindication,0,1910,,,"Short,Western",Gilbert M. 'Broncho Billy' Anderson,1880.0,1971.0,"director,actor,producer","tt0176832,tt0001706,tt0183803,tt0003719",,
tt0001170,nm0693055,9.0,actor,,,short,A Cowboy's Vindication,A Cowboy's Vindication,0,1910,,,"Short,Western",Victor Potel,1889.0,1947.0,"actor,writer,director","tt0037077,tt0034240,tt0312963,tt0022028",,
tt0001170,nm0789632,3.0,actor,,"[""Jess Gibbs""]",short,A Cowboy's Vindication,A Cowboy's Vindication,0,1910,,,"Short,Western",Brinsley Shaw,1876.0,1931.0,"actor,director,writer","tt0010907,tt0016694,tt0344400,tt0167005",,
tt0001170,nm1400009,6.0,actor,,,short,A Cowboy's Vindication,A Cowboy's Vindication,0,1910,,,"Short,Western",William A. Russell,1878.0,1914.0,actor,"tt1644591,tt0354954,tt0372255,tt0372421",,
tt0001170,nm0355582,4.0,actor,,"[""Will Morrison""]",short,A Cowboy's Vindication,A Cowboy's Vindication,0,1910,,,"Short,Western",Franklyn Hall,1886.0,,"actor,writer,director","tt0007558,tt0004996,tt0003442,tt0012551",,
tt0001170,nm0865178,5.0,actor,,,short,A Cowboy's Vindication,A Cowboy's Vindication,0,1910,,,"Short,Western",Harry Todd,1863.0,1935.0,actor,"tt0001801,tt0367226,tt0014527,tt0025599",,


In [0]:
# Checking the types of each column

df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- nconst: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)
 |-- characters: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- endYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: string (nullable = true)
 |-- deathYear: string (nullable = true)
 |-- primaryProfession: string (nullable = true)
 |-- knownForTitles: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: integer (nullable = true)



In [0]:
# Converting the columns which were supose to be float into float

df = df.withColumn('birthYear', df.birthYear.cast(FloatType()))
df = df.withColumn('deathYear', df.deathYear.cast(FloatType()))
df = df.withColumn('startYear', df.startYear.cast(FloatType()))
df = df.withColumn('endYear', df.endYear.cast(FloatType()))
df = df.withColumn('isAdult', df.isAdult.cast(FloatType()))

In [0]:
df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- nconst: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)
 |-- characters: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: float (nullable = true)
 |-- startYear: float (nullable = true)
 |-- endYear: float (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: float (nullable = true)
 |-- deathYear: float (nullable = true)
 |-- primaryProfession: string (nullable = true)
 |-- knownForTitles: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: integer (nullable = true)



**Q2** Create a new DataFrame based on the previous step, with the following removed:
1. Any participant that is not an actor or actress (as measured by the category column);
1. All adult movies;
1. All dead actors or actresses;
1. All actors or actresses born before 1920 or with no date of birth listed;
1. All titles that are not of the type movie.

How many rows does your dataframe have?

In [0]:
# Filtering the DataFrame for the conditions mentioned above

df_filtered = df.filter((df["category"].isin(["actor", "actress"])) & (df["isAdult"] == "0") & (df["deathYear"].isNull()) & (df["birthYear"] >= 1920) & (df["titleType"] == "movie"))

In [0]:
display(df_filtered)

tconst,nconst,ordering,category,job,characters,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres,primaryName,birthYear,deathYear,primaryProfession,knownForTitles,averageRating,numVotes
tt0087003,nm0000095,1,actor,,"[""Danny Rose""]",movie,Broadway Danny Rose,Broadway Danny Rose,0.0,1984.0,,84.0,Comedy,Woody Allen,1935.0,,"writer,director,actor","tt0079522,tt0075686,tt0091167,tt0118954",7.4,25367.0
tt0104466,nm0000095,1,actor,,"[""Gabe Roth""]",movie,Husbands and Wives,Husbands and Wives,0.0,1992.0,,108.0,"Comedy,Drama,Romance",Woody Allen,1935.0,,"writer,director,actor","tt0079522,tt0075686,tt0091167,tt0118954",7.5,29188.0
tt3538730,nm0000095,1,actor,,"[""Narrator""]",movie,"Barcelona, la rosa de foc","Barcelona, la rosa de foc",0.0,2014.0,,100.0,Documentary,Woody Allen,1935.0,,"writer,director,actor","tt0079522,tt0075686,tt0091167,tt0118954",5.8,65.0
tt0065063,nm0000095,1,actor,,"[""Virgil Starkwell""]",movie,Take the Money and Run,Take the Money and Run,0.0,1969.0,,85.0,"Comedy,Crime",Woody Allen,1935.0,,"writer,director,actor","tt0079522,tt0075686,tt0091167,tt0118954",7.2,29829.0
tt0079522,nm0000095,1,actor,,"[""Isaac""]",movie,Manhattan,Manhattan,0.0,1979.0,,96.0,"Comedy,Drama,Romance",Woody Allen,1935.0,,"writer,director,actor","tt0079522,tt0075686,tt0091167,tt0118954",7.9,138982.0
tt0116242,nm0000095,1,actor,,"[""Joe""]",movie,Everyone Says I Love You,Everyone Says I Love You,0.0,1996.0,,101.0,"Comedy,Musical,Romance",Woody Allen,1935.0,,"writer,director,actor","tt0079522,tt0075686,tt0091167,tt0118954",6.7,37103.0
tt0066808,nm0000095,1,actor,,"[""Fielding Mellish""]",movie,Bananas,Bananas,0.0,1971.0,,82.0,Comedy,Woody Allen,1935.0,,"writer,director,actor","tt0079522,tt0075686,tt0091167,tt0118954",6.9,35664.0
tt0120587,nm0000095,1,actor,,"[""Z""]",movie,Antz,Antz,0.0,1998.0,,83.0,"Adventure,Animation,Comedy",Woody Allen,1935.0,,"writer,director,actor","tt0079522,tt0075686,tt0091167,tt0118954",6.6,153856.0
tt5305230,nm0000095,4,actor,,,movie,CzechMate: In Search of Jirí Menzel,CzechMate: In Search of Jirí Menzel,0.0,2018.0,,448.0,Documentary,Woody Allen,1935.0,,"writer,director,actor","tt0079522,tt0075686,tt0091167,tt0118954",7.8,43.0
tt0074554,nm0000095,1,actor,,"[""Howard Prince""]",movie,The Front,The Front,0.0,1976.0,,95.0,Drama,Woody Allen,1935.0,,"writer,director,actor","tt0079522,tt0075686,tt0091167,tt0118954",7.3,8959.0


In [0]:
# Q:2 - Final awnser --> Numer of rows of the filtered DataFrame

df_filtered.count()

Out[26]: 487566

In [0]:
# Using the advantages of the DBFS in order to persist my dataframes for quicker use

#df_filtered = spark.read.option("header", True).csv("dbfs:/databricks/driver/df_filtered.csv")
#df_filtered.write.option("header", True).csv("dbfs:/databricks/driver/df_filtered.csv")

**Q3** Convert the above Dataframe to an RDD. Use map and reduce to create a paired RDD which counts how many movies each actor / actress appears in.

Display names of the top 10 actors/actresses according to the number of movies in which they appeared. Be careful to deal with different actors / actresses with the same name, these could be different people.

In [0]:
# Converting into an rdd, and then performing a map reduce to get the counts for each actor/actress

filtered_rdd = df_filtered.rdd.map(lambda x: ((x.nconst, x.primaryName), 1)).reduceByKey(lambda x, y: x + y)

In [0]:
# Q:3 - Final awnser --> Sorting by the counts in descending order to get the top 10 actors/actresses according to the number of movies in which they appeared

filtered_rdd.sortBy(lambda x: x[1], ascending = False).take(10)

Out[26]: [(('nm0103977', 'Brahmanandam'), 795),
 (('nm0007123', 'Mammootty'), 364),
 (('nm0482320', 'Mohanlal'), 351),
 (('nm0149822', 'Mithun Chakraborty'), 329),
 (('nm0007106', 'Shakti Kapoor'), 321),
 (('nm0000616', 'Eric Roberts'), 313),
 (('nm0035067', 'Cüneyt Arkin'), 300),
 (('nm0415549', 'Jagathi Sreekumar'), 293),
 (('nm0004429', 'Dharmendra'), 270),
 (('nm0374974', 'Helen'), 269)]

**Q4** Start with the dataframe from Q2. Generate a DataFrame that lists all links of your network. Here we shall consider that a link connects a pair of actors/actresses if they participated in at least one movie together (actors / actresses should be represented by their unique ID's). For every link we then need anytime a pair of actors were together in a movie as a link in each direction (A -> B and B -> A). However links should be distinct we do not need duplicates when two actors worked together in several movies.

In [0]:
# Using the self join to get the links between actors and actresses who worked on the same movies

df_q4 = df_filtered.alias("df1").join(df_filtered.alias("df2"), col("df1.tconst") == col("df2.tconst"), "inner").select(col("df1.nconst").alias("src"), col("df2.nconst").alias("dst"))

In [0]:
display(df_q4)

src,dst
nm0788396,nm0788396
nm0319855,nm0319855
nm0147802,nm0147802
nm0518178,nm0321407
nm0518178,nm0518178
nm0321407,nm0321407
nm0321407,nm0518178
nm0260854,nm0260854
nm0310989,nm0310989
nm0110067,nm0283004


In [0]:
df_q4.count()

Out[29]: 1367532

In [0]:
# using the distinct() function in order to drop duplicates

df_q4 = df_q4.distinct()

In [0]:
# Q:4 - Final awnser --> Removing, from the DataFrame, records in which the actors/actresses are the same in both columns

df_q4=df_q4.filter((df_q4.src) !=(df_q4.dst))
df_q4.count()

Out[31]: 776664

In [0]:
# Using the advantages of the DBFS in order to persist my dataframes for quicker use

#df_q4 = spark.read.option("header", True).csv("dbfs:/databricks/driver/df_q4.csv")
#df_q4.write.option("header", True).csv("dbfs:/databricks/driver/df_q4.csv")

**Q5** Compute the page rank of each actor. This can be done using GraphFrames or
by using RDDs and the iterative implementation of the PageRank algorithm. Do not take
more than 5 iterations and use reset probility = 0.1.

List the top 10 actors / actresses by pagerank.

In [0]:
# Creating the vertices DataFrame and the edges DataFrame

df_vertices = df_q4.select(["src"]).withColumnRenamed("src", "id").distinct()

df_edges = df_q4

In [0]:
df_vertices.show(10, False)

+---------+
|id       |
+---------+
|nm0518178|
|nm0894636|
|nm0391096|
|nm0085751|
|nm0110067|
|nm0906723|
|nm0321407|
|nm0442809|
|nm0832415|
|nm0283004|
+---------+
only showing top 10 rows



In [0]:
df_edges.show(10, False)

+---------+---------+
|src      |dst      |
+---------+---------+
|nm0894636|nm0518178|
|nm0518178|nm0321407|
|nm0283004|nm0110067|
|nm0442809|nm0906723|
|nm0085751|nm0391096|
|nm0202766|nm0832415|
|nm0906723|nm0442809|
|nm0321407|nm0518178|
|nm0832415|nm0202766|
|nm0110067|nm0283004|
+---------+---------+
only showing top 10 rows



In [0]:
# creating the GraphFrame

from graphframes import *

gdf = GraphFrame(df_vertices, df_edges)

In [0]:
# performing the pagerank algorithm

pageRanks = gdf.pageRank(resetProbability=0.1, maxIter = 5)

In [0]:
# Q:5 - Final awnser --> Top 10 actors/actresses by pagerank

pageRanks.vertices.sort("pagerank",ascending=False).show(10,False)

+---------+------------------+
|id       |pagerank          |
+---------+------------------+
|nm0000616|46.624252283878434|
|nm0000514|24.70160325051762 |
|nm0001744|23.608189467168074|
|nm0001803|21.663831603134632|
|nm0920460|18.85254366634066 |
|nm0000448|18.637566387778907|
|nm0001595|18.414150133742098|
|nm0001698|17.170200616153387|
|nm0000367|17.061150119895036|
|nm0004193|16.645807104486504|
+---------+------------------+
only showing top 10 rows



**Q6**: Create an RDD with the number of outDegrees for each actor. Display the top 10 by outdegrees.

In [0]:
gdf.outDegrees.show()

+---------+---------+
|       id|outDegree|
+---------+---------+
|nm0212204|       16|
|nm0000354|      109|
|nm0482652|       38|
|nm0000362|      110|
|nm0246820|        3|
|nm0165411|       18|
|nm0863599|       49|
|nm1522629|        3|
|nm0280355|        7|
|nm1367819|       43|
|nm1327166|       50|
|nm6981978|        8|
|nm3088934|       40|
|nm0076427|       16|
|nm0380489|       32|
|nm0877270|       37|
|nm1213235|       21|
|nm0113463|       10|
|nm0668241|       22|
|nm0118840|       34|
+---------+---------+
only showing top 20 rows



In [0]:
outDegree_rdd = gdf.outDegrees.rdd
outDegree_rdd.take(10)

Out[24]: [Row(id='nm1006993', outDegree=57),
 Row(id='nm0000198', outDegree=126),
 Row(id='nm0733586', outDegree=12),
 Row(id='nm0091035', outDegree=63),
 Row(id='nm1441714', outDegree=80),
 Row(id='nm0897197', outDegree=24),
 Row(id='nm0380489', outDegree=32),
 Row(id='nm0102522', outDegree=42),
 Row(id='nm0000767', outDegree=103),
 Row(id='nm3482594', outDegree=6)]

In [0]:
# Q:6 - Final awnser --> Top 10 actors by outdegree

outDegree_rdd.sortBy(lambda x: x[1], ascending=False).take(10)

Out[25]: [Row(id='nm0000616', outDegree=489),
 Row(id='nm0000514', outDegree=283),
 Row(id='nm0001744', outDegree=264),
 Row(id='nm0000367', outDegree=263),
 Row(id='nm0945189', outDegree=257),
 Row(id='nm0256628', outDegree=252),
 Row(id='nm0451600', outDegree=252),
 Row(id='nm0001803', outDegree=248),
 Row(id='nm0149822', outDegree=240),
 Row(id='nm0004109', outDegree=228)]

### Let’s play Kevin’s own game

**Q7** Start with the graphframe / dataframe you developed in the previous questions. Using Spark GraphFrame and/or Spark Core library perform the following steps:

1. Identify the id of Kevin Bacon, there are two actors named ‘Kevin Bacon’, we will use the one with the highest degree, that is, the one that participated in most titles;
1. Estimate the shortest path between every actor in the database actors and Kevin Bacon, keep a dataframe with this information as you will need it later;
1. Summarise the data, that is, count the number of actors at each number of degress from kevin bacon (you will need to deal with actors unconnected to kevin bacon, if not connected to Kevin Bacon given these actors / actresses a score/degree of 20).

In [0]:
gdf.degrees.show()

+---------+------+
|       id|degree|
+---------+------+
|nm0293255|     4|
|nm0212204|    32|
|nm0847265|    16|
|nm2658331|    34|
|nm0000354|   218|
|nm1816263|    40|
|nm0482652|    76|
|nm0000198|   252|
|nm0000362|   220|
|nm0246820|     6|
|nm0165411|    36|
|nm0863599|    98|
|nm1522629|     6|
|nm0668241|    44|
|nm0076427|    32|
|nm0280355|    14|
|nm1367819|    86|
|nm0091035|   126|
|nm1327166|   100|
|nm0113463|    20|
+---------+------+
only showing top 20 rows



In [0]:
degrees_df = gdf.degrees.toDF("id", "degree")

In [0]:
# Using the advantages of the DBFS in order to persist my dataframes for quicker use

#degrees_df = spark.read.option("header", True).csv("dbfs:/databricks/driver/degrees_df.csv")
#degrees_df.write.option("header", True).csv("dbfs:/databricks/driver/degrees_df.csv")

In [0]:
degrees_df.show(5)

+---------+------+
|       id|degree|
+---------+------+
|nm0293255|     4|
|nm0212204|    32|
|nm0847265|    16|
|nm2658331|    34|
|nm0000354|   218|
+---------+------+
only showing top 5 rows



In [0]:
# Joining the Dataframes in order to obtain the names of each id 

df_joined = df_filtered.join(degrees_df, df_filtered.nconst == degrees_df.id, "inner").select(degrees_df.id, df_filtered.primaryName, degrees_df.degree)

In [0]:
# Using the advantages of the DBFS in order to persist my dataframes for quicker use

#df_joined = spark.read.option("header", True).csv("dbfs:/databricks/driver/df_joined.csv")
#df_joined.write.option("header", True).csv("dbfs:/databricks/driver/df_joined.csv")

In [0]:
## Performing a filtration and sorting by the degree in order to get Kevin Bacon's id (the one with the highest number of degrees)

df_joined.filter(df_joined.primaryName == "Kevin Bacon").sort("degree", ascending = False).take(1)

Out[40]: [Row(id='nm0000102', primaryName='Kevin Bacon', degree=266)]

In [0]:
# Q:7.1 - Final awnser --> This is Kevin Bacon's Id

kevin_id = df_joined.filter(df_joined.primaryName == "Kevin Bacon").sort("degree", ascending = False).take(1)[0][0]
print(kevin_id)

nm0000102


In [0]:
# Using the shortestPaths algorithm in order to retrieve the distances to a specific landmark (kevin_id in this case)

kevin_distances = gdf.shortestPaths(landmarks=[kevin_id])

In [0]:
display(kevin_distances)

id,distances
nm0463100,Map(nm0000102 -> 3)
nm0481731,Map(nm0000102 -> 4)
nm0091035,Map(nm0000102 -> 3)
nm0847128,Map(nm0000102 -> 3)
nm0847265,Map(nm0000102 -> 4)
nm0280355,Map(nm0000102 -> 4)
nm3088934,Map(nm0000102 -> 3)
nm1480555,Map(nm0000102 -> 3)
nm3244111,Map(nm0000102 -> 4)
nm0727382,Map(nm0000102 -> 3)


In [0]:
# building a function in order to deaggregate the dictionary into the respective distance values

def changer(element):
  if element.distances:
    return(element.id, list(element.distances.values())[0])
  else:
    return(element.id, 20)

In [0]:
kevin_distances = kevin_distances.rdd.map(changer)

In [0]:
# Q:7.2 - Final awnser --> Dataframes with estimated distances

kevin_distances_df = sqlContext.createDataFrame(kevin_distances, ["id", "distances"])

In [0]:
# Using the advantages of the DBFS in order to persist my dataframes for quicker use

#kevin_distances_df = spark.read.option("header", True).csv("dbfs:/databricks/driver/kevin_distances_df.csv")
#kevin_distances_df.write.option("header", True).csv("dbfs:/databricks/driver/kevin_distances_df.csv")

In [0]:
display(kevin_distances_df)

id,distances
nm0068551,3
nm0058547,5
nm0277020,4
nm5157721,4
nm2236012,20
nm0057651,5
nm1932274,3
nm4633806,4
nm0208444,3
nm0252309,4


In [0]:
kevin_distances_df.groupBy("distances").count().show()

+---------+-----+
|distances|count|
+---------+-----+
|        7|  375|
|        6| 2737|
|        9|   10|
|        5|14880|
|        1|  133|
|        3|19510|
|        8|   62|
|        2| 3552|
|        4|31190|
|       20| 6931|
|        0|    1|
+---------+-----+



In [0]:
# Q:7.3 - Final awnser ---> Performing a groupby distances and then using the count() function to get the total number of actors per distance to kevin bacon
# Sorting the dataframe for better visualization

kevin_distances_counts = kevin_distances_df.groupBy("distances").count().sort("distances").select([col("distances"), col("count").alias("total_count")])

In [0]:
display(kevin_distances_counts)

distances,total_count
0,1
1,133
2,3552
3,19510
4,31190
5,14880
6,2737
7,375
8,62
9,10


In [0]:
# Using the advantages of the DBFS in order to persist my dataframes for quicker use

#kevin_distances_counts = spark.read.option("header", True).csv("dbfs:/databricks/driver/kevin_distances_counts.csv")
#kevin_distances_counts.write.option("header", True).csv("dbfs:/databricks/driver/kevin_distances_counts.csv")

### Exploring the data with RDD's

Using RDDs and (not dataframes) answer the following questions (if you loaded your data into spark in a dataframe you can convert to an RDD of rows easily using `.rdd`):

**Q8** Movies can have multiple genres. Considering only titles of the type 'movie' what is the combination of genres that is the most popluar (as measured by number of reviews). Hint: paired RDD's will be useful.

In [0]:
display(df)

tconst,nconst,ordering,category,job,characters,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres,primaryName,birthYear,deathYear,primaryProfession,knownForTitles,averageRating,numVotes
tt0000658,nm0169871,1.0,director,,,short,The Puppet's Nightmare,Le cauchemar de Fantoche,0.0,1908.0,,2.0,"Animation,Short",Émile Cohl,1857.0,1938.0,"director,animation_department,writer","tt1003447,tt0794279,tt1003497,tt1003400",6.5,230.0
tt0000839,nm0378408,2.0,producer,producer,,short,The Curse of Money,The Curse of Money,0.0,1909.0,,,"Drama,Short",Cecil M. Hepworth,1873.0,1953.0,"cinematographer,producer,director","tt0000309,tt0006866,tt0000420,tt0006129",,
tt0000839,nm0294276,1.0,director,,,short,The Curse of Money,The Curse of Money,0.0,1909.0,,,"Drama,Short",Theo Frenkel,1871.0,1956.0,"director,actor,writer","tt0010076,tt0011219,tt0010946,tt0014170",,
tt0001170,nm0607104,10.0,actor,,,short,A Cowboy's Vindication,A Cowboy's Vindication,0.0,1910.0,,,"Short,Western",Chick Morrison,1878.0,1924.0,actor,"tt0006849,tt0013759,tt0005070,tt0011973",,
tt0001170,nm0001908,1.0,actor,,"[""Frank Morrison""]",short,A Cowboy's Vindication,A Cowboy's Vindication,0.0,1910.0,,,"Short,Western",Gilbert M. 'Broncho Billy' Anderson,1880.0,1971.0,"director,actor,producer","tt0183803,tt0003719,tt0001706,tt0176832",,
tt0001170,nm0693055,9.0,actor,,,short,A Cowboy's Vindication,A Cowboy's Vindication,0.0,1910.0,,,"Short,Western",Victor Potel,1889.0,1947.0,"actor,writer,director","tt0034240,tt0022028,tt0312963,tt0037077",,
tt0001170,nm0789632,3.0,actor,,"[""Jess Gibbs""]",short,A Cowboy's Vindication,A Cowboy's Vindication,0.0,1910.0,,,"Short,Western",Brinsley Shaw,1876.0,1931.0,"actor,director,writer","tt0010907,tt0016694,tt0167005,tt0344400",,
tt0001170,nm1400009,6.0,actor,,,short,A Cowboy's Vindication,A Cowboy's Vindication,0.0,1910.0,,,"Short,Western",William A. Russell,1878.0,1914.0,actor,"tt1644591,tt0354954,tt0372255,tt0372421",,
tt0001170,nm0355582,4.0,actor,,"[""Will Morrison""]",short,A Cowboy's Vindication,A Cowboy's Vindication,0.0,1910.0,,,"Short,Western",Franklyn Hall,1886.0,,"actor,writer,director","tt0007558,tt0004996,tt0012551,tt0003442",,
tt0001170,nm0865178,5.0,actor,,,short,A Cowboy's Vindication,A Cowboy's Vindication,0.0,1910.0,,,"Short,Western",Harry Todd,1863.0,1935.0,actor,"tt0367226,tt0014527,tt0001801,tt0025599",,


In [0]:
# Q:8 - Final awnser --> Finding which combination of genres is the most popular

# First we filter the dataframe for movie titles, then we drop all the null values. After that we convert to a paired rdd and use the map function to do it

genres_rdd = df.where(df.titleType == "movie").select("genres", "numVotes").dropna().rdd.map(lambda x: (x.genres, x.numVotes))

# Then, we use the rdd method reduceByKey to add all the numvotes and we sort them

genres_rdd = genres_rdd.reduceByKey(lambda count1, count2: count1 + count2).sortBy(lambda x: x[1], ascending = False)

genres_rdd.take(1)

Out[31]: [('Action,Adventure,Sci-Fi', 511279782)]

**Q9** Movies can have multiple genres. Considering only titles of the type 'movie', and movies with more than 400 ratings, what is the combination of genres that has the highest **average movie rating** (you can average the movie rating for each movie in that genre combination). Hint: paired RDD's will be useful.

In [0]:
# https://stackoverflow.com/questions/29930110/calculating-the-averages-for-each-key-in-a-pairwise-k-v-rdd-in-spark-with-pyth
# Q:9 Final awnser --> Finding which combination of genres has the highest avg movie rating

# Filtering by record of type "movie" and number of votes higher than 400

df_avg_ratings = df.where((df.titleType == "movie") & (df.numVotes > 400)).select("genres", "averageRating")

# Both lambda functions used inside the aggregateByKey function were inspired in the stack overflow link above. The mapvalus is used to calculate de mean

avg_ratings_rdd = df_avg_ratings.rdd.aggregateByKey((0,0), lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1])).mapValues(lambda x: x[0]/x[1])

# Sorting by the avg movie rating in order to get the highest one

avg_ratings_rdd = avg_ratings_rdd.sortBy(lambda x: x[1], ascending = False)

avg_ratings_rdd.take(1)

Out[32]: [('Music,Musical', 8.4)]

**Q10** Movies can have multiple genres. What is **the individual genre** which is the most popular as meaured by number of votes. Votes for multiple genres count towards each genre listed. Hint: flatmap and pairedRDD's will be useful here.

In [0]:
# Q:10 - Final awnser --> Finding which individual genre is the most popular

# Filterering for record of type "movie", droping the nulls and using only the genres and numvotes columns (coud have performed a select on the DF instead)

voting_genre_rdd = df.where(df.titleType == "movie").select("genres", "numVotes").dropna().rdd.map(lambda x: (x.genres, x.numVotes))

# spliting the aggregation of genres into individual ones

rdd_ind_genre = voting_genre_rdd.map(lambda x: (x[0].split(","), x[1]))

# using a for cicle to iterate through the list of individual genres and mantaining the associated num_votes

rdd_ind_genre = rdd_ind_genre.map(lambda x: [(i, x[1]) for i in x[0]])

# Using flat map to deaggregate the resulting list into different rows and then performing a reduce by key to add the corresponding num_votes. Sorting after that.

rdd_ind_genre = rdd_ind_genre.flatMap(lambda x: x).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending = False)

rdd_ind_genre.take(1)

Out[33]: [('Drama', 4847222150)]

## Engineering the perfect cast
We have created a number of potential features for predicting the rating of a movie based on its cast. Use sparkML to build a simple linear model to predict the rating of a movie based on the following features:

1. The total number of movies in which the actors / actresses have acted (based on Q3)
1. The average pagerank of the cast in each movie (based on Q5)
1. The average outDegree of the cast in each movie (based on Q6)
1. The average value for for the cast of degrees of Kevin Bacon (based on Q7).

You will need to create a dataframe with the required features and label. Use a pipeline to create the vectors required by sparkML and apply the model. Remember to split your dataset, leave 30% of the data for testing, when splitting your data use the option seed=0.

**Q11** Provide the coefficients of the regression and the accuracy of your model on that test dataset according to RSME.

In [0]:
from pyspark.sql.functions import explode, avg, sum

In [0]:
# the number of movies an actor/actress has worked on

movie_count = df_filtered.rdd.map(lambda x: (x.nconst, 1)).reduceByKey(lambda count1, count2: count1 + count2)
movie_count.take(5)

Out[35]: [('nm0000375', 51),
 ('nm0000378', 27),
 ('nm0000425', 2),
 ('nm0000573', 9),
 ('nm0000699', 6)]

In [0]:
df_movie_count = sqlContext.createDataFrame(movie_count, ["nconst", "total_movies"])
display(df_movie_count)

nconst,total_movies
nm0000375,51
nm0000378,27
nm0000425,2
nm0000573,9
nm0000699,6
nm0000832,16
nm0000873,7
nm0000877,1
nm0000913,16
nm0001044,27


In [0]:
# creating an rdd with the movie title and respective actors/actresses to later join with the previous dataframe

movie_actors = df_filtered.rdd.map(lambda x: (x.tconst, [x.nconst])).reduceByKey(lambda count1, count2: count1 + count2)
movie_actors.take(5)

Out[37]: [('tt0065063', ['nm0000095']),
 ('tt0118954', ['nm0000095', 'nm0001114', 'nm0745029', 'nm0000506']),
 ('tt0070707', ['nm0000095', 'nm0065183', 'nm0000473']),
 ('tt0313792', ['nm0000095', 'nm0000362', 'nm0004755', 'nm0000207']),
 ('tt0105378', ['nm0000095', 'nm0001201'])]

In [0]:
# Converting the previous rdd in a Dataframe

df_movie_actors = sqlContext.createDataFrame(movie_actors, ["tconst", "nconst"])
display(df_movie_actors)

tconst,nconst
tt0065063,List(nm0000095)
tt0118954,"List(nm0000095, nm0001114, nm0745029, nm0000506)"
tt0070707,"List(nm0000095, nm0065183, nm0000473)"
tt0313792,"List(nm0000095, nm0000362, nm0004755, nm0000207)"
tt0105378,"List(nm0000095, nm0001201)"
tt0069097,"List(nm0000095, nm0731634, nm0000473, nm0480379)"
tt0107507,"List(nm0000095, nm0012178, nm0000473)"
tt0118689,"List(nm0000100, nm0001493, nm0715622)"
tt0453451,"List(nm0000100, nm0000353, nm0671487)"
tt0100944,"List(nm0000100, nm0001378, nm0279557)"


In [0]:
# joining the two previous dataframes 

df_totalmovie = df_movie_actors.select("tconst", explode("nconst").alias("nconst")).join(df_movie_count, "nconst", how = "inner" )
display(df_totalmovie)

nconst,tconst,total_movies
nm0000362,tt0313792,49
nm0000198,tt6111574,53
nm0000198,tt1626175,53
nm0000198,tt4357394,53
nm0000362,tt0091584,49
nm0000362,tt0418599,49
nm0000362,tt0094155,49
nm0350453,tt0443706,47
nm0000767,tt0239670,46
nm0000767,tt0101937,46


In [0]:
# Q: 11.1 - Final awnser --> performing a grouby in the movie title in order to sum all the movies in which the actors/actresses of that movie, worked on.

df_totalmovie = df_totalmovie.select("tconst", "total_movies").groupBy("tconst").agg(sum("total_movies"))
display(df_totalmovie)

tconst,sum(total_movies)
tt7657364,95
tt0489019,12
tt0177528,5
tt0393875,132
tt4342506,26
tt12094140,27
tt14949634,33
tt1874708,191
tt9174618,2
tt12218186,9


In [0]:
# Storing the dataframe with the pagerank per vertice into a new dataframe

df_pagerank = pageRanks.vertices.withColumnRenamed("id", "nconst")
display(df_pagerank)

nconst,pagerank
nm0463100,0.3792550716142751
nm0481731,0.5449251142407031
nm0091035,6.046573991663524
nm0847128,1.5889055132982228
nm0847265,0.4689647164122567
nm0280355,0.8217388751563005
nm3088934,1.927590311297771
nm1480555,0.6381449627704232
nm3244111,0.1679099540517935
nm0727382,1.1034591447510491


In [0]:
# load from file system
#df_pagerank = spark.read.option("header", True).csv("dbfs:/databricks/driver/df_pagerank.csv")

#df_pagerank.write.option("header", True).csv("dbfs:/databricks/driver/df_pagerank.csv")

In [0]:
# Q:11.2 - Final awnser --> Following the same thinking as before. Performing a join between the df_movie_actors and the df_pagerank.
# after that, grouping by the title id and then performing an aggregation function (avg) to calculcate the average pagerank per movie cast

df_avg_pagerank = df_movie_actors.select("tconst", explode("nconst").alias("nconst")).join(df_pagerank, "nconst", how = "inner" )
df_avg_pagerank = df_avg_pagerank.select("tconst", "pagerank").groupBy("tconst").agg(avg("pagerank"))
display(df_avg_pagerank)

tconst,avg(pagerank)
tt7657364,7.788372297805875
tt1329350,0.7909512946374175
tt1874708,2.627737312969145
tt0193004,0.4547796104912912
tt0282141,3.846422531370159
tt5586322,2.038631959948668
tt0166160,3.0084309122208075
tt0098048,1.2055172518456867
tt0302361,4.223558385454064
tt0489019,1.0650711239026995


In [0]:
# load from file system
#df_avg_pagerank = spark.read.option("header", True).csv("dbfs:/databricks/driver/df_avg_pagerank.csv")

#df_avg_pagerank.write.option("header", True).csv("dbfs:/databricks/driver/df_avg_pagerank.csv")

In [0]:
df_outdegrees = gdf.outDegrees.withColumnRenamed("id", "nconst")
display(df_outdegrees)

nconst,outDegree
nm1006993,57
nm0000198,126
nm0733586,12
nm0091035,63
nm1441714,80
nm0897197,24
nm0380489,32
nm0102522,42
nm0000767,103
nm3482594,6


In [0]:
# Q:11.3 - Final awnser --> Following the same procedures as before but now with the avg outdegree per cast

df_avg_outdegrees = df_movie_actors.select("tconst", explode("nconst").alias("nconst")).join(df_outdegrees, "nconst", how = "inner" )
df_avg_outdegrees = df_avg_outdegrees.select("tconst", "outDegree").groupBy("tconst").agg(avg("outDegree"))
display(df_avg_outdegrees)

tconst,avg(outDegree)
tt7657364,83.0
tt1329350,4.5
tt1874708,56.5
tt0193004,3.5
tt0282141,58.25
tt5586322,36.0
tt0166160,39.66666666666666
tt0098048,13.0
tt0302361,62.0
tt0489019,8.0


In [0]:
kevin_distances_df = kevin_distances_df.withColumnRenamed("id", "nconst")
display(kevin_distances_df)

nconst,distances
nm0068551,3
nm0058547,5
nm0277020,4
nm5157721,4
nm2236012,20
nm0057651,5
nm1932274,3
nm4633806,4
nm0208444,3
nm0252309,4


In [0]:
# Q:11.4 - Final awnser --> Following the same procedures as before but now with the avg distances per cast to Kevin Bacon node.

df_avg_kevindegrees = df_movie_actors.select("tconst", explode("nconst").alias("nconst")).join(kevin_distances_df, "nconst", how = "inner" )
df_avg_kevindegrees = df_avg_kevindegrees.select("tconst", "distances").groupBy("tconst").agg(avg("distances"))
display(df_avg_kevindegrees)

tconst,avg(distances)
tt7657364,2.0
tt1329350,3.0
tt1874708,3.5
tt0193004,4.0
tt0282141,2.5
tt5586322,4.0
tt0166160,2.6666666666666665
tt0098048,3.5
tt0302361,2.25
tt0489019,3.333333333333333


In [0]:
# I will need this DF for the labels (avg ratings)

df_ratings = ratings.select(["tconst", "averageRating"])

# Building the Final DataFrame with all the previous dataframes from exercise 11

df_Final_join = df_totalmovie.join(df_avg_pagerank, "tconst", how = "inner")\
                        .join(df_avg_outdegrees, "tconst", how = "inner")\
                        .join(df_avg_kevindegrees, "tconst", how = "inner")\
                        .join(df_ratings, "tconst", how = "inner")

# Renaming the columns for better understanding

df_Final = df_Final_join.select(col("sum(total_movies)").alias("total_movies"), col("avg(pagerank)").alias("avg_pagerank"), col("avg(outDegree)").alias("avg_outDegree"), col("avg(distances)").alias("avg_distances"), col("averageRating").alias("avg_ratings"))

display(df_Final)

total_movies,avg_pagerank,avg_outDegree,avg_distances,avg_ratings
95,7.788372297805875,83.0,2.0,3.5
8,0.7909512946374175,4.5,3.0,4.2
191,2.627737312969145,56.5,3.5,3.3
5,0.4547796104912912,3.5,4.0,5.6
122,3.846422531370159,58.25,2.5,5.6
40,2.038631959948668,36.0,4.0,7.7
50,3.0084309122208075,39.66666666666666,2.6666666666666665,5.8
19,1.2055172518456867,13.0,3.5,2.3
101,4.223558385454064,62.0,2.25,5.9
12,1.0650711239026995,8.0,3.333333333333333,3.2


In [0]:
# load from file system

#df_Final = spark.read.option("header", True).csv("dbfs:/databricks/driver/df_Final.csv")
#df_Final.write.option("header", True).csv("dbfs:/databricks/driver/df_Final.csv")

In [0]:
# load from file system
#df_Final_join = spark.read.option("header", True).csv("dbfs:/databricks/driver/df_Final_join.csv")

#df_Final_join.write.option("header", True).csv("dbfs:/databricks/driver/df_Final_join.csv")

In [0]:
# In this section, I should have used the sparkml pipeline for better time eficiency but I wanted to mannualy do this process to better understand some of the concepts learned in class

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

# Converting the df_Final DataFrame into dense vectors 

vector_rdd = df_Final.rdd.map(lambda x: Vectors.dense(x))

# Using the function learned in class to convert the features into dense vectors and leaving out the labels

def transData(data):
  # Combine columns to a dense vector (excluding the last column)
  dataFeaturesRDD = data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]])
  
  # Convert the RDD back to a DataFrame, labelling the columns
  featuresDF =  dataFeaturesRDD.toDF(['features','label'])
  
  return featuresDF

data_LR = transData(df_Final)
data_LR.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[95.0,7.788372297...|  3.5|
|[8.0,0.7909512946...|  4.2|
|[191.0,2.62773731...|  3.3|
|[5.0,0.4547796104...|  5.6|
|[122.0,3.84642253...|  5.6|
|[40.0,2.038631959...|  7.7|
|[50.0,3.008430912...|  5.8|
|[19.0,1.205517251...|  2.3|
|[101.0,4.22355838...|  5.9|
|[12.0,1.065071123...|  3.2|
|[15.0,1.220076786...|  5.0|
|[5.0,0.6816359314...|  5.8|
|[44.0,1.226160791...|  6.3|
|[132.0,5.56852483...|  4.1|
|[13.0,1.137542810...|  6.7|
|[199.0,2.81400383...|  2.7|
|[21.0,1.546371132...|  6.4|
|[26.0,1.268447008...|  7.1|
|[26.0,1.318565783...|  4.5|
|[20.0,1.185510982...|  5.5|
+--------------------+-----+
only showing top 20 rows



In [0]:
# Q:11 - Final awnser --> Coefficients and accuracy acording to the RMSE (1.26)

from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Building the LR model

lr = LinearRegression()

# Spliting the data into train (70%) and test (30%)

train, test = data_LR.randomSplit([0.7, 0.3], 0)

# Fiting into the training data

model_LR = lr.fit(train, {lr.regParam:0.0})
print(">>>> model intercept: %r, model coefficient: %r" % (model_LR.intercept, model_LR.coefficients[0]))

# applying the model on the test set in order to get the predictions

predictions = model_LR.transform(test)

# Evaluation method (RMSE)

evaluator = RegressionEvaluator(metricName="rmse")

p_values = model_LR.summary.pValues

RMSE = evaluator.evaluate(predictions)
print("model_LR: RMSE = " + str(RMSE))
print("model_LR pvalue intercept: %r, pvalue coefficient: %r" % (p_values[1], p_values[0]))

>>>> model intercept: 5.87753960623849, model coefficient: -0.0007194734311468037
model_LR: RMSE = 1.2626402843183355
model_LR pvalue intercept: 0.0, pvalue coefficient: 0.0


**Q12** What score would your model predict for the 1997 movie Titanic.

In [0]:
# Filtering the dataframe for this Titanic movie

df_Titanic = basics.filter((basics.originalTitle == "Titanic") & (basics.startYear == 1997) & (basics.titleType == "movie"))
display(df_Titanic)

tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
tt0120338,movie,Titanic,Titanic,0,1997,\N,194,"Drama,Romance"


In [0]:
# Q:12 - Final awnser --> Model Prediction for the 1997 Titanic movie

# Filtering for the specific Titanic id found above

Titanic_row = df_Final_join.where(col("tconst") == "tt0120228").drop("tconst")

# Transforming into a vector

vector_Titanic = Titanic_row.rdd.map(lambda x: Vectors.dense(x))

# applying the TransData function 

data_Titanic = transData(Titanic_row)

# Predicting the Titanic's Label

prediction_Titanic = model_LR.transform(data_Titanic)

prediction_Titanic.show()

+--------------------+-----+-----------------+
|            features|label|       prediction|
+--------------------+-----+-----------------+
|[19.0,0.747531401...|  5.0|5.869251659104352|
+--------------------+-----+-----------------+



**Q13** Create dummy variables for each of the top 10 movie genres for Q10. These variable should have a value of 1 if the movie was rated with that genre and 0 otherwise. For example the 1997 movie Titanic should have a 1 in the dummy variable column for Romance, and a 1 in the dummy variable column for Drama, and 0's in all the other dummy variable columns.

Does adding these variable to the regression improve your results? What is the new RMSE and predicted rating for the 1997 movie Titanic.

In [0]:
rdd_ind_genre.take(10)

Out[61]: [('Drama', 4845971786),
 ('Action', 3233082384),
 ('Comedy', 2925654980),
 ('Adventure', 2598666640),
 ('Crime', 1880285263),
 ('Thriller', 1601437009),
 ('Sci-Fi', 1235494933),
 ('Romance', 1205642129),
 ('Mystery', 957925978),
 ('Horror', 892241761)]

In [0]:
# Performing a join in order to introduce the "genres" column

df_q13 = df.select(["tconst", "genres"])

df_dummy = df_Final_join.join(df_q13, "tconst", how = "inner")
display(df_dummy)

tconst,sum(total_movies),avg(pagerank),avg(outDegree),avg(distances),averageRating,genres
tt0020652,16,0.3457380088569495,1.0,7.0,5.3,Drama
tt0020652,16,0.3457380088569495,1.0,7.0,5.3,Drama
tt0020652,16,0.3457380088569495,1.0,7.0,5.3,Drama
tt0020652,16,0.3457380088569495,1.0,7.0,5.3,Drama
tt0020652,16,0.3457380088569495,1.0,7.0,5.3,Drama
tt0020652,16,0.3457380088569495,1.0,7.0,5.3,Drama
tt0020652,16,0.3457380088569495,1.0,7.0,5.3,Drama
tt0020652,16,0.3457380088569495,1.0,7.0,5.3,Drama
tt0020652,16,0.3457380088569495,1.0,7.0,5.3,Drama
tt0020652,16,0.3457380088569495,1.0,7.0,5.3,Drama


In [0]:
# load from file system

#df_dummy = spark.read.option("header", True).csv("dbfs:/databricks/driver/df_dummy.csv")
#df_dummy.write.option("header", True).csv("dbfs:/databricks/driver/df_dummy.csv")

In [0]:
# Top 10 individual genres

rdd_ind_genre.map(lambda x: x[0]).take(10)

Out[70]: ['Drama',
 'Action',
 'Comedy',
 'Adventure',
 'Crime',
 'Thriller',
 'Sci-Fi',
 'Romance',
 'Mystery',
 'Horror']

In [0]:
# Adding dummy variables to the dataset
df_ML = df_dummy

import pyspark.sql.functions as F 

distinct_genres = rdd_ind_genre.map(lambda x: x[0]).take(10)

for genre in distinct_genres:
    function = udf(lambda x: 1 if (genre in str(x).split(',')) else 0, IntegerType())
    new_column_name = str(genre)
    df_ML = df_ML.withColumn(new_column_name, function(col("genres")))

display(df_ML)

tconst,sum(total_movies),avg(pagerank),avg(outDegree),avg(distances),averageRating,genres,Drama,Action,Comedy,Adventure,Crime,Thriller,Sci-Fi,Romance,Mystery,Horror
tt0030136,4,0.999999999999998,1.0,20.0,5.2,"Comedy,Drama",1,0,1,0,0,0,0,0,0,0
tt0030136,4,0.999999999999998,1.0,20.0,5.2,"Comedy,Drama",1,0,1,0,0,0,0,0,0,0
tt0030136,4,0.999999999999998,1.0,20.0,5.2,"Comedy,Drama",1,0,1,0,0,0,0,0,0,0
tt0030136,4,0.999999999999998,1.0,20.0,5.2,"Comedy,Drama",1,0,1,0,0,0,0,0,0,0
tt0030136,4,0.999999999999998,1.0,20.0,5.2,"Comedy,Drama",1,0,1,0,0,0,0,0,0,0
tt0030136,4,0.999999999999998,1.0,20.0,5.2,"Comedy,Drama",1,0,1,0,0,0,0,0,0,0
tt0030136,4,0.999999999999998,1.0,20.0,5.2,"Comedy,Drama",1,0,1,0,0,0,0,0,0,0
tt0030136,4,0.999999999999998,1.0,20.0,5.2,"Comedy,Drama",1,0,1,0,0,0,0,0,0,0
tt0030136,4,0.999999999999998,1.0,20.0,5.2,"Comedy,Drama",1,0,1,0,0,0,0,0,0,0
tt0030136,4,0.999999999999998,1.0,20.0,5.2,"Comedy,Drama",1,0,1,0,0,0,0,0,0,0


In [0]:
# load from file system

#df_ML = spark.read.option("header", True).csv("dbfs:/databricks/driver/df_ML.csv")
#df_ML.write.option("header", True).csv("dbfs:/databricks/driver/df_ML.csv")

In [0]:
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import col,when
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

def transData(data):
  # vectorizing the data and leaving the last column (target)
  dataFeaturesRDD = data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]])
  
  # Converting into an rdd 
  featuresDF =  dataFeaturesRDD.toDF(['features','label'])
  
  return featuresDF


In [0]:
# Changing the target (averageRating) to the last position in order to use the transdata function as it is

df_ML = df_ML.select(col("tconst"), col("sum(total_movies)").alias("total_movies"), col("avg(pagerank)").alias("avg_pagerank"), col("avg(outDegree)").alias("avg_outDegree"), col("avg(distances)").alias("avg_distances"), col("Drama"), col("Action"), col("Comedy"), col("Adventure"), col("Crime"), col("Thriller"), col("Sci-Fi").alias("Sci_Fi"), col("Romance"), col("Mystery"), col("Horror"), col("averageRating"))

In [0]:
# Removing duplicated rows

df_ML = df_ML.distinct()

display(df_ML)

tconst,total_movies,avg_pagerank,avg_outDegree,avg_distances,Drama,Action,Comedy,Adventure,Crime,Thriller,Sci_Fi,Romance,Mystery,Horror,averageRating
tt0047562,21,0.611354371819591,6.0,3.0,1,1,0,0,0,0,0,1,0,0,5.6
tt0051587,41,4.09714786866391,45.0,4.0,1,0,0,0,0,0,0,0,0,0,4.7
tt0060560,10,0.9565383910725436,13.0,4.0,1,0,1,0,0,0,0,0,0,0,7.2
tt0063731,12,1.1792712437159192,6.333333333333333,5.333333333333333,1,0,0,0,0,0,0,0,0,0,7.5
tt0064265,58,1.5139578256620343,32.5,3.5,0,1,0,1,0,0,0,0,0,0,7.2
tt0052230,55,2.0623666024341363,19.0,4.0,1,0,0,0,0,0,0,0,0,0,5.2
tt0052951,8,0.8600370812355762,2.5,4.5,0,0,0,0,0,0,0,0,0,0,5.2
tt0057136,13,1.2993479028262007,9.0,4.0,1,0,0,0,0,0,0,0,0,0,5.4
tt0047598,16,1.5380986271445172,4.0,4.0,0,0,1,0,0,0,0,0,0,0,6.2
tt0062292,13,0.913192356235834,7.5,3.5,0,0,0,0,0,0,1,0,0,1,6.2


In [0]:
df_ML.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- total_movies: string (nullable = true)
 |-- avg_pagerank: string (nullable = true)
 |-- avg_outDegree: string (nullable = true)
 |-- avg_distances: string (nullable = true)
 |-- Drama: string (nullable = true)
 |-- Action: string (nullable = true)
 |-- Comedy: string (nullable = true)
 |-- Adventure: string (nullable = true)
 |-- Crime: string (nullable = true)
 |-- Thriller: string (nullable = true)
 |-- Sci_Fi: string (nullable = true)
 |-- Romance: string (nullable = true)
 |-- Mystery: string (nullable = true)
 |-- Horror: string (nullable = true)
 |-- averageRating: string (nullable = true)



In [0]:
# Converting string variables into float

df_ML = df_ML.withColumn('total_movies', df_ML.total_movies.cast(FloatType()))
df_ML = df_ML.withColumn('avg_pagerank', df_ML.avg_pagerank.cast(FloatType()))
df_ML = df_ML.withColumn('avg_outDegree', df_ML.avg_outDegree.cast(FloatType()))
df_ML = df_ML.withColumn('avg_distances', df_ML.avg_distances.cast(FloatType()))
df_ML = df_ML.withColumn('Drama', df_ML.Drama.cast(FloatType()))
df_ML = df_ML.withColumn('Action', df_ML.Action.cast(FloatType()))
df_ML = df_ML.withColumn('Comedy', df_ML.Comedy.cast(FloatType()))
df_ML = df_ML.withColumn('Adventure', df_ML.Adventure.cast(FloatType()))
df_ML = df_ML.withColumn('Crime', df_ML.Crime.cast(FloatType()))
df_ML = df_ML.withColumn('Thriller', df_ML.Thriller.cast(FloatType()))
df_ML = df_ML.withColumn('Sci_Fi', df_ML.Sci_Fi.cast(FloatType()))
df_ML = df_ML.withColumn('Romance', df_ML.Romance.cast(FloatType()))
df_ML = df_ML.withColumn('Mystery', df_ML.Mystery.cast(FloatType()))
df_ML = df_ML.withColumn('Horror', df_ML.Horror.cast(FloatType()))
df_ML = df_ML.withColumn('averageRating', df_ML.averageRating.cast(FloatType()))

In [0]:
# Droping the genres column because we dont need it

df_ML = df_ML.drop("genres")
df_ML_Final = df_ML.drop("tconst")

# vectorizing the data

df_ML_Final = transData(df_ML_Final)
df_ML_Final.show()

+--------------------+-----------------+
|            features|            label|
+--------------------+-----------------+
|[21.0,0.611354351...|5.599999904632568|
|[41.0,4.097147941...|4.699999809265137|
|[10.0,0.956538379...|7.199999809265137|
|[12.0,1.179271221...|              7.5|
|[58.0,1.513957858...|7.199999809265137|
|[32.0,1.548627257...|6.099999904632568|
|[11.0,1.113009810...|              5.0|
|[41.0,4.139506340...|6.800000190734863|
|[36.0,3.578929185...|              6.5|
|[3.0,0.2757430970...|4.800000190734863|
|[22.0,2.069422721...|6.099999904632568|
|[3.0,1.0,1.0,20.0...|6.099999904632568|
|[167.0,3.13122892...|5.699999809265137|
|[17.0,1.285108327...|              5.0|
|[84.0,3.114949226...|6.199999809265137|
|[98.0,6.172012805...|6.800000190734863|
|[21.0,1.437712430...|6.400000095367432|
|[12.0,1.502348303...|7.300000190734863|
|[40.0,1.250881910...|7.800000190734863|
|[34.0,1.239699602...|              7.5|
+--------------------+-----------------+
only showing top

In [0]:
# Q:13 - Final awnser --> Checking the results ---> The accuracy of the model improves with the implementation of dummy variables! From 1.26 to 1.19!

lr = LinearRegression()

# Spliting the data like before
train, test = df_ML_Final.randomSplit([0.7, 0.3], 0)

# Fiting into the training data

model_LR_dummy = lr.fit(train, {lr.regParam:0.0})
print(">>>> model intercept: %r, model coefficient: %r" % (model_LR_dummy.intercept, model_LR_dummy.coefficients[0]))

# retrieving the predictions

predictions_dummy = model_LR_dummy.transform(test)

# Evaluation method (RMSE)

evaluator = RegressionEvaluator(metricName="rmse")

p_values = model_LR_dummy.summary.pValues

RMSE = evaluator.evaluate(predictions_dummy)
print("model_LR: RMSE = " + str(RMSE))
print("model_LR pvalue intercept: %r, pvalue coefficient: %r" % (p_values[1], p_values[0]))

>>>> model intercept: 5.935210255803197, model coefficient: -0.0007095669210852347
model_LR: RMSE = 1.1997666867438908
model_LR pvalue intercept: 0.0, pvalue coefficient: 0.0


In [0]:
# Finding Titanic title id

display(df_ML.where(col("tconst") == "tt0120338"))

Titanic_dummy = df_ML.where(col("tconst") == "tt0120338")
Titanic_dummy = Titanic_dummy.drop("tconst")

tconst,total_movies,avg_pagerank,avg_outDegree,avg_distances,Drama,Action,Comedy,Adventure,Crime,Thriller,Sci_Fi,Romance,Mystery,Horror,averageRating
tt0120338,206.0,8.224008,118.5,2.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,7.9


In [0]:
# Vectorizing Titanic row

Titanic_dummy = transData(Titanic_dummy)
Titanic_dummy.show()

+--------------------+-----------------+
|            features|            label|
+--------------------+-----------------+
|[206.0,8.22400760...|7.900000095367432|
+--------------------+-----------------+



In [0]:
# Q:13 - Final awnser ---> predicted rating for Titanic 1997 moviea and RMSE

pred_Titanic_dummy = model_LR_dummy.transform(Titanic_dummy)
pred_Titanic_dummy.show()

# RMSE

RMSE = evaluator.evaluate(pred_Titanic_dummy)
print("model_LR_dummy: RMSE = " + str(RMSE))

+--------------------+-----------------+-----------------+
|            features|            label|       prediction|
+--------------------+-----------------+-----------------+
|[206.0,8.22400760...|7.900000095367432|6.144256392511611|
+--------------------+-----------------+-----------------+

model_LR_dummy: RMSE = 1.7557437028558205


**Q14 - Open Question**: Improve your model by testing different machine learning algorithms, using hyperparameter tuning on these algorithms, changing the included features. What is the RMSE of you final model and what rating does it predict for the 1997 movie Titanic.

In [0]:
# In this question, I will try to implement some ML methodologies to improve the results and the chosen model

df_ML_pipeline = df_ML
display(df_ML_pipeline)

tconst,total_movies,avg_pagerank,avg_outDegree,avg_distances,Drama,Action,Comedy,Adventure,Crime,Thriller,Sci_Fi,Romance,Mystery,Horror,averageRating
tt0047562,21.0,0.61135435,6.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,5.6
tt0051587,41.0,4.097148,45.0,4.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,4.7
tt0060560,10.0,0.9565384,13.0,4.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,7.2
tt0063731,12.0,1.1792712,6.3333335,5.3333335,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,7.5
tt0064265,58.0,1.5139579,32.5,3.5,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,7.2
tt0052230,55.0,2.0623665,19.0,4.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,5.2
tt0052951,8.0,0.8600371,2.5,4.5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,5.2
tt0057136,13.0,1.2993479,9.0,4.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,5.4
tt0047598,16.0,1.5380986,4.0,4.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.2
tt0062292,13.0,0.91319233,7.5,3.5,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,6.2


In [0]:
# feature Engineering: I decided it would be insteresting to add the numVotes column because I think it might have some importance in getting the average Rating

df_ML_pipeline = df_ML_pipeline.join(df.select(["tconst", "numVotes"]), "tconst", how = "inner").distinct()
display(df_ML_pipeline)

tconst,total_movies,avg_pagerank,avg_outDegree,avg_distances,Drama,Action,Comedy,Adventure,Crime,Thriller,Sci_Fi,Romance,Mystery,Horror,averageRating,numVotes
tt0054643,24.0,1.2163867,10.5,4.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,7.1,95
tt0058692,73.0,4.8818254,60.0,3.0,0.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,6.6,2326
tt0049309,45.0,1.4845567,12.0,4.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.0,80
tt0067305,44.0,2.1468558,16.333334,3.6666667,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,4.5,162
tt0043782,34.0,1.7889122,18.0,3.0,1.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,6.6,1092
tt0055207,53.0,2.619838,20.5,3.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,6.7,7452
tt0064197,71.0,2.4496396,27.0,3.5,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.3,73
tt0044913,49.0,1.8666121,17.0,3.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.8,160
tt0057239,34.0,1.7889122,18.0,3.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,7.3,1953
tt0064944,35.0,2.2275767,18.0,3.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.4,8


In [0]:
# More Feature Engineering: Creting a ration between avg_pagerank and numVotes called pagerank_per_votes

df_ML_pipeline = df_ML_pipeline.withColumn("pagerank_per_votes", col("avg_pagerank") / col("numVotes"))

display(df_ML_pipeline)

tconst,total_movies,avg_pagerank,avg_outDegree,avg_distances,Drama,Action,Comedy,Adventure,Crime,Thriller,Sci_Fi,Romance,Mystery,Horror,averageRating,numVotes,pagerank_per_votes
tt0054643,24.0,1.2163867,10.5,4.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,7.1,95,0.0128040702719437
tt0058692,73.0,4.8818254,60.0,3.0,0.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,6.6,2326,0.0020988071569572
tt0049309,45.0,1.4845567,12.0,4.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.0,80,0.0185569584369659
tt0067305,44.0,2.1468558,16.333334,3.6666667,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,4.5,162,0.013252196488557
tt0043782,34.0,1.7889122,18.0,3.0,1.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,6.6,1092,0.0016381979643643
tt0055207,53.0,2.619838,20.5,3.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,6.7,7452,0.0003515617283070145
tt0064197,71.0,2.4496396,27.0,3.5,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.3,73,0.0335567062848234
tt0044913,49.0,1.8666121,17.0,3.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.8,160,0.0116663254797458
tt0057239,34.0,1.7889122,18.0,3.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,7.3,1953,0.0009159816574940484
tt0064944,35.0,2.2275767,18.0,3.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.4,8,0.2784470915794372


In [0]:
# Saving the Titanic row because I will need it later

df_Titanic_pipeline = df_ML_pipeline.where(col("tconst") == "tt0120338")
display(df_Titanic_pipeline)

tconst,total_movies,avg_pagerank,avg_outDegree,avg_distances,Drama,Action,Comedy,Adventure,Crime,Thriller,Sci_Fi,Romance,Mystery,Horror,averageRating,numVotes,pagerank_per_votes
tt0120338,206.0,8.224008,118.5,2.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,7.9,1137493,7.22994128887505e-06


In [0]:
# load from file system

#df_Titanic_pipeline = spark.read.option("header", True).csv("dbfs:/databricks/driver/df_Titanic_pipeline.csv")
#df_Titanic_pipeline.write.option("header", True).csv("dbfs:/databricks/driver/df_Titanic_pipeline.csv")

In [0]:
# Moving the label column to the last position and removing the tconst because we dont need it 

df_ML_pipeline = df_ML_pipeline.select("total_movies", "avg_pagerank", "pagerank_per_votes", "avg_outDegree", "avg_distances", "numVotes", "Drama", "Action", "Comedy", "Adventure", "Crime", "Thriller", "Sci_Fi", "Romance", "Mystery", "Horror", "averageRating")

display(df_ML_pipeline)

total_movies,avg_pagerank,pagerank_per_votes,avg_outDegree,avg_distances,numVotes,Drama,Action,Comedy,Adventure,Crime,Thriller,Sci_Fi,Romance,Mystery,Horror,averageRating
24.0,1.2163867,0.0128040702719437,10.5,4.0,95,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,7.1
73.0,4.8818254,0.0020988071569572,60.0,3.0,2326,0.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,6.6
45.0,1.4845567,0.0185569584369659,12.0,4.0,80,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.0
44.0,2.1468558,0.013252196488557,16.333334,3.6666667,162,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,4.5
34.0,1.7889122,0.0016381979643643,18.0,3.0,1092,1.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,6.6
53.0,2.619838,0.0003515617283070145,20.5,3.0,7452,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,6.7
71.0,2.4496396,0.0335567062848234,27.0,3.5,73,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.3
49.0,1.8666121,0.0116663254797458,17.0,3.0,160,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6.8
34.0,1.7889122,0.0009159816574940484,18.0,3.0,1953,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,7.3
35.0,2.2275767,0.2784470915794372,18.0,3.0,8,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.4


In [0]:
df_ML_pipeline.printSchema()

root
 |-- total_movies: string (nullable = true)
 |-- avg_pagerank: string (nullable = true)
 |-- pagerank_per_votes: double (nullable = true)
 |-- avg_outDegree: string (nullable = true)
 |-- avg_distances: string (nullable = true)
 |-- numVotes: string (nullable = true)
 |-- Drama: string (nullable = true)
 |-- Action: string (nullable = true)
 |-- Comedy: string (nullable = true)
 |-- Adventure: string (nullable = true)
 |-- Crime: string (nullable = true)
 |-- Thriller: string (nullable = true)
 |-- Sci_Fi: string (nullable = true)
 |-- Romance: string (nullable = true)
 |-- Mystery: string (nullable = true)
 |-- Horror: string (nullable = true)
 |-- averageRating: string (nullable = true)



In [0]:
# Convert string variables that should be float into float

df_ML_pipeline = df_ML_pipeline.withColumn('total_movies', df_ML_pipeline.total_movies.cast(FloatType()))\
                      .withColumn('avg_pagerank', df_ML_pipeline.avg_pagerank.cast(FloatType()))\
                      .withColumn('avg_outDegree', df_ML_pipeline.avg_outDegree.cast(FloatType()))\
                      .withColumn('avg_distances', df_ML_pipeline.avg_distances.cast(FloatType()))\
                      .withColumn('numVotes', df_ML_pipeline.numVotes.cast(FloatType()))\
                      .withColumn('Drama', df_ML_pipeline.Drama.cast(FloatType()))\
                      .withColumn('Action', df_ML_pipeline.Action.cast(FloatType()))\
                      .withColumn('Comedy', df_ML_pipeline.Comedy.cast(FloatType()))\
                      .withColumn('Adventure', df_ML_pipeline.Adventure.cast(FloatType()))\
                      .withColumn('Crime', df_ML_pipeline.Crime.cast(FloatType()))\
                      .withColumn('Thriller', df_ML_pipeline.Thriller.cast(FloatType()))\
                      .withColumn('Sci_Fi', df_ML_pipeline.Sci_Fi.cast(FloatType()))\
                      .withColumn('Romance', df_ML_pipeline.Romance.cast(FloatType()))\
                      .withColumn('Mystery', df_ML_pipeline.Mystery.cast(FloatType()))\
                      .withColumn('Horror', df_ML_pipeline.Horror.cast(FloatType()))\
                      .withColumn('averageRating', df_ML_pipeline.averageRating.cast(FloatType()))

In [0]:
# Defining the continuous columns and target column

CONTINUOUS_COLUMNS = [
    "total_movies",
    "avg_pagerank",
    "pagerank_per_votes",
    "avg_outDegree",
    "avg_distances",
    "numVotes"
]

TARGET_COLUMN = ["averageRating"]

In [0]:
# Correlation between features
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler

continuous_features = VectorAssembler(
    inputCols=CONTINUOUS_COLUMNS, outputCol="continuous_features"
)

vector_features = df_ML_pipeline.select(CONTINUOUS_COLUMNS)
for x in CONTINUOUS_COLUMNS:
    vector_features = vector_features.where(~F.isnull(F.col(x))) 

vector = continuous_features.transform(vector_features)

vector.select("continuous_features").show(3, False)

+--------------------------------------------------------------+
|continuous_features                                           |
+--------------------------------------------------------------+
|[24.0,1.2163866758346558,0.012804070271943744,10.5,4.0,95.0]  |
|[73.0,4.8818254470825195,0.002098807156957231,60.0,3.0,2326.0]|
|[85.0,1.9602729082107544,0.056007797377450125,42.5,4.0,35.0]  |
+--------------------------------------------------------------+
only showing top 3 rows



In [0]:
from pyspark.ml.stat import Correlation

correlation = Correlation.corr(
    vector, "continuous_features"
)

correlation.printSchema()

root
 |-- pearson(continuous_features): matrix (nullable = false)



In [0]:
# Correlation matrix between continuous features

import pandas as pd
correlation_array = correlation.head()[0].toArray()

correlation_pd = pd.DataFrame(
    correlation_array,  
    index=CONTINUOUS_COLUMNS,  
    columns=CONTINUOUS_COLUMNS, 
)

print(correlation_pd.iloc[:, :6])

                    total_movies  avg_pagerank  pagerank_per_votes  \
total_movies            1.000000      0.454966            0.210920   
avg_pagerank            0.454966      1.000000            0.277351   
pagerank_per_votes      0.210920      0.277351            1.000000   
avg_outDegree           0.679225      0.875955            0.277710   
avg_distances          -0.143966     -0.256829           -0.004359   
numVotes                0.056166      0.121904           -0.066710   

                    avg_outDegree  avg_distances  numVotes  
total_movies             0.679225      -0.143966  0.056166  
avg_pagerank             0.875955      -0.256829  0.121904  
pagerank_per_votes       0.277710      -0.004359 -0.066710  
avg_outDegree            1.000000      -0.289525  0.126591  
avg_distances           -0.289525       1.000000 -0.078632  
numVotes                 0.126591      -0.078632  1.000000  


In [0]:
# avg_pagerank is highly correlated with avg_outDegree --> I will remove the later since it has higher comulative correlation with the remaining columns

df_ML_pipeline = df_ML_pipeline.drop("avg_outDegree")

In [0]:
# Feature Normalization (minmax)

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
import pyspark.ml.feature as MF

CONTINUOUS_COLUMNS = ["total_movies", "avg_pagerank", "avg_distances", "numVotes"]

BINARY_COLUMNS = ["Drama", "Action", "Comedy", "Adventure", "Crime", "Thriller", "Sci_Fi", "Romance", "Mystery", "Horror"]

# pagerank_per_votes is considered a ration here so will not be normalized

continuous_assembler = MF.VectorAssembler(  
    inputCols=CONTINUOUS_COLUMNS,
    outputCol="continuous",
)

continuous_scaler = MF.MinMaxScaler(  
    inputCol="continuous",
    outputCol="continuous_scaled",
)

# Initializing a sparkML pipeline 

imdb_pipeline = Pipeline(  
    stages=[continuous_assembler, continuous_scaler]
)

In [0]:
# Vectorizing all the data to feed it into the ML model

preml_assembler = MF.VectorAssembler(
    inputCols=BINARY_COLUMNS 
    + ["continuous_scaled"]
    + ["pagerank_per_votes"],
    outputCol="features",
)

imdb_pipeline.setStages(
    [continuous_assembler, continuous_scaler, preml_assembler])

Out[76]: Pipeline_d556ded7ab50

In [0]:
# train test split to fit the pipeline --> 70/30

train, test = df_ML_pipeline.randomSplit([0.7, 0.3], 0) # seed = 0 for reproducibility

# Fiting the pipeline to the train and transform the test

imdb_pipeline_model = imdb_pipeline.fit(train)  
imdb_features = imdb_pipeline_model.transform(test) 

In [0]:
# the data is now ready for the ML model

imdb_features.select("averageRating", "features").show(5, truncate=30)

+-------------+------------------------------+
|averageRating|                      features|
+-------------+------------------------------+
|          6.2|(15,[2,7,11,12,13,14],[1.0,...|
|          6.3|(15,[0,11,12,13,14],[1.0,5....|
|          8.2|(15,[11,12,13,14],[8.507249...|
|          6.1|(15,[0,2,7,11,12,13,14],[1....|
|          4.1|(15,[11,12,13,14],[0.001251...|
+-------------+------------------------------+
only showing top 5 rows



In [0]:
# Random Forest Regressor(base)

from pyspark.ml.regression import RandomForestRegressor

Forest_Regressor = RandomForestRegressor(labelCol="averageRating", featuresCol="features", predictionCol='prediction', seed = 0)

imdb_pipeline.setStages(
    [continuous_assembler, continuous_scaler, preml_assembler, Forest_Regressor])

Out[79]: Pipeline_d556ded7ab50

In [0]:
imdb_pipeline_model = imdb_pipeline.fit(train)
results_RF = imdb_pipeline_model.transform(test)

In [0]:
# Testing with a basic version of Random Forest --> RMSE: 1.17

evaluator = RegressionEvaluator(labelCol = "averageRating", metricName="rmse")

RMSE_RF = evaluator.evaluate(results_RF)
print("RMSE = %g" % RMSE_RF)

RMSE = 1.17229


In [0]:
# Testing a different variation of the Random forest
#https://towardsdatascience.com/random-forest-hyperparameters-and-how-to-fine-tune-them-17aee785ee0d
#  After some research and, based on the link above, I have found that the number of features to consider when splitting at each node is fundamental
# Therefore I will use the "sqrt" on the featureSubsetStrategy parameter

Forest_Regressor_2 = RandomForestRegressor(labelCol="averageRating", featuresCol="features", predictionCol='prediction', featureSubsetStrategy = "sqrt", seed = 0)

imdb_pipeline.setStages(
    [continuous_assembler, continuous_scaler, preml_assembler, Forest_Regressor_2])

Out[188]: Pipeline_54e18c999e8c

In [0]:
imdb_pipeline_model = imdb_pipeline.fit(train)
results_RF_2 = imdb_pipeline_model.transform(test)

In [0]:
# Testing with the second version of the Random forest described above --> RMSE = 1.17 (same as before)

evaluator = RegressionEvaluator(labelCol = "averageRating", metricName="rmse")

RMSE_RF_2 = evaluator.evaluate(results_RF_2)
print("RMSE = %g" % RMSE_RF_2)

RMSE = 1.17546


In [0]:
# Testing with a simple Regression Tree

from pyspark.ml.regression import DecisionTreeRegressor

Regression_Tree = DecisionTreeRegressor(labelCol="averageRating", featuresCol="features", predictionCol='prediction', seed = 0)

imdb_pipeline.setStages(
    [continuous_assembler, continuous_scaler, preml_assembler, Regression_Tree])

Out[192]: Pipeline_54e18c999e8c

In [0]:
imdb_pipeline_model = imdb_pipeline.fit(train)
results_tree = imdb_pipeline_model.transform(test)

In [0]:
# Testing with the baseline version of a Regression Tree --> RMSE = 1.17

evaluator = RegressionEvaluator(labelCol = "averageRating", metricName="rmse")

RMSE_tree = evaluator.evaluate(results_tree)
print("RMSE = %g" % RMSE_tree)

RMSE = 1.17674


In [0]:
# Q:14 --> Random Forest baseline was the best model with RMSE of 1.17229

In [0]:
# Now, we will try to implement this pipeline on the Titanic movie row in order to predict its rating 
# It is important to mention that, at this stage, we dont have the avg_outDegree because of the previous observed correlation

df_Titanic_pipeline = df_Titanic_pipeline.select("total_movies", "avg_pagerank", "pagerank_per_votes", "avg_distances", "numVotes", "Drama", "Action", "Comedy", "Adventure", "Crime", "Thriller", "Sci_Fi", "Romance", "Mystery", "Horror", "averageRating")

df_Titanic_pipeline = df_Titanic_pipeline.withColumn('total_movies', df_Titanic_pipeline.total_movies.cast(FloatType()))\
                      .withColumn('avg_pagerank', df_Titanic_pipeline.avg_pagerank.cast(FloatType()))\
                      .withColumn('avg_distances', df_Titanic_pipeline.avg_distances.cast(FloatType()))\
                      .withColumn('numVotes', df_Titanic_pipeline.numVotes.cast(FloatType()))\
                      .withColumn('Drama', df_Titanic_pipeline.Drama.cast(FloatType()))\
                      .withColumn('Action', df_Titanic_pipeline.Action.cast(FloatType()))\
                      .withColumn('Comedy', df_Titanic_pipeline.Comedy.cast(FloatType()))\
                      .withColumn('Adventure', df_Titanic_pipeline.Adventure.cast(FloatType()))\
                      .withColumn('Crime', df_Titanic_pipeline.Crime.cast(FloatType()))\
                      .withColumn('Thriller', df_Titanic_pipeline.Thriller.cast(FloatType()))\
                      .withColumn('Sci_Fi', df_Titanic_pipeline.Sci_Fi.cast(FloatType()))\
                      .withColumn('Romance', df_Titanic_pipeline.Romance.cast(FloatType()))\
                      .withColumn('Mystery', df_Titanic_pipeline.Mystery.cast(FloatType()))\
                      .withColumn('Horror', df_Titanic_pipeline.Horror.cast(FloatType()))\
                      .withColumn('averageRating', df_Titanic_pipeline.averageRating.cast(FloatType()))

imdb_pipeline.setStages(
    [continuous_assembler, continuous_scaler, preml_assembler, Forest_Regressor])

imdb_pipeline_model = imdb_pipeline.fit(train)
results_RF_Titanic = imdb_pipeline_model.transform(df_Titanic_pipeline)

In [0]:
# Q:14 - Final awnser - Predicting the Titanic averageRating

results_RF_Titanic.select(["averageRating", "prediction"]).show()

+-------------+-----------------+
|averageRating|       prediction|
+-------------+-----------------+
|          7.9|7.101463848184295|
+-------------+-----------------+

