![banner.png](banner.png)

<h2 style="color:#de4a48; background-color:#fce19a; padding: 10px; text-align:left; border: 1px solid #fce19a;">Imports</h2>

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, min, max, count
import os

In [18]:
spark = SparkSession.builder \
    .appName("IMDbProject") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

In [None]:
# spark.conf.set("spark.sql.shuffle.partitions", "4")
# spark.conf.set("spark.executor.memory", "2g")

<h2 style="color:#de4a48; background-color:#fce19a; padding: 10px; text-align:left; border: 1px solid #fce19a;">Load Datasets</h2>

In [None]:
# Paths
IMDB_PATH = '../data/raw/imdb'
TMDB_PATH = '../data/raw/tmdb'
RAW_PARQUET_PATH = '../data/raw_parquet'
PROCESSED_PATH = '../data/processed'

In [4]:
# Make sure output folders exist
os.makedirs(RAW_PARQUET_PATH, exist_ok=True)
os.makedirs(PROCESSED_PATH, exist_ok=True)

imdb_files = {
    "basics": "title.basics.tsv",
    "akas": "title.akas.tsv",
    "crew": "title.crew.tsv",
    "episode": "title.episode.tsv",
    "principals": "title.principals.tsv",
    "ratings": "title.ratings.tsv",
    "names": "name.basics.tsv"
}

### Load IMDB data to Spark Dataframes

In [5]:
imdb_dfs = {
    key: spark.read.csv(
        os.path.join(IMDB_PATH, filename),
        sep='\t',
        header=True,
        inferSchema=True,
        nullValue='\\N'
    )
    for key, filename in imdb_files.items()
}

for name, df in imdb_dfs.items():
    print(f"IMDb - {name}:")
    df.show(5)

IMDb - basics:
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|   NULL|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|   NULL|             5|     Animation,Short|
|tt0000003|    short|        Poor Pierrot|      Pauvre Pierrot|      0|     1892|   NULL|             5|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|   NULL|            12|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|   NULL|             1

### Save unfiltered IMDB files to Parquet

In [None]:
# for name, df in imdb_dfs.items():
#     parquet_path = os.path.join(RAW_PARQUET_PATH, f'{name}.parquet')
#     df.write.mode('overwrite').parquet(parquet_path)
#     print(f"Saved {name} to {parquet_path}")

### Load TMDB data to Spark Dataframes

In [7]:
tmdb_path = os.path.join(TMDB_PATH, 'TMDB_movie_dataset_v11.csv')

tmdb_df = spark.read.csv(
    tmdb_path,
    header=True,
    inferSchema=True,
    nullValue='\\N'
)

print("TMDb Data Preview:")
tmdb_df.show(5)

TMDb Data Preview:
+------+---------------+------------+----------+--------+------------+----------+-------+-----+--------------------+---------+--------------------+---------+-----------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    id|          title|vote_average|vote_count|  status|release_date|   revenue|runtime|adult|       backdrop_path|   budget|            homepage|  imdb_id|original_language| original_title|            overview|          popularity|         poster_path|             tagline|              genres|production_companies|production_countries|    spoken_languages|            keywords|
+------+---------------+------------+----------+--------+------------+----------+-------+-----+--------------------+---------+--------------------+---------+-----------------+---------------+--------------------+-------

### Save TMDB file to Parquet

In [None]:
# TMDB_PARQUET_PATH = '../data/raw_parquet'
# os.makedirs(TMDB_PARQUET_PATH, exist_ok=True)

# tmdb_df.write.mode('overwrite').parquet(os.path.join(TMDB_PARQUET_PATH, 'TMDB_movie_dataset_v11.parquet'))
# print("Saved TMDb to parquet")

<h2 style="color:#de4a48; background-color:#fce19a; padding: 10px; text-align:left; border: 1px solid #fce19a;">Exploratory Data Analysis</h2>

### Preliminary EDA

In [8]:
imdb_basics = imdb_dfs['basics']

# Total counts for each dataset
print("Row counts per file:")
for name, df in imdb_dfs.items():
    print(f"{name}: {df.count():,} rows")

# Date range in imdb_basics
date_range = imdb_basics.select(min(col("startYear")).alias("min_year"), max(col("startYear")).alias("max_year"))
date_range.show()

# Top genres
genres_exploded = imdb_basics.withColumn("genre", explode(split(col("genres"), ",")))
top_genres = genres_exploded.groupBy("genre").count().orderBy(col("count").desc())
top_genres.show(10)

Row counts per file:
basics: 11,485,855 rows
akas: 51,496,209 rows
crew: 11,485,855 rows
episode: 8,832,680 rows
principals: 91,154,932 rows
ratings: 1,539,107 rows
names: 14,217,591 rows
+--------+--------+
|min_year|max_year|
+--------+--------+
|    1874|    2031|
+--------+--------+

+-----------+-------+
|      genre|  count|
+-----------+-------+
|      Drama|3237636|
|     Comedy|2236773|
|  Talk-Show|1424798|
|      Short|1228429|
|Documentary|1096234|
|       News|1080456|
|    Romance|1072271|
|     Family| 844672|
| Reality-TV| 643344|
|  Animation| 572468|
+-----------+-------+
only showing top 10 rows



### Loading Using Parquet Files

In [None]:
# # Load from saved Parquets (raw)
# imdb_dfs = {}
# imdb_files = ["basics", "akas", "crew", "episode", "principals", "ratings", "names"]

# for name in imdb_files:
#     path = os.path.join(RAW_PARQUET_PATH, f'{name}.parquet')
#     imdb_dfs[name] = spark.read.parquet(path)

### Filter to `movies` only

In [10]:
imdb_basics_filtered = imdb_dfs['basics'].filter(col("titleType") == "movie")

# # Save filtered basics immediately to processed folder
# imdb_basics.write.mode('overwrite').parquet(os.path.join(PROCESSED_PARQUET_PATH, 'basics.parquet'))

imdb_basics_filtered.count()

707625

In [17]:
# Filter akas
imdb_akas_filtered = imdb_dfs['akas'] \
    .join(imdb_basics_filtered.select("tconst"), imdb_dfs['akas'].titleId == imdb_basics_filtered.tconst, "inner") \
    .drop("tconst")

# imdb_akas.write.mode('overwrite').parquet(os.path.join(PROCESSED_PARQUET_PATH, 'akas.parquet'))

# Filter ratings
imdb_ratings_filtered = imdb_dfs['ratings'] \
    .join(imdb_basics_filtered.select("tconst"), "tconst", "inner")

# imdb_ratings.write.mode('overwrite').parquet(os.path.join(PROCESSED_PARQUET_PATH, 'ratings.parquet'))

# Filter principals
imdb_principals_filtered = imdb_dfs['principals'] \
    .join(imdb_basics_filtered.select("tconst"), "tconst", "inner")

# imdb_principals.write.mode('overwrite').parquet(os.path.join(PROCESSED_PARQUET_PATH, 'principals.parquet'))

# Filter crew
imdb_crew_filtered = imdb_dfs['crew'] \
    .join(imdb_basics_filtered.select("tconst"), "tconst", "inner")

# imdb_crew.write.mode('overwrite').parquet(os.path.join(PROCESSED_PARQUET_PATH, 'crew.parquet'))

# For now, leaving `names` alone since it's a people table not directly tied to titleType.
imdb_names = imdb_dfs['names']
# imdb_dfs['names'].write.mode('overwrite').parquet(os.path.join(PROCESSED_PARQUET_PATH, 'names.parquet'))

# Episodes is usually for TV shows, so we can skip it, or handle if needed later.
imdb_episodes = imdb_dfs['episode']

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

#### Record Counts

In [16]:
imdb_basics_filtered.count()
imdb_akas_filtered.count()
imdb_ratings_filtered.count()
imdb_principals_filtered.count()
imdb_crew_filtered.count()

Py4JJavaError: An error occurred while calling o179.count.
: org.apache.spark.SparkException: Multiple failures in stage materialization.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.multiFailuresInStageMaterializationError(QueryExecutionErrors.scala:2076)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.cleanUpAndThrowException(AdaptiveSparkPlanExec.scala:821)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:335)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3684/82042809.apply(Unknown Source)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:272)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:419)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3615)
	at org.apache.spark.sql.Dataset$$Lambda$3607/376411126.apply(Unknown Source)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.Dataset$$Lambda$2021/1075174821.apply(Unknown Source)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset$$Lambda$1673/717076099.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1684/1192951288.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1674/262707506.apply(Unknown Source)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:3615)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
	Suppressed: org.apache.spark.SparkException: Job 53 cancelled because SparkContext was shut down
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1253)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1251)
		at org.apache.spark.scheduler.DAGScheduler$$Lambda$4766/903179147.apply(Unknown Source)
		at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
		at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1251)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:3087)
		at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$stop$3(DAGScheduler.scala:2973)
		at org.apache.spark.scheduler.DAGScheduler$$Lambda$4765/1916375042.apply$mcV$sp(Unknown Source)
		at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1375)
		at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2973)
		at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2258)
		at org.apache.spark.SparkContext$$Lambda$4762/500915728.apply$mcV$sp(Unknown Source)
		at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1375)
		at org.apache.spark.SparkContext.stop(SparkContext.scala:2258)
		at org.apache.spark.SparkContext.stop(SparkContext.scala:2211)
		at org.apache.spark.SparkContext.$anonfun$new$34(SparkContext.scala:681)
		at org.apache.spark.SparkContext$$Lambda$734/1709177787.apply$mcV$sp(Unknown Source)
		at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
		at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
		at org.apache.spark.util.SparkShutdownHookManager$$Lambda$4732/1041841685.apply$mcV$sp(Unknown Source)
		at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
		at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
		at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
		at org.apache.spark.util.SparkShutdownHookManager$$Lambda$4731/975613321.apply$mcV$sp(Unknown Source)
		at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
		at scala.util.Try$.apply(Try.scala:213)
		at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
		at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
		at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
		at java.util.concurrent.FutureTask.run(Unknown Source)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
		... 1 more
Caused by: org.apache.spark.SparkException: Job 54 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1251)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$4766/903179147.apply(Unknown Source)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1251)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:3087)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$stop$3(DAGScheduler.scala:2973)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$4765/1916375042.apply$mcV$sp(Unknown Source)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1375)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2973)
	at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2258)
	at org.apache.spark.SparkContext$$Lambda$4762/500915728.apply$mcV$sp(Unknown Source)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1375)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2258)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2211)
	at org.apache.spark.SparkContext.$anonfun$new$34(SparkContext.scala:681)
	at org.apache.spark.SparkContext$$Lambda$734/1709177787.apply$mcV$sp(Unknown Source)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$Lambda$4732/1041841685.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$Lambda$4731/975613321.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.util.concurrent.FutureTask.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


In [None]:
tmdb_movies.count()

#### Schema Checks

In [None]:
imdb_basics_filtered.printSchema()
imdb_akas_filtered.printSchema()
imdb_ratings_filtered.printSchema()
imdb_principals_filtered.printSchema()
imdb_crew_filtered.printSchema()
imdb_names.printSchema()
imdb_episode.printSchema()

tmdb_movies.printSchema()

IMDb Basics Schema:


NameError: name 'imdb_basics' is not defined

### 1. Highest Rated Movies in 2023

**Balancing Ratings and Number of Votes:**



### 2. Most Popular Actors/Actresses in 2023

### 3. User-Movie Trends

### 4. Metric of a 'Hit Movie'

<h2 style="color:#de4a48; background-color:#fce19a; padding: 10px; text-align:left; border: 1px solid #fce19a;">Predictive Modelling</h2>