# Six Degrees of Kevin Bacon
**Introduction** - Six Degrees of Kevin Bacon is a game based on the "six degrees of separation"
concept, which posits that any two people on Earth are six or fewer acquaintance links apart. Movie
buffs challenge each other to find the shortest path between an arbitrary actor and prolific actor
Kevin Bacon. It rests on the assumption that anyone involved in the film industry can be linked
through their film roles to Bacon within six steps.
The analysis of social networks can be a computationally intensive task, especially when dealing with
large volumes of data. It is also a challenging problem to devise a correct methodology to infer an
informative social network structure. Here, we will analyze a social network of actors and actresses
that co-participated in movies. We will do some simple descriptive analysis, and in the end try to
relate an actor/actress’s position in the social network with the success of the movies in which they
participate.

#### Rules & Notes - Please take your time to read the following points:

1. The submission deadline shall be set for the 10th of June at 23:59.
2. It is acceptable that you **discuss** with your colleagues different approaches to solve each step of the problem set. You are responsible for writing your own code, and analysing the results. Clear cases of cheating will be penalized with 0 points in this assignment;
3. After review of your submission files, and before a mark is attributed, you might be called to orally defend your submission;
4. You will be scored first and foremost by the number of correct answers, secondly by the logic used in the trying to approach each step of the problem set;
5. Consider skipping questions that you are stuck in, and get back to them later;
6. Expect computations to take a few minutes to finish in some of the steps.
7. **IMPORTANT** It is expected you have developed skills beyond writting SQL queries. Any question where you directly write a SQL query (then for example create a temporary table and use spark.sql to pass the query) will receive a 25% penalty. Using the Spark syntax (for example dataframe.select("\*").where("conditions")) is acceptable and does not incur this penalty. Comment your code in a reasonable fashion.
8. **Questions** – Any questions about this assignment should be posted in the Forum@Moodle. The last class will be an open office session for anyone with questions concerning the assignment. 
9. **Delivery** - To fulfil this activity you will have to upload the following materials to Moodle:
    1. An exported IPython notebook. The notebook should be solved (have results displayed), but should contain all neccesary code so that when the notebook is run in databricks it should also replicate these results. This means the all data downloading and processing should be done in this notebook. It is also important you clearly indicate where your final answer to each question is when you are using multiple cells (for example you print "my final anwser is" before your answer or use cell comments). Please make sure to name your file in the following way: *[student_number1]_[student_number2]_submission.ipynb*. As an example: *19740001_197400010_submission.ipynb*
    2. **Delivery** - You will also need to provide a signed statement of authorship, which is present in the last page;
    3. It is recommended you read the whole assignment before starting.
    4. You can add as many cells as you like to answer the questions.
    5. You can make use of caching or persisting your RDDs or Dataframes, this may speed up performance.
    6. If you have trouble with graphframes in databricks (specifically the import statement) you need to make sure the graphframes package is installed on the cluster you are running. If you click home on the left, then click on the graphframes library, from where you can install the package on your cluster (check the graphframes checkbox and click install). Another installation option is using the JAR available on Moodle with the graphframes library.
10. **Note**: By including the name and student number of each group member  in the submission notebook, this will be considered as a declaration of authorship.

#### Data Sources and Description
We will use data from IMDB. You can download raw datafiles
from https://datasets.imdbws.com. Note that the files are tab delimited (.tsv) You can find a
description of the each datafile in https://www.imdb.com/interfaces/

## Authors
Carlos Rodrigues - 20230543

Devora Cavaleiro - 20230794

## Questions
### Data loading and preperation
Review the file descriptions and load the necessary data onto your databricks cluser and into spark dataframes. You will need to use shell commands to download the data, unzip the data, load the data into spark. Note that the data might require parsing and preprocessing to be ready for the questions below.

**Hints** You can use 'gunzip' to unzip the .tz files. The data files will then be tab seperated (.tsv), which you can load into a dataframe using the tab seperated option instead of the comma seperated option we have typically used in class: `.option(“sep”,”\t”)`

# Cluster configuration

We've used a cluster wth the library "graphframes" installed.

In [None]:
import os
from itertools import combinations
from pyspark.sql import functions as f
from pyspark.sql.functions import col, collect_list
from graphframes import *
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
import pyspark.ml.feature as MF
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator

# Download and persist files into DBFS

In [None]:
spark = SparkSession.builder.appName("Project").getOrCreate()

In [None]:
def file_exists_dbfs(path):
  try:
    dbutils.fs.ls(path)
    return True
  except:
      return False

In [None]:
def download_file_and_store_in_dbfs(url):
  file_name = url.split('/')[-1]
  unziped_file_name = file_name[:-3]
  dbfs_file_path = "/datasets/" + unziped_file_name

  if file_exists_dbfs(dbfs_file_path):
    print(f"File {dbfs_file_path} already exists in DBFS. Skipping...")
    return
  
  if os.path.isfile(f"/dbfs/tmp/{file_name}"):
    print(f"File {file_name} already exists. Skipping download...")
  else:    
    print(f"Downloading file from {url}...")
    get_ipython().run_cell_magic('sh', '', f'wget -O /dbfs/tmp/{file_name} {url}')

  if os.path.isfile(f"/dbfs/tmp/{unziped_file_name}"):
    print(f"File {unziped_file_name} already exists. Skipping unziping...")
  else:
    print(f"Unziping file...")
    get_ipython().run_cell_magic('sh', '', f'gunzip /dbfs/tmp/{file_name}')

  print("Copying file to DBFS...")
  dbutils.fs.mv(f"dbfs:/tmp/{unziped_file_name}", f"dbfs:{dbfs_file_path}")

  print("Done!")

## title.principals.tsv.gz

In [None]:
download_file_and_store_in_dbfs("https://datasets.imdbws.com/title.principals.tsv.gz")

File /datasets/title.principals.tsv already exists in DBFS. Skipping...


## title.basics.tsv.gz

In [None]:
download_file_and_store_in_dbfs("https://datasets.imdbws.com/title.basics.tsv.gz")

File /datasets/title.basics.tsv already exists in DBFS. Skipping...


## title.ratings.tsv.gz

In [None]:
download_file_and_store_in_dbfs("https://datasets.imdbws.com/title.ratings.tsv.gz")

File /datasets/title.ratings.tsv already exists in DBFS. Skipping...


## name.basics.tsv.gz

In [None]:
download_file_and_store_in_dbfs("https://datasets.imdbws.com/name.basics.tsv.gz")

File /datasets/name.basics.tsv already exists in DBFS. Skipping...


The datasets **"title.episode.tsv.gz"**, **"title.akas.tsv.gz"** and **"title.crew.tsv.gz"** weren’t download, because during the analyse of the content provided, they aren't needed to solve the questions on this project.

# Data Cleaning and Pre-processing

## Title Principals Dataset

In [None]:
title_principals_df = spark.read \
    .format("csv") \
    .options(inferSchema = "True", header = "True", sep = "\t", nullValue='\\N') \
    .load("dbfs:/datasets/title.principals.tsv")

In [None]:
display(title_principals_df.head(20))

tconst,ordering,nconst,category,job,characters
tt0000001,1,nm1588970,self,,"[""Self""]"
tt0000001,2,nm0005690,director,,
tt0000001,3,nm0005690,producer,producer,
tt0000001,4,nm0374658,cinematographer,director of photography,
tt0000002,1,nm0721526,director,,
tt0000002,2,nm1335271,composer,,
tt0000003,1,nm0721526,director,,
tt0000003,2,nm1770680,producer,producer,
tt0000003,3,nm0721526,producer,producer,
tt0000003,4,nm1335271,composer,,


In [None]:
title_principals_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- nconst: string (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)
 |-- characters: string (nullable = true)



## Title Basics Dataset

In [None]:
title_basics_df = spark.read \
    .format("csv") \
    .options(inferSchema = "True", header = "True", sep = "\t", nullValue='\\N') \
    .load("dbfs:/datasets/title.basics.tsv")

In [None]:
title_basics_df.show(20)

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|   null|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|   null|             5|     Animation,Short|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|     1892|   null|             5|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|   null|            12|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|   null|             1|        Comedy

In [None]:
title_basics_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: integer (nullable = true)
 |-- startYear: integer (nullable = true)
 |-- endYear: integer (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)



Convert column `runtimeMinutes` from `string` to `integer`

In [None]:
title_basics_df = title_basics_df.withColumn("runtimeMinutes", col("runtimeMinutes").cast(IntegerType()))
title_basics_df.show(20)

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|   null|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|   null|             5|     Animation,Short|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|     1892|   null|             5|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|   null|            12|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|   null|             1|        Comedy

In [None]:
title_basics_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: integer (nullable = true)
 |-- startYear: integer (nullable = true)
 |-- endYear: integer (nullable = true)
 |-- runtimeMinutes: integer (nullable = true)
 |-- genres: string (nullable = true)



The `genres` column is string a comma-separated string. We need to process it and transform it into and array.

We're also sorting the array to ensure that same genres combination with different ordering doesn't happen (eg: `['Drama', 'Romance']` and `['Romance','Drama']`)

In [None]:
title_basics_df_preprocessed = title_basics_df \
  .withColumn('genres', f.array_sort(f.split('genres', ',')))

In [None]:
title_basics_df_preprocessed.show()

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|   null|             1|[Documentary, Short]|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|   null|             5|  [Animation, Short]|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|     1892|   null|             5|[Animation, Comed...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|   null|            12|  [Animation, Short]|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|   null|             1|     [Comedy, 

In [None]:
title_basics_df_preprocessed.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: integer (nullable = true)
 |-- startYear: integer (nullable = true)
 |-- endYear: integer (nullable = true)
 |-- runtimeMinutes: integer (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = false)



## Name basics Dataset

In [None]:
name_basics_df = spark.read \
    .format("csv") \
    .options(inferSchema = "True", header = "True", sep = "\t", nullValue='\\N') \
    .load("dbfs:/datasets/name.basics.tsv")

display(name_basics_df.take(20))

nconst,primaryName,birthYear,deathYear,primaryProfession,knownForTitles
nm0000001,Fred Astaire,1899,1987.0,"actor,miscellaneous,producer","tt0072308,tt0050419,tt0053137,tt0027125"
nm0000002,Lauren Bacall,1924,2014.0,"actress,soundtrack,archive_footage","tt0037382,tt0075213,tt0117057,tt0038355"
nm0000003,Brigitte Bardot,1934,,"actress,music_department,producer","tt0057345,tt0049189,tt0056404,tt0054452"
nm0000004,John Belushi,1949,1982.0,"actor,writer,music_department","tt0072562,tt0077975,tt0080455,tt0078723"
nm0000005,Ingmar Bergman,1918,2007.0,"writer,director,actor","tt0050986,tt0083922,tt0050976,tt0069467"
nm0000006,Ingrid Bergman,1915,1982.0,"actress,producer,soundtrack","tt0034583,tt0036855,tt0038109,tt0038787"
nm0000007,Humphrey Bogart,1899,1957.0,"actor,producer,miscellaneous","tt0034583,tt0042593,tt0043265,tt0037382"
nm0000008,Marlon Brando,1924,2004.0,"actor,director,writer","tt0078788,tt0068646,tt0047296,tt0070849"
nm0000009,Richard Burton,1925,1984.0,"actor,producer,director","tt0061184,tt0087803,tt0059749,tt0057877"
nm0000010,James Cagney,1899,1986.0,"actor,director,producer","tt0029870,tt0031867,tt0042041,tt0035575"


## Title ratings Dataset

In [None]:
title_ratings_df = spark.read \
    .format("csv") \
    .options(inferSchema = "True", header = "True", sep = "\t", nullValue='\\N') \
    .load("dbfs:/datasets/title.ratings.tsv")

display(title_ratings_df.take(20))

tconst,averageRating,numVotes
tt0000001,5.7,2059
tt0000002,5.6,277
tt0000003,6.5,2020
tt0000004,5.3,180
tt0000005,6.2,2786
tt0000006,5.1,187
tt0000007,5.4,872
tt0000008,5.4,2201
tt0000009,5.4,212
tt0000010,6.8,7587


# Project questions

### Network Inference, Let’s build a network
In the following questions you will look to summarise the data and build a network. We want to examine a network that abstracts how actors and actress are related through their co-participation in movies. To that end perform the following steps:

**Q1** Create a DataFrame that combines **all the information** on each of the titles (i.e., movies, tv-shows, etc …) and **all of the information** the participants in those movies (i.e., actors, directors, etc … ), make sure the actual names of the movies and participants are included. It may be worth reviewing the following questions to see how this dataframe will be used.

How many rows does your dataframe have?

In [None]:
# Join both on 'nconst'
actors_roles = name_basics_df.join(title_principals_df, "nconst", "inner")

# Join previous table on 'tconst'
actors_movie = actors_roles.join(title_basics_df_preprocessed, "tconst", "inner")

# Now join with ratings, on 'tconst' as well
actors_movie_rating = actors_movie.join(title_ratings_df, "tconst", "inner")

# Final dataframe after joins
actors_movie_rating.show()

+---------+---------+--------------------+---------+---------+--------------------+--------------------+--------+---------------+--------+--------------------+---------+--------------------+--------------------+-------+---------+-------+--------------+------------------+-------------+--------+
|   tconst|   nconst|         primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|ordering|       category|     job|          characters|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|            genres|averageRating|numVotes|
+---------+---------+--------------------+---------+---------+--------------------+--------------------+--------+---------------+--------+--------------------+---------+--------------------+--------------------+-------+---------+-------+--------------+------------------+-------------+--------+
|tt0000658|nm0169871|          Émile Cohl|     1857|     1938|director,animatio...|tt1003400,tt10034...|       1|  

In [None]:
# Count the rows
row_count = actors_movie_rating.count()
print(f"The number of rows in the DataFrame is: {row_count}")

The number of rows in the DataFrame is: 20090057


In [None]:
# save df into dbfs
# actors_movie_rating.write.parquet("dbfs:/datasets/actors_movie_rating_parquet")

In [None]:
# Load the file from dbfs into a df
#actors_movie_rating = spark.read.parquet("dbfs:/datasets/actors_movie_rating_parquet")

**Q2** Create a new DataFrame based on the previous step, with the following removed:
1. Any participant that is not an actor or actress (as measured by the category column);
1. All adult movies;
1. All dead actors or actresses;
1. All actors or actresses born before 1920 or with no date of birth listed;
1. All titles that are not of the type movie.

How many rows does your dataframe have?

In [None]:
filtered_df = actors_movie_rating.where(
    (col("category").isin("actor", "actress")) &
    (col("isAdult") == 0) &
    (col("deathYear").isNull()) &
    ((col("birthYear") >= 1920) & (col("birthYear").isNotNull())) &
    (col("titleType") == "movie")
)

filtered_df.show()

+---------+---------+--------------------+---------+---------+--------------------+--------------------+--------+--------+----+--------------------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+-------------+--------+
|   tconst|   nconst|         primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|ordering|category| job|          characters|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|averageRating|numVotes|
+---------+---------+--------------------+---------+---------+--------------------+--------------------+--------+--------+----+--------------------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+-------------+--------+
|tt0026455|nm0172237|    Cora Sue Collins|     1927|     null|  actress,soundtrack|tt0024895,tt00334...|      10| actress|null|   ["Marian Fos

In [None]:
# save df into dbfs
# filtered_df.write.parquet("dbfs:/datasets/actors_movie_rating_filtered_parquet")

In [None]:
# Load the file from dbfs into a df
#filtered_df = spark.read.parquet("dbfs:/datasets/actors_movie_rating_filtered_parquet")

In [None]:
row_count = filtered_df.count()
print(f"The number of rows in the DataFrame is: {row_count}")

The number of rows in the DataFrame is: 749075


**Q3** Convert the above Dataframe to an RDD. Use map and reduce to create a paired RDD which counts how many movies each actor / actress appears in.

Display names of the top 10 actors/actresses according to the number of movies in which they appeared. Be careful to deal with different actors / actresses with the same name, these could be different people.

In [None]:
# Convert DataFrame to RDD
rdd = filtered_df.rdd

# Paired RDD with actor/actress add count 1 for each movie where they appear. Take into account the 'nconst' (unique identifier of person)
# to handle cases of actor/actresses with same name.
# We also need to use distinct to avoid computing duplicated rows for the same title + actor/actress
paired_rdd = rdd.map(lambda row: (row["nconst"], row["primaryName"], row["tconst"])) \
                .distinct() \
                .map(lambda row: ((row[0], row[1]), 1))

# Reduce by key (actor/actress) to count the number of movies
movies_count_rdd = paired_rdd.reduceByKey(lambda a, b: a + b)

# Sort by movie count (X[1]) in descending order and take the top 10
top_10_actors = movies_count_rdd.sortBy(lambda x: x[1], ascending=False) \
                                .take(10)

In [None]:

# top 10 names
print("Top 10 actors/actresses according to the number of movies in which they appeared:")
for actor, count in top_10_actors:
    print(f"{actor[1]}: {count} movies")

Top 10 actors/actresses according to the number of movies in which they appeared
Brahmanandam: 791 movies
Jagathy Sreekumar: 505 movies
Shakti Kapoor: 458 movies
Mammootty: 394 movies
Nassar: 362 movies
Tanikella Bharani: 361 movies
Aruna Irani: 359 movies
Mohanlal: 351 movies
Anupam Kher: 351 movies
Eric Roberts: 347 movies


**Q4** Start with the dataframe from Q2. Generate a DataFrame that lists all links of your network. Here we shall consider that a link connects a pair of actors/actresses if they participated in at least one movie together (actors / actresses should be represented by their unique ID's). For every link we then need anytime a pair of actors were together in a movie as a link in each direction (A -> B and B -> A). However links should be distinct we do not need duplicates when two actors worked together in several movies. 

Display a DataFrame with the first 10 edges.

In [None]:
# get the unique actors and unique movies
actors_movies_df = filtered_df.select("tconst", "nconst")

display(actors_movies_df.take(10))

tconst,nconst
tt0026455,nm0172237
tt0027663,nm1359047
tt0028674,nm0124638
tt0029638,nm0038535
tt0032160,nm0092008
tt0032746,nm0751255
tt0033006,nm0190681
tt0033532,nm0905464
tt0033822,nm1168072
tt0034461,nm0798333


In [None]:
# Group by movie (tconst) and collect all actors (nconst)
grouped_df = actors_movies_df.groupBy("tconst").agg(collect_list("nconst").alias("actors"))

display(grouped_df.take(10))

tconst,actors
tt0026455,List(nm0172237)
tt0027663,List(nm1359047)
tt0028674,List(nm0124638)
tt0029638,List(nm0038535)
tt0032160,List(nm0092008)
tt0032746,List(nm0751255)
tt0033006,List(nm0190681)
tt0033532,List(nm0905464)
tt0033822,List(nm1168072)
tt0034461,List(nm0798333)


First 10 edges:

In [None]:
# Generate all possible pairs of actors for each movie
pairs_rdd = grouped_df.rdd.flatMap(lambda row: [(a, b) for a, b in combinations(row["actors"], 2)])

# Create a DataFrame from the pairs RDD
pairs_df = pairs_rdd.toDF(["actor1", "actor2"])

# Create bidirectional links (A -> B and B -> A)
bidirectional_links_rdd = pairs_df.rdd.flatMap(lambda row: [(row["actor1"], row["actor2"]), (row["actor2"], row["actor1"])])

# Remove duplicates to ensure unique links
edgeDataFrame = bidirectional_links_rdd.distinct().toDF(["src", "dst"])

# Show the first 10 edges
edgeDataFrame.show(10)

+---------+---------+
|      src|      dst|
+---------+---------+
|nm0001352|nm0657190|
|nm0657190|nm0001352|
|nm0756292|nm0473530|
|nm0807519|nm0818215|
|nm0189774|nm0674540|
|nm0151228|nm0886638|
|nm0450326|nm0450300|
|nm0450326|nm0351600|
|nm0395012|nm0336390|
|nm0132577|nm0829813|
+---------+---------+
only showing top 10 rows



In [None]:
edgeDataFrame = bidirectional_links_rdd.distinct().toDF(["src", "dst"]).persist()
verticesDataFrame = actors_movies_df.select(col('nconst').alias('id')).distinct().persist()

graph = GraphFrame(verticesDataFrame, edgeDataFrame)
graph.vertices.show()
graph.edges.show()



+---------+
|       id|
+---------+
|nm0788120|
|nm0852670|
|nm0463100|
|nm0481731|
|nm0091035|
|nm0155923|
|nm0847265|
|nm0280355|
|nm3088934|
|nm0102522|
|nm1966962|
|nm1480555|
|nm0727382|
|nm4993862|
|nm7029055|
|nm3236159|
|nm0360305|
|nm0090120|
|nm0000767|
|nm0812556|
+---------+
only showing top 20 rows

+---------+---------+
|      src|      dst|
+---------+---------+
|nm0561155|nm0949942|
|nm0279524|nm0517642|
|nm0253349|nm0515581|
|nm0402505|nm0311357|
|nm0474926|nm0904537|
|nm0391096|nm0493048|
|nm0556146|nm0094544|
|nm0140153|nm0881605|
|nm0765783|nm0846998|
|nm1977074|nm0005033|
|nm0060288|nm0625115|
|nm0145061|nm0023134|
|nm0135742|nm0307491|
|nm0040152|nm0813961|
|nm0436157|nm0173318|
|nm0000047|nm0139206|
|nm0116532|nm0885446|
|nm0355246|nm0194357|
|nm0554718|nm0690362|
|nm0131939|nm0188051|
+---------+---------+
only showing top 20 rows



**Q5** Compute the page rank of each actor. This can be done using GraphFrames or
by using RDDs and the iterative implementation of the PageRank algorithm. Do not take
more than 5 iterations and use reset probility = 0.1.

List the top 10 actors / actresses by pagerank.

In [None]:
pageRanks = graph.pageRank(resetProbability=0.1, maxIter = 5)



Top actors/actresses by Pagerank:

In [None]:
pageRanksDf = pageRanks.vertices.join(name_basics_df, name_basics_df['nconst'] == pageRanks.vertices['id']) \
  .select('nconst', 'primaryName', 'pagerank') \
  .orderBy(f.desc('pagerank'))

display(pageRanksDf.take(10))

nconst,primaryName,pagerank
nm0000616,Eric Roberts,50.92364805311795
nm0000514,Michael Madsen,31.601431340559397
nm0001803,Danny Trejo,23.346304188390643
nm0202966,Keith David,22.531306846826777
nm0261724,Joe Estevez,21.841306390240263
nm0000448,Lance Henriksen,21.702360457117138
nm0000532,Malcolm McDowell,21.208998343240047
nm0001595,Michael Paré,20.82491382545784
nm0726223,Richard Riehle,20.06968885970805
nm0000367,Gérard Depardieu,19.563157341971092


**Q6**: Create an RDD with the number of outDegrees for each actor. Display the top 10 by outdegrees.

In [None]:
outDegreesDf = graph.outDegrees.join(name_basics_df, name_basics_df['nconst'] == graph.outDegrees['id']) \
  .select('nconst', 'primaryName', 'outDegree')

outDegreesRdd = outDegreesDf.rdd \
  .sortBy(lambda row: row['outDegree'], ascending=False)

display(outDegreesRdd.take(10))

nconst,primaryName,outDegree
nm0000616,Eric Roberts,1117
nm0000514,Michael Madsen,791
nm0451600,Anupam Kher,723
nm0000367,Gérard Depardieu,657
nm0202966,Keith David,657
nm0621937,Nassar,642
nm0410902,Renji Ishibashi,624
nm0256628,Akira Emoto,617
nm0695177,Prakash Raj,606
nm0000168,Samuel L. Jackson,605


### Let’s play Kevin’s own game

**Q7** Start with the graphframe / dataframe you developed in the previous questions. Using Spark GraphFrame and/or Spark Core library perform the following steps:

1. Identify the id of Kevin Bacon, there are two actors named ‘Kevin Bacon’, we will use the one with the highest degree, that is, the one that participated in most titles;
1. Estimate the shortest path between every actor in the database actors and Kevin Bacon, keep a dataframe with this information as you will need it later;
1. Summarise the data, that is, count the number of actors at each number of degress from kevin bacon (you will need to deal with actors unconnected to kevin bacon, if not connected to Kevin Bacon given these actors / actresses a score/degree of 20).

In [None]:
kevin_bacon_df = name_basics_df.join(graph.outDegrees, graph.outDegrees['id'] == name_basics_df['nconst']) \
  .where(col('primaryName') == 'Kevin Bacon') \
  .orderBy(f.desc('outDegree')) \
  .select('nconst')

Get Kevin Bacon ID and store it into a variable:

In [None]:
kevin_bacon_id = kevin_bacon_df.take(1)[0]['nconst']
kevin_bacon_id

Out[34]: 'nm0000102'

Calculate shortest-path for each actor/actress and Kevin Bacon

In [None]:
shortest_path_df = graph.shortestPaths(landmarks=[kevin_bacon_id])

display(shortest_path_df.take(10))

id,distances
nm0046770,Map(nm0000102 -> 2)
nm2570728,Map(nm0000102 -> 3)
nm0057029,Map(nm0000102 -> 3)
nm4969323,Map(nm0000102 -> 3)
nm0165411,Map(nm0000102 -> 3)
nm3974605,Map(nm0000102 -> 3)
nm0622637,Map(nm0000102 -> 4)
nm1682363,Map(nm0000102 -> 3)
nm0911320,Map(nm0000102 -> 2)
nm0738722,Map(nm0000102 -> 3)


Summarize the data:

In [None]:
#Create new column to store only the distance, replacing nulls with 20
distance_to_kevin_bacon_df = shortest_path_df \
  .withColumn("distance_to_kevin_bacon", f.coalesce(f.col("distances").getItem(kevin_bacon_id), f.lit(20))) \
  .withColumnRenamed('id', 'nconst')
  
summarized_shortest_path_df = distance_to_kevin_bacon_df.groupBy('distance_to_kevin_bacon') \
  .count() \
  .orderBy(f.desc('count'))

display(summarized_shortest_path_df.take(20))

distance_to_kevin_bacon,count
3,54190
4,38606
2,13315
20,10283
5,4048
1,334
6,319
7,32
8,3
0,1


### Exploring the data with RDD's

Using RDDs and (not dataframes) answer the following questions (if you loaded your data into spark in a dataframe you can convert to an RDD of rows easily using `.rdd`):

**Q8** Movies can have multiple genres. Considering only titles of the type 'movie' what is the combination of genres that is the most popluar (as measured by number of reviews). Hint: paired RDD's will be useful.

In [None]:
# Convert DataFrames to RDD
title_basics_rdd = title_basics_df_preprocessed.rdd
title_ratings_rdd = title_ratings_df.rdd

In [None]:
# filter only the movies
movies_rdd = title_basics_rdd.filter(lambda row: row["titleType"] == "movie")

In [None]:
# Select necessary columns
movies_rdd = movies_rdd.map(lambda row: (row['tconst'], row['genres']))
ratings_rdd = title_ratings_rdd.map(lambda row: (row['tconst'], (row['numVotes'])))

In [None]:
# join
movies_ratings_rdd = movies_rdd.join(ratings_rdd)

In [None]:
# filter none type
movies_ratings_rdd = movies_ratings_rdd.filter(lambda x: x[1][0] is not None and x[1][1] is not None)

In [None]:
# Split genres and pair with number of votes
genre_votes_rdd = movies_ratings_rdd.map(lambda x: (tuple(x[1][0]), x[1][1]))

In [None]:
# Reduce by genre to sum the number of votes
genre_votes_sum_rdd = genre_votes_rdd.reduceByKey(lambda x, y: x + y)

In [None]:
# sort in descending order of votes and taking the first
most_popular_genre_combination = genre_votes_sum_rdd.sortBy(lambda x: x[1], ascending=False).first()

print(f"The most popular genre combination is: {most_popular_genre_combination}")

The most popular genre combination is: (('Action', 'Adventure', 'Sci-Fi'), 54342547)


**Q9** Movies can have multiple genres. Considering only titles of the type 'movie', and movies with more than 400 ratings, what is the combination of genres that has the highest **average movie rating** (you can average the movie rating for each movie in that genre combination). Hint: paired RDD's will be useful.

In [None]:
# filter only the movies
movies_rdd = title_basics_rdd.filter(lambda row: row["titleType"] == "movie")

In [None]:
# Join movies and ratings RDDs
movies_rdd = movies_rdd.map(lambda row: (row['tconst'], row['genres']))
ratings_rdd = title_ratings_rdd.map(lambda row: (row['tconst'], (row['numVotes'], row['averageRating'])))

movies_ratings_rdd = movies_rdd.join(ratings_rdd)

In [None]:
# Filter movies with more than 400 ratings (2nd element on the tuple)
movies_more_rated = movies_ratings_rdd.filter(lambda row: row[1][1][0] > 400 if row[1][0] is not None else False)

In [None]:
# Map to get (genre_combination, (sum_rating, count))
genre_ratings = movies_more_rated.map(lambda row: (tuple(row[1][0]), (row[1][1][1])))

In [None]:
genres_and_rating_rdd = movies_more_rated.map(lambda row: (tuple(row[1][0]), row[1][1][1]))

highest_avg_genre_combination = genres_and_rating_rdd.groupByKey() \
  .mapValues(lambda values: sum(values) / len(values)) \
  .max(lambda x: x[1])

print("Genre combination with the highest average rating:", highest_avg_genre_combination)

Genre combination with the highest average rating: (('Music', 'Musical'), 8.3)


**Q10** Movies can have multiple genres. What is **the individual genre** which is the most popular as meaured by number of votes. Votes for multiple genres count towards each genre listed. Hint: flatmap and pairedRDD's will be useful here.

In [None]:
# Filter out movies with None genres
movies_with_genres = movies_ratings_rdd.filter(lambda x: x[1][0] is not None)

In [None]:
# flatten genres - create pairs of (genre, votes)
genres_votes = movies_with_genres.flatMap(lambda x: [(genre, x[1][1][0]) for genre in x[1][0]])

In [None]:
# ReduceByKey to sum votes for each genre
genre_vote_totals = genres_votes.reduceByKey(lambda x, y: x + y)

In [None]:
# Find the genre with the maximum votes
most_popular_genre = genre_vote_totals.max(lambda x: x[1])

print("Most popular genre by number of votes:", most_popular_genre)

Most popular genre by number of votes: ('Drama', 572280968)


## Engineering the perfect cast
We have created a number of potential features for predicting the rating of a movie based on its cast. Use sparkML to build a simple linear model to predict the rating of a movie based on the following features:

1. The total number of movies in which the actors / actresses have acted (based on Q3)
1. The average pagerank of the cast in each movie (based on Q5)
1. The average outDegree of the cast in each movie (based on Q6)
1. The average value for for the cast of degrees of Kevin Bacon (based on Q7).

You will need to create a dataframe with the required features and label. Use a pipeline to create the vectors required by sparkML and apply the model. Remember to split your dataset, leave 30% of the data for testing, when splitting your data use the option seed=0.

**Q11** Provide the coefficients of the regression and the accuracy of your model on that test dataset according to RSME.

Transform movie_count_rdd to a Data Frame

In [None]:
movie_count_schema = StructType([       
    StructField('actor', StructType([
      StructField('nconst', StringType(), True),
      StructField('primaryName', StringType(), True)  
    ]), True),
    StructField('n_movies', StringType(), True)
])

movie_counts_df = movies_count_rdd.toDF(schema=movie_count_schema)

In [None]:
aggregated_df = title_principals_df.join(movie_counts_df, movie_counts_df['actor']['nconst'] == title_principals_df['nconst']) \
  .join(pageRanksDf, "nconst") \
  .join(outDegreesDf, "nconst") \
  .join(distance_to_kevin_bacon_df, "nconst") \
  .groupBy('tconst') \
  .agg({
    'n_movies': 'sum',
    'pagerank': 'mean',
    'outDegree': 'mean',
    'distance_to_kevin_bacon': 'mean'
  })

aggregated_df = aggregated_df.join(title_ratings_df, 'tconst')
  
# Rename the columns to be easier to use in the pipeline:

aggregated_df = aggregated_df \
  .withColumnRenamed('sum(n_movies)', 'n_movies') \
  .withColumnRenamed('avg(distance_to_kevin_bacon)', 'avg_distance_to_kevin_bacon') \
  .withColumnRenamed('avg(outDegree)', 'avg_out_degree') \
  .withColumnRenamed('avg(pagerank)', 'avg_pagerank') \
  .withColumnRenamed('averageRating', 'avg_rating') \
  .cache()

display(aggregated_df.head(5))

tconst,n_movies,avg_distance_to_kevin_bacon,avg_out_degree,avg_pagerank,avg_rating,numVotes
tt0040483,2.0,20.0,1.0,1.0502618046180614,4.8,19
tt0040695,19.0,3.0,12.0,2.173982233151124,7.1,4589
tt0043858,8.0,4.0,2.0,0.2139640354995338,4.9,20
tt0044431,17.0,3.0,5.0,0.6110460740928226,6.1,284
tt0044770,22.0,2.5,4.5,0.4732553094990344,6.3,77


### Linear Regression

Create the ML Pipeline

In [None]:
imputer = MF.Imputer(  
    strategy="mean",
    inputCols=["n_movies", "avg_distance_to_kevin_bacon", "avg_out_degree", "avg_pagerank"],
    outputCols=["n_movies_i", "avg_distance_to_kevin_bacon_i", "avg_out_degree_i", "avg_pagerank_i"],
)

continuous_assembler = MF.VectorAssembler(  
    inputCols=["n_movies_i", "avg_distance_to_kevin_bacon_i", "avg_out_degree_i", "avg_pagerank_i"],
    outputCol="continuous",
)

continuous_scaler = MF.MinMaxScaler(  
    inputCol="continuous",
    outputCol="features",
)

lr = LinearRegression(
    featuresCol="features", labelCol="avg_rating", predictionCol="prediction"
)

pipeline = Pipeline(stages=[imputer, continuous_assembler, continuous_scaler, lr])

Train/Test split + Predictions

In [None]:
train, test = aggregated_df.randomSplit([0.7, 0.3], seed = 0)

train.cache()

pipeline_model = pipeline.fit(train)
results = pipeline_model.transform(test)

In [None]:
intercept = pipeline_model.stages[-1].intercept
coefficients = pipeline_model.stages[-1].coefficients

print(f"Linear Regression Model intercept: {intercept}, coefficients: {coefficients}")

Linear Regression Model intercept: 7.156209262374765, coefficients: [-3.8495174061719855,-0.4761521698071885,4.762847671493903,-7.138299027008857]


In [None]:
evaluator = RegressionEvaluator(labelCol="avg_rating", predictionCol="prediction", metricName="rmse")

In [None]:
rmse = evaluator.evaluate(results)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

Root Mean Squared Error (RMSE) on test data: 1.349


**Q12** What score would your model predict for the 1997 movie Titanic.

In [None]:
def get_titanic_df(feats_df):
  # Here we had to play a little with aliases because the join were duplicating the runtimeMinutes columns
  # and for that particular function, we only need to return the columns that are already present in the feature df
  titanic_df = feats_df.alias("feats_df").join(title_basics_df_preprocessed.alias("title_basics_df"), "tconst") \
    .where(
      (col("title_basics_df.primaryTitle") == "Titanic") &
      (col("title_basics_df.startYear") == 1997) &
      (col("title_basics_df.titleType") == "movie")
    ).select("feats_df.*") # This line to the trick

  return titanic_df

In [None]:
def view_titanic_actual_rating_vs_prediction(titanic_df, model):
  titanic_results = model.transform(titanic_df)
  titanic_prediction = titanic_results \
    .select(
      col('avg_rating').alias('Actual Avg Rating'),
      col('prediction').alias('Predicted Avg Rating')
    ).collect()

  display(titanic_prediction)

View Actual Rating vs Predicted Rating for Titanic:

In [None]:
titanic_df = get_titanic_df(aggregated_df)
view_titanic_actual_rating_vs_prediction(titanic_df, pipeline_model)

Actual Avg Rating,Predicted Avg Rating
7.9,6.625432087706436


**Q13** Create dummy variables for each of the top 10 movie genres for Q10. These variable should have a value of 1 if the movie was rated with that genre and 0 otherwise. For example the 1997 movie Titanic should have a 1 in the dummy variable column for Romance, and a 1 in the dummy variable column for Drama, and 0's in all the other dummy variable columns.

Does adding these variable to the regression improve your results? What is the new RMSE and predicted rating for the 1997 movie Titanic.

In [None]:
top10_genres = genre_vote_totals.sortBy(lambda x: x[1], ascending=False) \
  .map(lambda x: x[0]) \
  .take(10)

print(top10_genres)

['Drama', 'Action', 'Comedy', 'Adventure', 'Crime', 'Thriller', 'Sci-Fi', 'Romance', 'Mystery', 'Horror']


In [None]:
aggregated_df_with_genres = aggregated_df.join(title_basics_df_preprocessed, "tconst")

for genre in top10_genres:
  aggregated_df_with_genres = aggregated_df_with_genres.withColumn(genre, f.when(f.array_contains(f.col('genres'), genre), f.lit(1)).otherwise(f.lit(0)))

In [None]:
# Print Titanic information to check if the dummies are correct
aggregated_df_with_genres.where(col("tconst") == 'tt0120338').display()

tconst,n_movies,avg_distance_to_kevin_bacon,avg_out_degree,avg_pagerank,avg_rating,numVotes,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres,Drama,Action,Comedy,Adventure,Crime,Thriller,Sci-Fi,Romance,Mystery,Horror
tt0120338,317.0,2.0,292.6666666666667,9.702508039423831,7.9,1286735,movie,Titanic,Titanic,0,1997,,194,"List(Drama, Romance)",1,0,0,0,0,0,0,1,0,0


Create the ML pipeline using the categorical variables:

In [None]:
imputer = MF.Imputer(  
    strategy="mean",
    inputCols=["n_movies", "avg_distance_to_kevin_bacon", "avg_out_degree", "avg_pagerank"],
    outputCols=["n_movies_i", "avg_distance_to_kevin_bacon_i", "avg_out_degree_i", "avg_pagerank_i"],
)

continuous_assembler = MF.VectorAssembler(  
    inputCols=["n_movies_i", "avg_distance_to_kevin_bacon_i", "avg_out_degree_i", "avg_pagerank_i"],
    outputCol="continuous",
)

continuous_scaler = MF.MinMaxScaler(  
    inputCol="continuous",
    outputCol="features_scaled",
)

categorical_assembler = MF.VectorAssembler(
    inputCols=top10_genres + ["features_scaled"],
    outputCol="features"
)

lr = LinearRegression(
    featuresCol="features", labelCol="avg_rating", predictionCol="prediction"
)

pipeline_w_dummies = Pipeline(stages=[imputer, continuous_assembler, continuous_scaler, categorical_assembler, lr])

In [None]:
train_w_dummies, test_w_dummies = aggregated_df_with_genres.randomSplit([0.7, 0.3], seed = 0)

train_w_dummies.cache()

linear_regression_w_dummies = pipeline_w_dummies.fit(train_w_dummies)
results_w_dummies = linear_regression_w_dummies.transform(test_w_dummies)

In [None]:
rmse = evaluator.evaluate(results_w_dummies)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

Root Mean Squared Error (RMSE) on test data: 1.322


View Actual Rating vs Predicted Rating for Titanic:

In [None]:
titanic_df = get_titanic_df(aggregated_df_with_genres)
view_titanic_actual_rating_vs_prediction(titanic_df, linear_regression_w_dummies)

Actual Avg Rating,Predicted Avg Rating
7.9,6.65981918791074


**Q14 - Open Question**: Improve your model by testing different machine learning algorithms, using hyperparameter tuning on these algorithms, changing the included features. What is the RMSE of you final model and what rating does it predict for the 1997 movie Titanic.

In [None]:
# Just renaming
features_df = aggregated_df_with_genres

In [None]:
# Store Titanic DF that will be used by the models later, also cache it as it will be reused
titanic_df = get_titanic_df(features_df).cache()

In [None]:
train, test = features_df.randomSplit([0.7, 0.3], seed = 0)

train.cache()

Out[88]: DataFrame[tconst: string, n_movies: double, avg_distance_to_kevin_bacon: double, avg_out_degree: double, avg_pagerank: double, avg_rating: double, numVotes: int, titleType: string, primaryTitle: string, originalTitle: string, isAdult: int, startYear: int, endYear: int, runtimeMinutes: int, genres: array<string>, Drama: int, Action: int, Comedy: int, Adventure: int, Crime: int, Thriller: int, Sci-Fi: int, Romance: int, Mystery: int, Horror: int]

In [None]:

imputer = MF.Imputer(  
    strategy="mean",
    inputCols=["n_movies", "numVotes", "runtimeMinutes", "avg_distance_to_kevin_bacon", "avg_out_degree", "avg_pagerank"],
    outputCols=["n_movies_i", "numVotes_i", "runtimeMinutes_i", "avg_distance_to_kevin_bacon_i", "avg_out_degree_i", "avg_pagerank_i"],
)

continuous_assembler = MF.VectorAssembler(  
    inputCols=["n_movies_i", "numVotes_i", "runtimeMinutes_i", "avg_distance_to_kevin_bacon_i", "avg_out_degree_i", "avg_pagerank_i"],
    outputCol="continuous",
)

continuous_scaler = MF.MinMaxScaler(  
    inputCol="continuous",
    outputCol="features_scaled",
)

categorical_assembler = MF.VectorAssembler(
    inputCols=top10_genres + ["features_scaled"],
    outputCol="features"
)

### Random Forest Regressor

Build model:

In [None]:
random_forest = RandomForestRegressor(featuresCol="features", labelCol="avg_rating", seed=42, numTrees=20, maxDepth=5)

random_forest_pipeline = Pipeline(stages=[imputer, continuous_assembler, continuous_scaler, categorical_assembler, random_forest])

random_forest_pipeline_model = random_forest_pipeline.fit(train)
results_random_forest =  random_forest_pipeline_model.transform(test)

Calculate RMSE:

In [None]:
rmse = evaluator.evaluate(results_random_forest)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

Root Mean Squared Error (RMSE) on test data: 1.231


View Actual Rating vs Predicted Rating for Titanic:

In [None]:
view_titanic_actual_rating_vs_prediction(titanic_df, random_forest_pipeline_model)

Actual Avg Rating,Predicted Avg Rating
7.9,6.551805073412251


### Gradient-Boosted Trees

Build model:

In [None]:
gbt = GBTRegressor(featuresCol="features", labelCol="avg_rating", maxDepth=5, seed=42)

gbt_pipeline = Pipeline(stages=[imputer, continuous_assembler, continuous_scaler, categorical_assembler, gbt])

gbt_pipeline_model = gbt_pipeline.fit(train)
results_gbt = gbt_pipeline_model.transform(test)

Calculate RMSE:

In [None]:
rmse = evaluator.evaluate(results_gbt)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

Root Mean Squared Error (RMSE) on test data: 1.206


View Actual Rating vs Predicted Rating for Titanic:

In [None]:
view_titanic_actual_rating_vs_prediction(titanic_df, gbt_pipeline_model)

Actual Avg Rating,Predicted Avg Rating
7.9,7.113691138540517


### Hyperparameters tuning

Let's apply hyperparameter tuning to GBT

In [None]:
grid_search = (
    ParamGridBuilder() 
    .addGrid(gbt.maxDepth, [2, 5])
    .addGrid(gbt.maxIter, [10, 30]) 
    .build()
)

In [None]:
cv = CrossValidator(
    estimator=gbt_pipeline,
    estimatorParamMaps=grid_search,
    evaluator=evaluator,
    numFolds=3,
    seed=42,
    collectSubModels=True,
    parallelism=2 # Comment this line or set parallelism to 1 if using single node cluster
)

In [None]:
cv_model = cv.fit(train)
print(cv_model.avgMetrics)

[1.2389238750900478, 1.2299268411510222, 1.211564077225533, 1.1945191709475644]


In [None]:
cv_best_model = cv_model.bestModel
print("Best model maxDepth = ", cv_best_model.stages[-1]._java_obj.getMaxDepth())
print("Best model maxIter = ", cv_best_model.stages[-1]._java_obj.getMaxIter())

Best model maxDepth =  5
Best model maxIter =  30


In [None]:
results_cv =  cv_best_model.transform(test)

Calculate RMSE:

In [None]:
rmse = evaluator.evaluate(results_cv)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

Root Mean Squared Error (RMSE) on test data: 1.198


View Actual Rating vs Predicted Rating for Titanic:

In [None]:
view_titanic_actual_rating_vs_prediction(titanic_df, cv_best_model)

Actual Avg Rating,Predicted Avg Rating
7.9,7.032068008456022
