##KPI Table 1 - content_snapshot

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
cdf = spark.read.table('ott_catalog.gold.fact_title_snapshot')

# Ensure snapshot_month is string
cdf = cdf.withColumn(
    'snapshot_month',
    date_format(to_date(col('snapshot_date')), 'yyyy-MM').cast(StringType())
)

agg_df = cdf.groupBy('snapshot_month').agg(
    countDistinct('show_key').alias('total_titles'),
    countDistinct(when(col('is_movie_flag') == '1', col('show_key'))).alias('total_movies'),
    countDistinct(when(col('is_series_flag') == '1', col('show_key'))).alias('total_series'),
    avg(when(col('is_movie_flag') == '1', col('duration_minutes'))).alias('avg_movie_duration'),
    avg(when(col('is_series_flag') == '1', col('duration_seasons'))).alias('avg_series_seasons')
)

new_cdf = agg_df.select(
    col('snapshot_month').cast(StringType()),
    col('total_titles').cast("bigint"),
    col('total_movies').cast("bigint"),
    col('total_series').cast("bigint"),
    col('avg_movie_duration').cast("decimal(18,2)"),
    col('avg_series_seasons').cast("decimal(18,2)"),
    (col('total_movies') / col('total_titles') * 100).alias('pct_movies').cast("decimal(18,2)"),
    (col('total_series') / col('total_titles') * 100).alias('pct_series').cast("decimal(18,2)")
)

new_cdf.write.mode('append').saveAsTable('ott_catalog.gold.kpi_content_snapshot')

##KPI Table 2 - content_rating 

In [0]:
base_df = (
    spark.table("ott_catalog.gold.fact_title_snapshot").alias("f")
    .join(
        spark.table("ott_catalog.gold.dim_titles").alias("d"),
        on="show_key",
        how="inner"
    )
    .select(
        date_format(col("f.snapshot_date"), "yyyy-MM")
         .cast(StringType())
         .alias("snapshot_month"),

        col("f.show_key"),

        col("d.rating")
         .cast(StringType())
         .alias("rating"),

        col("f.is_movie_flag"),
        col("f.is_series_flag"),
        col("f.is_new_release_flag")
    )
    .filter(col("d.rating").isNotNull())
)

In [0]:
rating_agg_df = (
    base_df
    .groupBy("snapshot_month", "rating")
    .agg(
        countDistinct("show_key")
         .cast(LongType())
         .alias("titles_count"),

        countDistinct(
            when(col("is_movie_flag") == 1, col("show_key"))
        ).cast(LongType())
         .alias("movies_count"),

        countDistinct(
            when(col("is_series_flag") == 1, col("show_key"))
        ).cast(LongType())
         .alias("series_count"),

        
    )
)


In [0]:
month_window = Window.partitionBy("snapshot_month")

final_df = (
    rating_agg_df
    # % of total catalog (window needed)
    .withColumn(
        "pct_of_total_titles",
        round(
            (col("titles_count") * 100.0) /
            sum("titles_count").over(month_window),
            2
        ).cast(DecimalType(18, 2))
    )
)




In [0]:
final_df.write.mode('append').saveAsTable('ott_catalog.gold.kpi_content_rating')