In [None]:
# Variablen definieren um übergebene Werte von Pipeline-Parametern aufzunehmen
run_year = ""
run_month = ""
run_day = ""

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import col

#path = "Files/2024/01/18/"
path = "Files/" + run_year + "/" + run_month + "/" + run_day + "/"

In [None]:
basics_df = spark.read.parquet(path + "title.basics.parquet")
display(basics_df.count())
basics_df.printSchema()

episode_df = spark.read.parquet(path + "title.episode.parquet")
display(episode_df.count())
episode_df.printSchema()

ratings_df = spark.read.parquet(path + "title.ratings.parquet")
display(ratings_df.count())
ratings_df.printSchema()

In [None]:
basics_df.createOrReplaceTempView("basics")
episode_df.createOrReplaceTempView("episode")
ratings_df.createOrReplaceTempView("ratings")

In [None]:
%%sql

/*
Zwecks Veranschaulichung auf Spark SQL gewechselt.
Natürlich hätte auch weiterhin mit PySpark gearbeitet werden können.
*/

CREATE OR REPLACE TEMPORARY VIEW filtered_titles AS (
-- alle Titles inkl. Informationen zu Episoden/Season und Rating zusammenbringen
WITH all_titles AS
(
          SELECT    b.tconst,
                    e.parentTconst,
                    b.titleType,
                    b.primaryTitle,
                    b.originalTitle,
                    b.startYear,
                    b.endYear,
                    b.runtimeMinutes,
                    b.isAdult,
                    e.seasonNumber,
                    e.episodeNumber,
                    r.averageRating,
                    r.numVotes
          FROM      basics b
          LEFT JOIN episode e
          ON        e.tconst = b.tconst
          LEFT JOIN ratings r
          ON        r.tconst = b.tconst),
-- gefilterte Titles
filtered_titles AS
(
       SELECT *
       FROM   all_titles a
       WHERE  a.isAdult = false
       AND    a.averageRating > 0
       AND    a.runtimeMinutes > 0
       AND    a.numVotes > 0
       AND    a.titleType IN ("tvEpisode",
                              "short",
                              "movie",
                              "video",
                              "tvSeries",
                              "tvMovie",
                              "tvMiniSeries",
                              "tvSpecial",
                              "tvShort") ),
-- Parent-Titles wieder hinzufügen, falls durch Filtern verloren gegangen
parent_titles AS
(
       SELECT DISTINCT a.*
       FROM            all_titles a
       INNER JOIN      filtered_titles f
       ON              f.parentTconst = a.tconst
       -- nur jene Parent-Titles, die im gefilterten Datensatz nicht vorkommen, hinzufügen
       WHERE           NOT EXISTS
                     (
                            SELECT *
                            FROM   filtered_titles sub_f
                            WHERE  sub_f.tconst = a.tconst) ),
-- gefilterte und Parent-Titles zusammenbringen
union_titles AS
(
       SELECT *
       FROM   filtered_titles
       UNION ALL
       SELECT *
       FROM   parent_titles )
SELECT * FROM   union_titles ) 


In [None]:
df = spark.table("filtered_titles")
display(df.count())
df.printSchema()

In [None]:
df.write.mode("overwrite").format("delta").saveAsTable("gold_imdb_titles")