# UFC Spark ETL Pipeline - Google Colab

This notebook runs the Spark ETL pipeline for UFC data processing on Google Colab.

## Setup Steps:
1. Install PySpark
2. Upload your data files or mount Google Drive
3. Run the ETL pipeline


In [None]:
# Step 1: Install PySpark
%pip install pyspark


In [None]:
# Step 2: Mount Google Drive
# Upload your 'data' folder to Google Drive first, then run this cell
from google.colab import drive
drive.mount('/content/drive')

# Set your data directory path - adjust this to match your Google Drive folder structure
DATA_DIR = '/content/drive/MyDrive/UFC_SellThrough_Project/data'
OUTPUT_DIR = '/content/drive/MyDrive/UFC_SellThrough_Project/data'

print(f"Data directory: {DATA_DIR}")
print(f"Output directory: {OUTPUT_DIR}")


In [None]:
# Step 3: Verify data files exist
import os

print("Raw data files:")
raw_path = os.path.join(DATA_DIR, 'raw')
if os.path.exists(raw_path):
    for f in os.listdir(raw_path):
        if f.endswith('.csv'):
            print(f"  - {f}")
else:
    print(f"  Directory not found: {raw_path}")

print("\nExternal data files:")
ext_path = os.path.join(DATA_DIR, 'external')
if os.path.exists(ext_path):
    for f in os.listdir(ext_path):
        if f.endswith('.csv'):
            print(f"  - {f}")
else:
    print(f"  Directory not found: {ext_path}")


In [None]:
# Step 4: Import libraries and setup Spark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DateType, IntegerType, FloatType, BooleanType
import os

def make_spark(app_name="UFC-ETL", local=True):
    builder = SparkSession.builder.appName(app_name)
    
    if local:
        builder = builder.master("local[*]")
    
    # Colab-optimized settings (reduced memory for Colab's limited RAM)
    builder = (builder
        .config("spark.sql.adaptive.enabled", "true")
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
        .config("spark.sql.parquet.compression.codec", "snappy")
        .config("spark.driver.memory", "2g")
        .config("spark.executor.memory", "2g")
    )
    
    return builder.getOrCreate()

# Create Spark session
spark = make_spark()
print(f"Spark version: {spark.version}")


In [None]:
# Step 5: Define data loading functions

def load_data(spark, data_dir):
    print("Loading CSV files...")
    raw_path = os.path.join(data_dir, "raw")
    
    file_map = {
        "events": "events.csv",
        "fight_results": "fight_results.csv",
        "fight_stats": "fight_stats.csv",
        "fight_details": "fight_details.csv",
        "fighter_details": "fighter_details.csv",
        "fighter_tott": "fighter_tott.csv",
    }
    
    data = {}
    for name, fname in file_map.items():
        full_path = os.path.join(raw_path, fname)
        if os.path.exists(full_path):
            df = spark.read.csv(full_path, header=True, inferSchema=True)
            for col in df.columns:
                df = df.withColumnRenamed(col, col.lower())
            print(f"  Loaded {name}: {df.count()} rows")
            data[name] = df
        else:
            print(f"  Warning: {full_path} not found")
    return data

def load_external_data(spark, data_dir):
    print("Loading external data sources...")
    external_path = os.path.join(data_dir, "external")
    external_data = {}
    
    external_files = {
        "betting_odds": "betting_odds.csv",
        "google_trends": "google_trends.csv",
        "fighter_buzz": "fighter_buzz.csv",
        "event_sentiment": "event_sentiment.csv",
        "reddit_comments": "reddit_comments.csv",
    }
    
    for name, fname in external_files.items():
        full_path = os.path.join(external_path, fname)
        if os.path.exists(full_path):
            df = spark.read.csv(full_path, header=True, inferSchema=True)
            for col in df.columns:
                df = df.withColumnRenamed(col, col.lower())
            print(f"  Loaded {name}: {df.count()} rows")
            external_data[name] = df
        else:
            print(f"  Note: {fname} not found")
    return external_data

def load_graph_features(spark, data_dir):
    print("Loading graph features...")
    graph_path = os.path.join(data_dir, "features", "graph_features")
    
    if os.path.exists(graph_path):
        df = spark.read.parquet(graph_path)
        print(f"  Loaded graph features: {df.count()} fighters")
        return df
    else:
        print("  Note: Graph features not found")
        return None

print("Data loading functions defined.")


In [None]:
# Step 6: Define data cleaning functions

def clean_events(df):
    print("Cleaning events...")
    df = df.withColumn("event_date", F.to_date(F.col("date"), "MMMM d, yyyy"))
    df = df.withColumn(
        "event_type",
        F.when(F.col("event").rlike("^UFC [0-9]+"), "PPV")
         .when(F.col("event").contains("Fight Night"), "Fight Night")
         .when(F.col("event").contains("TUF"), "TUF")
         .when(F.col("event").contains("Contender"), "DWCS")
         .otherwise("Other")
    )
    df = df.withColumn("location_parts", F.split(F.col("location"), ", "))
    df = df.withColumn("city", F.element_at(F.col("location_parts"), 1))
    df = df.withColumn("state_country", F.element_at(F.col("location_parts"), 2))
    df = df.withColumn(
        "country",
        F.when(F.col("state_country").isin(["USA", "United States"]), "USA")
         .when(F.col("state_country").isin(["UK", "England", "Scotland"]), "UK")
         .when(F.col("state_country").isin(["Canada"]), "Canada")
         .otherwise(F.col("state_country"))
    )
    df = df.withColumn("event_id", F.regexp_extract(F.col("url"), r"event/([^/]+)", 1))
    df = df.withColumn("year", F.year(F.col("event_date")))
    df = df.select(
        F.col("event_id"), F.col("event").alias("event_name"),
        F.col("event_date"), F.col("event_type"), F.col("city"),
        F.col("country"), F.col("location"), F.col("year")
    )
    return df.dropDuplicates(["event_id"])

def clean_fighters(df):
    print("Cleaning fighters...")
    df = df.withColumn("fighter_id", F.regexp_extract(F.col("url"), r"fighter/([^/]+)", 1))
    df = df.withColumn(
        "height_inches",
        F.regexp_extract(F.col("height"), r"(\d+)'", 1).cast(IntegerType()) * 12 +
        F.regexp_extract(F.col("height"), r'(\d+)"', 1).cast(IntegerType())
    )
    df = df.withColumn("reach_inches", F.regexp_extract(F.col("reach"), r"(\d+)", 1).cast(IntegerType()))
    df = df.withColumn("weight_lbs", F.regexp_extract(F.col("weight"), r"(\d+)", 1).cast(IntegerType()))
    df = df.withColumn("dob", F.to_date(F.col("dob"), "MMM dd, yyyy"))
    
    if "record" in [c.lower() for c in df.columns]:
        df = df.withColumn("wins", F.regexp_extract(F.col("record"), r"^(\d+)", 1).cast(IntegerType()))
        df = df.withColumn("losses", F.regexp_extract(F.col("record"), r"^(\d+)-(\d+)", 2).cast(IntegerType()))
        df = df.withColumn("draws", F.regexp_extract(F.col("record"), r"^(\d+)-(\d+)-(\d+)", 3).cast(IntegerType()))
    else:
        df = df.withColumn("wins", F.lit(None).cast(IntegerType()))
        df = df.withColumn("losses", F.lit(None).cast(IntegerType()))
        df = df.withColumn("draws", F.lit(None).cast(IntegerType()))
    
    if "fighter" in df.columns:
        df = df.withColumn("fighter_name", F.col("fighter"))
    elif "first" in df.columns and "last" in df.columns:
        df = df.withColumn("fighter_name", F.concat_ws(" ", F.col("first"), F.col("last")))
    else:
        df = df.withColumn("fighter_name", F.lit("Unknown"))
    
    df = df.withColumn("nickname", F.col("nickname") if "nickname" in df.columns else F.lit(None).cast("string"))
    df = df.withColumn("stance", F.col("stance") if "stance" in df.columns else F.lit(None).cast("string"))
    
    df = df.select(
        "fighter_id", "fighter_name", "nickname", "height_inches",
        "weight_lbs", "reach_inches", "stance", "dob", "wins", "losses", "draws"
    )
    return df.dropDuplicates(["fighter_id"])

print("Event and fighter cleaning functions defined.")


In [None]:
# Step 7: Define fight cleaning functions

def clean_fights(df):
    print("Cleaning fights...")
    df = df.withColumn("fight_id", F.regexp_extract(F.col("url"), r"fight/([^/]+)", 1))
    
    if "event_url" in df.columns:
        df = df.withColumn("event_id", F.regexp_extract(F.col("event_url"), r"event/([^/]+)", 1))
    else:
        df = df.withColumn("event_id", F.lit(None).cast("string"))
    
    wc_col = "weightclass" if "weightclass" in df.columns else "weight_class"
    if wc_col in df.columns:
        df = df.withColumn("is_title_fight", F.col(wc_col).contains("Title").cast(BooleanType()))
        df = df.withColumn("weight_class_clean", F.regexp_replace(F.col(wc_col), " Title Bout", ""))
    else:
        df = df.withColumn("is_title_fight", F.lit(False))
        df = df.withColumn("weight_class_clean", F.lit("Unknown"))
    
    if "method" in df.columns:
        df = df.withColumn(
            "method_category",
            F.when(F.col("method").contains("KO"), "KO/TKO")
             .when(F.col("method").contains("SUB"), "Submission")
             .when(F.col("method").contains("DEC"), "Decision")
             .otherwise("Other")
        )
    else:
        df = df.withColumn("method", F.lit(None).cast("string"))
        df = df.withColumn("method_category", F.lit("Unknown"))
    
    if "round" in df.columns:
        df = df.withColumn("round_num", F.regexp_extract(F.col("round"), r"(\d+)", 1).cast(IntegerType()))
    else:
        df = df.withColumn("round_num", F.lit(None).cast(IntegerType()))
    
    if "bout" in df.columns:
        df = df.withColumn("fighter1_name", F.trim(F.regexp_extract(F.col("bout"), r"^(.+?)\s+vs\.?\s+", 1)))
        df = df.withColumn("fighter2_name", F.trim(F.regexp_extract(F.col("bout"), r"\s+vs\.?\s+(.+)$", 1)))
    elif "fighter1" in df.columns and "fighter2" in df.columns:
        df = df.withColumn("fighter1_name", F.col("fighter1"))
        df = df.withColumn("fighter2_name", F.col("fighter2"))
    else:
        df = df.withColumn("fighter1_name", F.lit("Unknown"))
        df = df.withColumn("fighter2_name", F.lit("Unknown"))
    
    if "winner" in df.columns:
        df = df.withColumn("winner_name", F.col("winner"))
    elif "outcome" in df.columns:
        df = df.withColumn(
            "winner_name",
            F.when(F.col("outcome").startswith("W"), F.col("fighter1_name"))
             .when(F.col("outcome").startswith("L"), F.col("fighter2_name"))
             .otherwise(F.lit(None).cast("string"))
        )
    else:
        df = df.withColumn("winner_name", F.lit(None).cast("string"))
    
    df = df.withColumn("finish_time", F.col("time") if "time" in df.columns else F.lit(None).cast("string"))
    
    df = df.select(
        "fight_id", "event_id", "fighter1_name", "fighter2_name", "winner_name",
        F.col("weight_class_clean").alias("weight_class"), "is_title_fight",
        "method_category", "method", F.col("round_num").alias("round"), "finish_time"
    )
    return df.dropDuplicates(["fight_id"])

def clean_fight_stats(df):
    print("Cleaning fight stats...")
    cols = [c.lower() for c in df.columns]
    
    df = df.withColumn("fighter_name", F.col("fighter") if "fighter" in cols else F.lit("Unknown"))
    
    if "event" in cols and "bout" in cols:
        df = df.withColumn("fight_key", F.concat_ws("_", F.col("event"), F.col("bout")))
    else:
        df = df.withColumn("fight_key", F.lit(None).cast("string"))
    
    sig_str_col = None
    for c in df.columns:
        if "sig" in c.lower() and "str" in c.lower():
            sig_str_col = c
            break
    
    if sig_str_col:
        df = df.withColumn("sig_strikes_landed", F.regexp_extract(F.col(sig_str_col), r"(\d+) of", 1).cast(IntegerType()))
        df = df.withColumn("sig_strikes_attempted", F.regexp_extract(F.col(sig_str_col), r"of (\d+)", 1).cast(IntegerType()))
    else:
        df = df.withColumn("sig_strikes_landed", F.lit(None).cast(IntegerType()))
        df = df.withColumn("sig_strikes_attempted", F.lit(None).cast(IntegerType()))
    
    if "td" in cols:
        df = df.withColumn("takedowns_landed", F.regexp_extract(F.col("td"), r"(\d+) of", 1).cast(IntegerType()))
        df = df.withColumn("takedowns_attempted", F.regexp_extract(F.col("td"), r"of (\d+)", 1).cast(IntegerType()))
    else:
        df = df.withColumn("takedowns_landed", F.lit(None).cast(IntegerType()))
        df = df.withColumn("takedowns_attempted", F.lit(None).cast(IntegerType()))
    
    df = df.withColumn("knockdowns", F.col("kd").cast(IntegerType()) if "kd" in cols else F.lit(None).cast(IntegerType()))
    
    return df

print("Fight cleaning functions defined.")


In [None]:
# Step 8: Define feature processing functions

def build_event_features(events_df, fights_df):
    print("Building event features...")
    fight_agg = fights_df.groupBy("event_id").agg(
        F.count("*").alias("num_fights"),
        F.sum(F.col("is_title_fight").cast(IntegerType())).alias("num_title_fights"),
        F.first(F.when(F.col("is_title_fight"), F.col("weight_class"))).alias("title_weight_class")
    )
    df = events_df.join(fight_agg, on="event_id", how="left")
    df = df.withColumn("day_of_week", F.dayofweek("event_date"))
    df = df.withColumn("month", F.month("event_date"))
    df = df.withColumn("is_saturday", (F.col("day_of_week") == 7).cast(IntegerType()))
    df = df.withColumn("is_las_vegas", F.col("city").contains("Las Vegas").cast(IntegerType()))
    df = df.withColumn("is_usa", (F.col("country") == "USA").cast(IntegerType()))
    return df

def process_betting_odds(betting_df, fights_df):
    if betting_df is None:
        return None
    print("Processing betting odds...")
    betting_df = betting_df.withColumn("f1_name_norm", F.lower(F.regexp_replace(F.col("fighter1_name"), r"[^a-z0-9]", "")))
    betting_df = betting_df.withColumn("f2_name_norm", F.lower(F.regexp_replace(F.col("fighter2_name"), r"[^a-z0-9]", "")))
    betting_features = betting_df.select(
        "f1_name_norm", "f2_name_norm",
        F.col("fighter1_odds").alias("f1_closing_odds"),
        F.col("fighter2_odds").alias("f2_closing_odds"),
        F.col("fighter1_implied_prob").alias("f1_implied_prob"),
        F.col("fighter2_implied_prob").alias("f2_implied_prob"),
        F.col("odds_spread").alias("betting_spread"),
        F.when(F.col("is_competitive_matchup") == 1, True).otherwise(False).alias("is_competitive"),
        F.when(F.col("has_heavy_favorite") == 1, True).otherwise(False).alias("has_heavy_favorite")
    )
    print(f"  Processed {betting_features.count()} betting records")
    return betting_features

def process_sentiment(sentiment_df, events_df):
    if sentiment_df is None:
        return None
    print("Processing event sentiment...")
    sentiment_df = sentiment_df.withColumn("event_name_norm", F.lower(F.regexp_replace(F.col("event_name"), r"[^a-z0-9]", "")))
    sentiment_features = sentiment_df.select(
        "event_name_norm",
        F.col("avg_sentiment").alias("reddit_sentiment"),
        F.col("sentiment_std").alias("sentiment_variance"),
        F.col("total_engagement").alias("reddit_engagement"),
        F.col("comment_count").alias("reddit_comments"),
        F.col("hype_score").alias("reddit_hype")
    )
    print(f"  Processed {sentiment_features.count()} sentiment records")
    return sentiment_features

def process_trends(trends_df, buzz_df):
    if buzz_df is None and trends_df is None:
        return None
    print("Processing Google Trends...")
    if buzz_df is not None:
        buzz_df = buzz_df.withColumn("f1_name_norm", F.lower(F.regexp_replace(F.col("fighter1_name"), r"[^a-z0-9]", "")))
        buzz_df = buzz_df.withColumn("f2_name_norm", F.lower(F.regexp_replace(F.col("fighter2_name"), r"[^a-z0-9]", "")))
        trends_features = buzz_df.select(
            "f1_name_norm", "f2_name_norm",
            F.col("combined_buzz_7d").alias("pre_event_buzz_7d"),
            F.col("combined_buzz_30d").alias("pre_event_buzz_30d"),
            F.col("max_peak_search").alias("peak_search_interest"),
            F.col("buzz_differential").alias("buzz_differential")
        )
        print(f"  Processed {trends_features.count()} buzz records")
        return trends_features
    if trends_df is not None:
        trends_agg = trends_df.groupBy("fighter_name").agg(
            F.avg("search_interest").alias("avg_search_interest"),
            F.max("search_interest").alias("max_search_interest"),
            F.stddev("search_interest").alias("search_volatility")
        )
        trends_agg = trends_agg.withColumn("fighter_name_norm", F.lower(F.regexp_replace(F.col("fighter_name"), r"[^a-z0-9]", "")))
        print(f"  Processed {trends_agg.count()} trend records")
        return trends_agg
    return None

print("Feature processing functions defined.")


In [None]:
# Step 9: Define enrichment and save functions

def enrich_fights_with_external(fights_df, betting_df, trends_df, graph_df):
    print("Enriching fights with external data...")
    df = fights_df
    df = df.withColumn("f1_name_norm", F.lower(F.regexp_replace(F.col("fighter1_name"), r"[^a-z0-9]", "")))
    df = df.withColumn("f2_name_norm", F.lower(F.regexp_replace(F.col("fighter2_name"), r"[^a-z0-9]", "")))
    
    if betting_df is not None:
        df = df.join(betting_df, on=["f1_name_norm", "f2_name_norm"], how="left")
        print("  Added betting features")
    
    if trends_df is not None and "f1_name_norm" in trends_df.columns:
        df = df.join(trends_df, on=["f1_name_norm", "f2_name_norm"], how="left")
        print("  Added trends features")
    
    if graph_df is not None:
        graph_df = graph_df.withColumn("name_norm", F.lower(F.regexp_replace(F.col("name"), r"[^a-z0-9]", "")))
        graph_f1 = graph_df.select(
            F.col("name_norm").alias("f1_name_norm"),
            F.col("pagerank_score").alias("f1_pagerank"),
            F.col("num_opponents").alias("f1_network_size"),
            F.col("community_id").alias("f1_community")
        )
        df = df.join(graph_f1, on="f1_name_norm", how="left")
        graph_f2 = graph_df.select(
            F.col("name_norm").alias("f2_name_norm"),
            F.col("pagerank_score").alias("f2_pagerank"),
            F.col("num_opponents").alias("f2_network_size"),
            F.col("community_id").alias("f2_community")
        )
        df = df.join(graph_f2, on="f2_name_norm", how="left")
        df = df.withColumn("combined_pagerank", F.coalesce(F.col("f1_pagerank"), F.lit(0.0)) + F.coalesce(F.col("f2_pagerank"), F.lit(0.0)))
        df = df.withColumn("combined_network_size", F.coalesce(F.col("f1_network_size"), F.lit(0)) + F.coalesce(F.col("f2_network_size"), F.lit(0)))
        df = df.withColumn("same_community", (F.col("f1_community") == F.col("f2_community")).cast(IntegerType()))
        print("  Added graph features")
    
    df = df.fillna({
        "betting_spread": 0.1, "is_competitive": True, "has_heavy_favorite": False,
        "pre_event_buzz_7d": 50.0, "reddit_sentiment": 0.0, "combined_pagerank": 0.5, "combined_network_size": 10
    })
    print(f"  Enriched {df.count()} fights")
    return df

def enrich_events_with_sentiment(events_df, sentiment_df):
    if sentiment_df is None:
        return events_df
    print("Enriching events with sentiment...")
    events_df = events_df.withColumn("event_name_norm", F.lower(F.regexp_replace(F.col("event_name"), r"[^a-z0-9]", "")))
    df = events_df.join(sentiment_df, on="event_name_norm", how="left")
    df = df.fillna({"reddit_sentiment": 0.0, "reddit_engagement": 100, "reddit_hype": 0.5})
    print(f"  Enriched {df.count()} events")
    return df

def save_parquet(df, output_path, partition_cols=None):
    print(f"Saving to {output_path}...")
    writer = df.write.mode("overwrite")
    if partition_cols:
        writer = writer.partitionBy(*partition_cols)
    writer.parquet(output_path)
    print(f"  Saved {df.count()} rows")

print("Enrichment and save functions defined.")


In [None]:
# Step 10: Load all data
raw_dfs = load_data(spark, DATA_DIR)
print(f"\nLoaded datasets: {list(raw_dfs.keys())}")

external_dfs = load_external_data(spark, DATA_DIR)
print(f"\nLoaded external datasets: {list(external_dfs.keys())}")

graph_features = load_graph_features(spark, DATA_DIR)


In [None]:
# Step 11: Clean the data

# Clean events
events_clean = None
if "events" in raw_dfs:
    events_clean = clean_events(raw_dfs["events"])
    print(f"Events cleaned: {events_clean.count()} rows")
    events_clean.show(5)
else:
    print("ERROR: No events data!")

# Clean fighters
fighters_clean = None
if "fighter_details" in raw_dfs:
    fighters_clean = clean_fighters(raw_dfs["fighter_details"])
elif "fighters" in raw_dfs:
    fighters_clean = clean_fighters(raw_dfs["fighters"])

if fighters_clean:
    print(f"Fighters cleaned: {fighters_clean.count()} rows")
    fighters_clean.show(5)
else:
    print("Warning: No fighter data found")

# Clean fights
fights_clean = None
if "fight_details" in raw_dfs:
    fights_clean = clean_fights(raw_dfs["fight_details"])
elif "fight_results" in raw_dfs:
    fights_clean = clean_fights(raw_dfs["fight_results"])

if fights_clean:
    print(f"Fights cleaned: {fights_clean.count()} rows")
    fights_clean.show(5)
else:
    print("Warning: No fight data found")

# Clean fight stats
fight_stats_clean = None
if "fight_stats" in raw_dfs:
    fight_stats_clean = clean_fight_stats(raw_dfs["fight_stats"])
    print(f"Fight stats cleaned: {fight_stats_clean.count()} rows")
else:
    print("Warning: No fight stats data found")


In [None]:
# Step 12: Process external data

betting_features = None
if "betting_odds" in external_dfs:
    betting_features = process_betting_odds(external_dfs["betting_odds"], fights_clean)

sentiment_features = None
if "event_sentiment" in external_dfs:
    sentiment_features = process_sentiment(external_dfs["event_sentiment"], events_clean)

trends_features = None
if "fighter_buzz" in external_dfs:
    trends_features = process_trends(
        external_dfs.get("google_trends"),
        external_dfs["fighter_buzz"]
    )
elif "google_trends" in external_dfs:
    trends_features = process_trends(external_dfs["google_trends"], None)

print("\nExternal data processing complete.")


In [None]:
# Step 13: Enrich data and build features

# Enrich fights with external data
fights_enriched = fights_clean
if fights_clean is not None:
    fights_enriched = enrich_fights_with_external(
        fights_clean,
        betting_features,
        trends_features,
        graph_features
    )

# Enrich events with sentiment
events_enriched = events_clean
if sentiment_features is not None:
    events_enriched = enrich_events_with_sentiment(events_clean, sentiment_features)

# Build event features
if fights_enriched:
    event_features = build_event_features(events_enriched, fights_enriched)
else:
    event_features = events_enriched

print("\nEnrichment complete.")


In [None]:
# Step 14: Save to Parquet

processed_dir = os.path.join(OUTPUT_DIR, "processed")
os.makedirs(processed_dir, exist_ok=True)

save_parquet(
    events_enriched,
    os.path.join(processed_dir, "events"),
    partition_cols=["year"]
)

if fighters_clean:
    save_parquet(
        fighters_clean,
        os.path.join(processed_dir, "fighters")
    )

if fights_enriched:
    save_parquet(
        fights_enriched,
        os.path.join(processed_dir, "fights")
    )

if fight_stats_clean:
    save_parquet(
        fight_stats_clean,
        os.path.join(processed_dir, "fight_stats")
    )

save_parquet(
    event_features,
    os.path.join(processed_dir, "event_features"),
    partition_cols=["year"]
)

print(f"\nAll data saved to: {processed_dir}")


In [None]:
# Step 15: Summary and cleanup

print("=" * 50)
print("ETL PIPELINE COMPLETE")
print("=" * 50)
print(f"  Betting data: {'Yes' if betting_features is not None else 'No'}")
print(f"  Trends data: {'Yes' if trends_features is not None else 'No'}")
print(f"  Sentiment data: {'Yes' if sentiment_features is not None else 'No'}")
print(f"  Graph data: {'Yes' if graph_features is not None else 'No'}")
print(f"\nOutput saved to: {processed_dir}")

# Stop Spark session
spark.stop()
print("\nSpark session stopped.")


In [None]:
# Optional: Download processed files to local machine
# Uncomment the lines below if you want to download the results

# import shutil
# from google.colab import files
# 
# # Zip the processed directory
# shutil.make_archive('/content/processed_data', 'zip', processed_dir)
# 
# # Download the zip file
# files.download('/content/processed_data.zip')
