# Setting up the environment
installing the necessary components in the google colab env

In [None]:
!pip install pyspark findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("IMDB").getOrCreate()

writing some tests to see that the current spark intallation has all the functionalities that will be needed

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# 1. Create a small test DataFrame
data = [(1, "Alice", 1980), (2, "Bob", 1990)]
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("birthYear", IntegerType(), True)
])
df = spark.createDataFrame(data, schema)
df.show()

# 2. Write to Parquet and read back
test_path = "/content/test_parquet"
df.write.mode("overwrite").parquet(test_path)
df_parquet = spark.read.parquet(test_path)
print("✅ Parquet read/write successful")
df_parquet.show()

# 3. Simulate reading a TSV (like IMDB datasets)
tsv_path = "/content/test_people.tsv"
with open(tsv_path, "w") as f:
    f.write("nconst\tprimaryName\tbirthYear\n")
    f.write("nm0000001\tFred Astaire\t1899\n")
    f.write("nm0000002\tLauren Bacall\t1924\n")

df_tsv = spark.read.option("header", True).option("sep", "\t").csv(tsv_path)
print("✅ TSV file loaded with inferred schema:")
df_tsv.printSchema()
df_tsv.show()

print("✅ All tests passed. Spark is ready for the IMDB project")

+---+-----+---------+
| id| name|birthYear|
+---+-----+---------+
|  1|Alice|     1980|
|  2|  Bob|     1990|
+---+-----+---------+

✅ Parquet read/write successful
+---+-----+---------+
| id| name|birthYear|
+---+-----+---------+
|  1|Alice|     1980|
|  2|  Bob|     1990|
+---+-----+---------+

✅ TSV file loaded with inferred schema:
root
 |-- nconst: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: string (nullable = true)

+---------+-------------+---------+
|   nconst|  primaryName|birthYear|
+---------+-------------+---------+
|nm0000001| Fred Astaire|     1899|
|nm0000002|Lauren Bacall|     1924|
+---------+-------------+---------+

✅ All tests passed. Spark is ready for the IMDB project


# The data
the data has been uploaded to a google [drive folder](https://drive.google.com/drive/folders/1mTUgXJaaOItl44Y91fYVoxSX95L_dL5W?usp=sharing)


In [None]:
#https://drive.google.com/drive/folders/1mTUgXJaaOItl44Y91fYVoxSX95L_dL5W?usp=sharing
!pip install -q gdown

!gdown --folder --id 1mTUgXJaaOItl44Y91fYVoxSX95L_dL5W

Retrieving folder contents
Processing file 1LQMYnwYI6ClspulTrTZvq1_WdQKRbT0v name.basics.tsv.gz
Processing file 1cBsbI-GGyspZpK6K9IAmzNr2UMspuBzF title.akas.tsv.gz
Processing file 1J3Tt5yprc7oN7QP6YPL9BlCWP4_CkVgS title.basics.tsv.gz
Processing file 1zlsfX_2roQwS3H1DziTJsyxlDnmdCX3r title.crew.tsv.gz
Processing file 1u3f71O-oSZM1sXic4pl4iZxZbH6S5rRk title.episode.tsv.gz
Processing file 1zHMfFEs6IWuD2ZUSsx9U85zkZYJmCVfu title.principals.tsv.gz
Processing file 1YqsHPfa6Kz2b1xOiJCgtQG1wS_DLJnPe title.ratings.tsv.gz
Retrieving folder contents completed
Building directory structure
Building directory structure completed
Downloading...
From (original): https://drive.google.com/uc?id=1LQMYnwYI6ClspulTrTZvq1_WdQKRbT0v
From (redirected): https://drive.google.com/uc?id=1LQMYnwYI6ClspulTrTZvq1_WdQKRbT0v&confirm=t&uuid=7a9c2f29-eabb-4c01-a34a-1180121b8d50
To: /content/imdb-data/name.basics.tsv.gz
100% 283M/283M [00:01<00:00, 166MB/s]
Downloading...
From (original): https://drive.google.com/uc?id=1

In [None]:
import os
print(os.listdir("/content/imdb-data"))

['name.basics.tsv.gz', 'title.akas.tsv.gz', 'title.ratings.tsv.gz', 'title.crew.tsv.gz', 'title.basics.tsv.gz', 'title.episode.tsv.gz', 'title.principals.tsv.gz']


## Inspecting the data and structure

In [None]:
from pyspark.sql import DataFrame

# Define path and file list
imdb_path = "/content/imdb-data"
files = {
    "name.basics": "name.basics.tsv.gz",
    "title.basics": "title.basics.tsv.gz",
    "title.ratings": "title.ratings.tsv.gz",
    "title.akas": "title.akas.tsv.gz",
    "title.principals": "title.principals.tsv.gz",
    "title.crew": "title.crew.tsv.gz",
    "title.episode": "title.episode.tsv.gz"
}

# Function to inspect structure
def inspect_file(name: str, filename: str):
    print(f"\n📁 folder: {name} — {filename}")
    df = spark.read.option("header", True).option("sep", "\t").csv(os.path.join(imdb_path, filename))
    print("▶ Columns:", df.columns)
    print("▶ Schema:")
    df.printSchema()
    print("▶ Row count (approx):", df.count())
    print("▶ Sample rows:")
    df.show(5, truncate=False)

# Loop through and inspect all files
for name, filename in files.items():
    inspect_file(name, filename)



📁 folder: name.basics — name.basics.tsv.gz
▶ Columns: ['nconst', 'primaryName', 'birthYear', 'deathYear', 'primaryProfession', 'knownForTitles']
▶ Schema:
root
 |-- nconst: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: string (nullable = true)
 |-- deathYear: string (nullable = true)
 |-- primaryProfession: string (nullable = true)
 |-- knownForTitles: string (nullable = true)

▶ Row count (approx): 14323600
▶ Sample rows:
+---------+---------------+---------+---------+----------------------------------+---------------------------------------+
|nconst   |primaryName    |birthYear|deathYear|primaryProfession                 |knownForTitles                         |
+---------+---------------+---------+---------+----------------------------------+---------------------------------------+
|nm0000001|Fred Astaire   |1899     |1987     |actor,miscellaneous,producer      |tt0072308,tt0050419,tt0027125,tt0031983|
|nm0000002|Lauren Bacall  |1924     |2014 


This folder contains the core `.tsv.gz` files provided by IMDb at [https://datasets.imdbws.com](https://datasets.imdbws.com), which are used throughout this project.

### Dataset Files & Their Structure

| File | Description | Key Columns |
|------|-------------|-------------|
| **name.basics.tsv.gz** | People info (actors, directors, writers, etc.) | `nconst`, `primaryName`, `birthYear`, `primaryProfession`, `knownForTitles` |
| **title.basics.tsv.gz** | Title info (movies, shows, etc.) | `tconst`, `titleType`, `primaryTitle`, `startYear`, `runtimeMinutes`, `genres` |
| **title.ratings.tsv.gz** | Ratings and votes | `tconst`, `averageRating`, `numVotes` |
| **title.akas.tsv.gz** | Alternate titles across regions/languages | `titleId`, `title`, `region`, `types`, `isOriginalTitle` |
| **title.principals.tsv.gz** | People involved in each title + their role | `tconst`, `nconst`, `category`, `job`, `characters` |
| **title.crew.tsv.gz** | Directors and writers per title | `tconst`, `directors`, `writers` |
| **title.episode.tsv.gz** | Episode-specific info for TV shows | `tconst`, `parentTconst`, `seasonNumber`, `episodeNumber` |

### Notes on Format

- All files are tab-separated (`.tsv`) and compressed with Gzip (`.gz`)
- Null values are represented as `\N`
- All fields are loaded as strings initially and should be cast to appropriate types when needed (e.g., integers for `birthYear`, floats for `averageRating`)


# Question 2-13

**2**. How many total people in data set

in this code we can see that we have used `distinct()`.

In theory, `nconst` should be unique per row — but using `distinct()` ensures we account for any accidental duplicates.



**using a Spark DataFrame to compute the answer**

In [None]:
# Load the name.basics data
people_df = spark.read.option("header", True).option("sep", "\t").csv("/content/imdb-data/name.basics.tsv.gz")
# - 'option("header", True)' tells Spark to treat the first line of the file as column names.
# - 'option("sep", "\t")' specifies that the file is tab-separated (TSV format).
# - '.csv(...)' is used even for TSV files because Spark treats it as a general delimited format.

# Count unique people using nconst
unique_people_count = people_df.select("nconst").distinct().count()
# - 'select("nconst")' extracts just the person ID column.
# - 'distinct()' ensures that we only count unique IDs (in case of duplicates).
# - 'count()' returns the total number of distinct people.

print(f" Total unique people in the dataset: {unique_people_count}")


 Total unique people in the dataset: 14323600


total people in the data set: **14323600**

**3.4.5.** Earliest birth year

this question will be answered using **SQL commands**

In [None]:
#this is using dataframes just as a reference

from pyspark.sql.functions import col

# Filter out null birth years and cast to int
valid_births_df = people_df.filter(people_df.birthYear != "\\N") \
                           .withColumn("birthYear", col("birthYear").cast("int"))

# Find the earliest birth year
earliest_birth_year = valid_births_df.agg({"birthYear": "min"}).collect()[0][0]

print(f" Earliest birth year in the dataset: {earliest_birth_year}")


 Earliest birth year in the dataset: 4


To answer this question, we used **Spark SQL** by first creating a temporary view of the people dataset. We then ran a SQL query to:

- Filter out invalid birth years (represented as `\N`)
- Cast the `birthYear` column from string to integer
- Select the **minimum birth year** using the `MIN()` aggregation function

This demonstrates how SQL queries can be executed on Spark DataFrames using the `spark.sql()` interface.


In [None]:
# Create or replace a temporary SQL view for the people dataset (it is just creating here, but kept in case of rerun and overall consistency)
people_df.createOrReplaceTempView("people")

# Use Spark SQL to get the earliest birth year, filtering out nulls (\N)
earliest_birth_year_sql = spark.sql("""
    SELECT MIN(CAST(birthYear AS INT)) AS earliest_birth_year
    FROM people
    WHERE birthYear != '\\N'
""")

# Show the result
earliest_birth_year_sql.show()


+-------------------+
|earliest_birth_year|
+-------------------+
|                  4|
+-------------------+



while this seems like a suspect result since the person is born more than **2020~2021 years ago**.
we'll check out more details before coming to a conclusion

In [None]:
spark.sql("""
    SELECT nconst, primaryName, birthYear
    FROM people
    WHERE birthYear = '4'
""").show()

+---------+------------------+---------+
|   nconst|       primaryName|birthYear|
+---------+------------------+---------+
|nm0784172|Lucio Anneo Seneca|        4|
+---------+------------------+---------+



explaining the result:

Using Spark SQL, we found that the **earliest year of birth** recorded in the dataset is **4 AD**.

This result may seem surprising at first, but it makes sense given the nature of IMDb's data. IMDb includes not only modern actors and filmmakers but also historical figures who are referenced in documentaries, educational series, or dramatized content.

For example in this case, the person with `birthYear = 4` is:

- **Lucio Anneo Seneca**, a Roman philosopher, listed in the dataset with `nconst = nm0784172`.

These historical entries are valid, but depending on the scope of the analysis, we might choose to **filter out extremely early birth years** (e.g., before the year 1000) for more modern-focused insights.

we don't know if we should filter them out so far, so they will be kept unless we encouter an issue


**6.** latest year of birth


In [None]:
# Latest year of birth using Spark SQL
latest_birth_year_sql = spark.sql("""
    SELECT MAX(CAST(birthYear AS INT)) AS latest_birth_year
    FROM people
    WHERE birthYear != '\\N'
""")

latest_birth_year_sql.show()

+-----------------+
|latest_birth_year|
+-----------------+
|             2024|
+-----------------+



We used a **Spark SQL query** to determine the most recent birth year in the dataset. The steps included:

- Filtering out null values (`\N`) in the `birthYear` column
- Casting the `birthYear` values from string to integer
- Using the SQL `MAX()` function to find the most recent year

This gives us insight into how up-to-date the dataset is in terms of the people it includes. The latest birth year often corresponds to very young individuals who have recently appeared in films or television.


**7.** People with missing birth year

In [None]:
# Count how many people have no recorded birth year
missing_birth_count = people_df.filter(people_df.birthYear == "\\N").count()
# - IMDb represents missing values as the string '\N', not as null.
# - 'filter(...)' selects rows where the birthYear equals '\N'.
# - 'count()' returns the total number of such rows.
print(f"Number of people with no birth year: {missing_birth_count}")


Number of people with no birth year: 13680662


people with no birth year in the dataset: **13680662**

**8.9.** Length of longest short and shortest movie after 1900

We'll use `title.basics.tsv.gz`, which includes:

- `titleType` (to filter for "short" or "movie")

- `startYear` (to filter for titles after 1900)

- `runtimeMinutes` (to find the min/max length)



In [None]:
# Load title.basics data
titles_df = spark.read.option("header", True).option("sep", "\t").csv("/content/imdb-data/title.basics.tsv.gz")

In [None]:
# Filter out invalid/null runtime and startYear values, and cast them to integers
# we could also add a condition on the year here, but this cell is kept for null values only and a condition on the year will be with the length on the next cells
clean_titles_df = titles_df.filter((col("startYear") != "\\N") & (col("runtimeMinutes") != "\\N")) \
    .withColumn("startYear", col("startYear").cast("int")) \
    .withColumn("runtimeMinutes", col("runtimeMinutes").cast("int"))

In [None]:
# longest "short" after 1900

# - Filter where titleType is "short" and startYear > 1900
# - Use max() to get the longest runtime
longest_short_runtime = clean_titles_df \
    .filter((col("titleType") == "short") & (col("startYear") > 1900)) \
    .agg({"runtimeMinutes": "max"}) \
    .collect()[0][0]

# this step is optional but can help make things more readable or in a more standard format
# convert runtime to hours and minutes
hours = longest_short_runtime // 60
minutes = longest_short_runtime % 60
formatted_runtime = f"{hours}h {minutes}m" if hours > 0 else f"{minutes}m"

print(f"Longest short after 1900: {longest_short_runtime} minutes or {formatted_runtime}")

Longest short after 1900: 250 minutes or 4h 10m


Longest short after 1900: 250 minutes or 4h 10m

In [None]:
# shortest "movie" after 1900

# - Filter where titleType is "movie" and startYear > 1900
# - Use min() to get the shortest runtime
shortest_movie_runtime = clean_titles_df \
    .filter((col("titleType") == "movie") & (col("startYear") > 1900)) \
    .agg({"runtimeMinutes": "min"}) \
    .collect()[0][0]

# convert runtime to hours and minutes
hours = shortest_movie_runtime // 60
minutes = shortest_movie_runtime % 60
formatted_runtime = f"{hours}h {minutes}m" if hours > 0 else f"{minutes}m"

print(f"Shortest movie after 1900: {shortest_movie_runtime} minutes")

Shortest movie after 1900: 1 minutes


Shortest movie after 1900: 1 minutes

while this seems very short, it's not a mistake and we can look up movies with a minute length to check that it is indeed not a mistake

In [None]:
# Find titles of 1-minute movies released after 1900
shortest_movie_df = clean_titles_df \
    .filter((col("titleType") == "movie") &
            (col("startYear") > 1900) &
            (col("runtimeMinutes") == 1)) \
    .select("tconst", "primaryTitle", "startYear", "runtimeMinutes")

shortest_movie_df.show(truncate=False)


+----------+-------------------------+---------+--------------+
|tconst    |primaryTitle             |startYear|runtimeMinutes|
+----------+-------------------------+---------+--------------+
|tt0025166 |George White's Scandals  |1934     |1             |
|tt0469119 |Love Trap                |2005     |1             |
|tt0810779 |Bound by Blood           |2007     |1             |
|tt0848384 |Nikkatsu on Parade       |1930     |1             |
|tt12893768|If I Die Tomorrow        |2020     |1             |
|tt26348770|Dancing Boy              |2023     |1             |
|tt32276067|Honest Vikky (Life Coach)|2024     |1             |
+----------+-------------------------+---------+--------------+



**10.** list of all of the genres represented


In [None]:
from pyspark.sql.functions import explode, split

# Step 1: Filter out rows with null genres
non_null_genres_df = titles_df.filter(titles_df.genres != "\\N")

# Step 2: Split the comma-separated genres into arrays
split_genres_df = non_null_genres_df.withColumn("genre", explode(split("genres", ",")))

# Step 3: Select distinct genres
unique_genres_df = split_genres_df.select("genre").distinct()

# Show all unique genres
unique_genres_df.show(100 , truncate=False)


+-----------+
|genre      |
+-----------+
|Crime      |
|Romance    |
|Thriller   |
|Adventure  |
|Drama      |
|War        |
|Documentary|
|Reality-TV |
|Family     |
|Fantasy    |
|Game-Show  |
|Adult      |
|History    |
|Mystery    |
|Musical    |
|Animation  |
|Music      |
|Film-Noir  |
|Short      |
|Horror     |
|Western    |
|Biography  |
|Comedy     |
|Sport      |
|Action     |
|Talk-Show  |
|Sci-Fi     |
|News       |
+-----------+



by default, the number of rows displayed by `show()` is limited to 20, we set the limit to 100 manually which is enough because it was set retroactively after the following cells from which we know the length is actually 28.

But we e can do it another way if we don't know the length

In [None]:
genres = [row["genre"] for row in unique_genres_df.collect()]
print(genres)

['Crime', 'Romance', 'Thriller', 'Adventure', 'Drama', 'War', 'Documentary', 'Reality-TV', 'Family', 'Fantasy', 'Game-Show', 'Adult', 'History', 'Mystery', 'Musical', 'Animation', 'Music', 'Film-Noir', 'Short', 'Horror', 'Western', 'Biography', 'Comedy', 'Sport', 'Action', 'Talk-Show', 'Sci-Fi', 'News']


In [None]:
len(genres)

28

To answer this question, we extracted the unique genres from the `title.basics.tsv.gz` file using the following steps:

1. **Filtered out null genre entries** (`\N`)
2. **Split** the `genres` column, which contains comma-separated strings (e.g., `"Action,Drama"`), into arrays
3. **Exploded** the arrays so that each genre appears in its own row
4. Selected the **distinct** genre values

This allowed us to get a clean list of all genres represented across titles in the dataset.


**11.** highest rated comedy movie in the dataset

In [None]:
# Load title basics and ratings
ratings_df = spark.read.option("header", True).option("sep", "\t").csv("/content/imdb-data/title.ratings.tsv.gz")
titles_df = spark.read.option("header", True).option("sep", "\t").csv("/content/imdb-data/title.basics.tsv.gz")

# Filter titles: type = 'movie', genre includes 'Comedy', startYear is not null
comedy_movies_df = titles_df \
    .filter((col("titleType") == "movie") &
            (col("genres").contains("Comedy")) &
            (col("startYear") != "\\N"))

# Join with ratings on tconst
comedy_rated_df = comedy_movies_df.join(ratings_df, on="tconst")

# Cast ratings and votes to numeric types
comedy_rated_df = comedy_rated_df \
    .withColumn("averageRating", col("averageRating").cast("double")) \
    .withColumn("numVotes", col("numVotes").cast("int"))

# Find the highest rated comedy movie, breaking ties by highest number of votes
# this can be done directly with the following line, but we will make things more explicit on the output
# top_comedy = comedy_rated_df.orderBy(col("averageRating").desc(), col("numVotes").desc()).limit(1)
# top_comedy.select("primaryTitle", "averageRating", "numVotes", "tconst").show(truncate=False)

from pyspark.sql.functions import max as spark_max

# the highest rating value for comedy movies
max_rating = comedy_rated_df.agg(spark_max("averageRating")).collect()[0][0]

# all movies with that same rating
tied_movies_df = comedy_rated_df.filter(col("averageRating") == max_rating) \
    .orderBy(col("numVotes").desc())

# all tied top-rated comedy movies
print(" Movies tied with the highest comedy rating:")
tied_movies_df.select("primaryTitle", "averageRating", "numVotes", "tconst").show(truncate=False)

# the top one by vote count
top_comedy_movie = tied_movies_df.limit(1)

print(" Final selected highest-rated comedy movie (tie broken by votes):")
top_comedy_movie.select("primaryTitle", "averageRating", "numVotes", "tconst").show(truncate=False)


 Movies tied with the highest comedy rating:
+----------------------+-------------+--------+----------+
|primaryTitle          |averageRating|numVotes|tconst    |
+----------------------+-------------+--------+----------+
|Here They Go          |10.0         |8       |tt20115996|
|Planet Disagreements 8|10.0         |8       |tt21360086|
|The Premiere          |10.0         |8       |tt26771891|
|Meet the Radebes      |10.0         |8       |tt6999602 |
+----------------------+-------------+--------+----------+

 Final selected highest-rated comedy movie (tie broken by votes):
+------------+-------------+--------+----------+
|primaryTitle|averageRating|numVotes|tconst    |
+------------+-------------+--------+----------+
|Here They Go|10.0         |8       |tt20115996|
+------------+-------------+--------+----------+



To determine the highest rated comedy movie, we used two datasets:

- `title.basics.tsv.gz` — contains title metadata (type, genres, name)
- `title.ratings.tsv.gz` — provides average IMDb ratings and vote counts

#### Steps:
1. **Filtered** titles to include only:
   - Movies (`titleType = "movie"`)
   - With "Comedy" in the `genres` column
   - That have a valid `startYear`
2. **Joined** this filtered set with the ratings data on the `tconst` column.
3. **Cast** `averageRating` and `numVotes` to appropriate numeric types.
4. **Computed** the maximum `averageRating` among the filtered results.
5. **Filtered again** to get all comedy movies with this top rating.
6. **Displayed all tied movies**, sorted by number of votes.
7. **Selected the final winner** by choosing the one with the highest number of votes.

This approach ensures that if there's a tie in rating, we explicitly show all tied titles and then clearly identify the winning movie based on popularity (most votes).

while we did pick a movie, we can notice that they had ties at 10, and that the number of votes is also the same at 8.

breaking the tie is actually impossible but since one had to be picked, the first one that came up on the table was.

**12.** director of the top rated movie

In [None]:
# Get the tconst of the top-rated comedy movie
top_movie_tconst = top_comedy_movie.select("tconst").collect()[0][0]
# - 'top_comedy_movie' is a DataFrame containing the top movie's details.
# - '.select("tconst")' selects the 'tconst' column, which is the unique identifier for titles.
# - '.collect()' collects the result into a list of Row objects.
# - '[0]' accesses the first (and only) row in the result.
# - '[0]' again accesses the 'tconst' value from the Row object (Row objects behave like tuples).

# Load the title.crew dataset
crew_df = spark.read.option("header", True).option("sep", "\t").csv("/content/imdb-data/title.crew.tsv.gz")

# Filter for the movie's director(s)
director_df = crew_df.filter(crew_df.tconst == top_movie_tconst).select("directors")

# Get the director's nconst from the previous step
director_nconst = director_df.collect()[0][0]

# Load the name.basics dataset to get the director's name
people_df = spark.read.option("header", True).option("sep", "\t").csv("/content/imdb-data/name.basics.tsv.gz")

# Filter the dataset to get the director's name using the nconst
director_name_df = people_df.filter(people_df.nconst == director_nconst).select("primaryName")

# Show the director's name
director_name_df.show(truncate=False)


+------------+
|primaryName |
+------------+
|Bryan Bostic|
+------------+



**13.** alternate titles for the movie

We'll need to pull data from the `title.akas.tsv.gz` file, which contains alternate titles in different languages or regions.

We already have the `tconst` for the top comedy movie from Question 11, so we'll filter for that and list the alternate titles.



In [None]:
# Load the title.akas dataset
akas_df = spark.read.option("header", True).option("sep", "\t").csv("/content/imdb-data/title.akas.tsv.gz")

# Filter for the top-rated comedy movie tconst
alternate_titles_df = akas_df.filter(akas_df.titleId == top_movie_tconst).select("title")

# Show all alternate titles
alternate_titles_df.show(truncate=False)


+------------+
|title       |
+------------+
|Here They Go|
|Here They Go|
+------------+



# Degrees of separation


In [None]:
# Load name.basics dataset to find the name of nconst nm0000102
people_df = spark.read.option("header", True).option("sep", "\t").csv("/content/imdb-data/name.basics.tsv.gz")

# Filter to find the person with nconst = nm0000102
person_info = people_df.filter(people_df.nconst == "nm0000102").select("primaryName")

# Show the result (the name of the person with nconst = nm0000102)
person_info.show(truncate=False)


+-----------+
|primaryName|
+-----------+
|Kevin Bacon|
+-----------+



this will be the *origin* of our degrees of separation.

we can see it as a graph where each person is a node/vertex, and these are linked by edges which represent movies/shorts/shows.


### **Algorithm Overview: Calculating Degrees of Separation**

To model the degrees of separation between people in the dataset, we use a **Breadth-First Search (BFS)** approach.

1. **Starting Point**: We start from a specific **person** (referred to as the **origin**), represented by their **nconst** (e.g., `nm0000102`).

2. **Degree 0**: The origin itself is considered to be at **Degree 0** (the first degree of separation). We initialize a **visited set** to track the people we've encountered and begin with the origin person.

3. **Degree 1**: From the origin, we look at all **directly connected people** — these are the people who worked on the same movies as the origin (e.g., co-actors, directors, etc.). These people form **Degree 1**.

4. **Track Visited Nodes**: As we explore each degree, we maintain a **set of visited nodes** to ensure that we don't count the same person more than once. If a person has already been encountered in a previous degree, we skip them.
(If nodes A and B at degree `n` both link to node C which makes it link to degree `n+1`, we only need to keep one of them (for example, node A). This is because functionally, both nodes provide the same connection — the only point of reference that matters to us is the origin.)

5. **Degree 2, Degree 3, and so on**: For each person in **Degree 1**, we find all **linked people** (people who worked on movies with them) to form **Degree 2**. We repeat this process for each subsequent degree, going as deep as necessary (up to **Degree 6**).

6. **BFS Process**: At each degree, we:
   - Find the people linked to those from the previous degree.
   - **Filter out** people who have already been visited.
   - Add these new people to the **visited set** and the current degree.
   
   This ensures that no one is counted multiple times and we explore the degrees layer by layer, starting from the origin and moving outward.

7. **Termination**: The process continues until we've reached **Degree 6** (or whatever the maximum degree is if we want to be more general). Each degree's set of people is saved as a table for further analysis.

---

- **BFS Traversal**: This algorithm explores people level by level (degree by degree), ensuring all connections are explored systematically ([wiki](https://en.wikipedia.org/wiki/Breadth-first_search)).
- **Visited Set**: Prevents revisiting nodes that have already been explored in previous degrees, maintaining the accuracy of the degree count.
- **Linked Nodes**: These are the people connected through movies (shared `tconst`), forming edges in the graph of relationships.


In [None]:
from pyspark.sql.functions import col
from collections import deque

# Define the starting person
starting_person = 'nm0000102'

# Create a set to track visited people (global to all degrees)
visited_people = set()
current_degree_people = set([starting_person])  # Start with the starting person

In [None]:
# Load title.principals dataset to get people linked to movies
title_principals_df = spark.read.option("header", True).option("sep", "\t").csv("/content/imdb-data/title.principals.tsv.gz")


In [None]:
# Output the size (number of rows) of the data
size = title_principals_df.count()
print(f"Size of the dataset: {size} rows")
# this outputs Size of the dataset: 91938655 rows and takes a long time to count

Size of the dataset: 91938655 rows


In [None]:
# Print the column names of the DataFrame
print("Column names in the DataFrame:")
print(title_principals_df.columns)

# Optionally, display the first few rows of the DataFrame to inspect its structure
title_principals_df.show(5, truncate=False)


Column names in the DataFrame:
['tconst', 'ordering', 'nconst', 'category', 'job', 'characters']
+---------+--------+---------+---------------+-----------------------+----------+
|tconst   |ordering|nconst   |category       |job                    |characters|
+---------+--------+---------+---------------+-----------------------+----------+
|tt0000001|1       |nm1588970|self           |\N                     |["Self"]  |
|tt0000001|2       |nm0005690|director       |\N                     |\N        |
|tt0000001|3       |nm0005690|producer       |producer               |\N        |
|tt0000001|4       |nm0374658|cinematographer|director of photography|\N        |
|tt0000002|1       |nm0721526|director       |\N                     |\N        |
+---------+--------+---------+---------------+-----------------------+----------+
only showing top 5 rows



In [None]:
# Filter for only actors and directors
filtered_df = title_principals_df.filter(
    title_principals_df.category.isin("actor", "director")
)

# Show a preview of the filtered data
filtered_df.show(5, truncate=False)

# Output the size (number of rows) of the filtered dataset
filtered_size = filtered_df.count()
print(f"Size of the filtered dataset (only actors and directors): {filtered_size} rows")
#this outputs: Size of the filtered dataset (only actors and directors): 29777295 rows, and it takes a long time


+---------+--------+---------+--------+---+--------------+
|tconst   |ordering|nconst   |category|job|characters    |
+---------+--------+---------+--------+---+--------------+
|tt0000001|2       |nm0005690|director|\N |\N            |
|tt0000002|1       |nm0721526|director|\N |\N            |
|tt0000003|1       |nm0721526|director|\N |\N            |
|tt0000004|1       |nm0721526|director|\N |\N            |
|tt0000005|1       |nm0443482|actor   |\N |["Blacksmith"]|
+---------+--------+---------+--------+---+--------------+
only showing top 5 rows

Size of the filtered dataset (only actors and directors): 29777295 rows


In [None]:
# Randomly sample 10,000 rows from the filtered DataFrame
sampled_df = filtered_df.sample(fraction=10000 / filtered_df.count(), seed=110)

# Show the top 5 rows of the sampled data
sampled_df.show(5, truncate=False)

# Output the size (number of rows) of the sampled dataset
sampled_size = sampled_df.count()
print(f"Size of the sampled dataset: {sampled_size} rows")


+---------+--------+---------+--------+---+--------------------------------------------+
|tconst   |ordering|nconst   |category|job|characters                                  |
+---------+--------+---------+--------+---+--------------------------------------------+
|tt0001121|7       |nm0163559|actor   |\N |["The Preacher"]                            |
|tt0002055|6       |nm0784407|director|\N |\N                                          |
|tt0002282|5       |nm0649211|actor   |\N |["Jack's Sweetheart's Father"]              |
|tt0002920|1       |nm0000779|actor   |\N |["Fatty"]                                   |
|tt0003748|1       |nm0853336|actor   |\N |["Robert W. Wainwright AKA Captain Alvarez"]|
+---------+--------+---------+--------+---+--------------------------------------------+
only showing top 5 rows

Size of the sampled dataset: 10068 rows


In [None]:
# A function to find all people connected to a given person in a particular degree (using title.principals)
def find_connected_people(person_nconst):
    print(f"Processing person: {person_nconst}")

    # Get all the titles (tconst) the person worked on
    titles = filtered_df.filter(filtered_df.nconst == person_nconst).select("tconst")
    # Get all people who worked on those titles (connected people)
    connected_people = filtered_df.join(titles, "tconst").select("nconst").distinct()

    # Collect distinct connected people
    return connected_people.rdd.map(lambda row: row['nconst']).collect()


In [None]:
#just to track progress
!pip install tqdm



In [None]:
# For each degree (1 to 6)
for degree in range(1, 7):
    print(f"Starting Degree {degree}...")
    new_degree_people = set()  # New set to track people for this degree
    degree_count = 0  # Track the number of new people added in this degree
    for person in current_degree_people:
        # Get all people connected to the current person (who haven't been visited)
        connected_people = find_connected_people(person)

        for new_person in connected_people:
            # Only add the person if they haven't been visited already
            if new_person not in visited_people:
                new_degree_people.add(new_person)
                visited_people.add(new_person)  # Mark them as visited
                degree_count += 1  # Count new person added

    # If no new people were added to this degree, print a warning (indicating we might be stuck or at the limit)
    if degree_count == 0:
        print(f"WARNING: No new people found for Degree {degree}. The traversal might be stuck or finished.")
        break

    # Move to the next degree
    current_degree_people = new_degree_people

    # Create a DataFrame of the current degree people
    degree_df = spark.createDataFrame([(p,) for p in current_degree_people], ["nconst"])

    # Save the DataFrame to a Parquet file for this degree
    degree_df.write.mode("overwrite").parquet(f"/content/degree_{degree}.parquet")

    # Print the saved degree for verification
    print(f"Degree {degree} saved to /content/degree_{degree}.parquet")
    print(f"Number of new people in Degree {degree}: {degree_count}")  # Print the number of new people added in this degree


Starting Degree 1...
Processing person: nm0000102
Degree 1 saved to /content/degree_1.parquet
Number of new people in Degree 1: 661
Starting Degree 2...
Processing person: nm1032600
Processing person: nm0571106
Processing person: nm0206097
Processing person: nm0128370
Processing person: nm0318889
Processing person: nm0198408
Processing person: nm0188975
Processing person: nm0805127
Processing person: nm0681785
Processing person: nm0347375


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 