In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

s3_bucket = "s3a://imdb-mvp/"

spark = (SparkSession.builder .appName("Imdb-MVP") \
        .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
        .config("spark.hadoop.fs.s3a.access.key", dbutils.secrets.get(scope="imdb-mvp", key="AWS_ACCESS_KEY")) \
        .config("spark.hadoop.fs.s3a.secret.key", dbutils.secrets.get(scope="imdb-mvp", key="AWS_SECRET_ACCESS_KEY")) \
        .getOrCreate() )

In [0]:
name_df = spark.read.option("header", "true") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .csv(s3_bucket + "name.basics.tsv")

name_df.show(10)

In [0]:
name_df.filter(name_df.primaryName == "Christopher Nolan").show()

In [0]:
title_df = spark.read.option("header", "true") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .csv(s3_bucket + "title.basics.tsv")

title_df.show(10)

In [0]:
tronIds = [result["tconst"] for result in title_df.filter(title_df.originalTitle.like("Tron:%")).filter(title_df.titleType == "movie").select("tconst").collect()]

In [0]:
rating_df = spark.read.option("header", "true") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .csv(s3_bucket + "title.ratings.tsv")

rating_df.show(10)

In [0]:
from pyspark.sql import functions as F

title_filtered = title_df.select("tconst", "originalTitle")
ratingFiltered = rating_df.filter(rating_df.tconst.isin(tronIds))

titleWithRatings = (
    ratingFiltered
    .join(title_filtered, on="tconst", how="left")
)

titleWithRatings.show()



In [0]:
idCn = name_df.filter(name_df.primaryName == "Christopher Nolan").filter(name_df.primaryProfession.like("%director%")).filter(name_df.birthYear != "\\N").select("nconst").collect()[0]["nconst"]

print(idCn)

idCnTitles = [result["knownForTitles"] for result in name_df.filter(name_df.primaryName == "Christopher Nolan").filter(name_df.primaryProfession.like("%director%")).filter(name_df.birthYear != "\\N").select("knownForTitles").collect()]

splittedIds = idCnTitles[0].split(",")

print(splittedIds)

In [0]:
principals_df = spark.read.option("header", "true") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .csv(s3_bucket + "title.principals.tsv")

principals_df.show(10)

In [0]:
cnDirectedTitlesId = [result["tconst"] for result in principals_df.filter(principals_df.nconst == idCn).filter(principals_df.category == "director").select("tconst").collect()]

print(cnDirectedTitlesId)

In [0]:
cnTitles_df = title_df.filter(title_df.tconst.isin(cnDirectedTitlesId)).select("tconst", "originalTitle", "startYear", "genres")

In [0]:
filteredCnTitlesRatings = (
    rating_df.filter(rating_df.tconst.isin(cnDirectedTitlesId)).select("tconst", "averageRating")
)

cnTitlesRatings_df = (
    cnTitles_df
    .join(filteredCnTitlesRatings, on="tconst", how="left")
)

cnTitlesRatings_df = cnTitlesRatings_df.filter(cnTitlesRatings_df.averageRating.isNotNull()).sort(cnTitlesRatings_df.averageRating.desc())

cnTitlesRatings_df.show()


In [0]:
cnTitlesRatings_df.first()

In [0]:
titleBasicsBronze = spark.read.option("header", "true") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .csv(s3_bucket + "title.basics.tsv")

titleBasicsBronze.write.format("delta").mode("overwrite").save(s3_bucket + "imdb_bronze/title/basics")


In [0]:
titlePrincipalsBronze = spark.read.option("header", "true") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .csv(s3_bucket + "title.principals.tsv")

titlePrincipalsBronze.write.format("delta").mode("overwrite").save(s3_bucket + "imdb_bronze/title/principals")

In [0]:
titleRatingsBronze = spark.read.option("header", "true") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .csv(s3_bucket + "title.ratings.tsv")

titleRatingsBronze.write.format("delta").mode("overwrite").save(s3_bucket + "imdb_bronze/title/ratings")

In [0]:
nameBasicsBronze = spark.read.option("header", "true") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .csv(s3_bucket + "name.basics.tsv")

nameBasicsBronze.write.format("delta").mode("overwrite").save(s3_bucket + "imdb_bronze/name/basics")

In [0]:
bronzeTitlesBasicsRaw = spark.read.format("delta").load(s3_bucket + "imdb_bronze/title/basics")
bronzeTitlesBasics = bronzeTitlesBasicsRaw.replace("\\N", None)

In [0]:
from pyspark.sql import functions as F

total = bronzeTitlesBasics.count()

exprs = [
    F.count(F.when(F.col(c).isNull(), c)).alias(c + "_nulls")
    for c in bronzeTitlesBasics.columns
]

result = bronzeTitlesBasics.select(exprs)

# mostrar % também
for col in result.columns:
    nulls = result.collect()[0][col]
    print(f"{col}: {nulls} ({nulls/total*100:.2f}%)")

In [0]:
bronzeTitlesBasics.printSchema()

In [0]:
tbtest = bronzeTitlesBasics.filter(bronzeTitlesBasics.tconst == "tt25616472")
tbtest.show()

In [0]:
from pyspark.sql.functions import col, when

silverMoviesBasics = (
    bronzeTitlesBasics \
    .filter(bronzeTitlesBasics.titleType == "movie") \
    .withColumn("year", when(col("startYear") == "\\N", None) \
                .otherwise(col("startYear").cast("int"))) \
    .withColumn("runtimeMinutes", when(col("runtimeMinutes") == "\\N", None) \
                .otherwise(col("runtimeMinutes").cast("int")) \
                .cast("int")) \
    .select("tconst", "titleType", "primaryTitle", "originalTitle", "isAdult", "year", "runtimeMinutes", "genres")
)

silverMoviesBasics.show()

In [0]:
from pyspark.sql import functions as F

total = silverMoviesBasics.count()

exprs = [
    F.count(F.when(F.col(c).isNull(), c)).alias(c + "_nulls")
    for c in silverMoviesBasics.columns
]

result = silverMoviesBasics.select(exprs)

# mostrar % também
for col in result.columns:
    nulls = result.collect()[0][col]
    print(f"{col}: {nulls} ({nulls/total*100:.2f}%)")

In [0]:
silverMoviesBasics.printSchema()

In [0]:
silverMoviesBasics.write.format("delta").mode("overwrite").save(s3_bucket + "imdb_silver/title/basics")

In [0]:
bronzeTitlesPrincipalsRaw = spark.read.format("delta").load(s3_bucket + "imdb_bronze/title/principals")
bronzeTitlesPrincipalsRaw.show()

In [0]:
bronzeTitlesPrincipalsRaw.printSchema()

In [0]:
silverTitlesPrincipals = bronzeTitlesPrincipalsRaw.replace("\\N", None).select("tconst", "ordering", "nconst", "category", "characters")
silverTitlesPrincipals.show()

In [0]:
silverTitlesPrincipals.write.format("delta").mode("overwrite").save(s3_bucket + "imdb_silver/title/principals")

In [0]:
bronzeTitlesRatingsRaw = spark.read.format("delta").load(s3_bucket + "imdb_bronze/title/ratings")
bronzeTitlesRatingsRaw.show()

In [0]:
from pyspark.sql import functions as F

total = bronzeTitlesRatingsRaw.count()

exprs = [
    F.count(F.when(F.col(c).isNull(), c)).alias(c + "_nulls")
    for c in bronzeTitlesRatingsRaw.columns
]

result = bronzeTitlesRatingsRaw.select(exprs)

# mostrar % também
for col in result.columns:
    nulls = result.collect()[0][col]
    print(f"{col}: {nulls} ({nulls/total*100:.2f}%)")

In [0]:
silverTitlesRatings = bronzeTitlesRatingsRaw.replace("\\N", None)

In [0]:
silverTitlesRatings.printSchema()

In [0]:
silverTitlesRatings.write.format("delta").mode("overwrite").save(s3_bucket + "imdb_silver/title/ratings")

In [0]:
bronzeNameBasicsRaw = spark.read.format("delta").load(s3_bucket + "imdb_bronze/name/basics")
bronzeNameBasicsRaw.show()

In [0]:
silverNameBasics = bronzeNameBasicsRaw.replace("\\N", None)

In [0]:
silverNameBasics.show()

In [0]:
silverNameBasics = silverNameBasics.withColumn("birthYear", silverNameBasics["birthYear"].cast("int"))
silverNameBasics = silverNameBasics.withColumn("deathYear", silverNameBasics["deathYear"].cast("int"))

In [0]:
silverNameBasics.printSchema()

In [0]:
silverNameBasics.write.format("delta").mode("overwrite").save(s3_bucket + "imdb_silver/name/basics")

In [0]:
silverRatings = spark.read.format("delta").load(s3_bucket + "imdb_silver/title/ratings")
silverMoviesBasics = spark.read.format("delta").load(s3_bucket + "imdb_silver/title/basics")

In [0]:
factMoviesRatings = silverRatings.join(
    silverMoviesBasics.select("tconst", "primaryTitle", "originalTitle", "isAdult", "year", "runtimeMinutes", "genres"),
    on="tconst",
    how="inner"
)

In [0]:
factMoviesRatings.show()

In [0]:
silverNameBasics = spark.read.format("delta").load(s3_bucket + "imdb_silver/name/basics")

dimPerson = (
    silverNameBasics.select("nconst", "primaryName", "birthYear", "deathYear").dropDuplicates(["nconst"])
)

dimPerson.write.format("delta").mode("overwrite").save(s3_bucket + "imdb_gold/dim/person")

In [0]:
silverTitleBasics = spark.read.format("delta").load(s3_bucket + "imdb_silver/title/basics")

dimMovie = (
    silverTitleBasics.select("tconst", "primaryTitle", "originalTitle", "isAdult", "year", "runtimeMinutes", "genres").dropDuplicates(["tconst"])
)

dimMovie.write.format("delta").mode("overwrite").save(s3_bucket + "imdb_gold/dim/movie")

In [0]:
silverTitlePrincipals = spark.read.format("delta").load(s3_bucket + "imdb_silver/title/principals")

silverTitlePrincipals = silverTitlePrincipals.select("tconst", "nconst", "category").dropDuplicates(["tconst", "nconst"])

In [0]:
bridgeMoviePerson = silverTitlePrincipals.join(dimMovie.select("tconst"), on="tconst", how="inner").join(dimPerson.select("nconst"), on="nconst", how="inner")

In [0]:
bridgeMoviePerson.show()

In [0]:
bridgeMoviePerson.write.format("delta").mode("overwrite").save(s3_bucket + "imdb_gold/bridge/movie-person")