In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    avg,
    col,
    count,
    sum as Fsum,
    to_date,
    to_timestamp,
    when,
)

project_id = "de2025-471807"
bq_dataset = "netflix"
temp_bucket = "netflix-group5-temp"
gcs_data_bucket = "netflix_data_25"

# Spark configuration
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("BigQueryNetflixIngest")
sparkConf.set("spark.driver.memory", "4g")
sparkConf.set("spark.executor.cores", "2")
sparkConf.set("spark.driver.cores", "1")

spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.conf.set("temporaryGcsBucket", temp_bucket)
spark.conf.set("viewsEnabled", "true")

# Define the mapping between logical table names and objects in the data bucket
table_configs = {
    "users": "users.csv",
    "movies": "movies.csv",
    "watch_history": "watch_history.csv",
    "search_logs": "search_logs.csv",
    "reviews": "reviews.csv",
    "recommendation_logs": "recommendation_logs.csv",
}


def load_table(table_name: str) -> DataFrame:
    """Read a CSV from Cloud Storage into a Spark DataFrame."""
    path = f"gs://{gcs_data_bucket}/{table_configs[table_name]}"
    return (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(path)
    )


def nullify_blanks(df: DataFrame, columns) -> DataFrame:
    """Convert blank strings in the specified columns to nulls for safe casting."""
    for c in columns:
        df = df.withColumn(c, when(col(c) == "", None).otherwise(col(c)))
    return df


raw_tables = {name: load_table(name) for name in table_configs}

for name, df in raw_tables.items():
    print(f"Previewing '{name}' dataset")
    df.show(5, truncate=False)

# Clean and cast each dataset to align with BigQuery schemas
users_df = (
    nullify_blanks(
        raw_tables["users"], ["age", "gender", "monthly_spend", "household_size"]
    )
    .dropDuplicates(["user_id"])
    .withColumn("age", col("age").cast("double"))
    .withColumn("monthly_spend", col("monthly_spend").cast("double"))
    .withColumn("household_size", col("household_size").cast("int"))
    .withColumn("is_active", col("is_active").cast("boolean"))
    .withColumn(
        "subscription_start_date",
        to_date(col("subscription_start_date"), "yyyy-MM-dd"),
    )
    .withColumn(
        "created_at",
        to_timestamp(col("created_at"), "yyyy-MM-dd HH:mm:ss.SSSSSS"),
    )
)

movies_df = (
    nullify_blanks(
        raw_tables["movies"],
        [
            "genre_secondary",
            "imdb_rating",
            "production_budget",
            "box_office_revenue",
            "number_of_seasons",
            "number_of_episodes",
        ],
    )
    .dropDuplicates(["movie_id"])
    .withColumn("release_year", col("release_year").cast("int"))
    .withColumn("duration_minutes", col("duration_minutes").cast("double"))
    .withColumn("imdb_rating", col("imdb_rating").cast("double"))
    .withColumn("production_budget", col("production_budget").cast("double"))
    .withColumn("box_office_revenue", col("box_office_revenue").cast("double"))
    .withColumn("number_of_seasons", col("number_of_seasons").cast("int"))
    .withColumn("number_of_episodes", col("number_of_episodes").cast("int"))
    .withColumn("is_netflix_original", col("is_netflix_original").cast("boolean"))
    .withColumn("content_warning", col("content_warning").cast("boolean"))
    .withColumn("added_to_platform", to_date(col("added_to_platform"), "yyyy-MM-dd"))
)

watch_history_df = (
    nullify_blanks(
        raw_tables["watch_history"],
        ["watch_duration_minutes", "progress_percentage", "user_rating"],
    )
    .dropDuplicates(["session_id"])
    .withColumn("watch_date", to_date(col("watch_date"), "yyyy-MM-dd"))
    .withColumn("watch_duration_minutes", col("watch_duration_minutes").cast("double"))
    .withColumn("progress_percentage", col("progress_percentage").cast("double"))
    .withColumn("is_download", col("is_download").cast("boolean"))
    .withColumn("user_rating", col("user_rating").cast("double"))
)

search_logs_df = (
    nullify_blanks(raw_tables["search_logs"], ["clicked_result_position"])
    .dropDuplicates(["search_id"])
    .withColumn("search_date", to_date(col("search_date"), "yyyy-MM-dd"))
    .withColumn("results_returned", col("results_returned").cast("int"))
    .withColumn("clicked_result_position", col("clicked_result_position").cast("int"))
    .withColumn("search_duration_seconds", col("search_duration_seconds").cast("double"))
    .withColumn("had_typo", col("had_typo").cast("boolean"))
    .withColumn("used_filters", col("used_filters").cast("boolean"))
)

reviews_df = (
    nullify_blanks(raw_tables["reviews"], ["sentiment_score", "review_text"])
    .dropDuplicates(["review_id"])
    .withColumn("review_date", to_date(col("review_date"), "yyyy-MM-dd"))
    .withColumn("rating", col("rating").cast("int"))
    .withColumn("is_verified_watch", col("is_verified_watch").cast("boolean"))
    .withColumn("helpful_votes", col("helpful_votes").cast("double"))
    .withColumn("total_votes", col("total_votes").cast("double"))
    .withColumn("sentiment_score", col("sentiment_score").cast("double"))
)

recommendation_logs_df = (
    nullify_blanks(raw_tables["recommendation_logs"], ["recommendation_score"])
    .dropDuplicates(["recommendation_id"])
    .withColumn("recommendation_date", to_date(col("recommendation_date"), "yyyy-MM-dd"))
    .withColumn("recommendation_score", col("recommendation_score").cast("double"))
    .withColumn("was_clicked", col("was_clicked").cast("boolean"))
    .withColumn("position_in_list", col("position_in_list").cast("int"))
)

cleaned_tables = {
    "users": users_df,
    "movies": movies_df,
    "watch_history": watch_history_df,
    "search_logs": search_logs_df,
    "reviews": reviews_df,
    "recommendation_logs": recommendation_logs_df,
}


def write_to_bigquery(df: DataFrame, table_name: str, mode: str = "overwrite") -> None:
    (
        df.write.format("bigquery")
        .option("project", project_id)
        .option("dataset", bq_dataset)
        .option("table", table_name)
        .mode(mode)
        .save()
    )


for table_name, dataframe in cleaned_tables.items():
    print(f"Writing '{table_name}' to BigQuery dataset {project_id}.{bq_dataset}")
    write_to_bigquery(dataframe, table_name)

# Curated analytics tables
user_engagement_df = (
    watch_history_df.join(
        users_df.select("user_id", "subscription_plan", "country"),
        on="user_id",
        how="left",
    )
    .groupBy("user_id", "subscription_plan", "country")
    .agg(
        Fsum("watch_duration_minutes").alias("total_watch_minutes"),
        avg("progress_percentage").alias("avg_progress_percentage"),
        Fsum(when(col("action") == "completed", 1).otherwise(0)).alias(
            "completed_sessions"
        ),
        count("*").alias("total_sessions"),
    )
)
print("Writing 'user_engagement_metrics' analytics table")
write_to_bigquery(user_engagement_df, "user_engagement_metrics")

content_performance_df = (
    watch_history_df.groupBy("movie_id")
    .agg(
        Fsum("watch_duration_minutes").alias("total_watch_minutes"),
        avg("progress_percentage").alias("avg_progress_percentage"),
        count("*").alias("total_sessions"),
        Fsum(when(col("action") == "completed", 1).otherwise(0)).alias(
            "completed_sessions"
        ),
    )
    .join(
        movies_df.select("movie_id", "title", "genre_primary", "content_type"),
        on="movie_id",
        how="left",
    )
)
print("Writing 'content_performance_metrics' analytics table")
write_to_bigquery(content_performance_df, "content_performance_metrics")

search_interest_df = (
    search_logs_df.groupBy("search_query", "device_type", "had_typo", "used_filters")
    .agg(
        count("*").alias("search_count"),
        avg("search_duration_seconds").alias("avg_search_duration"),
        avg("clicked_result_position").alias("avg_clicked_position"),
    )
)
print("Writing 'search_interest_metrics' analytics table")
write_to_bigquery(search_interest_df, "search_interest_metrics")



In [None]:
spark.stop()