### 0. Import Libraries

In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

### 1. Extract Data

In [None]:
df_title_basics = spark.read.format("delta").load("Tables/IMDb_Titles")
df_title_ratings = spark.read.format("delta").load("Tables/IMDb_Titles_Ratings")
df_name_basics = spark.read.format("delta").load("Tables/IMDb_People")

In [None]:
display(df_title_basics)

In [None]:
display(df_title_ratings)

In [None]:
display(df_name_basics)

In [None]:
print(f'There are {df_title_basics.count()} records for IMDb_Titles')
print(f'There are {df_title_ratings.count()} records for IMDb_Titles_Ratings')
print(f'There are {df_name_basics.count()} records for IMDb_People')

### 2. Transform Data

In [None]:
# Cast Columns to correct DataTypes using PySpark

df_cast_title_basics = df_title_basics.select(
    'tconst',
    'titleType',
    'primaryTitle',
    'originalTitle',
    F.col('isAdult').cast('boolean'),
    F.col('startYear').cast('int'),
    F.col('runtimeMinutes').cast('int'),
    'genres'
)

df_cast_title_ratings = df_title_ratings.select(
    'tconst',
    F.col('averageRating').cast('float'),
    F.col('numVotes').cast('int')
)


In [None]:
# Cast Columns into correct DataTypes using Spark SQL
df_name_basics.createOrReplaceTempView('Name_Basics')

df_cast_name_basics = spark.sql("""
    SELECT
        nconst
        ,primaryName
        ,CAST(birthYear AS INT) birthYear
        ,CAST(deathYear AS INT) deathYear
        ,primaryProfession
        ,knownForTitles 
    FROM Name_Basics
""").replace("\\N", None, subset=["knownForTitles", 'primaryName'])

In [None]:
# Clean and filter data
df_cleaned_title_basics = df_cast_title_basics\
    .dropna(subset=["primaryTitle", "startYear", "runtimeMinutes"], how='any')\
    .filter(F.col("titleType") == "movie")

df_cleaned_title_ratings = df_cast_title_ratings.filter(
    (F.col("averageRating").isNotNull()) &
    (F.col("numVotes").isNotNull())
)

df_cleaned_name_basics = df_cast_name_basics.filter(
    (F.col("primaryName").isNotNull()) &
    (F.col("knownForTitles").isNotNull())
)

In [None]:
print(f'There are {df_cleaned_title_basics.count()} records for IMDb_Titles')
print(f'There are {df_cleaned_title_ratings.count()} records for IMDb_Titles_Ratings')
print(f'There are {df_cleaned_name_basics.count()} records for IMDb_People')

In [None]:
# Join title basics with ratings
df_movies_with_ratings = df_cleaned_title_basics.join(
    df_cleaned_title_ratings,
    on="tconst",
    how="inner"
)

display(df_movies_with_ratings)

In [None]:
df_name_movie_association = df_cleaned_name_basics.withColumn(
    "knownForTitle",
    F.explode(F.split(F.col("knownForTitles"), ","))
)

display(df_name_movie_association)

In [None]:
df_silver_data = df_movies_with_ratings.join(
    df_name_movie_association,
    df_movies_with_ratings.tconst == df_name_movie_association.knownForTitle,
    how="left"
)

display(df_silver_data)

### 3. Load Data

In [None]:
df_silver_data.write.format("delta").mode('overwrite').save("Tables/IMDb_Movies_with_People")