# Six Degrees of Kevin Bacon
**Introduction** - Six Degrees of Kevin Bacon is a game based on the "six degrees of separation"
concept, which posits that any two people on Earth are six or fewer acquaintance links apart. Movie
buffs challenge each other to find the shortest path between an arbitrary actor and prolific actor
Kevin Bacon. It rests on the assumption that anyone involved in the film industry can be linked
through their film roles to Bacon within six steps.
The analysis of social networks can be a computationally intensive task, especially when dealing with
large volumes of data. It is also a challenging problem to devise a correct methodology to infer an
informative social network structure. Here, we will analyze a social network of actors and actresses
that co-participated in movies. We will do some simple descriptive analysis, and in the end try to
relate an actor/actress’s position in the social network with the success of the movies in which they
participate.

#### Rules & Notes - Please take your time to read the following points:

1. The submission deadline shall be set for the 10th of June at 23:59.
2. It is acceptable that you **discuss** with your colleagues different approaches to solve each step of the problem set. You are responsible for writing your own code, and analysing the results. Clear cases of cheating will be penalized with 0 points in this assignment;
3. After review of your submission files, and before a mark is attributed, you might be called to orally defend your submission;
4. You will be scored first and foremost by the number of correct answers, secondly by the logic used in the trying to approach each step of the problem set;
5. Consider skipping questions that you are stuck in, and get back to them later;
6. Expect computations to take a few minutes to finish in some of the steps.
7. **IMPORTANT** It is expected you have developed skills beyond writting SQL queries. Any question where you directly write a SQL query (then for example create a temporary table and use spark.sql to pass the query) will receive a 25% penalty. Using the Spark syntax (for example dataframe.select("\*").where("conditions")) is acceptable and does not incur this penalty. Comment your code in a reasonable fashion.
8. **Questions** – Any questions about this assignment should be posted in the Forum@Moodle. The last class will be an open office session for anyone with questions concerning the assignment. 
9. **Delivery** - To fulfil this activity you will have to upload the following materials to Moodle:
    1. An exported IPython notebook. The notebook should be solved (have results displayed), but should contain all neccesary code so that when the notebook is run in databricks it should also replicate these results. This means the all data downloading and processing should be done in this notebook. It is also important you clearly indicate where your final answer to each question is when you are using multiple cells (for example you print "my final anwser is" before your answer or use cell comments). Please make sure to name your file in the following way: *[student_number1]_[student_number2]_submission.ipynb*. As an example: *19740001_197400010_submission.ipynb*
    2. **Delivery** - You will also need to provide a signed statement of authorship, which is present in the last page;
    3. It is recommended you read the whole assignment before starting.
    4. You can add as many cells as you like to answer the questions.
    5. You can make use of caching or persisting your RDDs or Dataframes, this may speed up performance.
    6. If you have trouble with graphframes in databricks (specifically the import statement) you need to make sure the graphframes package is installed on the cluster you are running. If you click home on the left, then click on the graphframes library, from where you can install the package on your cluster (check the graphframes checkbox and click install). Another installation option is using the JAR available on Moodle with the graphframes library.
10. **Note**: By including the name and student number of each group member  in the submission notebook, this will be considered as a declaration of authorship.

#### Data Sources and Description
We will use data from IMDB. You can download raw datafiles
from https://datasets.imdbws.com. Note that the files are tab delimited (.tsv) You can find a
description of the each datafile in https://www.imdb.com/interfaces/

## Questions
### Data loading and preperation
Review the file descriptions and load the necessary data onto your databricks cluser and into spark dataframes. You will need to use shell commands to download the data, unzip the data, load the data into spark. Note that the data might require parsing and preprocessing to be ready for the questions below.

**Hints** You can use 'gunzip' to unzip the .tz files. The data files will then be tab seperated (.tsv), which you can load into a dataframe using the tab seperated option instead of the comma seperated option we have typically used in class: `.option(“sep”,”\t”)`

In [0]:
%sh

wget "https://datasets.imdbws.com/name.basics.tsv.gz" -O /tmp/name.basics.tsv.gz
wget "https://datasets.imdbws.com/title.akas.tsv.gz" -O /tmp/title.akas.tsv.gz
wget "https://datasets.imdbws.com/title.basics.tsv.gz" -O /tmp/title.basics.tsv.gz
wget "https://datasets.imdbws.com/title.crew.tsv.gz" -O /tmp/title.crew.tsv.gz
wget "https://datasets.imdbws.com/title.episode.tsv.gz" -O /tmp/title.episode.tsv.gz
wget "https://datasets.imdbws.com/title.principals.tsv.gz" -O /tmp/title.principals.tsv.gz
wget "https://datasets.imdbws.com/title.ratings.tsv.gz" -O /tmp/title.ratings.tsv.gz

--2024-06-08 04:37:56--  https://datasets.imdbws.com/name.basics.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 18.245.253.70, 18.245.253.117, 18.245.253.85, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|18.245.253.70|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 267595640 (255M) [binary/octet-stream]
Saving to: ‘/tmp/name.basics.tsv.gz’

     0K .......... .......... .......... .......... ..........  0% 19.8M 13s
    50K .......... .......... .......... .......... ..........  0% 23.0M 12s
   100K .......... .......... .......... .......... ..........  0% 12.6M 15s
   150K .......... .......... .......... .......... ..........  0% 83.5M 12s
   200K .......... .......... .......... .......... ..........  0% 13.1M 13s
   250K .......... .......... .......... .......... ..........  0% 49.4M 12s
   300K .......... .......... .......... .......... ..........  0% 25.6M 12s
   350K .......... .......... .......... .......... .......... 

In [0]:
%sh

gunzip -f /tmp/name.basics.tsv.gz
gunzip -f /tmp/title.akas.tsv.gz
gunzip -f /tmp/title.basics.tsv.gz
gunzip -f /tmp/title.crew.tsv.gz
gunzip -f /tmp/title.episode.tsv.gz
gunzip -f /tmp/title.principals.tsv.gz
gunzip -f /tmp/title.ratings.tsv.gz

In [0]:
dbutils.fs.mkdirs("/mnt/data/test")

Out[3]: True

In [0]:
dbutils.fs.mv("file:/tmp/name.basics.tsv", "dbfs:/mnt/data/test")
dbutils.fs.mv("file:/tmp/title.akas.tsv", "dbfs:/mnt/data/test")
dbutils.fs.mv("file:/tmp/title.basics.tsv", "dbfs:/mnt/data/test")
dbutils.fs.mv("file:/tmp/title.crew.tsv", "dbfs:/mnt/data/test")
dbutils.fs.mv("file:/tmp/title.episode.tsv", "dbfs:/mnt/data/test")
dbutils.fs.mv("file:/tmp/title.principals.tsv", "dbfs:/mnt/data/test")
dbutils.fs.mv("file:/tmp/title.ratings.tsv", "dbfs:/mnt/data/test")

Out[4]: True

In [0]:
name_basics_df = spark.read.option('inferSchema', 'true').option('header', 'true').option('sep', '\t').csv('dbfs:/mnt/data/test/name.basics.tsv')
title_akas_df = spark.read.option('inferSchema', 'true').option('header', 'true').option('sep', '\t').csv('dbfs:/mnt/data/test/title.akas.tsv')
title_basics_df = spark.read.option('inferSchema', 'true').option('header', 'true').option('sep', '\t').csv('dbfs:/mnt/data/test/title.basics.tsv')
title_crew_df = spark.read.option('inferSchema', 'true').option('header', 'true').option('sep', '\t').csv('dbfs:/mnt/data/test/title.crew.tsv')
title_episode_df = spark.read.option('inferSchema', 'true').option('header', 'true').option('sep', '\t').csv('dbfs:/mnt/data/test/title.episode.tsv')
title_principals_df = spark.read.option('inferSchema', 'true').option('header', 'true').option('sep', '\t').csv('dbfs:/mnt/data/test/title.principals.tsv')
title_ratings_df = spark.read.option('inferSchema', 'true').option('header', 'true').option('sep', '\t').csv('dbfs:/mnt/data/test/title.ratings.tsv')

### Network Inference, Let’s build a network
In the following questions you will look to summarise the data and build a network. We want to examine a network that abstracts how actors and actress are related through their co-participation in movies. To that end perform the following steps:

**Q1** Create a DataFrame that combines **all the information** on each of the titles (i.e., movies, tv-shows, etc …) and **all of the information** the participants in those movies (i.e., actors, directors, etc … ), make sure the actual names of the movies and participants are included. It may be worth reviewing the following questions to see how this dataframe will be used.

How many rows does your dataframe have?

In [0]:
from pyspark.sql.functions import col 

In [0]:
# Aliasing DataFrames for clarity
titles_df = title_basics_df.alias("titles")
principals_df = title_principals_df.alias("principals")
names_df = name_basics_df.alias("names")
ratings_df = title_ratings_df.alias("ratings")

# Join titles with principals on tconst
titles_with_principals = titles_df.join(
    principals_df,
    col("titles.tconst") == col("principals.tconst"),
    "left"
).select(
    col("titles.tconst").alias("title_tconst"),
    col("titles.*"),
    col("principals.*")
)

# Join the result with names on nconst
titles_principals_names = titles_with_principals.join(
    names_df,
    col("principals.nconst") == col("names.nconst"),
    "left_outer"
).select(
    col("title_tconst"),
    col("titles.*"),
    col("principals.nconst").alias("principal_nconst"),
    col("principals.*"),
    col("names.*")
)

# Join with ratings on tconst
full_title_info = titles_principals_names.join(
    ratings_df,
    col("title_tconst") == col("ratings.tconst"),
    "left"
).select(
    col("title_tconst"),
    col("titles.*"),
    col("principal_nconst"),
    col("names.nconst").alias("name_nconst"),
    col("names.*"),
    col("ratings.*"),
    col("principals.*")
)

# Final selection to avoid duplicate columns
final_df = full_title_info.select(
    col("title_tconst").alias("tconst"),
    col("principal_nconst"),
    col("name_nconst"),
    col("ratings.averageRating"),
    col("ratings.numVotes"),
    col("names.primaryName"),
    col("names.birthYear"),
    col("names.deathYear"),
    col("names.primaryProfession"),
    col("titles.primaryTitle"),
    col("titles.originalTitle"),
    col("titles.isAdult"),
    col("titles.startYear"),
    col("titles.endYear"),
    col("titles.runtimeMinutes"),
    col("titles.genres"),
    col("principals.category"),
    col("principals.job"),
    col("principals.characters")
)

# Show some of the data to verify correctness
final_df.show(5)

+---------+----------------+-----------+-------------+--------+-----------------+---------+---------+--------------------+---------------+---------------+-------+---------+-------+--------------+--------------------+---------------+---+--------------------+
|   tconst|principal_nconst|name_nconst|averageRating|numVotes|      primaryName|birthYear|deathYear|   primaryProfession|   primaryTitle|  originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|       category|job|          characters|
+---------+----------------+-----------+-------------+--------+-----------------+---------+---------+--------------------+---------------+---------------+-------+---------+-------+--------------+--------------------+---------------+---+--------------------+
|tt0969831|       nm0000007|  nm0000007|          8.3|     144|  Humphrey Bogart|     1899|     1957|actor,producer,mi...|Humphrey Bogart|Humphrey Bogart|      0|     2003|     \N|            43|Biography,Documen...|archive_fo

In [0]:
# Q1 FINAL ANSWER 

# Show the number of rows
print(f"My dataframe has {final_df.count()} rows.")

My dataframe has 87263790 rows.


**Q2** Create a new DataFrame based on the previous step, with the following removed:
1. Any participant that is not an actor or actress (as measured by the category column);
1. All adult movies;
1. All dead actors or actresses;
1. All actors or actresses born before 1920 or with no date of birth listed;
1. All titles that are not of the type movie.

How many rows does your dataframe have?

In [0]:
# Filter for actors and actresses
filtered_df = final_df.filter(col("category").isin(["actor", "actress"]))

# Filter out adult movies
filtered_df = filtered_df.filter(col("isAdult") == 0)

# Filter out dead actors/actresses
filtered_df = filtered_df.filter(col("deathYear")=="\\N")

# Filter out actors/actresses born before 1920 or with no date of birth listed
filtered_df = filtered_df.filter((col("birthYear") >= 1920) & (col("birthYear") != "\\N"))

# Filter for movie titles only
filtered_df = filtered_df.filter(col("titleType") == "movie")

In [0]:
# Q2 FINAL ANSWER

# Show the number of rows
print(f"The filtered dataframe has {filtered_df.count()} rows.")

The filtered dataframe has 930625 rows.


**Q3** Convert the above Dataframe to an RDD. Use map and reduce to create a paired RDD which counts how many movies each actor / actress appears in.

Display names of the top 10 actors/actresses according to the number of movies in which they appeared. Be careful to deal with different actors / actresses with the same name, these could be different people.

In [0]:
# Convert DataFrame to RDD
filtered_rdd = filtered_df.rdd

In [0]:
# Create a paired RDD (actor_id, 1) and reduce by key to count the movies
actor_movie_counts_rdd = filtered_rdd.map(lambda row: (row['name_nconst'], 1)).reduceByKey(lambda a, b: a + b)

In [0]:
# Convert the filtered DataFrame to RDD to get actor names
actor_names_rdd = filtered_df.select("name_nconst", "primaryName").distinct().rdd.map(lambda row: (row.name_nconst, row.primaryName))

In [0]:
# Join actor_movie_rdd with actor_names_rdd
actor_movie_count_with_names_rdd = actor_movie_counts_rdd.join(actor_names_rdd)

In [0]:
# Sort by movie count in descending order and take the top 10
top_10_actors_rdd = actor_movie_count_with_names_rdd.sortBy(lambda x: x[1][0], ascending=False).take(10)

In [0]:
# Q3 FINAL ANSWER

# Display names of the top 10 actors/actresses
for actor, (count, name) in top_10_actors_rdd:
    print(f"Actor: {name}, Movie Count: {count}")

Actor: Brahmanandam, Movie Count: 1130
Actor: Jagathy Sreekumar, Movie Count: 659
Actor: Shakti Kapoor, Movie Count: 600
Actor: Eric Roberts, Movie Count: 492
Actor: Aruna Irani, Movie Count: 467
Actor: Nassar, Movie Count: 440
Actor: Mammootty, Movie Count: 437
Actor: Helen, Movie Count: 433
Actor: Tanikella Bharani, Movie Count: 412
Actor: Mohanlal, Movie Count: 409


**Q4** Start with the dataframe from Q2. Generate a DataFrame that lists all links of your network. Here we shall consider that a link connects a pair of actors/actresses if they participated in at least one movie together (actors / actresses should be represented by their unique ID's). For every link we then need anytime a pair of actors were together in a movie as a link in each direction (A -> B and B -> A). However links should be distinct we do not need duplicates when two actors worked together in several movies. 

Display a DataFrame with the first 10 edges.

In [0]:
from pyspark.sql.functions import col, collect_list, explode

# Filtered DataFrame from Q2
df_q2 = filtered_df

In [0]:
# Select necessary columns
actor_movie_df = filtered_df.select("tconst", "name_nconst")

# Group by movie to get the list of actors in each movie
movie_actors_df = actor_movie_df.groupBy("tconst").agg(collect_list("name_nconst").alias("actors"))

# Explode the list of actors to create actor pairs
# First create a new DataFrame where each row contains the movie ID and each pair of actors
def create_pairs(actors):
    pairs = []
    for i in range(len(actors)):
        for j in range(i + 1, len(actors)):
            pairs.append((actors[i], actors[j]))
            pairs.append((actors[j], actors[i]))
    return pairs

# Register the function as a UDF
from pyspark.sql.types import ArrayType, StructType, StructField, StringType
from pyspark.sql.functions import udf

pair_schema = ArrayType(StructType([
    StructField("src", StringType(), False),
    StructField("dst", StringType(), False)
]))

create_pairs_udf = udf(create_pairs, pair_schema)

# Apply the UDF to create pairs
actor_pairs_df = movie_actors_df.withColumn("pairs", explode(create_pairs_udf(col("actors"))))

# Separate src and dst
bidirectional_pairs_df = actor_pairs_df.select(col("pairs.src").alias("src"), col("pairs.dst").alias("dst")).distinct()

In [0]:
# Q4 FINAL ANSWER

# Show the first 10 edges
bidirectional_pairs_df.show(10)

+---------+---------+
|      src|      dst|
+---------+---------+
|nm0009777|nm0904537|
|nm0175356|nm0001549|
|nm0292383|nm0661715|
|nm0542278|nm0826430|
|nm0352020|nm0749195|
|nm0070458|nm0632288|
|nm0001549|nm0175356|
|nm0826430|nm0542278|
|nm0904537|nm0009777|
|nm0749195|nm0352020|
+---------+---------+
only showing top 10 rows



**Q5** Compute the page rank of each actor. This can be done using GraphFrames or
by using RDDs and the iterative implementation of the PageRank algorithm. Do not take
more than 5 iterations and use reset probility = 0.1.

List the top 10 actors / actresses by pagerank.

In [0]:
from graphframes import GraphFrame
from pyspark.sql.functions import col 

In [0]:
# Create vertices DataFrame
vertices_df = filtered_df.select(col("name_nconst").alias("id")).distinct()

In [0]:
# Create edges DataFrame from bidirectional_pairs_df
edges_df = bidirectional_pairs_df.select(col("src").alias("src"), col("dst").alias("dst"))

In [0]:
# Initialize GraphFrame
g = GraphFrame(vertices_df, edges_df)



In [0]:
# Compute PageRank
pagerank_results = g.pageRank(resetProbability=0.1, maxIter=5)



In [0]:
# Join PageRank results with filtered_df to get actor names
pagerank_with_names = pagerank_results.vertices.join(filtered_df, pagerank_results.vertices.id == filtered_df.name_nconst, "inner") \
                                               .select("id", "pagerank", "primaryName") \
                                               .distinct()

In [0]:
# Q5 FINAL ANSWER

# Display top 10 actors by PageRank
top_10_actors = pagerank_with_names.orderBy(col("pagerank").desc()).select("id", "primaryName", "pagerank").limit(10)
top_10_actors.show()

+---------+----------------+------------------+
|       id|     primaryName|          pagerank|
+---------+----------------+------------------+
|nm0000616|    Eric Roberts| 62.76730717370119|
|nm0000514|  Michael Madsen|  33.8515453234617|
|nm0001803|     Danny Trejo| 26.52740605401198|
|nm0202966|     Keith David|24.799935707466872|
|nm0001595|    Michael Paré| 24.30138288871596|
|nm0261724|     Joe Estevez|23.869279087265877|
|nm0726223|  Richard Riehle|22.930329311633848|
|nm0000532|Malcolm McDowell|22.827677732238232|
|nm0442207|   Lloyd Kaufman|22.669291479635138|
|nm0000448| Lance Henriksen|22.250573940649378|
+---------+----------------+------------------+



**Q6**: Create an RDD with the number of outDegrees for each actor. Display the top 10 by outdegrees.

In [0]:
# Convert DataFrame to RDD
pairs_rdd = bidirectional_pairs_df.rdd

In [0]:
# Map to paired RDD with (actor_id, 1) for each out-degree connection
outdegrees_rdd = pairs_rdd.map(lambda row: (row.src, 1)).reduceByKey(lambda a, b: a + b)

In [0]:
# Convert the filtered DataFrame to RDD to get actor names
actor_names_rdd = filtered_df.select("name_nconst", "primaryName").distinct().rdd.map(lambda row: (row.name_nconst, row.primaryName))

In [0]:
# Join outDegrees with actor names
outdegrees_with_names_rdd = outdegrees_rdd.join(actor_names_rdd)

In [0]:
# Sort by outDegrees in descending order
top_10_outdegrees = outdegrees_with_names_rdd.sortBy(lambda x: x[1][0], ascending=False).take(10)

In [0]:
# Q6 FINAL ANSWER

# Display the top 10 actors by outDegrees with names
for actor, (outdegree, name) in top_10_outdegrees:
    print(f"Actor: {name}, OutDegrees: {outdegree}")

Actor: Eric Roberts, OutDegrees: 1338
Actor: Michael Madsen, OutDegrees: 842
Actor: Anupam Kher, OutDegrees: 761
Actor: Keith David, OutDegrees: 708
Actor: Renji Ishibashi, OutDegrees: 704
Actor: Nassar, OutDegrees: 689
Actor: Gérard Depardieu, OutDegrees: 677
Actor: Danny Trejo, OutDegrees: 664
Actor: Akira Emoto, OutDegrees: 659
Actor: Prakash Raj, OutDegrees: 649


### Let’s play Kevin’s own game

**Q7** Start with the graphframe / dataframe you developed in the previous questions. Using Spark GraphFrame and/or Spark Core library perform the following steps:

1. Identify the id of Kevin Bacon, there are two actors named ‘Kevin Bacon’, we will use the one with the highest degree, that is, the one that participated in most titles;
1. Estimate the shortest path between every actor in the database actors and Kevin Bacon, keep a dataframe with this information as you will need it later;
1. Summarise the data, that is, count the number of actors at each number of degress from kevin bacon (you will need to deal with actors unconnected to kevin bacon, if not connected to Kevin Bacon given these actors / actresses a score/degree of 20).

In [0]:
from graphframes import GraphFrame
from pyspark.sql.functions import col, explode, when, lit

# Identify the ID of Kevin Bacon with the highest degree
filtered_df.filter(col("primaryName") == "Kevin Bacon").show()

+----------+----------------+-----------+-------------+--------+-----------+---------+---------+--------------------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+--------+---+--------------------+
|    tconst|principal_nconst|name_nconst|averageRating|numVotes|primaryName|birthYear|deathYear|   primaryProfession|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|category|job|          characters|
+----------+----------------+-----------+-------------+--------+-----------+---------+---------+--------------------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+--------+---+--------------------+
| tt0112453|       nm0000102|  nm0000102|          7.1|   48294|Kevin Bacon|     1958|       \N|actor,producer,di...|               Balto|               Balto|      0|     1995|     \N|            78|Adventure,Animati...|   actor| \

In [0]:
graph = GraphFrame(vertices_df, edges_df)

# Kevin Bacon's ID
kevin_bacon_id = "nm0000102"

# Estimate shortest paths from Kevin Bacon to all other actors
shortest_paths = graph.shortestPaths(landmarks=[kevin_bacon_id])

# Extract and process the shortest path distances
shortest_paths = shortest_paths.select("id", col("distances").getItem(kevin_bacon_id).alias("distance"))

# Handle unconnected actors by assigning a distance of 20
shortest_paths = shortest_paths.withColumn("distance", col("distance").cast("int"))
shortest_paths = shortest_paths.na.fill(20, subset=["distance"])

# Cache the shortest paths DataFrame as it will be used later
shortest_paths.cache()

# Show the shortest paths DataFrame
shortest_paths.show(10)

+---------+--------+
|       id|distance|
+---------+--------+
|nm0043720|       4|
|nm0053910|       4|
|nm1826454|       4|
|nm0192053|       3|
|nm1333193|       3|
|nm3804601|       4|
|nm4154286|       4|
|nm0955459|       5|
|nm5293695|       4|
|nm7441300|       4|
+---------+--------+
only showing top 10 rows



In [0]:
# Q7 FINAL ANSWER

degree_summary = shortest_paths.groupBy("distance").count().orderBy("distance")

# Show the degree summary
degree_summary.show(15)

+--------+-----+
|distance|count|
+--------+-----+
|       0|    1|
|       1|  354|
|       2|14171|
|       3|58564|
|       4|42443|
|       5| 4843|
|       6|  510|
|       7|   56|
|       8|   20|
|       9|    3|
|      20|15332|
+--------+-----+



### Exploring the data with RDD's

Using RDDs and (not dataframes) answer the following questions (if you loaded your data into spark in a dataframe you can convert to an RDD of rows easily using `.rdd`):

**Q8** Movies can have multiple genres. Considering only titles of the type 'movie' what is the combination of genres that is the most popluar (as measured by number of reviews). Hint: paired RDD's will be useful.

In [0]:
from itertools import combinations
from pyspark.sql.functions import col

# Convert DataFrame to RDD and filter out movies with null numVotes
movies_rdd = final_df.filter((col("titleType") == "movie") & (col("numVotes").isNotNull())).select("genres", "numVotes").rdd

In [0]:
# Function to create genre combinations
def create_genre_combinations(row):
    genres = row.genres.split(',')
    combinations = []
    for i in range(len(genres)):
        for j in range(i + 1, len(genres)):
            combinations.append((f"{genres[i]},{genres[j]}", row.numVotes))
    return combinations

In [0]:
# Create paired RDD with genre combinations and review counts
genre_combinations_rdd = movies_rdd.flatMap(create_genre_combinations)

In [0]:
# Sum the number of reviews for each genre combination
genre_combination_counts = genre_combinations_rdd.reduceByKey(lambda a, b: a + b)

In [0]:
# Find the most popular genre combination
most_popular_genre_combination = genre_combination_counts.max(lambda x: x[1])

In [0]:
# Q8 FINAL ANSWER

# Display the most popular genre combination and the number of reviews
print(f"Most Popular Genre Combination: {most_popular_genre_combination[0]}. Number of Reviews: {most_popular_genre_combination[1]}")

Most Popular Genre Combination: Action,Adventure. Number of Reviews: 4403757243


**Q9** Movies can have multiple genres. Considering only titles of the type 'movie', and movies with more than 400 ratings, what is the combination of genres that has the highest **average movie rating** (you can average the movie rating for each movie in that genre combination). Hint: paired RDD's will be useful.

In [0]:
from itertools import combinations
from pyspark.sql import Row

# Convert DataFrames to RDDs
title_basics_rdd = title_basics_df.rdd
title_ratings_rdd = title_ratings_df.rdd

In [0]:
# Filter for movies only and movies with more than 400 ratings
movies_rdd = title_basics_rdd.filter(lambda row: row['titleType'] == 'movie')
highly_rated_movies_rdd = title_ratings_rdd.filter(lambda row: row['numVotes'] > 400)

In [0]:
# Join RDDs on tconst to get movie ratings
movie_ratings_rdd = movies_rdd.map(lambda row: (row['tconst'], row)) \
                              .join(highly_rated_movies_rdd.map(lambda row: (row['tconst'], (row['averageRating'], row['numVotes'])))) \
                              .map(lambda x: (x[1][0]['genres'], x[1][1]))

In [0]:
# Function to create genre combinations and map to ratings
def genre_combinations(row):
    genres = row[0]
    rating = row[1][0]
    if genres:
        genre_list = genres.split(',')
        for i in range(1, len(genre_list) + 1):
            for combo in combinations(genre_list, i):
                yield (','.join(sorted(combo)), (rating, 1))

In [0]:
# Create genre combinations and map to ratings
genre_combinations_rdd = movie_ratings_rdd.flatMap(genre_combinations)

In [0]:
# Reduce by key to calculate the sum of ratings and the count for each genre combination
genre_ratings_count_rdd = genre_combinations_rdd.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

In [0]:
# Calculate the average rating for each genre combination
genre_avg_ratings_rdd = genre_ratings_count_rdd.mapValues(lambda v: v[0] / v[1])

In [0]:
# Find the genre combination with the highest average rating
highest_avg_rating_genre_combo = genre_avg_ratings_rdd.sortBy(lambda x: x[1], ascending=False).take(1)

In [0]:
# Q9 FINAL ANSWER

# Show the highest average rating genre combination
for combo, avg_rating in highest_avg_rating_genre_combo:
    print(f"Highest Average Rating Genre Combination: {combo} with an average rating of {avg_rating}")

Highest Average Rating Genre Combination: Action,Documentary,Mystery with an average rating of 8.3


**Q10** Movies can have multiple genres. What is **the individual genre** which is the most popular as meaured by number of votes. Votes for multiple genres count towards each genre listed. Hint: flatmap and pairedRDD's will be useful here.

In [0]:
# Convert DataFrames to RDDs
title_basics_rdd = title_basics_df.rdd
title_ratings_rdd = title_ratings_df.rdd

In [0]:
# Filter for movies only
movies_rdd = title_basics_rdd.filter(lambda row: row['titleType'] == 'movie')

In [0]:
# Join RDDs on tconst to get number of votes
movie_votes_rdd = movies_rdd.map(lambda row: (row['tconst'], row)) \
                            .join(title_ratings_rdd.map(lambda row: (row['tconst'], row['numVotes']))) \
                            .map(lambda x: (x[1][0]['genres'], x[1][1]))

In [0]:
# Function to create individual genre records and map to votes
def explode_genres(row):
    genres = row[0]
    numVotes = row[1]
    if genres:
        genre_list = genres.split(',')
        for genre in genre_list:
            yield (genre, numVotes)

In [0]:
# Create individual genre records and map to votes
genres_votes_rdd = movie_votes_rdd.flatMap(explode_genres)

In [0]:
# Reduce by key to sum the number of votes for each genre
genre_votes_rdd = genres_votes_rdd.reduceByKey(lambda a, b: a + b)

In [0]:
# Find the most popular genre by votes
most_popular_genre = genre_votes_rdd.sortBy(lambda x: x[1], ascending=False).take(1)

In [0]:
# Q10 FINAL ANSWER

# Show the most popular genre
for genre, votes in most_popular_genre:
    print(f"Most Popular Genre: {genre} with {votes} votes")

Most Popular Genre: Drama with 572280968 votes


## Engineering the perfect cast
We have created a number of potential features for predicting the rating of a movie based on its cast. Use sparkML to build a simple linear model to predict the rating of a movie based on the following features:

1. The total number of movies in which the actors / actresses have acted (based on Q3)
1. The average pagerank of the cast in each movie (based on Q5)
1. The average outDegree of the cast in each movie (based on Q6)
1. The average value for for the cast of degrees of Kevin Bacon (based on Q7).

You will need to create a dataframe with the required features and label. Use a pipeline to create the vectors required by sparkML and apply the model. Remember to split your dataset, leave 30% of the data for testing, when splitting your data use the option seed=0.

**Q11** Provide the coefficients of the regression and the accuracy of your model on that test dataset according to RSME.

In [0]:
from pyspark.sql.functions import col, avg
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# Extract features
# Q3: Total number of movies each actor has acted in
actor_movie_counts_df = actor_movie_counts_rdd.toDF(["name_nconst", "total_movies"])

In [0]:
# Q5: Average PageRank of the cast in each movie
pagerank_df = pagerank_results.vertices.select(col("id").alias("name_nconst"), col("pagerank"))

In [0]:
# Q6: Average outDegree of the cast in each movie
# Extracting the first element of the struct which contains the numeric value
outdegrees_df = outdegrees_with_names_rdd.map(lambda row: (row[0], row[1][0], row[1][1])).toDF(["name_nconst", "outdegree", "primaryName"])

In [0]:
# Q7: Average degrees of separation from Kevin Bacon
shortest_paths_df = shortest_paths.select(col("id").alias("name_nconst"), col("distance").alias("distance_to_kevin_bacon"))

In [0]:
# Convert columns to numeric types where necessary
actor_movie_counts_df = actor_movie_counts_df.withColumn("total_movies", col("total_movies").cast("double"))
pagerank_df = pagerank_df.withColumn("pagerank", col("pagerank").cast("double"))
outdegrees_df = outdegrees_df.withColumn("outdegree", col("outdegree").cast("double"))
shortest_paths_df = shortest_paths_df.withColumn("distance_to_kevin_bacon", col("distance_to_kevin_bacon").cast("double"))

In [0]:
# Combine all features for each movie
features_df = filtered_df.select("tconst", "name_nconst").distinct()
features_df = features_df.join(actor_movie_counts_df, "name_nconst", "left")
features_df = features_df.join(pagerank_df, "name_nconst", "left")
features_df = features_df.join(outdegrees_df, "name_nconst", "left")
features_df = features_df.join(shortest_paths_df, "name_nconst", "left")

In [0]:
# Aggregate features for each movie
aggregated_features_df = features_df.groupBy("tconst").agg(
    avg("total_movies").alias("avg_total_movies"),
    avg("pagerank").alias("avg_pagerank"),
    avg("outdegree").alias("avg_outdegree"),
    avg("distance_to_kevin_bacon").alias("avg_distance_to_kevin_bacon")
)

In [0]:
# Fill null values with 0 or a meaningful default
aggregated_features_df = aggregated_features_df.na.fill(0)

In [0]:
# Join with ratings to get the labels
ratings_df = final_df.select("tconst", "averageRating").distinct()
data = aggregated_features_df.join(ratings_df, "tconst")

In [0]:
# Check for and handle any remaining null values
data = data.na.fill(0)

In [0]:
# Prepare features and label
assembler = VectorAssembler(
    inputCols=["avg_total_movies", "avg_pagerank", "avg_outdegree", "avg_distance_to_kevin_bacon"],
    outputCol="features"
)

In [0]:
# Aggregate features for each movie
aggregated_features_df = features_df.groupBy("tconst").agg(
    avg("total_movies").alias("avg_total_movies"),
    avg("pagerank").alias("avg_pagerank"),
    avg("outdegree").alias("avg_outdegree"),
    avg("distance_to_kevin_bacon").alias("avg_distance_to_kevin_bacon")
)

In [0]:
# Split data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [0]:
# Define linear regression model
lr = LinearRegression(featuresCol="features", labelCol="averageRating")

In [0]:
# Create pipeline
pipeline = Pipeline(stages=[assembler, lr])

In [0]:
# Train the model
model = pipeline.fit(train_data)

In [0]:
# Make predictions
predictions = model.transform(test_data)

In [0]:
# Evaluate the model
evaluator = RegressionEvaluator(labelCol="averageRating", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

In [0]:
# Get model coefficients
lr_model = model.stages[-1]
coefficients = lr_model.coefficients
intercept = lr_model.intercept

In [0]:
# Q11 FINAL ANSWER

# Display the results
print(f"Coefficients: {coefficients}")
print(f"Intercept: {intercept}")
print(f"RMSE: {rmse}")

Coefficients: [-0.012578245261461634,-0.12734451300732474,0.012555778878938552,-0.05478911587070738]
Intercept: 4.184387990495217
RMSE: 2.900264657664528


**Q12** What score would your model predict for the 1997 movie Titanic.

In [0]:
# Filter for the title "Titanic" and year 1997
titanic_df = title_basics_df.filter((col("primaryTitle") == "Titanic") & (col("startYear") == "1997"))

# Show the filtered result
titanic_df.show()

# Collect the ID for Titanic movie
titanic_id = titanic_df.select("tconst").collect()[0][0]

+---------+---------+------------+-------------+-------+---------+-------+--------------+-----------------+
|   tconst|titleType|primaryTitle|originalTitle|isAdult|startYear|endYear|runtimeMinutes|           genres|
+---------+---------+------------+-------------+-------+---------+-------+--------------+-----------------+
|tt0120338|    movie|     Titanic|      Titanic|      0|     1997|     \N|           194|    Drama,Romance|
|tt0594950|tvEpisode|     Titanic|      Titanic|      0|     1997|     \N|            \N|Documentary,Short|
|tt5722820|tvEpisode|     Titanic|      Titanic|      0|     1997|     \N|            \N| Documentary,News|
+---------+---------+------------+-------------+-------+---------+-------+--------------+-----------------+



In [0]:
# Filter the data for Titanic
titanic_features = data.filter(col("tconst") == titanic_id)

In [0]:
# Step 3: Use the trained model to predict Titanic's rating
titanic_prediction = model.transform(titanic_features)

In [0]:
# Q12 FINAL ANSWER

# Extract and print the predicted rating
predicted_rating = titanic_prediction.select("prediction").collect()[0][0]
print(f"Predicted rating for Titanic (1997): {round(predicted_rating,2)}")

Predicted rating for Titanic (1997): 5.86


**Q13** Create dummy variables for each of the top 10 movie genres for Q10. These variable should have a value of 1 if the movie was rated with that genre and 0 otherwise. For example the 1997 movie Titanic should have a 1 in the dummy variable column for Romance, and a 1 in the dummy variable column for Drama, and 0's in all the other dummy variable columns.

Does adding these variable to the regression improve your results? What is the new RMSE and predicted rating for the 1997 movie Titanic.

In [0]:
from pyspark.sql.functions import col, when, split, sum as _sum
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# Filter for movies only
movies_df = final_df.filter(col("titleType") == "movie")

In [0]:
# Explode genres
movies_with_genres_df = movies_df.withColumn("genre", explode(split(col("genres"), ",")))

In [0]:
# Calculate total votes for each genre
genre_votes_df = movies_with_genres_df.groupBy("genre").agg(_sum("numVotes").alias("total_votes"))

In [0]:
# Sort by total votes and select top 10 genres
top_10_genres_df = genre_votes_df.orderBy(col("total_votes").desc()).limit(10)
top_10_genres = [row['genre'] for row in top_10_genres_df.collect()]


In [0]:
# Display the top 10 genres
print("Top 10 genres by number of votes:")
for genre in top_10_genres:
    print(genre)

Top 10 genres by number of votes:
Drama
Action
Comedy
Adventure
Crime
Thriller
Sci-Fi
Romance
Mystery
Horror


In [0]:
# Create dummy variables for the top 10 genres in final_df
for genre in top_10_genres:
    final_df = final_df.withColumn(f"genre_{genre}", when(col("genres").contains(genre), 1).otherwise(0))

In [0]:
# Q3: Total number of movies each actor has acted in
actor_movie_counts_df = actor_movie_counts_rdd.toDF(["name_nconst", "total_movies"])

In [0]:
# Q5: Average PageRank of the cast in each movie
pagerank_df = pagerank_results.vertices.select(col("id").alias("name_nconst"), col("pagerank"))

In [0]:
# Q6: Average outDegree of the cast in each movie
outdegrees_df = outdegrees_with_names_rdd.map(lambda row: Row(name_nconst=row[0], outdegree=row[1][0])).toDF()

In [0]:
# Q7: Average degrees of separation from Kevin Bacon
shortest_paths_df = shortest_paths.select(col("id").alias("name_nconst"), col("distance").alias("distance_to_kevin_bacon"))

In [0]:
# Combine all features for each movie
features_df = filtered_df.select("tconst", "name_nconst").distinct()
features_df = features_df.join(actor_movie_counts_df, "name_nconst", "left")
features_df = features_df.join(pagerank_df, "name_nconst", "left")
features_df = features_df.join(outdegrees_df, "name_nconst", "left")
features_df = features_df.join(shortest_paths_df, "name_nconst", "left")

In [0]:
# Aggregate features for each movie
aggregated_features_df = features_df.groupBy("tconst").agg(
    avg("total_movies").alias("avg_total_movies"),
    avg("pagerank").alias("avg_pagerank"),
    avg("outdegree").alias("avg_outdegree"),
    avg("distance_to_kevin_bacon").alias("avg_distance_to_kevin_bacon")
)


In [0]:
# Fill null values with 0
aggregated_features_df = aggregated_features_df.na.fill(0)

In [0]:
# Join with ratings to get the labels
ratings_df = final_df.select("tconst", "averageRating").distinct()
data = aggregated_features_df.join(ratings_df, "tconst")

In [0]:
# Join with dummy variables for genres
for genre in top_10_genres:
    genre_col = f"genre_{genre}"
    genre_df = final_df.select("tconst", genre_col).distinct()
    data = data.join(genre_df, "tconst", "left").na.fill(0)

In [0]:
# Check for and handle any remaining null values
data = data.na.fill(0)

In [0]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Prepare feature set
assembler = VectorAssembler(
    inputCols=["avg_total_movies", "avg_pagerank", "avg_outdegree", "avg_distance_to_kevin_bacon"] + [f"genre_{genre}" for genre in top_10_genres],
    outputCol="features"
)
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

In [0]:
# Split data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [0]:
# Define linear regression model
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="averageRating")

In [0]:
# Create pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

In [0]:
model = pipeline.fit(train_data)

In [0]:
# Make predictions
predictions = model.transform(test_data)

In [0]:
# Evaluate the model
evaluator = RegressionEvaluator(labelCol="averageRating", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

In [0]:
# Q13 FINAL ANSWER PART 1

# Display the RMSE
print(f"New RMSE with genre dummy variables: {round(rmse,4)}")
print(f"This is an improvement in relation to the model without genre dummies")

New RMSE with genre dummy variables: 2.7867
This is an improvement in relation to the model without genre dummies


In [0]:
# Predict the rating for Titanic using the updated model
titanic_id = final_df.filter(col("primaryTitle") == "Titanic").select("tconst").distinct().first()[0]

In [0]:
# Extract features for Titanic
titanic_features_df = data.filter(col("tconst") == titanic_id)

In [0]:
titanic_features_df.drop("averageRating")

Out[111]: DataFrame[tconst: string, avg_total_movies: double, avg_pagerank: double, avg_outdegree: double, avg_distance_to_kevin_bacon: double, genre_Drama: int, genre_Action: int, genre_Comedy: int, genre_Adventure: int, genre_Crime: int, genre_Thriller: int, genre_Sci-Fi: int, genre_Romance: int, genre_Mystery: int, genre_Horror: int]

In [0]:
# Fill null values with 0
titanic_features_df = titanic_features_df.na.fill(0)

In [0]:
# # Prepare features for prediction
# titanic_features_vector = assembler.transform(titanic_features_df)

In [0]:
# # Ensure no conflicting columns for prediction
# titanic_features_vector = titanic_features_vector.drop("features")

In [0]:
# # Assemble features for Titanic again
# titanic_assembler = VectorAssembler(
#     inputCols=["avg_total_movies", "avg_pagerank", "avg_outdegree", "avg_distance_to_kevin_bacon"] + [f"genre_{genre}" for genre in top_10_genres],
#     outputCol="features"
# )

In [0]:
#titanic_features_vector = titanic_assembler.transform(titanic_features_df)

In [0]:
#titanic_features_vector = titanic_features_vector.drop("features")

In [0]:
# Make prediction for Titanic
titanic_prediction = model.transform(titanic_features_df)

In [0]:
# Extract and print the predicted rating
predicted_rating = titanic_prediction.select("prediction").collect()

In [0]:
# Q13 FINAL ANSWER PART 2
if predicted_rating:
    print(f"Predicted rating for Titanic (1997): {predicted_rating[0][0]}")
else:
    print("No prediction available for Titanic (1997)")


# We could not get a prediction for Titanic in this question and we could not understand why. We tried everything but to no avail.

No prediction available for Titanic (1997)


**Q14 - Open Question**: Improve your model by testing different machine learning algorithms, using hyperparameter tuning on these algorithms, changing the included features. What is the RMSE of you final model and what rating does it predict for the 1997 movie Titanic.

In [0]:
from pyspark.sql.functions import col, avg, when, lit
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [0]:
# Convert RDDs to DataFrames
actor_movie_counts_df = actor_movie_counts_rdd.toDF(["name_nconst", "total_movies"])
pagerank_df = pagerank_results.vertices.select(col("id").alias("name_nconst"), col("pagerank"))
outdegrees_df = outdegrees_with_names_rdd.map(lambda row: Row(name_nconst=row[0], outdegree=row[1][0])).toDF()
shortest_paths_df = shortest_paths.select(col("id").alias("name_nconst"), col("distance").alias("distance_to_kevin_bacon"))

In [0]:
# Combine all features for each movie
features_df = filtered_df.select("tconst", "name_nconst").distinct()
features_df = features_df.join(actor_movie_counts_df, "name_nconst", "left")
features_df = features_df.join(pagerank_df, "name_nconst", "left")
features_df = features_df.join(outdegrees_df, "name_nconst", "left")
features_df = features_df.join(shortest_paths_df, "name_nconst", "left")


In [0]:
# Combine all features for each movie
features_df = filtered_df.select("tconst", "name_nconst").distinct()
features_df = features_df.join(actor_movie_counts_df, "name_nconst", "left")
features_df = features_df.join(pagerank_df, "name_nconst", "left")
features_df = features_df.join(outdegrees_df, "name_nconst", "left")
features_df = features_df.join(shortest_paths_df, "name_nconst", "left")

In [0]:
# Aggregate features for each movie
aggregated_features_df = features_df.groupBy("tconst").agg(
    avg("total_movies").alias("avg_total_movies"),
    avg("pagerank").alias("avg_pagerank"),
    avg("outdegree").alias("avg_outdegree"),
    avg("distance_to_kevin_bacon").alias("avg_distance_to_kevin_bacon")
)


In [0]:
# Fill null values with 0
aggregated_features_df = aggregated_features_df.na.fill(0)

In [0]:
# Join with ratings to get the labels
ratings_df = final_df.select("tconst", "averageRating").distinct()
data = aggregated_features_df.join(ratings_df, "tconst")

In [0]:
# Check for and handle any remaining null values
data = data.na.fill(0)

In [0]:
# Prepare feature set
assembler = VectorAssembler(
    inputCols=["avg_total_movies", "avg_pagerank", "avg_outdegree", "avg_distance_to_kevin_bacon"],
    outputCol="features4"
)
scaler = StandardScaler(inputCol="features4", outputCol="scaledFeatures4", withStd=True, withMean=False)

In [0]:
# Split data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [0]:
# Define models
lr = LinearRegression(featuresCol="scaledFeatures4", labelCol="averageRating")
rf = RandomForestRegressor(featuresCol="scaledFeatures4", labelCol="averageRating")
gbt = GBTRegressor(featuresCol="scaledFeatures4", labelCol="averageRating")

In [0]:
# Define parameter grids for hyperparameter tuning
paramGridLR = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1, 0.5]).build()
paramGridRF = ParamGridBuilder().addGrid(rf.numTrees, [20, 50, 100]).build()
paramGridGBT = ParamGridBuilder().addGrid(gbt.maxIter, [10, 20, 50]).build()

In [0]:
# Define evaluators
evaluator = RegressionEvaluator(labelCol="averageRating", predictionCol="prediction", metricName="rmse")

In [0]:
# Cross-validation for each model
crossvalLR = CrossValidator(estimator=Pipeline(stages=[assembler, scaler, lr]),
                            estimatorParamMaps=paramGridLR,
                            evaluator=evaluator,
                            numFolds=5)
crossvalRF = CrossValidator(estimator=Pipeline(stages=[assembler, scaler, rf]),
                            estimatorParamMaps=paramGridRF,
                            evaluator=evaluator,
                            numFolds=5)
crossvalGBT = CrossValidator(estimator=Pipeline(stages=[assembler, scaler, gbt]),
                             estimatorParamMaps=paramGridGBT,
                             evaluator=evaluator,
                             numFolds=5)

In [0]:
# Fit models
cvModelLR = crossvalLR.fit(train_data)
cvModelRF = crossvalRF.fit(train_data)
cvModelGBT = crossvalGBT.fit(train_data)

In [0]:
# Evaluate models
predictionsLR = cvModelLR.transform(test_data)
predictionsRF = cvModelRF.transform(test_data)
predictionsGBT = cvModelGBT.transform(test_data)

rmseLR = evaluator.evaluate(predictionsLR)
rmseRF = evaluator.evaluate(predictionsRF)
rmseGBT = evaluator.evaluate(predictionsGBT)



In [0]:
print(f"RMSE for Linear Regression: {rmseLR}")
print(f"RMSE for Random Forest: {rmseRF}")
print(f"RMSE for Gradient Boosted Trees: {rmseGBT}")



In [0]:
# Q14 FINAL ANSWER PART 1

# Choose the best model
bestModel = min((cvModelLR, rmseLR), (cvModelRF, rmseRF), (cvModelGBT, rmseGBT), key=lambda x: x[1])[0]

print(f"Our best model is {bestModel}")



In [0]:
# Predict the rating for Titanic using the best model

# Extract features for Titanic
titanic_features_df = filtered_df.filter(col("tconst") == titanic_tconst).select("tconst", "name_nconst").distinct()



In [0]:
# Perform the joins
titanic_features_df = titanic_features_df.join(actor_movie_counts_df, "name_nconst", "left")
titanic_features_df = titanic_features_df.join(pagerank_df, "name_nconst", "left")
titanic_features_df = titanic_features_df.join(outdegrees_df, "name_nconst", "left")
titanic_features_df = titanic_features_df.join(shortest_paths_df, "name_nconst", "left")

# Aggregate features for Titanic
titanic_aggregated_features_df = titanic_features_df.groupBy("tconst").agg(
    avg("total_movies").alias("avg_total_movies"),
    avg("pagerank").alias("avg_pagerank"),
    avg("outdegree").alias("avg_outdegree"),
    avg("distance_to_kevin_bacon").alias("avg_distance_to_kevin_bacon")
)



In [0]:
# Add dummy variables for Titanic
for genre in top_10_genres:
    titanic_aggregated_features_df = titanic_aggregated_features_df.withColumn(f"genre_{genre}", lit(1) if genre in ["Romance", "Drama"] else lit(0))

# Fill null values with 0
titanic_aggregated_features_df = titanic_aggregated_features_df.na.fill(0)



In [0]:
# Prepare features for prediction
titanic_features_vector = assembler.transform(titanic_aggregated_features_df)
titanic_features_vector = scaler.transform(titanic_features_vector)



In [0]:
# Make prediction for Titanic
titanic_prediction = bestModel.transform(titanic_features_vector).select("prediction").first()[0]



In [0]:
# Q14 FINAL ANSWER PART 2

# Display the prediction for Titanic
print(f"Predicted Rating for Titanic with the best model: {round(titanic_prediction,2)}")

