#Final Project
##By Sarah SHAHIN, Najlaa ALLIOUI, Hafsa REDOUANE

In [0]:
!pip install pyspark findspark

# --- CELL 1: IMPORTS, CONFIGURATION, LOADING, AND UNIVERSAL CLEANING ---

In [0]:
# --- CELL 1: IMPORTS, CONFIGURATION, LOADING, AND UNIVERSAL CLEANING ---

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, when, regexp_replace
from pyspark.sql.types import IntegerType, FloatType, StringType, LongType 
import re
import time
from datetime import datetime
from pyspark.sql.utils import AnalysisException 

# Initialize SparkSession 
spark = SparkSession.builder.appName("IMDB_Final_Project").getOrCreate()


NA_VALUE = r'\N'
CURRENT_YEAR = 2025 


UC_VOLUME_NAME = "raw_imdb_files" 


STORAGE_ROOT = f"dbfs:/Volumes/workspace/imdb_project/{UC_VOLUME_NAME}/" 


DBFS_PATH = STORAGE_ROOT 


# --- IMDB File Dictionary ---
DATA_FILES = {
    'name_basics': DBFS_PATH + "name.basics.tsv",
    'title_basics': DBFS_PATH + "title.basics.tsv",
    'title_ratings': DBFS_PATH + "title.ratings.tsv",
    'title_crew': DBFS_PATH + "title.crew.tsv",
    'title_akas': DBFS_PATH + "title.akas.tsv",
    'title_episode': DBFS_PATH + "title.episode.tsv",
    'title_principals': DBFS_PATH + "title.principals.tsv"
}

# --- CORE LOADING FUNCTION ---
def load_and_clean_imdb_data(file_name, file_path, schema_override=None):
    print(f"Loading {file_name}...")
    try:
        df = spark.read.csv(
            file_path,
            sep='\t',
            header=True,
            nullValue=NA_VALUE,
            inferSchema=(schema_override is None), 
            schema=schema_override
        )
        count = df.count()
        print(f"Success! {file_name} loaded. {count:,} rows.")
        return df
    except Exception as e:
        print(f" SPARK LOADING FAILED for {file_name}. (Error: {e})")
        return None

# --- SAFE CAST FUNCTION (Compatible with older Spark, handles '') ---
def safe_cast_pyspark(col_name, target_type, pattern=r'[^\d]'):
    """Replaces non-matching characters, converts resulting '' to NULL, then casts."""
    cleaned_col = regexp_replace(col(col_name), pattern, '')
    return F.when(
        (col(col_name).isNull()) | (col(col_name) == F.lit("")) | (cleaned_col == F.lit("")),
        F.lit(None)
    ).otherwise(
        cleaned_col.cast(target_type)
    )

# --- LOADING AND OVERRIDE STRATEGY ---
print("\n--- Starting Data Load and Cleaning ---")

# A. Load Non-Critical DataFrames
df_crew = load_and_clean_imdb_data('title.crew', DATA_FILES['title_crew']) 
df_akas = load_and_clean_imdb_data('title.akas', DATA_FILES['title_akas'])
df_episode = load_and_clean_imdb_data('title.episode', DATA_FILES['title_episode'])

# CRITICAL FIX: Rename titleId to tconst for consistency
if df_akas:
    if 'titleId' in df_akas.columns:
        df_akas = df_akas.withColumnRenamed("titleId", "tconst")
        print("df_akas: Renamed 'titleId' to 'tconst'.")
    elif 'tconst' not in df_akas.columns:
        print("df_akas: Could not find 'titleId' or 'tconst'. Check schema.")


# B. df_basics (Cleaning startYear/runtimeMinutes)
df_basics = spark.read.csv(DATA_FILES['title_basics'], sep='\t', header=True, nullValue=NA_VALUE, inferSchema=False)
df_basics = df_basics.withColumn(
    "runtimeMinutes",
    safe_cast_pyspark("runtimeMinutes", IntegerType())
).withColumn(
    "startYear",
    safe_cast_pyspark("startYear", IntegerType())
)
print(f"title.basics loaded and cleaned. {df_basics.count():,} rows.")

# C. df_names (birthYear/deathYear override)
df_names = spark.read.csv(DATA_FILES['name_basics'], sep='\t', header=True, nullValue=NA_VALUE, inferSchema=False)
df_names = df_names.select("nconst", "primaryName", "birthYear", "deathYear", "primaryProfession", "knownForTitles")
df_names = df_names.withColumn(
    "birthYear",
    safe_cast_pyspark("birthYear", IntegerType())
).withColumn(
    "deathYear",
    safe_cast_pyspark("deathYear", IntegerType())
)
print(f"name.basics loaded and cleaned. {df_names.count():,} rows.")

# D. df_ratings (numVotes and averageRating)
df_ratings = spark.read.csv(DATA_FILES['title_ratings'], sep='\t', header=True, nullValue=NA_VALUE, inferSchema=False)
df_ratings = df_ratings.withColumn(
    "numVotes",
    safe_cast_pyspark("numVotes", LongType())
).withColumn(
    "averageRating",
    safe_cast_pyspark("averageRating", FloatType(), pattern=r'[^\d\.]')
)
df_ratings = df_ratings.filter(col("numVotes").isNotNull() & (col("numVotes") > 0))
print(f"title.ratings loaded and cleaned. {df_ratings.count():,} rows.")

# E. df_principals (ordering)
df_principals = spark.read.csv(DATA_FILES['title_principals'], sep='\t', header=True, nullValue=NA_VALUE, inferSchema=False)
df_principals = df_principals.withColumn(
    "ordering",
    safe_cast_pyspark("ordering", IntegerType())
)
print(f"title.principals loaded and cleaned. {df_principals.count():,} rows.")

print("\nDataFrame preparation complete. Proceed to analysis.")

# --- CELL 2: BASIC STATISTICS (RUNTIME AND YEARS) ---


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col

print("--- A. PEOPLE ANALYSIS (df_names) ---")

# How many total people in data set?
total_people = df_names.count()
print(f"Total people in the dataset: {total_people:,}")

# What is the earliest year of birth?
earliest_year_row = df_names.agg(F.min("birthYear").alias("min_year")).collect()[0]
earliest_year = earliest_year_row["min_year"]
print(f"Earliest year of birth: {earliest_year}")

# How many years ago was this person born?
if earliest_year:
    age_ago = CURRENT_YEAR - earliest_year
    print(f"This person was born approximately {age_ago} years ago (based on {CURRENT_YEAR}).")

# Using only the data in the data set, determine if this date of birth correct.
df_impossible_births = df_names.filter(
    col("birthYear").isNotNull() & 
    col("deathYear").isNotNull() & 
    (col("birthYear") > col("deathYear"))
)
impossible_count = df_impossible_births.count()
print(f"Number of impossible records (Birth Year > Death Year): {impossible_count:,}")

# What is the most recent date of birth?
most_recent_year_row = df_names.agg(F.max("birthYear").alias("max_year")).collect()[0]
most_recent_year = most_recent_year_row["max_year"]
print(f"Most recent year of birth: {most_recent_year}")

# What percentage of the people do not have a listed date of birth?
people_with_birth_year = df_names.filter(col("birthYear").isNotNull()).count()
people_without_birth_year = total_people - people_with_birth_year
percentage_without_birth = (people_without_birth_year / total_people) * 100
print(f"Percentage of people without a listed date of birth: {percentage_without_birth:.2f}%")

print("\n--- B. TITLE ANALYSIS (df_basics) ---")

# Release Year Distribution (Top 10):
print("\nRelease Year Distribution (Top 10):")
df_release_years = df_basics.filter(
    F.col("startYear").isNotNull()
).groupBy(
    F.col("startYear")
).agg(
    F.count("*").alias("count")
).orderBy(
    F.col("count").desc()
).limit(10)

df_release_years.show()

# What is the length of the longest "short" after 1900?
longest_short_row = df_basics.filter(
    (col("titleType") == "short") & 
    (col("startYear") >= 1900) & 
    col("runtimeMinutes").isNotNull()
).agg(F.max("runtimeMinutes").alias("longest_runtime")).collect()[0]
longest_short = longest_short_row["longest_runtime"]
print(f"Length of the longest 'short' after 1900 (in minutes): {longest_short}")

# What is the length of the shortest "movie" after 1900?
shortest_movie_row = df_basics.filter(
    (col("titleType") == "movie") & 
    (col("startYear") >= 1900) & 
    col("runtimeMinutes").isNotNull()
).agg(F.min("runtimeMinutes").alias("shortest_runtime")).collect()[0]
shortest_movie = shortest_movie_row["shortest_runtime"]
print(f"Length of the shortest 'movie' after 1900 (in minutes): {shortest_movie}")


# List of all of the genres represented.
print("\nList of all genres represented (Top 20 shown):")
df_basics.withColumn("genre", F.explode(F.split(col("genres"), ","))) \
         .select(col("genre")).distinct().orderBy("genre").show(20, truncate=False)


# --- CELL 3: BATCH ANALYSIS ---


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit
from pyspark.sql.utils import AnalysisException

MIN_VOTES_THRESHOLD = 10000 
print(f"Applying minimum vote threshold: {MIN_VOTES_THRESHOLD:,} votes.")


# 1. Join basics and ratings
df_movie_data = df_basics.join(df_ratings, on="tconst", how="inner")

# 2. Filter for 'movie', 'Comedy', AND minimum votes
df_meaningful_comedy_movies = df_movie_data.filter(
    (col("titleType") == "movie") & 
    (col("genres").contains("Comedy")) &
    (col("numVotes") >= MIN_VOTES_THRESHOLD) 
)

# 3. Order by rating (desc) then votes (desc) and select the top one
highest_rated_comedy = df_meaningful_comedy_movies.orderBy(
    col("averageRating").desc(), 
    col("numVotes").desc()
).select("tconst", "primaryTitle", "averageRating", "numVotes").limit(1)

print("\nHighest Rated Comedy 'Movie' (STATISTICALLY SIGNIFICANT):")
highest_rated_comedy.show(truncate=False)

try:
    best_movie_details = highest_rated_comedy.collect()[0]
    best_movie_tconst = best_movie_details["tconst"]
    best_movie_title = best_movie_details["primaryTitle"]
    
    print(f"--- RESULTS FOR {best_movie_title} (tconst: {best_movie_tconst}) ---")

    # Who was the director of the movie?
    print(f"\nDirector(s) of '{best_movie_title}':")
    df_directors = df_crew.filter(col("tconst") == lit(best_movie_tconst)) \
                          .select(F.explode(F.split(col("directors"), ",")).alias("nconst")) \
                          .join(df_names.select("nconst", "primaryName"), on="nconst", how="inner")
    
    df_directors.select(col("primaryName").alias("Director Name")).distinct().show(truncate=False)
    
    # List, if any, the alternate titles for the movie.
    print(f"\nAlternate titles for '{best_movie_title}':")
    df_alternate_titles = df_akas.filter(col("tconst") == lit(best_movie_tconst)) \
                                 .select("title", "region", "language") \
                                 .orderBy("region", "language")
    
    df_alternate_titles.show(truncate=False)
    
except IndexError:
    print(f"ANALYTICAL ERROR: No comedy movies found with at least {MIN_VOTES_THRESHOLD} votes. Try lowering the threshold.")
except AnalysisException as e:
     print(f" ANALYSIS ERROR (Check Join/Column Names): {e}")

# --- CELL 4: STRUCTURED STREAMING SIMULATION ---


In [0]:

from pyspark.sql.functions import window, rand, array, size, lit, current_timestamp
from pyspark.sql.types import TimestampType

# 1. Define Checkpoint Paths and UC Table Names

UC_VOLUME_NAME_EXPLICIT = "raw_imdb_files" 


CHECKPOINT_ROOT = f"dbfs:/Volumes/workspace/imdb_project/{UC_VOLUME_NAME_EXPLICIT}/"

CHECKPOINT_PATH_METRICS = CHECKPOINT_ROOT + "stream_checkpoints_metrics_final/cp" 
CHECKPOINT_PATH_ALERTS = CHECKPOINT_ROOT + "stream_checkpoints_alerts_final/cp"   


UC_TABLE_METRICS = "workspace.imdb_project.stream_metrics_output"
UC_TABLE_ALERTS = "workspace.imdb_project.stream_alerts_output"

# Define target entities (using results from analysis or placeholders)
try:
    # Retrieve variables from Cell 3 if they exist
    TRACK_TCONST = best_movie_tconst
    TRACK_NCONST = df_directors.select("nconst").limit(1).collect()[0]["nconst"]
except:
    # Placeholders if batch analysis failed or wasn't run
    TRACK_TCONST = "tt0252487" 
    TRACK_NCONST = "nm0250608"

TRACK_ENTITIES = [TRACK_TCONST, TRACK_NCONST, "Genre_Action", "User_Type_Bot"]


# 2. Define the Simulated Stream DataFrame (Source: using 'rate' source)
df_stream_raw = spark.readStream.format("rate") \
    .option("rowsPerSecond", 5) \
    .load() \
    .withColumnRenamed("timestamp", "event_time") \
    .withColumn("tconst", F.array(*[lit(e) for e in TRACK_ENTITIES])[F.rand().cast(IntegerType()) % F.size(F.array(*[lit(e) for e in TRACK_ENTITIES]))]) \
    .withColumn("byte_change", (F.rand() * 5000).cast(IntegerType())) \
    .withColumn("is_bot", (F.rand() > 0.8))

# Apply Watermark for time-based aggregation
df_stream_watermarked = df_stream_raw.withWatermark("event_time", "5 minutes")


# METRIC: Edits per Entity in 2-minute Tumbling Window 
df_metrics = df_stream_watermarked.groupBy(
    F.window(col("event_time"), "2 minutes"), 
    col("tconst")
).agg(
    F.count("*").alias("edit_count"),
    F.sum("byte_change").alias("total_bytes")
)

# ALERT: Trigger for specific user type (non-bot) making a massive change 
ALERT_THRESHOLD_BYTES = 1000
ALERT_TARGET_TCONST = lit(TRACK_TCONST)

df_alerts = df_stream_watermarked.filter(
    (col("tconst") == ALERT_TARGET_TCONST) & 
    (col("byte_change") >= ALERT_THRESHOLD_BYTES) & 
    (col("is_bot") == lit(False))
).select(
    col("event_time"), 
    col("tconst").alias("entity_id"), 
    col("byte_change"), 
    lit("LARGE_NON_BOT_EDIT").alias("alert_type")
)


# 4. Define Output Sinks (Using AvailableNow trigger to write to UC Tables)

print(f"\nStarting Metrics Stream (Complete Mode) to UC Table: {UC_TABLE_METRICS}...")

# Explicit construction of the metrics query
query_metrics_writer = df_metrics.writeStream
query_metrics_writer = query_metrics_writer.format("delta")
query_metrics_writer = query_metrics_writer.outputMode("complete") 
query_metrics_writer = query_metrics_writer.option("checkpointLocation", CHECKPOINT_PATH_METRICS)
query_metrics_writer = query_metrics_writer.queryName("Metrics_Query")
query_metrics_writer = query_metrics_writer.trigger(availableNow=True)
query_metrics = query_metrics_writer.toTable(UC_TABLE_METRICS) 


print(f"Starting Alerts Stream (Append Mode) to UC Table: {UC_TABLE_ALERTS}...")

# Explicit construction of the alerts query
query_alerts_writer = df_alerts.writeStream
query_alerts_writer = query_alerts_writer.format("delta")
query_alerts_writer = query_alerts_writer.outputMode("append")
query_alerts_writer = query_alerts_writer.option("checkpointLocation", CHECKPOINT_PATH_ALERTS)
query_alerts_writer = query_alerts_writer.queryName("Alerts_Query")
query_alerts_writer = query_alerts_writer.trigger(availableNow=True)
query_alerts = query_alerts_writer.toTable(UC_TABLE_ALERTS)


print("\nStreaming queries started. Check Databricks UI for status.")

# --- STREAMING VERIFICATION CELL (READING FROM UC TABLES) ---

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.utils import AnalysisException

# Define the Managed Unity Catalog Table NAMES (Must match those in Cell 4)
UC_TABLE_METRICS = "workspace.imdb_project.stream_metrics_output"
UC_TABLE_ALERTS = "workspace.imdb_project.stream_alerts_output"

print("--- VERIFYING AGGREGATED METRICS (UC TABLE) ---")
try:
    # Read directly from the UC table by name
    df_metrics_result = spark.table(UC_TABLE_METRICS)
    print(f"Total processed metric windows: {df_metrics_result.count():,}")
    
    # Display the top 10 rows, sorted by edit count
    df_metrics_result.orderBy(F.col("edit_count").desc()).show(10, truncate=False)
    
except AnalysisException as e:
    print(f"❌ METRICS READING FAILED: The table '{UC_TABLE_METRICS}' might not exist yet, or permissions are missing.")
    print(f"Error: {e.desc}")
except Exception as e:
    print(f"❌ METRICS READING FAILED. Error: {e}")
    
print("\n--- VERIFYING TRIGGERED ALERTS (UC TABLE) ---")
try:
    # Read directly from the UC table by name
    df_alerts_result = spark.table(UC_TABLE_ALERTS)
    print(f"Total triggered alerts: {df_alerts_result.count():,}")
    
    # Display the alerts
    df_alerts_result.show(10, truncate=False)
    
except AnalysisException as e:
    print(f"❌ ALERTS READING FAILED: The table '{UC_TABLE_ALERTS}' might not exist yet, or permissions are missing.")
    print(f"Error: {e.desc}")
except Exception as e:
    print(f"❌ ALERTS READING FAILED. Error: {e}")