In [0]:
# Install wget
%pip install wget

Python interpreter will be restarted.
Collecting wget
  Downloading wget-3.2.zip (10 kB)
Building wheels for collected packages: wget
  Building wheel for wget (setup.py): started
  Building wheel for wget (setup.py): finished with status 'done'
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9672 sha256=a3244a34fe0e8a834894fbfee26e845eeb8789cfa022681cc464c29c17968ab1
  Stored in directory: /root/.cache/pip/wheels/04/5f/3e/46cc37c5d698415694d83f607f833f83f0149e49b3af9d0f38
Successfully built wget
Installing collected packages: wget
Successfully installed wget-3.2
Python interpreter will be restarted.


In [0]:
#Load The Data
import wget
from pyspark import SparkFiles

# Define the URLs for the IMDb datasets
name_basics_url = "https://datasets.imdbws.com/name.basics.tsv.gz"
title_akas_url = "https://datasets.imdbws.com/title.akas.tsv.gz"
title_basics_url = "https://datasets.imdbws.com/title.basics.tsv.gz"
title_crew_url = "https://datasets.imdbws.com/title.crew.tsv.gz"
title_episode_url = "https://datasets.imdbws.com/title.episode.tsv.gz"
title_principals_url = "https://datasets.imdbws.com/title.principals.tsv.gz"
title_ratings_url = "https://datasets.imdbws.com/title.ratings.tsv.gz"

# List of dataset URLs
data_set_urls = [
    name_basics_url,
    title_akas_url,
    title_basics_url,
    title_crew_url,
    title_episode_url,
    title_principals_url,
    title_ratings_url
]

# Download and copy datasets to DBFS
for url in data_set_urls: 
    # Get the file name from the URL
    f_start = url.rfind("/") 
    f_name = url[(f_start + 1):]
    
    # Download the dataset
    tsv_path = wget.download(url, f_name)
    
    # Copy to DBFS
    dbutils.fs.cp(f"file:/databricks/driver/{f_name}", f"dbfs:/FileStore/{f_name}")

# Display the files in the FileStore directory
display(dbutils.fs.ls('dbfs:/FileStore/'))


path,name,size,modificationTime
dbfs:/FileStore/degrees/,degrees/,0,0
dbfs:/FileStore/name.basics.tsv.gz,name.basics.tsv.gz,282889413,1744299075000
dbfs:/FileStore/tables/,tables/,0,0
dbfs:/FileStore/title.akas.tsv.gz,title.akas.tsv.gz,450103193,1744299104000
dbfs:/FileStore/title.basics.tsv.gz,title.basics.tsv.gz,204295954,1744299121000
dbfs:/FileStore/title.crew.tsv.gz,title.crew.tsv.gz,75652116,1744299127000
dbfs:/FileStore/title.episode.tsv.gz,title.episode.tsv.gz,49112241,1744299131000
dbfs:/FileStore/title.principals.tsv.gz,title.principals.tsv.gz,711208252,1744299174000
dbfs:/FileStore/title.ratings.tsv.gz,title.ratings.tsv.gz,7830233,1744299181000


**1. Load the data**

In [0]:
# Base path
base_path = "/FileStore/"

# Load GZ-compressed TSV files directly
name_basics_df = spark.read.option("header", "true").option("sep", "\t").option("nullValue", "\\N").csv(base_path + "name.basics.tsv.gz")
title_akas_df = spark.read.option("header", "true").option("sep", "\t").option("nullValue", "\\N").csv(base_path + "title.akas.tsv.gz")
title_basics_df = spark.read.option("header", "true").option("sep", "\t").option("nullValue", "\\N").csv(base_path + "title.basics.tsv.gz")
title_crew_df = spark.read.option("header", "true").option("sep", "\t").option("nullValue", "\\N").csv(base_path + "title.crew.tsv.gz")
title_episode_df = spark.read.option("header", "true").option("sep", "\t").option("nullValue", "\\N").csv(base_path + "title.episode.tsv.gz")
title_principals_df = spark.read.option("header", "true").option("sep", "\t").option("nullValue", "\\N").csv(base_path + "title.principals.tsv.gz")
title_ratings_df = spark.read.option("header", "true").option("sep", "\t").option("nullValue", "\\N").csv(base_path + "title.ratings.tsv.gz")


In [0]:
dfs = {
    "title_basics_df": title_basics_df,
    "title_ratings_df": title_ratings_df,
    "title_crew_df": title_crew_df,
    "title_principals_df": title_principals_df,
    "title_episode_df": title_episode_df,
    "title_akas_df": title_akas_df,
    "name_basics_df": name_basics_df
}

for name, df in dfs.items():
    print(f"\n== Schema for {name} ==")
    df.printSchema()
    print(f"\n== Preview of {name} ==")
    df.show(5)



== Schema for title_basics_df ==
root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- endYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)


== Preview of title_basics_df ==
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|   null|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown 

**2. How many total people in data set**
- Total number of people in the IMDb dataset: 14325453

In [0]:
total_people = name_basics_df.count()
print(f"Total number of people in the IMDb dataset: {total_people}")

Total number of people in the IMDb dataset: 14325453


In [0]:
from pyspark.sql.functions import countDistinct

distinct_people = name_basics_df.select("nconst").distinct().count()
print(f"Total distinct people (nconst): {distinct_people}")

Total distinct people (nconst): 14325453


**3. What is the earliest year of birth**
- Lucio Anneo Seneca| 4        |writer 

In [0]:
from pyspark.sql.functions import col

# Step 1: Cast 'birthYear' to Integer
name_basics_df = name_basics_df.withColumn("birthYear", col("birthYear").cast("int"))

# Step 2: Find the earliest (unfiltered) birth year
earliest_birth = name_basics_df \
    .filter(col("birthYear").isNotNull()) \
    .orderBy("birthYear") \
    .select("birthYear") \
    .first()["birthYear"]

print(f"The earliest recorded birth year is: {earliest_birth}")

# Step 3: Show the person(s) born in that year
name_basics_df \
    .filter(col("birthYear") == earliest_birth) \
    .select("primaryName", "birthYear", "primaryProfession") \
    .show(truncate=False)


The earliest recorded birth year is: 4
+------------------+---------+-----------------+
|primaryName       |birthYear|primaryProfession|
+------------------+---------+-----------------+
|Lucio Anneo Seneca|4        |writer           |
+------------------+---------+-----------------+



**4. How many years ago was this person born**
- Lucio Anneo Seneca was born 2021 years ago.


In [0]:
from datetime import datetime

# Get the current year
current_year = datetime.now().year

# Use previously found earliest_birth value (which was 4)
years_ago = current_year - earliest_birth

print(f"Lucio Anneo Seneca was born {years_ago} years ago.")


Lucio Anneo Seneca was born 2021 years ago.


**5. Using only the data in the data set is this date of birth correct . explain the answer**
- Yes, based only on the IMDb dataset, the date of birth (year 4) is considered valid.
There's no conflict in the data: both birth and death years are present, and IMDb includes historical figures when they’re credited in titles. The dataset does not verify historical accuracy — it only reflects credits from the entertainment industry.
Yes, based only on the IMDb dataset, the date of birth (year 4) is considered valid. There's no conflict in the data: both birth and death years are present, and IMDb includes historical figures when they’re credited in titles. The dataset does not verify historical accuracy — it only reflects credits from the entertainment industry.

In [0]:
# Find Seneca by name (case-insensitive contains)
seneca_df = name_basics_df.filter(col("primaryName").like("%Lucio Anneo Seneca%"))

seneca_df.select("nconst", "primaryName", "birthYear", "deathYear", "primaryProfession", "knownForTitles") \
         .show(truncate=False)


+---------+------------------+---------+---------+-----------------+---------------------------------------+
|nconst   |primaryName       |birthYear|deathYear|primaryProfession|knownForTitles                         |
+---------+------------------+---------+---------+-----------------+---------------------------------------+
|nm0784172|Lucio Anneo Seneca|4        |65       |writer           |tt0043802,tt0218822,tt0049203,tt0972562|
+---------+------------------+---------+---------+-----------------+---------------------------------------+



**6. What is the latest data of birth**
- The most recent recorded birth year is: 2024


In [0]:
# Step 1: Cast 'birthYear' to Integer
name_basics_df = name_basics_df.withColumn("birthYear", col("birthYear").cast("int"))

# Step 2: Find the latest (unfiltered) birth year
latest_birth = name_basics_df \
    .filter(col("birthYear").isNotNull()) \
    .orderBy(col("birthYear").desc()) \
    .select("birthYear") \
    .first()["birthYear"]

print(f"The most recent recorded birth year is: {latest_birth}")

# Step 3: Show the person(s) born in that year
name_basics_df \
    .filter(col("birthYear") == latest_birth) \
    .select("primaryName", "birthYear", "primaryProfession") \
    .show(truncate=False)


The most recent recorded birth year is: 2024
+--------------------+---------+-------------------------------+
|primaryName         |birthYear|primaryProfession              |
+--------------------+---------+-------------------------------+
|Ronnie Lordi        |2024     |actor,writer,producer          |
|Aris A. Stavropoulos|2024     |producer                       |
|Christina Goitia    |2024     |actress                        |
|Turner Samuel       |2024     |null                           |
|Moo Deng            |2024     |archive_footage                |
|Alexandra Ardelyan  |2024     |writer,editor,director         |
|Jerzy Ficowski      |2024     |writer,miscellaneous,soundtrack|
+--------------------+---------+-------------------------------+



**7. How many people do not have a year of birth**
- Number of people without a recorded birth year: 13682359


In [0]:
# Ensure birthYear is cast to integer
name_basics_df = name_basics_df.withColumn("birthYear", col("birthYear").cast("int"))

# Count people where birthYear is null
missing_birth_year_count = name_basics_df.filter(col("birthYear").isNull()).count()

print(f"Number of people without a recorded birth year: {missing_birth_year_count}")


Number of people without a recorded birth year: 13682359


**8. What is the length of the longest short after 1900**
- The longest short film after 1900 is 'Tanha Dar Mazrae' (2024) with a runtime of 250 minutes.


In [0]:
# Ensure correct data types
title_basics_df = title_basics_df.withColumn("startYear", col("startYear").cast("int")) \
                                 .withColumn("runtimeMinutes", col("runtimeMinutes").cast("int"))

# Filter for short films after 1900 with valid runtime
longest_short_df = title_basics_df.filter(
    (col("titleType") == "short") &
    (col("startYear") > 1900) &
    (col("runtimeMinutes").isNotNull())
)

# Get the longest short film
longest_short = longest_short_df.orderBy(col("runtimeMinutes").desc()).select(
    "primaryTitle", "startYear", "runtimeMinutes"
).first()

# Show result
print(f"The longest short film after 1900 is '{longest_short['primaryTitle']}' "
      f"({longest_short['startYear']}) with a runtime of {longest_short['runtimeMinutes']} minutes.")


The longest short film after 1900 is 'Tanha Dar Mazrae' (2024) with a runtime of 250 minutes.


SQL

In [0]:
title_basics_df.createOrReplaceTempView("title_basics")

In [0]:
%sql
-- Filter for short films after 1900 with valid runtime
WITH longest_short AS (
    SELECT primaryTitle, startYear, runtimeMinutes
    FROM title_basics
    WHERE titleType = 'short'
      AND startYear > 1900
      AND runtimeMinutes IS NOT NULL
)
-- Get the longest short film
SELECT primaryTitle, startYear, runtimeMinutes
FROM longest_short
ORDER BY runtimeMinutes DESC
LIMIT 1;


primaryTitle,startYear,runtimeMinutes
Tanha Dar Mazrae,2024,250


**9. What is the length of the shortest movie after 1900**
- The shortest movie after 1900 is 'George White's Scandals' (1934) with a runtime of 1 minutes.


In [0]:
# Ensure correct data types
title_basics_df = title_basics_df.withColumn("startYear", col("startYear").cast("int")) \
                                 .withColumn("runtimeMinutes", col("runtimeMinutes").cast("int"))

# Filter for full-length movies after 1900 with valid runtime
shortest_movie_df = title_basics_df.filter(
    (col("titleType") == "movie") &
    (col("startYear") > 1900) &
    (col("runtimeMinutes").isNotNull()) &
    (col("runtimeMinutes") > 0)  # avoid weird zero-minute entries
)

# Get the shortest movie
shortest_movie = shortest_movie_df.orderBy(col("runtimeMinutes").asc()).select(
    "primaryTitle", "startYear", "runtimeMinutes"
).first()

# Show result
print(f"The shortest movie after 1900 is '{shortest_movie['primaryTitle']}' "
      f"({shortest_movie['startYear']}) with a runtime of {shortest_movie['runtimeMinutes']} minutes.")


The shortest movie after 1900 is 'George White's Scandals' (1934) with a runtime of 1 minutes.


SQL

In [0]:
title_basics_df.createOrReplaceTempView("title_basics")


In [0]:
%sql
-- Filter for full-length movies after 1900 with valid runtime
WITH shortest_movie AS (
    SELECT primaryTitle, startYear, runtimeMinutes
    FROM title_basics
    WHERE titleType = 'movie'
      AND startYear > 1900
      AND runtimeMinutes IS NOT NULL
      AND runtimeMinutes > 0  -- Avoid zero-minute entries
)
-- Get the shortest movie
SELECT primaryTitle, startYear, runtimeMinutes
FROM shortest_movie
ORDER BY runtimeMinutes ASC
LIMIT 1;


primaryTitle,startYear,runtimeMinutes
George White's Scandals,1934,1


**10. Provide a list of all of the genres represented**
- |Action|Adult|Adventure  |Animation|Biography|Comedy|Crime|Documentary|Drama|Family|Fantasy|Film-Noir|Game-Show|History|Horror|Music|Musical|Mystery|News|Reality-TV

In [0]:
from pyspark.sql.functions import split, explode

# Filter nulls and split genre strings into arrays
genres_df = title_basics_df \
    .filter(col("genres").isNotNull()) \
    .withColumn("genre", explode(split(col("genres"), ",")))

# Get distinct genres
unique_genres = genres_df.select("genre").distinct().orderBy("genre")

# Show all genres
unique_genres.show(truncate=False)


+-----------+
|genre      |
+-----------+
|Action     |
|Adult      |
|Adventure  |
|Animation  |
|Biography  |
|Comedy     |
|Crime      |
|Documentary|
|Drama      |
|Family     |
|Fantasy    |
|Film-Noir  |
|Game-Show  |
|History    |
|Horror     |
|Music      |
|Musical    |
|Mystery    |
|News       |
|Reality-TV |
+-----------+
only showing top 20 rows



**11. What is the higest rated comedy movie in the dataset . note, if there is a tie, the tie shall be broken by the movie with the most votes.**
- The highest-rated comedy movie is 'Here They Go' (2022) with a rating of 10.0 based on 8 votes. Genres: Comedy,Family


In [0]:
# Ensure correct data types
title_ratings_df = title_ratings_df.withColumn("averageRating", col("averageRating").cast("float")) \
                                   .withColumn("numVotes", col("numVotes").cast("int"))

# Filter movies that include 'Comedy' as one of the genres
comedy_movies_df = title_basics_df.filter(
    (col("titleType") == "movie") &
    (col("genres").isNotNull()) &
    (col("genres").like("%Comedy%"))
)

# Join with ratings
comedy_with_ratings = comedy_movies_df.join(title_ratings_df, on="tconst", how="inner")

# Order by rating, then number of votes
top_comedy = comedy_with_ratings.orderBy(
    col("averageRating").desc(),
    col("numVotes").desc()
).select("primaryTitle", "startYear", "averageRating", "numVotes", "genres").first()

# Show result
print(f"The highest-rated comedy movie is '{top_comedy['primaryTitle']}' "
      f"({top_comedy['startYear']}) with a rating of {top_comedy['averageRating']} "
      f"based on {top_comedy['numVotes']} votes. Genres: {top_comedy['genres']}")


The highest-rated comedy movie is 'Here They Go' (2022) with a rating of 10.0 based on 8 votes. Genres: Comedy,Family


**12. Who was the director of the movie.**
- Bryan Bostic|director,writer,editor

In [0]:
# Step 1: Get the tconst of the highest-rated comedy movie
top_comedy_tconst = comedy_with_ratings.orderBy(
    col("averageRating").desc(),
    col("numVotes").desc()
).select("tconst").first()["tconst"]

# Step 2: Get the director nconst(s) from title_crew_df
director_ids = title_crew_df.filter(col("tconst") == top_comedy_tconst) \
    .select("directors").first()["directors"]

# Step 3: Handle multiple directors (split by comma)
director_id_list = director_ids.split(",")

# Step 4: Get director names from name_basics_df
directors_df = name_basics_df.filter(col("nconst").isin(director_id_list)) \
    .select("primaryName", "primaryProfession")

directors_df.show(truncate=False)

+------------+----------------------+
|primaryName |primaryProfession     |
+------------+----------------------+
|Bryan Bostic|director,writer,editor|
+------------+----------------------+



**13. List, if any, the alternate titles for the movie .**
- Alternate titles for the movie with tconst = tt20115996:
- |Here They Go|null  |null    |original   |null      |1           
- |Here They Go|US    |null    |imdbDisplay|null      |0 

In [0]:
# Use the same tconst from earlier (already stored in top_comedy_tconst)

# Filter title.akas for that tconst
alternate_titles_df = title_akas_df.filter(col("titleId") == top_comedy_tconst)

# Check if there are any alternate titles
if alternate_titles_df.count() > 1:  # more than one = at least one alternate besides the original
    print(f"Alternate titles for the movie with tconst = {top_comedy_tconst}:\n")
    alternate_titles_df.select("title", "region", "language", "types", "attributes", "isOriginalTitle") \
        .show(truncate=False)
else:
    print(f"No alternate titles found for the movie with tconst = {top_comedy_tconst}.")


Alternate titles for the movie with tconst = tt20115996:

+------------+------+--------+-----------+----------+---------------+
|title       |region|language|types      |attributes|isOriginalTitle|
+------------+------+--------+-----------+----------+---------------+
|Here They Go|null  |null    |original   |null      |1              |
|Here They Go|US    |null    |imdbDisplay|null      |0              |
+------------+------+--------+-----------+----------+---------------+



**14. Build the degrees of seperation**

In [0]:
name_basics_df.cache()
title_principals_df.cache()
title_basics_df.cache()

Out[109]: DataFrame[tconst: string, titleType: string, primaryTitle: string, originalTitle: string, isAdult: string, startYear: int, endYear: string, runtimeMinutes: int, genres: string]

In [0]:
degree_0 = name_basics_df.filter(name_basics_df.nconst == "nm0000102")
degree_0.write.mode("overwrite").parquet("/FileStore/imdb/degree_0")
degree_0.show()


+---------+-----------+---------+---------+--------------------+--------------------+
|   nconst|primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+-----------+---------+---------+--------------------+--------------------+
|nm0000102|Kevin Bacon|     1958|     null|actor,producer,di...|tt0087277,tt01640...|
+---------+-----------+---------+---------+--------------------+--------------------+



In [0]:
# Get titles featuring nm0000102
actor_titles = title_principals_df.filter(title_principals_df.nconst == "nm0000102") \
                               .select("tconst").distinct()

# Find co-actors in the same titles
degree_1 = title_principals_df.join(actor_titles, "tconst") \
                           .filter(title_principals_df.nconst != "nm0000102") \
                           .select("nconst").distinct()

degree_1.write.mode("overwrite").parquet("/FileStore/imdb/degree_1")
degree_1.show()


+---------+
|   nconst|
+---------+
|nm1139759|
|nm0251041|
|nm5016878|
|nm1635586|
|nm0000123|
|nm1057101|
|nm0000741|
|nm0000704|
|nm5812920|
|nm3994408|
|nm0000119|
|nm0137272|
|nm3666749|
|nm1296595|
|nm0004854|
|nm0086301|
|nm1284039|
|nm0227759|
|nm5441137|
|nm0166921|
+---------+
only showing top 20 rows



In [0]:
degree_1 = spark.read.parquet("/FileStore/imdb/degree_1")

# Find titles of Degree 1 actors
degree_1_titles = title_principals_df.join(degree_1, "nconst").select("tconst").distinct()

# Find Degree 2 actors
degree_2 = title_principals_df.join(degree_1_titles, "tconst") \
                           .select("nconst").distinct() \
                           .subtract(degree_1).subtract(spark.createDataFrame([("nm0000102",)], ["nconst"]))

degree_2.write.mode("overwrite").parquet("/FileStore/imdb/degree_2")
degree_2.show()

+----------+
|    nconst|
+----------+
| nm0005706|
| nm0020967|
| nm0811256|
| nm0085812|
| nm0522844|
| nm0213142|
| nm0732440|
| nm0068551|
| nm0420949|
| nm0110951|
| nm0936837|
| nm1297550|
| nm1219967|
|nm10647687|
| nm1706032|
| nm0652288|
| nm0165145|
| nm0574206|
| nm0560754|
| nm0385591|
+----------+
only showing top 20 rows



In [0]:

degree_2 = spark.read.parquet("/FileStore/imdb/degree_2")

degree_2_titles = title_principals_df.join(degree_2, "nconst").select("tconst").distinct()

degree_3 = title_principals_df.join(degree_2_titles, "tconst") \
                           .select("nconst").distinct() \
                           .subtract(degree_2).subtract(degree_1).subtract(spark.createDataFrame([("nm0000102",)], ["nconst"]))

degree_3.write.mode("overwrite").parquet("/FileStore/imdb/degree_3")
degree_3.show()


+----------+
|    nconst|
+----------+
| nm6128855|
| nm1012680|
|nm11521609|
|nm12198811|
|nm11960936|
| nm0701465|
| nm2748252|
| nm2770056|
|nm11968849|
|nm11969975|
| nm0516714|
| nm1296585|
| nm1690431|
| nm0126962|
|nm11524843|
| nm4695808|
| nm1791085|
|nm11832961|
| nm0240345|
|nm10946376|
+----------+
only showing top 20 rows



In [0]:

degree_3 = spark.read.parquet("/FileStore/imdb/degree_3")

degree_3_titles = title_principals_df.join(degree_3, "nconst").select("tconst").distinct()

degree_4 = title_principals_df.join(degree_3_titles, "tconst") \
                           .select("nconst").distinct() \
                           .subtract(degree_3).subtract(degree_2).subtract(degree_1).subtract(spark.createDataFrame([("nm0000102",)], ["nconst"]))

degree_4.write.mode("overwrite").parquet("/FileStore/imdb/degree_4")
degree_4.show()


+----------+
|    nconst|
+----------+
|nm10379575|
|nm10393973|
|nm10406493|
|nm10418659|
|nm10420177|
|nm10425330|
|nm10473888|
|nm10485417|
|nm10511950|
|nm10519654|
|nm10526909|
|nm10556435|
|nm10569784|
|nm10577666|
|nm10592182|
|nm10647654|
| nm1066295|
|nm10683152|
|nm10695122|
|nm10703799|
+----------+
only showing top 20 rows



In [0]:
degree_1 = spark.read.parquet("/FileStore/imdb/degree_1")
degree_2 = spark.read.parquet("/FileStore/imdb/degree_2")
degree_3 = spark.read.parquet("/FileStore/imdb/degree_3")
degree_4 = spark.read.parquet("/FileStore/imdb/degree_4")

degree_4_titles = title_principals_df.join(degree_4, "nconst").select("tconst").distinct()

degree_5 = title_principals_df.join(degree_4_titles, "tconst") \
                           .select("nconst").distinct() \
                           .subtract(degree_4).subtract(degree_3).subtract(degree_2).subtract(degree_1).subtract(spark.createDataFrame([("nm0000102",)], ["nconst"]))

degree_5.write.mode("overwrite").parquet("/FileStore/imdb/degree_5")
degree_5.show()


+---------+
|   nconst|
+---------+
|nm0007376|
|nm0041906|
|nm0076839|
|nm0105401|
|nm0078364|
|nm0068274|
|nm0101690|
|nm0029547|
|nm0089739|
|nm0070150|
|nm0045562|
|nm0071535|
|nm0072291|
|nm0094111|
|nm0056673|
|nm0024304|
|nm0121462|
|nm0069436|
|nm0097740|
|nm0120748|
+---------+
only showing top 20 rows



In [0]:
degree_5 = spark.read.parquet("/FileStore/imdb/degree_5")

degree_5_titles = title_principals_df.join(degree_5, "nconst").select("tconst").distinct()

degree_6 = title_principals_df.join(degree_5_titles, "tconst") \
                           .select("nconst").distinct() \
                           .subtract(degree_5).subtract(degree_4).subtract(degree_3).subtract(degree_2).subtract(degree_1).subtract(spark.createDataFrame([("nm0000102",)], ["nconst"]))

degree_6.write.mode("overwrite").parquet("/FileStore/imdb/degree_6")
degree_6.show()


+----------+
|    nconst|
+----------+
| nm0046221|
| nm0729441|
|nm10034173|
|nm10036721|
|nm10109309|
| nm0326818|
| nm0602752|
| nm0208166|
|nm10036713|
|nm10119873|
| nm0865695|
|nm10029633|
| nm0375367|
|nm10102709|
|nm10039383|
| nm0275066|
|nm10125415|
| nm0867061|
|nm10050922|
|nm10045212|
+----------+
only showing top 20 rows



**15. Which degree of seperation contains the most people**
- DEGREE 3

In [0]:
degrees = []
for i in range(7):
    df = spark.read.parquet(f"/FileStore/imdb/degree_{i}")
    count = df.count()
    degrees.append((f"Degree {i}", count))

# Convert to DataFrame for better display
degree_counts_df = spark.createDataFrame(degrees, ["Degree", "Count"])
degree_counts_df.show()

+--------+-------+
|  Degree|  Count|
+--------+-------+
|Degree 0|      1|
|Degree 1|   4769|
|Degree 2| 723186|
|Degree 3|3902063|
|Degree 4|1587019|
|Degree 5| 159851|
|Degree 6|  16758|
+--------+-------+



In [0]:
from pyspark.sql.functions import lit
from functools import reduce

degree_dfs = []

for i in range(7):
    path = f"/FileStore/imdb/degree_{i}"
    try:
        df = spark.read.parquet(path)
        # Ensure only nconst is kept before adding degree column
        df = df.select("nconst").withColumn("degree", lit(i))
        degree_dfs.append(df)
    except Exception as e:
        print(f"Skipping degree_{i}: {e}")

if degree_dfs:
    all_degrees_df = reduce(lambda df1, df2: df1.union(df2), degree_dfs)
    all_degrees_df.groupBy("degree").count().orderBy("count", ascending=False).show()
else:
    print("No degree data was available for analysis.")


+------+-------+
|degree|  count|
+------+-------+
|     3|3902063|
|     4|1587019|
|     2| 723186|
|     5| 159851|
|     6|  16758|
|     1|   4769|
|     0|      1|
+------+-------+



**16. Aside from Degree 0, which Degree containes the most people**
- DEGREE 3

In [0]:
# Group by degree and count the number of people, excluding Degree 0
degree_counts = all_degrees_df.filter(all_degrees_df.degree != 0) \
    .groupBy("degree") \
    .count() \
    .orderBy("count", ascending=False)

degree_counts.show(1)  # Display the degree with the most people

+------+-------+
|degree|  count|
+------+-------+
|     3|3902063|
+------+-------+
only showing top 1 row



**17. Which contains the least**
- DEGREE 1

In [0]:
# Group by degree and count the number of people, excluding Degree 0
degree_counts_least = all_degrees_df.filter(all_degrees_df.degree != 0) \
    .groupBy("degree") \
    .count() \
    .orderBy("count", ascending=True)

degree_counts_least.show(1)  # Display the degree with the least people


+------+-----+
|degree|count|
+------+-----+
|     1| 4769|
+------+-----+
only showing top 1 row



**18. Is the person from question 3 within 6 Degrees of nm0000102, if so, how many ?**
- The earliest year of birth is: 4
- The nconst of this person is: nm0784172
- The person from question 3 (nconst: nm0784172) is within 3 degrees of nm0000102

In [0]:
# Filter out null birth years and cast to int
earliest_birth_year_df = name_basics_df \
    .filter(col("birthYear").isNotNull()) \
    .withColumn("birthYear", col("birthYear").cast("int"))

# Find the earliest birth year
earliest_birth_year = earliest_birth_year_df.agg({"birthYear": "min"}).collect()[0][0]

# Find the nconst for the earliest birth year
earliest_person = earliest_birth_year_df \
    .filter(col("birthYear") == earliest_birth_year) \
    .select("nconst") \
    .collect()[0][0]

print(f"The earliest year of birth is: {earliest_birth_year}")
print(f"The nconst of this person is: {earliest_person}")

The earliest year of birth is: 4
The nconst of this person is: nm0784172


In [0]:
# Is the person from question 3 within 6 Degrees of nm0000102, if so, how many ?
found = False
for i in range(1, 7):
    degree_df = spark.read.parquet(f"/FileStore/imdb/degree_{i}")
    if degree_df.filter(degree_df.nconst == earliest_person).count() > 0:
        print(f"The person from question 3 (nconst: {earliest_person}) is within {i} degrees of nm0000102.")
        found = True
        break

if not found:
    print(f"The person from question 3 (nconst: {earliest_person}) is NOT within 6 degrees of nm0000102.")

The person from question 3 (nconst: nm0784172) is within 3 degrees of nm0000102.


**19. Is nm0000102 within 6 degrees of the movie from question 11, if so, how many ?**
-     The tconst of the highest rated comedy movie is: tt10867894
-     Kevin Bacon (nm0000102) is within 2 degrees of the top-rated comedy movie (tconst: tt10867894)



In [0]:
# Join comedy movies with ratings (explicit join keys)
comedy_with_ratings = comedy_movies.join(
    title_ratings_df, 
    comedy_movies["tconst_basics"] == title_ratings_df["tconst"], 
    how="inner"
)

# Order by averageRating and numVotes
from pyspark.sql.functions import col

top_comedy_movie_tconst = comedy_with_ratings.orderBy(
    col("averageRating").desc(), 
    col("numVotes").desc()
).select("tconst").limit(1).collect()[0][0]

print(f"The tconst of the highest rated comedy movie is: {top_comedy_movie_tconst}")


The tconst of the highest rated comedy movie is: tt10867894


In [0]:
from pyspark.sql.functions import col

# Load actors of the top comedy movie
comedy_movie_actors_df = title_principals_df.filter(col("tconst") == top_comedy_movie_tconst).select("nconst").distinct()
comedy_movie_actors = [row["nconst"] for row in comedy_movie_actors_df.collect()]

found = False
for degree in range(1, 7):
    try:
        degree_df = spark.read.parquet(f"/FileStore/imdb/degree_{degree}")
        match_df = degree_df.filter(col("nconst").isin(comedy_movie_actors))
        if match_df.count() > 0:
            print(f" Kevin Bacon (nm0000102) is within {degree} degrees of the top-rated comedy movie (tconst: {top_comedy_movie_tconst})")
            found = True
            break
    except Exception as e:
        print(f" Could not read degree_{degree}: {e}")

if not found:
    print(f" Kevin Bacon (nm0000102) is NOT within 6 degrees of the top-rated comedy movie (tconst: {top_comedy_movie_tconst})")


 Kevin Bacon (nm0000102) is within 2 degrees of the top-rated comedy movie (tconst: tt10867894)
