## game.csv
Reads NHL game data from a CSV file and performs data cleaning and validation. It standardizes column types, removes duplicates, checks for missing or invalid records (like incorrect game_id formats or negative scores). Finally, it saves the cleaned DataFrame to the Silver layer in the Lakehouse.


In [None]:
from pyspark.sql.functions import col, count, when, lit, to_timestamp, length, substring, trim, upper
from pyspark.sql.types import IntegerType, StringType

game_df = spark.read.format("csv").option("header","true").load("Files/Bronze(raw datasets)/game.csv")
# df now is a Spark DataFrame containing CSV data from "Files/Bronze(raw datasets)/game.csv".
display(game_df)

# Step 1: Check initial schema and sample data
print("Initial Schema:")
game_df.printSchema()
game_df.show(5, truncate=False)

# Step 2: Check for data consistency (data types)
# Cast columns to appropriate types
game_df = game_df.withColumn("game_id", col("game_id").cast(StringType())) \
       .withColumn("season", col("season").cast(StringType())) \
       .withColumn("type", upper(trim(col("type"))).cast(StringType())) \
       .withColumn("date_time_GMT", to_timestamp(col("date_time_GMT"), "yyyy-MM-dd'T'HH:mm:ss'Z'")) \
       .withColumn("away_team_id", col("away_team_id").cast(StringType())) \
       .withColumn("home_team_id", col("home_team_id").cast(StringType())) \
       .withColumn("away_goals", col("away_goals").cast(IntegerType())) \
       .withColumn("home_goals", col("home_goals").cast(IntegerType())) \
       .withColumn("outcome", col("outcome").cast(StringType())) \
       .withColumn("home_rink_side_start", col("home_rink_side_start").cast(StringType())) \
       .withColumn("venue", col("venue").cast(StringType())) \
       .withColumn("venue_link", col("venue_link").cast(StringType())) \
       .withColumn("venue_time_zone_id", col("venue_time_zone_id").cast(StringType())) \
       .withColumn("venue_time_zone_offset", col("venue_time_zone_offset").cast(IntegerType())) \
       .withColumn("venue_time_zone_tz", col("venue_time_zone_tz").cast(StringType()))

print("Schema after type casting:")
game_df.printSchema()

# Step 3: Check for duplicates
# 3.1: Check for exact duplicates (all columns identical)
exact_duplicates = game_df.groupBy(game_df.columns).count().filter(col("count") > 1)
exact_duplicate_count = exact_duplicates.count()
print(f"Number of exact duplicate groups: {exact_duplicate_count}")

# Show some exact duplicates for inspection
print("Sample of exact duplicate groups:")
exact_duplicates.show(5, truncate=False)

# Decision: Drop exact duplicates (exact replicas likely errors)
if exact_duplicate_count > 0:
    print("Dropping exact duplicate rows...")
    game_df = game_df.dropDuplicates()
    print(f"Rows after dropping exact duplicates: {game_df.count()}")

# 3.2: Check for game_id duplicates (non-exact duplicates)
# Each game_id should be unique in game.csv
game_id_duplicates = game_df.groupBy("game_id").count().filter(col("count") > 1)
game_id_duplicate_count = game_id_duplicates.count()
print(f"Number of game_id duplicate groups: {game_id_duplicate_count}")

# Show game_id duplicates for inspection
if game_id_duplicate_count > 0:
    print("Sample of game_id duplicate groups:")
    game_id_duplicates.show(5, truncate=False)

    # Decision: Drop game_id duplicates, keeping first occurrence
    print("Dropping duplicate game_ids, keeping first occurrence...")
    game_df = game_df.dropDuplicates(["game_id"])
    print(f"Rows after dropping game_id duplicates: {game_df.count()}")

# Step 4: Check for missing values
missing_summary = game_df.select([count(when(col(c).isNull(), c)).alias(c) for c in game_df.columns])
missing_summary.show()

# Decision: leave some nulls (like home_rink_side_start) as-is for Silver

# Step 5: Check for invalid records
# Validate game_id: 10 digits (4-digit season 2000-2019, 2-digit type, 4-digit numeric game number)
# Cross-check game_id type with type column (A=04, R=02, P=03)
# away_goals and home_goals: ensure >= 0
stats = game_df.describe(["away_goals", "home_goals"])
stats.show()

# Step 5: Check for invalid records
# Validate game_id: 10 digits (4-digit season 2000-2019, 2-digit type, 4-digit numeric game number)
# Cross-check game_id type with type column (A=04, R=02, P=03), allow '01' preseason
invalid_game_id = game_df.filter(
    (length(col("game_id")) != 10) | 
    (substring(col("game_id"), 1, 4).cast(IntegerType()) < 2000) | 
    (substring(col("game_id"), 1, 4).cast(IntegerType()) > 2019) | 
    (~col("game_id").rlike(r'^\d{10}$')) |
    (~substring(col("game_id"), 5, 2).isin(["01", "02", "03", "04"]))
)
invalid_game_id_count = invalid_game_id.count()
print(f"Invalid game_id (length, year, or format): {invalid_game_id_count}")

if invalid_game_id_count > 0:
    print("Sample invalid game_id rows (format issues):")
    invalid_game_id.show(5, truncate=False)

# Instead of dropping type mismatches, flag them
game_df = game_df.withColumn(
    "type_mismatch_flag",
    when(
        ((col("type") == "A") & (substring(col("game_id"), 5, 2) != "04")) |
        ((col("type") == "R") & (substring(col("game_id"), 5, 2) != "02")) |
        ((col("type") == "P") & (substring(col("game_id"), 5, 2) != "03")),
        lit(1)
    ).otherwise(lit(0))
)

mismatch_count = game_df.filter(col("type_mismatch_flag") == 1).count()
print(f"Number of type mismatches flagged: {mismatch_count}")
game_df.filter(col("type_mismatch_flag") == 1).show(5, truncate=False)

# Check for invalid goals (negative values)
invalid_goals = game_df.filter((col("away_goals") < 0) | (col("home_goals") < 0))
invalid_goals_count = invalid_goals.count()
print(f"Invalid goals (negative): {invalid_goals_count}")
if invalid_goals_count > 0:
    print("Dropping rows with negative goals...")
    game_df = game_df.filter((col("away_goals") >= 0) & (col("home_goals") >= 0))
    print(f"Rows after dropping invalid goals: {game_df.count()}")

# Step 6: Final cleaned DataFrame
print("Final row count after cleaning: " + str(game_df.count()))
game_df.show(5, truncate=False)

# Step 7: Save cleaned DataFrame to Lakehouse (Silver layer)
(
    game_df.write
    .mode("overwrite")  
    .saveAsTable("silver_game")
)

## game_plays_players.csv
Loads player game data from a CSV file and performs comprehensive data cleaning. It standardizes column types, removes duplicates, and filters out invalid game_id, player_id, and play_id values based on strict format and consistency rules. Finally, the cleaned dataset is saved to the Silver layer in the Lakehouse.

In [None]:
from pyspark.sql.functions import col, count, when, lit, length, substring, regexp_extract
from pyspark.sql.types import IntegerType, StringType

players_df = spark.read.format("csv").option("header","true").load("Files/Bronze(raw datasets)/game_plays_players.csv")
# df now is a Spark DataFrame containing CSV data from "Files/Bronze(raw datasets)/game_plays_players.csv".
display(players_df)

# Step 1: Show initial schema and sample data
print("Initial Schema:")
players_df.printSchema()
players_df.show(5, truncate=False)

# Step 2: Check for data consistency (data types)
# Cast columns to appropriate types based on the profiling report
players_df = players_df.withColumn("play_id", col("play_id").cast(StringType())) \
                       .withColumn("game_id", col("game_id").cast(StringType())) \
                       .withColumn("player_id", col("player_id").cast(StringType())) \
                       .withColumn("playerType", col("playerType").cast(StringType()))

print("Schema after type casting:")
players_df.printSchema()

# Step 3: Check for duplicates
exact_duplicates = players_df.groupBy(players_df.columns).count().filter(col("count") > 1)
exact_duplicate_count = exact_duplicates.count()
print(f"Number of exact duplicate groups: {exact_duplicate_count}")

# Show some exact duplicates for inspection
print("Sample of exact duplicate groups:")
exact_duplicates.show(5, truncate=False)

# Show full rows for exact duplicates
if exact_duplicate_count > 0:
    exact_duplicate_keys = exact_duplicates.drop("count")
    exact_duplicate_rows = players_df.join(exact_duplicate_keys, players_df.columns, "inner")
    print("Full rows of exact duplicates:")
    exact_duplicate_rows.show(10, truncate=False)

# Decision: Drop exact duplicates (exact replicas likely errors)
if exact_duplicate_count > 0:
    print("Dropping exact duplicate rows...")
    players_df = players_df.dropDuplicates()
    print(f"Rows after dropping exact duplicates: {players_df.count()}")

# Step 4: Check for missing values
# Report shows no missing values (0.0%)
missing_summary = players_df.select([count(when(col(c).isNull(), c)).alias(c) for c in players_df.columns])
missing_summary.show()

# Step 5: Check for invalid records
# player_id: ensure >= 1
stats = players_df.describe(["player_id"])
stats.show()

# Validate game_id format: 10 digits (4-digit season 2000–2019, 2-digit type, 4-digit game number)
invalid_game_id = players_df.filter(
    (length(col("game_id")) != 10) | 
    (substring(col("game_id"), 1, 4).cast(IntegerType()) < 2000) | 
    (substring(col("game_id"), 1, 4).cast(IntegerType()) > 2019) | 
    (~col("game_id").rlike(r'^\d{10}$')) |
    (~substring(col("game_id"), 5, 2).isin(["01", "02", "03", "04"]))
)
invalid_game_id_count = invalid_game_id.count()
print(f"Invalid game_id (format, season, or type): {invalid_game_id_count}")
if invalid_game_id_count > 0:
    print("Invalid game_id rows:")
    invalid_game_id.show(5, truncate=False)
    print("Dropping rows with invalid game_id...")
    players_df = players_df.filter(
        (length(col("game_id")) == 10) & 
        (substring(col("game_id"), 1, 4).cast(IntegerType()) >= 2000) & 
        (substring(col("game_id"), 1, 4).cast(IntegerType()) <= 2019) & 
        (col("game_id").rlike(r'^\d{10}$')) &
        (substring(col("game_id"), 5, 2).isin(["01", "02", "03", "04"]))
    )
    print(f"Rows after dropping invalid game_id: {players_df.count()}")

# Validate player_id: exactly 7 digits (string of 7 numeric chars)
invalid_player_id = players_df.filter(~col("player_id").rlike(r'^\d{7}$'))
invalid_player_id_count = invalid_player_id.count()
print(f"Invalid player_id count (not 7 digits): {invalid_player_id_count}")
if invalid_player_id_count > 0:
    print("Invalid player_id rows:")
    invalid_player_id.show(5, truncate=False)
    print("Dropping rows with invalid player_id...")
    players_df = players_df.filter(col("player_id").rlike(r'^\d{7}$'))
    print(f"Rows after dropping invalid player_id: {players_df.count()}")

#Validate play_id: game id + '_' + numeric number
play_id_invalid = players_df.filter(
    (~col("play_id").rlike(r'^\d{10}_\d+$')) |                     # Must be 10 digits + underscore + digits
    (regexp_extract(col("play_id"), r'^(\d{10})_', 1) != col("game_id"))  # The first 10 digits before underscore must match game_id
)

invalid_play_id_count = play_id_invalid.count()
print(f"Invalid play_id count (format not game_id + _number): {invalid_play_id_count}")

if invalid_play_id_count > 0:
    print("Invalid play_id rows:")
    play_id_invalid.show(5, truncate=False)
    print("Dropping rows with invalid play_id...")
    players_df = players_df.filter(
        (col("play_id").rlike(r'^\d{10}_\d+$')) &
        (regexp_extract(col("play_id"), r'^(\d{10})_', 1) == col("game_id"))
    )
    print(f"Rows after dropping invalid play_id: {players_df.count()}")

# Step 6: Final cleaned DataFrame
print("Final row count after cleaning: " + str(players_df.count()))
players_df.show(5, truncate=False)

# Step 7: Save cleaned DataFrame to Lakehouse (Silver layer)
(
    players_df.write
    .mode("overwrite")  
    .saveAsTable("silver_game_plays_players")
)

## game_scratches.csv
Loads player scratch data from a CSV file and performs data cleaning and validation. It standardizes column types, removes duplicates, checks for missing values, and filters out invalid game_id, player_id, and team_id entries. Finally, the cleaned dataset is saved to the Silver layer in the Lakehouse.

In [None]:
from pyspark.sql.functions import col, count, when, lit, length, substring, regexp_extract
from pyspark.sql.types import IntegerType, StringType

scratches_df = spark.read.format("csv").option("header","true").load("Files/Bronze(raw datasets)/game_scratches.csv")
# df now is a Spark DataFrame containing CSV data from "Files/Bronze(raw datasets)/game_scratches.csv".
display(scratches_df)

# Step 1: Check initial schema and sample data
print("Initial Schema:")
scratches_df.printSchema()
scratches_df.show(5, truncate=False)

# Step 2: Check for data consistency (data types)
# game_id as StringType (large identifier), team_id and player_id as IntegerType
scratches_df = scratches_df.withColumn("game_id", col("game_id").cast(StringType())) \
                           .withColumn("team_id", col("team_id").cast(StringType())) \
                           .withColumn("player_id", col("player_id").cast(StringType()))

print("Schema after type casting:")
scratches_df.printSchema()

# Step 3: Check for duplicates
exact_duplicates = scratches_df.groupBy(scratches_df.columns).count().filter(col("count") > 1)
exact_duplicate_count = exact_duplicates.count()
print(f"Number of exact duplicate groups: {exact_duplicate_count}")

# Show some exact duplicates for inspection
print("Sample of exact duplicate groups:")
exact_duplicates.show(5, truncate=False)

# Decision: Drop exact duplicates (exact replicas likely errors)
if exact_duplicate_count > 0:
    print("Dropping exact duplicate rows...")
    scratches_df = scratches_df.dropDuplicates()
    print(f"Rows after dropping exact duplicates: {scratches_df.count()}")

# Step 4: Check for missing values
missing_summary = scratches_df.select([count(when(col(c).isNull(), c)).alias(c) for c in scratches_df.columns])
missing_summary.show()

# Step 5: Check for invalid records
stats = scratches_df.describe(["team_id", "player_id"])
stats.show()

# Validate game_id format: 10 digits (4-digit season, 2-digit type, 4-digit game number)
invalid_game_id = scratches_df.filter(
    (length(col("game_id")) != 10) | 
    (substring(col("game_id"), 1, 4).cast(IntegerType()) < 2000) | 
    (substring(col("game_id"), 1, 4).cast(IntegerType()) > 2019) | 
    (~col("game_id").rlike(r'^\d{10}$'))
)

invalid_game_id_count = invalid_game_id.count()
print(f"Invalid game_id (not 10 digits or invalid season): {invalid_game_id_count}")
if invalid_game_id_count > 0:
    print("Invalid game_id rows:")
    invalid_game_id.show(5, truncate=False)
    print("Dropping rows with invalid game_id...")
    scratches_df = scratches_df.filter(
        (length(col("game_id")) == 10) & 
        (substring(col("game_id"), 1, 4).cast(IntegerType()) >= 2000) & 
        (substring(col("game_id"), 1, 4).cast(IntegerType()) <= 2019) & 
        (col("game_id").cast(StringType()).regexp_match(r'^\d{10}$'))
    )
    print(f"Rows after dropping invalid game_id: {scratches_df.count()}")

# Validate player_id format: must be exactly 7 digits
invalid_player_id = scratches_df.filter(~col("player_id").rlike(r'^\d{7}$'))
invalid_player_id_count = invalid_player_id.count()
print(f"Invalid player_id (not exactly 7 digits): {invalid_player_id_count}")
if invalid_player_id_count > 0:
    print("Invalid player_id rows:")
    invalid_player_id.show(5, truncate=False)
    print("Dropping rows with invalid player_id...")
    scratches_df = scratches_df.filter(col("player_id").rlike(r'^\d{7}$'))
    print(f"Rows after dropping invalid player_id: {scratches_df.count()}")

# Validate team_id: numeric and >= 1
invalid_team_id = scratches_df.filter(
    (~col("team_id").rlike(r'^\d+$')) | 
    (col("team_id").cast(IntegerType()) < 1)
)
invalid_team_id_count = invalid_team_id.count()
print(f"Invalid team_id (not numeric or less than 1): {invalid_team_id_count}")
if invalid_team_id_count > 0:
    print("Invalid team_id rows:")
    invalid_team_id.show(5, truncate=False)
    print("Dropping rows with invalid team_id...")
    scratches_df = scratches_df.filter(
        (col("team_id").rlike(r'^\d+$')) & 
        (col("team_id").cast(IntegerType()) >= 1)
    )
    print(f"Rows after dropping invalid team_id: {scratches_df.count()}")

# Step 6: Final cleaned DataFrame
print("Final row count after cleaning: " + str(scratches_df.count()))
scratches_df.show(5, truncate=False)

# Step 7: Save cleaned DataFrame to Lakehouse (Silver layer)
(
    scratches_df.write
    .mode("overwrite")  
    .saveAsTable("silver_scratches")
)

## team_info.csv
Loads team information data from a CSV file and performs cleaning and validation. It standardizes column types, removes duplicates and invalid records such as empty fields, negative IDs, or incorrect link formats. Finally, it saves the cleaned dataset to the Silver layer in the Lakehouse.

In [None]:
from pyspark.sql.functions import col, count, when, length, trim
from pyspark.sql.types import IntegerType, StringType

team_df = spark.read.format("csv").option("header","true").load("Files/Bronze(raw datasets)/team_info.csv")
# df now is a Spark DataFrame containing CSV data from "Files/Bronze(raw datasets)/team_info.csv".
display(team_df)

# Step 1: Show initial schema and sample data
print("Initial Schema:")
team_df.printSchema()
team_df.show(5, truncate=False)

# Step 2: Check for data consistency (data types)
# Cast columns to appropriate types 
team_df = team_df.withColumn("team_id", col("team_id").cast(StringType())) \
                 .withColumn("franchiseId", col("franchiseId").cast(StringType())) \
                 .withColumn("shortName", col("shortName").cast(StringType())) \
                 .withColumn("teamName", col("teamName").cast(StringType())) \
                 .withColumn("abbreviation", col("abbreviation").cast(StringType())) \
                 .withColumn("link", col("link").cast(StringType()))

print("Schema after type casting:")
team_df.printSchema()

# Step 3: Check for duplicates
exact_duplicates = team_df.groupBy(team_df.columns).count().filter(col("count") > 1)
exact_duplicate_count = exact_duplicates.count()
print(f"Number of exact duplicate groups: {exact_duplicate_count}")

# Show some exact duplicates for inspection (if any)
if exact_duplicate_count > 0:
    print("Sample of exact duplicate groups:")
    exact_duplicates.show(5, truncate=False)

# Decision: Drop exact duplicates (exact replicas likely errors)
if exact_duplicate_count > 0:
    print("Dropping exact duplicate rows...")
    team_df = team_df.dropDuplicates()
    print(f"Rows after dropping exact duplicates: {team_df.count()}")

# Check for team id duplicates
team_id_duplicates = team_df.groupBy("team_id").count().filter(col("count") > 1)
team_id_duplicate_count = team_id_duplicates.count()
print(f"Number of team_id duplicate groups: {team_id_duplicate_count}")

# Show game_id duplicates for inspection
if team_id_duplicate_count > 0:
    print("Sample of game_id duplicate groups:")
    team_id_duplicates.show(5, truncate=False)

    # Decision: Drop game_id duplicates, keeping first occurrence
    print("Dropping duplicate game_ids, keeping first occurrence...")
    team_df = team_df.dropDuplicates(["team_id"])
    print(f"Rows after dropping game_id duplicates: {team_df.count()}")

# Step 4: Check for missing values

missing_summary = team_df.select([count(when(col(c).isNull(), c)).alias(c) for c in team_df.columns])
missing_summary.show()

# Step 5: Check for invalid records
# Validate team_id and franchiseId: ensure >= 1 (no negatives or zero)
# shortName, teamName, abbreviation: ensure non-empty strings
# link: ensure starts with "/api/v1/teams/"
stats = team_df.describe(["team_id", "franchiseId"])
stats.show()

# Check for invalid team_id or franchiseId (negative or zero)
invalid_ids = team_df.filter((col("team_id") < 1) | (col("franchiseId") < 1))
invalid_ids_count = invalid_ids.count()
print(f"Invalid IDs (negative or zero): {invalid_ids_count}")
if invalid_ids_count > 0:
    print("Dropping rows with invalid IDs...")
    team_df = team_df.filter((col("team_id") >= 1) & (col("franchiseId") >= 1))
    print(f"Rows after dropping invalid IDs: {team_df.count()}")

# Check for empty strings in shortName, teamName, abbreviation
invalid_strings = team_df.filter(
    (length(col("shortName")) == 0) | 
    (length(col("teamName")) == 0) | 
    (length(col("abbreviation")) == 0)
)
invalid_strings_count = invalid_strings.count()
print(f"Empty string fields: {invalid_strings_count}")
if invalid_strings_count > 0:
    print("Dropping rows with empty string fields...")
    team_df = team_df.filter(
        (length(col("shortName")) > 0) & 
        (length(col("teamName")) > 0) & 
        (length(col("abbreviation")) > 0)
    )
    print(f"Rows after dropping empty strings: {team_df.count()}")

# Check for invalid link format (should start with "/api/v1/teams/")
invalid_links = team_df.filter(~col("link").startswith("/api/v1/teams/"))
invalid_links_count = invalid_links.count()
print(f"Invalid links (not starting with '/api/v1/teams/'): {invalid_links_count}")
if invalid_links_count > 0:
    print("Invalid link rows:")
    invalid_links.show(5, truncate=False)
    print("Dropping rows with invalid links...")
    team_df = team_df.filter(col("link").startswith("/api/v1/teams/"))
    print(f"Rows after dropping invalid links: {team_df.count()}")

# Step 6: Final cleaned DataFrame
print("Final row count after cleaning: " + str(team_df.count()))
team_df.show(5, truncate=False)# Show initial schema and sample data
print("Initial Schema:")
team_df.printSchema()
team_df.show(5, truncate=False)

# Step 7: Save cleaned DataFrame to Lakehouse (Silver layer)
(
    team_df.write
    .mode("overwrite")  
    .saveAsTable("silver_team_info")
)


## game_teams_stats.csv
Loads game team statistics data from a CSV file, performs type casting, removes duplicate rows, and replaces negative numeric values with zero. It then generates a summary table showing row counts across cleaning steps. Finally, it saves the cleaned dataset to the Silver layer in the Lakehouse.

In [None]:
from pyspark.sql.functions import col, lower, trim, when, count as count_
from pyspark.sql.types import IntegerType, FloatType, BooleanType, StringType

# ======================================================
# STEP 0: Load raw dataset
# ======================================================
raw_game_team_df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("Files/Bronze(raw datasets)/game_teams_stats.csv")
)
initial_count = raw_game_team_df.count()
print(f"Initial row count: {initial_count}")

# Show original schema before type casting
original_schema = [(f.name, f.dataType.simpleString()) for f in raw_game_team_df.schema.fields]
schema_changes = []

# ======================================================
# STEP 1: Type Casting
# ======================================================
game_team_df = (
    raw_game_team_df
    .withColumn("game_id", col("game_id").cast(StringType()))
    .withColumn("team_id", col("team_id").cast(IntegerType()))
    .withColumn("HoA", lower(trim(col("HoA"))))
    .withColumn("won", col("won").cast(BooleanType()))
    .withColumn("settled_in", lower(trim(col("settled_in"))))
    .withColumn("head_coach", col("head_coach").cast(StringType()))
    .withColumn("goals", col("goals").cast(IntegerType()))
    .withColumn("shots", col("shots").cast(IntegerType()))
    .withColumn("hits", col("hits").cast(IntegerType()))
    .withColumn("pim", col("pim").cast(IntegerType()))
    .withColumn("powerPlayOpportunities", col("powerPlayOpportunities").cast(IntegerType()))
    .withColumn("powerPlayGoals", col("powerPlayGoals").cast(IntegerType()))
    .withColumn("faceOffWinPercentage", col("faceOffWinPercentage").cast(FloatType()))
    .withColumn("giveaways", col("giveaways").cast(IntegerType()))
    .withColumn("takeaways", col("takeaways").cast(IntegerType()))
    .withColumn("blocked", col("blocked").cast(IntegerType()))
    .withColumn("startRinkSide", lower(trim(col("startRinkSide"))))
)

count_after_type_cast = game_team_df.count()

# Identify schema changes
new_schema = [(f.name, f.dataType.simpleString()) for f in game_team_df.schema.fields]
for (name_o, type_o), (name_n, type_n) in zip(original_schema, new_schema):
    if type_o != type_n:
        schema_changes.append((name_o, type_o, type_n))

# ======================================================
# STEP 2: Drop exact duplicates
# ======================================================
count_before = count_after_type_cast
game_team_df = game_team_df.dropDuplicates()
count_after = game_team_df.count()
exact_dupes_dropped = count_before - count_after

# ======================================================
# STEP 3: Fix negative numeric values
# ======================================================
numeric_cols = [
    "goals", "shots", "hits", "pim", "powerPlayOpportunities", "powerPlayGoals",
    "giveaways", "takeaways", "blocked"
]
for c in numeric_cols:
    game_team_df = game_team_df.withColumn(c, when(col(c) < 0, 0).otherwise(col(c)))
count_after_neg_fix = game_team_df.count()

# ======================================================
# STEP 4: Placeholder cleaning checks (for future logic)
# ======================================================
# These are placeholders so script won’t error out
pair_dupes_dropped = 0
faceoff_dropped = 0
hoa_dropped = 0
settled_in_dropped = 0
startRinkSide_dropped = 0
logical_ppg_gt_ppo_dropped = 0
logical_goals_lt_ppg_dropped = 0
multi_winners_dropped = 0

final_count = game_team_df.count()

# ======================================================
# STEP 5: Summary Table
# ======================================================
summary_data = [
    ("Initial row count (raw)", initial_count, None),
    ("After type casting", count_after_type_cast, None),
    ("Exact duplicates dropped", exact_dupes_dropped, f"{exact_dupes_dropped} rows dropped"),
    ("After dropping exact duplicates", count_after, None),
    ("Duplicate (game_id, team_id) dropped", pair_dupes_dropped, f"{pair_dupes_dropped} rows dropped"),
    ("Negative values fixed", count_after_neg_fix, "Negative values replaced by zero (no rows dropped)"),
    ("Invalid faceOffWinPercentage removed", faceoff_dropped, f"{faceoff_dropped} rows dropped"),
    ("Invalid HoA removed", hoa_dropped, f"{hoa_dropped} rows dropped"),
    ("Invalid settled_in removed", settled_in_dropped, f"{settled_in_dropped} rows dropped"),
    ("Invalid startRinkSide removed", startRinkSide_dropped, f"{startRinkSide_dropped} rows dropped"),
    ("powerPlayGoals > powerPlayOpportunities removed", logical_ppg_gt_ppo_dropped, f"{logical_ppg_gt_ppo_dropped} rows dropped"),
    ("goals < powerPlayGoals removed", logical_goals_lt_ppg_dropped, f"{logical_goals_lt_ppg_dropped} rows dropped"),
    ("Multiple winners removed", multi_winners_dropped, f"{multi_winners_dropped} rows dropped"),
    ("Final cleaned row count", final_count, None)
]

summary_df = spark.createDataFrame(summary_data, ["Step", "Row Count", "Details"])

print("📊 Cleaning Process Summary (step-by-step):")
display(summary_df)

# ======================================================
# STEP 6: Preview Data
# ======================================================
print("🧐 Original raw data preview (5 rows):")
display(raw_game_team_df.limit(5))

print("✅ Final cleaned data preview (5 rows):")
display(game_team_df.limit(5))

# ======================================================
# STEP 7: Save to Silver Layer
# ======================================================
game_team_df.write.mode("overwrite").saveAsTable("silver_game_team_stats")

print("✅ Successfully saved cleaned dataset to Silver layer: silver_game_team_stats")


In [None]:
from pyspark.sql.functions import col, lower, trim, when, count as count_
from pyspark.sql.types import IntegerType, FloatType, BooleanType, StringType

# --- Step 0: Load raw game_teams_stats from metastore table ---
raw_game_team_df = spark.read.format("csv").option("header","true").load("Files/Bronze(raw datasets)/game_teams_stats.csv")
initial_count = raw_game_team_df.count()
print(f"Initial row count: {initial_count}")

# Show original schema before type casting
original_schema = [(f.name, f.dataType.simpleString()) for f in raw_game_team_df.schema.fields]
schema_changes = []

# --- Step 1: Type casting ---
game_team_df = raw_game_team_df \
    .withColumn("game_id", col("game_id").cast(StringType())) \
    .withColumn("team_id", col("team_id").cast(IntegerType())) \
    .withColumn("HoA", lower(trim(col("HoA")))) \
    .withColumn("won", col("won").cast(BooleanType())) \
    .withColumn("settled_in", lower(trim(col("settled_in")))) \
    .withColumn("head_coach", col("head_coach").cast(StringType())) \
    .withColumn("goals", col("goals").cast(IntegerType())) \
    .withColumn("shots", col("shots").cast(IntegerType())) \
    .withColumn("hits", col("hits").cast(IntegerType())) \
    .withColumn("pim", col("pim").cast(IntegerType())) \
    .withColumn("powerPlayOpportunities", col("powerPlayOpportunities").cast(IntegerType())) \
    .withColumn("powerPlayGoals", col("powerPlayGoals").cast(IntegerType())) \
    .withColumn("faceOffWinPercentage", col("faceOffWinPercentage").cast(FloatType())) \
    .withColumn("giveaways", col("giveaways").cast(IntegerType())) \
    .withColumn("takeaways", col("takeaways").cast(IntegerType())) \
    .withColumn("blocked", col("blocked").cast(IntegerType())) \
    .withColumn("startRinkSide", lower(trim(col("startRinkSide"))))

count_after_type_cast = game_team_df.count()

# Capture schema after casting for comparison
new_schema = [(f.name, f.dataType.simpleString()) for f in game_team_df.schema.fields]

# Identify schema changes
for (name_o, type_o), (name_n, type_n) in zip(original_schema, new_schema):
    if type_o != type_n:
        schema_changes.append((name_o, type_o, type_n))

# Dictionary to hold samples of dropped rows per step
dropped_samples = {}

# --- Step 2: Drop exact duplicates ---
count_before = count_after_type_cast
exact_duplicates_df = game_team_df.subtract(game_team_df.dropDuplicates())
sample_dupes = exact_duplicates_df.limit(5)
dropped_samples['Exact duplicates dropped'] = sample_dupes
game_team_df = game_team_df.dropDuplicates()
count_after = game_team_df.count()
exact_dupes_dropped = count_before - count_after

# --- Step 3: Drop duplicate (game_id, team_id) pairs ---
count_before = count_after
pair_duplicates_df = game_team_df.subtract(game_team_df.dropDuplicates(["game_id", "team_id"]))
sample_pair_dupes = pair_duplicates_df.limit(5)
dropped_samples['Duplicate (game_id, team_id) dropped'] = sample_pair_dupes
game_team_df = game_team_df.dropDuplicates(["game_id", "team_id"])
count_after = game_team_df.count()
pair_dupes_dropped = count_before - count_after

# --- Step 4: Replace negative values with zero (no rows dropped) ---
numeric_cols = [
    "goals", "shots", "hits", "pim", "powerPlayOpportunities", "powerPlayGoals",
    "giveaways", "takeaways", "blocked"
]
for c in numeric_cols:
    game_team_df = game_team_df.withColumn(c, when(col(c) < 0, 0).otherwise(col(c)))
count_after_neg_fix = game_team_df.count()  # no rows dropped, so no sample

# --- Step 5: Filter invalid faceOffWinPercentage ---
count_before = game_team_df.count()
invalid_faceoff = game_team_df.filter(
    (col("faceOffWinPercentage") < 0) | 
    (col("faceOffWinPercentage") > 100) |
    (col("faceOffWinPercentage").isNull())
)
count_invalid_faceoff = invalid_faceoff.count()
sample_faceoff = invalid_faceoff.limit(5)
dropped_samples['Invalid faceOffWinPercentage removed'] = sample_faceoff
game_team_df = game_team_df.subtract(invalid_faceoff)
count_after = game_team_df.count()
faceoff_dropped = count_before - count_after

# --- Step 6: Filter invalid HoA ---
count_before = game_team_df.count()
invalid_hoa = game_team_df.filter(~col("HoA").isin("home", "away") | col("HoA").isNull())
count_invalid_hoa = invalid_hoa.count()
sample_hoa = invalid_hoa.limit(5)
dropped_samples['Invalid HoA removed'] = sample_hoa
game_team_df = game_team_df.subtract(invalid_hoa)
count_after = game_team_df.count()
hoa_dropped = count_before - count_after

# --- Step 7: Filter invalid settled_in ---
count_before = game_team_df.count()
invalid_settled_in = game_team_df.filter(~col("settled_in").isin("reg", "ot", "so") | col("settled_in").isNull())
count_invalid_settled_in = invalid_settled_in.count()
sample_settled_in = invalid_settled_in.limit(5)
dropped_samples['Invalid settled_in removed'] = sample_settled_in
game_team_df = game_team_df.subtract(invalid_settled_in)
count_after = game_team_df.count()
settled_in_dropped = count_before - count_after

# --- Step 8: Filter invalid startRinkSide ---
count_before = game_team_df.count()
invalid_startRinkSide = game_team_df.filter(~col("startRinkSide").isin("left", "right") | col("startRinkSide").isNull())
count_invalid_startRinkSide = invalid_startRinkSide.count()
sample_startRinkSide = invalid_startRinkSide.limit(5)
dropped_samples['Invalid startRinkSide removed'] = sample_startRinkSide
game_team_df = game_team_df.subtract(invalid_startRinkSide)
count_after = game_team_df.count()
startRinkSide_dropped = count_before - count_after

# --- Step 9a: Logical check powerPlayGoals > powerPlayOpportunities ---
count_before = game_team_df.count()
logical_ppg_gt_ppo = game_team_df.filter(col("powerPlayGoals") > col("powerPlayOpportunities"))
count_logical_ppg_gt_ppo = logical_ppg_gt_ppo.count()
sample_ppg_gt_ppo = logical_ppg_gt_ppo.limit(5)
dropped_samples['powerPlayGoals > powerPlayOpportunities removed'] = sample_ppg_gt_ppo
game_team_df = game_team_df.subtract(logical_ppg_gt_ppo)
count_after = game_team_df.count()
logical_ppg_gt_ppo_dropped = count_before - count_after

# --- Step 9b: Logical check goals < powerPlayGoals ---
count_before = game_team_df.count()
logical_goals_lt_ppg = game_team_df.filter(col("goals") < col("powerPlayGoals"))
count_logical_goals_lt_ppg = logical_goals_lt_ppg.count()
sample_goals_lt_ppg = logical_goals_lt_ppg.limit(5)
dropped_samples['goals < powerPlayGoals removed'] = sample_goals_lt_ppg
game_team_df = game_team_df.subtract(logical_goals_lt_ppg)
count_after = game_team_df.count()
logical_goals_lt_ppg_dropped = count_before - count_after

# --- Step 10: Check for multiple winners per game ---
multi_winners_df = game_team_df.filter(col("won") == True) \
    .groupBy("game_id") \
    .agg(count_("won").alias("winner_count")) \
    .filter(col("winner_count") > 1)

multi_winners_list = [row["game_id"] for row in multi_winners_df.collect()]
count_before = game_team_df.count()
multi_winners_rows = game_team_df.filter(col("game_id").isin(multi_winners_list))
count_multi_winners_rows = multi_winners_rows.count()
sample_multi_winners = multi_winners_rows.limit(5)
dropped_samples['Multiple winners removed'] = sample_multi_winners
game_team_df = game_team_df.subtract(multi_winners_rows)
count_after = game_team_df.count()
multi_winners_dropped = count_before - count_after

# Final cleaned count
final_count = game_team_df.count()

# --- Schema changes summary ---
schema_changes_df = spark.createDataFrame(
    [(name, old_t, new_t) for name, old_t, new_t in schema_changes],
    schema=["Column", "Original Type", "New Type"]
)

print("🛠 Schema changes summary:")
display(schema_changes_df)

# --- Cleaning steps summary ---
summary_data = [
    ("Initial row count (raw)", initial_count, None),
    ("After type casting", count_after_type_cast, None),
    ("Exact duplicates dropped", exact_dupes_dropped, f"{exact_dupes_dropped} rows dropped"),
    ("After dropping exact duplicates", count_after_type_cast - exact_dupes_dropped, None),
    ("Duplicate (game_id, team_id) dropped", pair_dupes_dropped, f"{pair_dupes_dropped} rows dropped"),
    ("After dropping (game_id, team_id) duplicates", count_after_type_cast - exact_dupes_dropped - pair_dupes_dropped, None),
    ("Negative values fixed", count_after_neg_fix, "Negative values replaced by zero (no rows dropped)"),
    ("Invalid faceOffWinPercentage removed", faceoff_dropped, f"{faceoff_dropped} rows dropped (null or outside 0-100)"),
    ("Invalid HoA removed", hoa_dropped, f"{hoa_dropped} rows dropped (invalid or null HoA)"),
    ("Invalid settled_in removed", settled_in_dropped, f"{settled_in_dropped} rows dropped (invalid or null settled_in)"),
    ("Invalid startRinkSide removed", startRinkSide_dropped, f"{startRinkSide_dropped} rows dropped (invalid or null startRinkSide)"),
    ("powerPlayGoals > powerPlayOpportunities removed", logical_ppg_gt_ppo_dropped, f"{logical_ppg_gt_ppo_dropped} rows dropped"),
    ("goals < powerPlayGoals removed", logical_goals_lt_ppg_dropped, f"{logical_goals_lt_ppg_dropped} rows dropped"),
    ("Multiple winners removed", multi_winners_dropped, f"{multi_winners_dropped} rows dropped"),
    ("Final cleaned row count", final_count, None)
]

summary_df = spark.createDataFrame(summary_data, ["Step", "Row Count", "Details"])

print("📊 Cleaning Process Summary (step-by-step):")
display(summary_df)

# --- Show original data preview ---
print("🧐 Original raw data preview (50 rows):")
display(raw_game_team_df.limit(5))

# --- Show final cleaned data preview ---
print("✅ Final cleaned data preview (50 rows):")
display(game_team_df.limit(5))

# --- Show dropped examples per step ---
print("🔍 Examples of dropped rows at each cleaning step:")
for step, sample_df in dropped_samples.items():
    print(f"\nStep: {step} — Showing up to 5 rows dropped")
    display(sample_df)

game_team_df.write.mode("overwrite").saveAsTable("silver_game_team_stats")



## game_shifts.csv
Loads raw game shift data, performs data type casting, and removes invalid or duplicate records. It filters out rows with nulls, negative times, or invalid period logic, then summarizes each cleaning step. Finally, it previews and saves the cleaned dataset to the Silver layer in the Lakehouse.

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, FloatType, StringType, StructType

# Helper function to compare full original and new schema and show all columns
def full_schema_comparison_df(original_schema: StructType, new_schema: StructType):
    orig_types = {f.name: f.dataType.simpleString() for f in original_schema.fields}
    new_types = {f.name: f.dataType.simpleString() for f in new_schema.fields}
    all_columns = sorted(set(orig_types.keys()).union(set(new_types.keys())))
    rows = []
    for col_name in all_columns:
        orig_type = orig_types.get(col_name, "N/A (added)")
        new_type = new_types.get(col_name, "N/A (removed)")
        rows.append((col_name, orig_type, new_type))
    return spark.createDataFrame(rows, ["Column", "Original Type", "New Type"])

# --- Step 0: Load raw data ---
raw_shift_df = spark.read.table("game_shifts")
initial_count = raw_shift_df.count()
print(f"Initial row count: {initial_count}")

# --- Show original schema ---
original_schema = raw_shift_df.schema

# --- Step 1: Type casting ---
shift_df = raw_shift_df \
    .withColumn("game_id", col("game_id").cast(StringType())) \
    .withColumn("player_id", col("player_id").cast(StringType())) \
    .withColumn("period", col("period").cast(IntegerType())) \
    .withColumn("shift_start", col("shift_start").cast(FloatType())) \
    .withColumn("shift_end", col("shift_end").cast(FloatType()))

count_after_type_cast = shift_df.count()

# --- Show full schema comparison ---
new_schema = shift_df.schema
schema_comparison_df = full_schema_comparison_df(original_schema, new_schema)
print("📋 Full Schema Comparison (all columns):")
display(schema_comparison_df)

# --- Step 2: Drop exact duplicates ---
count_before = count_after_type_cast
shift_df = shift_df.dropDuplicates()
count_after = shift_df.count()
exact_dupes_dropped = count_before - count_after

# --- Step 3: Drop duplicates on shift keys ---
count_before = count_after
shift_df = shift_df.dropDuplicates(["game_id", "player_id", "period", "shift_start", "shift_end"])
count_after = shift_df.count()
pair_dupes_dropped = count_before - count_after

# --- Step 4: Remove rows with nulls in mandatory columns ---
count_before = count_after
nulls_df = shift_df.filter(
    col("game_id").isNull() |
    col("player_id").isNull() |
    col("period").isNull()
)
nulls_count = nulls_df.count()

# Subtract only rows that fail mandatory checks
shift_df = shift_df.subtract(nulls_df)
count_after = shift_df.count()
nulls_dropped = count_before - count_after

# --- Step 5: Filter invalid periods (period <= 0) ---
count_before = count_after
invalid_period_df = shift_df.filter(col("period") <= 0)
invalid_period_count = invalid_period_df.count()
shift_df = shift_df.subtract(invalid_period_df)
count_after = shift_df.count()
period_dropped = count_before - count_after

# --- Step 6: Filter invalid shift times (shift_start < 0 or shift_end < 0) ---
count_before = count_after
invalid_shift_time_df = shift_df.filter((col("shift_start") < 0) | (col("shift_end") < 0))
invalid_shift_time_count = invalid_shift_time_df.count()
shift_df = shift_df.subtract(invalid_shift_time_df)
count_after = shift_df.count()
shift_time_dropped = count_before - count_after

# --- Step 7: Filter shifts where shift_end <= shift_start ---
count_before = count_after
invalid_shift_order_df = shift_df.filter(col("shift_end") <= col("shift_start"))
invalid_shift_order_count = invalid_shift_order_df.count()
shift_df = shift_df.subtract(invalid_shift_order_df)
count_after = shift_df.count()
shift_order_dropped = count_before - count_after


# --- Final cleaned count ---
final_count = shift_df.count()

# --- Summary table ---
summary_data = [
    ("Initial row count (raw)", initial_count, None),
    ("After type casting", count_after_type_cast, None),
    ("Exact duplicates dropped", exact_dupes_dropped, f"{exact_dupes_dropped} rows dropped"),
    ("Duplicate shift rows dropped", pair_dupes_dropped, f"{pair_dupes_dropped} rows dropped"),
    ("Rows with null mandatory fields dropped", nulls_dropped, f"{nulls_dropped} rows dropped"),
    ("Invalid periods (<=0) dropped", period_dropped, f"{period_dropped} rows dropped"),
    ("Invalid shift times (negative) dropped", shift_time_dropped, f"{shift_time_dropped} rows dropped"),
    ("Shifts with shift_end <= shift_start dropped", shift_order_dropped, f"{shift_order_dropped} rows dropped"),
    ("Final cleaned row count", final_count, None)
]

summary_df = spark.createDataFrame(summary_data, ["Step", "Row Count", "Details"])

print("📊 Cleaning Process Summary (step-by-step):")
display(summary_df)

# --- Show samples of dropped rows for validation ---
print("🚨 Sample dropped rows for validation:")

def show_dropped_sample(name, df):
    print(f"--- {name} ---")
    if df.count() > 0:
        display(df.limit(5))
    else:
        print("No rows dropped here.")

# Note: Using subtract with original raw dataframe to find dropped rows only works for exact matches,
# so samples are shown from filtered dfs directly when available
show_dropped_sample("Exact duplicates", raw_shift_df.subtract(shift_df))
show_dropped_sample("Duplicate shifts", raw_shift_df.subtract(shift_df))
show_dropped_sample("Null mandatory fields", nulls_df)
show_dropped_sample("Invalid periods", invalid_period_df)
show_dropped_sample("Invalid shift times", invalid_shift_time_df)
show_dropped_sample("Shifts with shift_end <= shift_start", invalid_shift_order_df)

# --- Preview cleaned data ---
print("✅ Cleaned Data Preview:")
display(shift_df.limit(10))

shift_df.write.mode("overwrite").saveAsTable("silver_game_shifts")

print("✅ Cleaned data successfully saved to table: silver_game_shifts")


## game_skater_stats.csv
Loads player game statistics from a CSV file and performs step-by-step data cleaning and validation. It casts column types, removes duplicates, handles missing or invalid values, and corrects logical inconsistencies (e.g., wins exceeding attempts or mismatched time-on-ice totals). Finally, it saves the cleaned dataset to the Silver layer in the Lakehouse.

In [None]:
from pyspark.sql.functions import col, trim, lower, lit, udf
from pyspark.sql.types import IntegerType, FloatType, StringType
from functools import reduce
from operator import add

# --- Helper function: Compare schema before & after ---
def full_schema_comparison_df(original_schema, new_schema):
    orig_types = {f.name: f.dataType.simpleString() for f in original_schema.fields}
    new_types = {f.name: f.dataType.simpleString() for f in new_schema.fields}
    all_columns = sorted(set(orig_types.keys()).union(set(new_types.keys())))
    rows = [(col_name, orig_types.get(col_name, "N/A (added)"), new_types.get(col_name, "N/A (removed)")) for col_name in all_columns]
    return spark.createDataFrame(rows, ["Column", "Original Type", "New Type"])

# --- Step 0: Load raw data ---
raw_stats_df = spark.read.format("csv").option("header","true").load("Files/Bronze(raw datasets)/game_skater_stats.csv")
initial_count = raw_stats_df.count()
print(f"Initial row count: {initial_count}")

# Save original schema
original_schema = raw_stats_df.schema

# --- Step 1: Type Casting ---
stats_df = raw_stats_df \
    .withColumn("game_id", col("game_id").cast(StringType())) \
    .withColumn("player_id", col("player_id").cast(StringType())) \
    .withColumn("team_id", col("team_id").cast(StringType())) \
    .withColumn("timeOnIce", col("timeOnIce").cast(IntegerType())) \
    .withColumn("assists", col("assists").cast(IntegerType())) \
    .withColumn("goals", col("goals").cast(IntegerType())) \
    .withColumn("shots", col("shots").cast(IntegerType())) \
    .withColumn("hits", col("hits").cast(FloatType())) \
    .withColumn("powerPlayGoals", col("powerPlayGoals").cast(IntegerType())) \
    .withColumn("powerPlayAssists", col("powerPlayAssists").cast(IntegerType())) \
    .withColumn("penaltyMinutes", col("penaltyMinutes").cast(IntegerType())) \
    .withColumn("faceOffWins", col("faceOffWins").cast(IntegerType())) \
    .withColumn("faceoffTaken", col("faceoffTaken").cast(IntegerType())) \
    .withColumn("takeaways", col("takeaways").cast(FloatType())) \
    .withColumn("giveaways", col("giveaways").cast(FloatType())) \
    .withColumn("shortHandedGoals", col("shortHandedGoals").cast(IntegerType())) \
    .withColumn("shortHandedAssists", col("shortHandedAssists").cast(IntegerType())) \
    .withColumn("blocked", col("blocked").cast(IntegerType())) \
    .withColumn("plusMinus", col("plusMinus").cast(IntegerType())) \
    .withColumn("evenTimeOnIce", col("evenTimeOnIce").cast(IntegerType())) \
    .withColumn("shortHandedTimeOnIce", col("shortHandedTimeOnIce").cast(IntegerType())) \
    .withColumn("powerPlayTimeOnIce", col("powerPlayTimeOnIce").cast(IntegerType()))

count_after_type_cast = stats_df.count()

# --- Schema Comparison ---
new_schema = stats_df.schema
schema_comparison_df = full_schema_comparison_df(original_schema, new_schema)
print("📋 Full Schema Comparison (Original vs After Type Casting):")
display(schema_comparison_df)

# --- Dropped rows collector ---
dropped_samples = {}

# --- Step 2: Drop exact duplicates ---
count_before = count_after_type_cast
exact_duplicates_df = stats_df.subtract(stats_df.dropDuplicates())
dropped_samples["Exact duplicates dropped"] = exact_duplicates_df.limit(5)
stats_df = stats_df.dropDuplicates()
count_after = stats_df.count()
exact_dupes_dropped = count_before - count_after

# --- Step 3: Drop duplicates on game_id, player_id, team_id ---
count_before = count_after
key_duplicates_df = stats_df.subtract(stats_df.dropDuplicates(["game_id", "player_id", "team_id"]))
dropped_samples["Duplicate keys dropped"] = key_duplicates_df.limit(5)
stats_df = stats_df.dropDuplicates(["game_id", "player_id", "team_id"])
count_after = stats_df.count()
key_dupes_dropped = count_before - count_after

# --- Step 4: Drop nulls in mandatory fields ---
mandatory_cols = ["game_id", "player_id", "team_id", "timeOnIce", "assists", "goals", "shots"]
count_before = count_after
null_condition = reduce(add, [col(c).isNull().cast("int") for c in mandatory_cols]) > 0
nulls_df = stats_df.filter(null_condition)
dropped_samples["Rows with null mandatory fields dropped"] = nulls_df.limit(5)
stats_df = stats_df.subtract(nulls_df)
count_after = stats_df.count()
nulls_dropped = count_before - count_after

# --- Step 5: Remove rows with negative values (except plusMinus) ---
non_negative_cols = [
    "timeOnIce", "assists", "goals", "shots", "hits",
    "powerPlayGoals", "powerPlayAssists", "penaltyMinutes",
    "faceOffWins", "faceoffTaken", "takeaways", "giveaways",
    "shortHandedGoals", "shortHandedAssists", "blocked",
    "evenTimeOnIce", "shortHandedTimeOnIce", "powerPlayTimeOnIce"
]

count_before = count_after
neg_condition = reduce(add, [(col(c) < 0).cast("int") for c in non_negative_cols]) > 0
negatives_df = stats_df.filter(neg_condition)
dropped_samples["Negative numeric stats dropped"] = negatives_df.limit(5)
stats_df = stats_df.subtract(negatives_df)
count_after = stats_df.count()
negatives_dropped = count_before - count_after

# --- Step 6: faceOffWins > faceoffTaken ---
count_before = count_after
invalid_faceoff_df = stats_df.filter(col("faceOffWins") > col("faceoffTaken"))
dropped_samples["faceOffWins > faceoffTaken dropped"] = invalid_faceoff_df.limit(5)
stats_df = stats_df.subtract(invalid_faceoff_df)
count_after = stats_df.count()
faceoff_dropped = count_before - count_after

# --- Step 7: timeOnIce < sum of even/short/power play ---
count_before = count_after
inconsistent_time_df = stats_df.filter(
    col("timeOnIce") < (col("evenTimeOnIce") + col("shortHandedTimeOnIce") + col("powerPlayTimeOnIce"))
)
dropped_samples["timeOnIce inconsistency dropped"] = inconsistent_time_df.limit(5)
stats_df = stats_df.subtract(inconsistent_time_df)
count_after = stats_df.count()
time_inconsistent_dropped = count_before - count_after

# --- Step 8: Overwrite the table with full schema ---
# Optional: drop existing table
spark.sql("DROP TABLE IF EXISTS silver_game_skater_stats")

# Write the cleaned DataFrame as a new table
stats_df.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver_game_skater_stats")

print("✅ Table 'silver_game_skater_stats' created/overwritten successfully with full schema.")

# --- Final Count ---
final_count = stats_df.count()

# --- Summary Table ---
summary_data = [
    ("Initial row count (raw)", initial_count, None),
    ("After type casting", count_after_type_cast, None),
    ("Exact duplicates dropped", exact_dupes_dropped, f"{exact_dupes_dropped} rows dropped"),
    ("Duplicate keys dropped", key_dupes_dropped, f"{key_dupes_dropped} rows dropped"),
    ("Rows with null mandatory fields dropped", nulls_dropped, f"{nulls_dropped} rows dropped"),
    ("Negative numeric stats dropped", negatives_dropped, f"{negatives_dropped} rows dropped"),
    ("faceOffWins > faceoffTaken dropped", faceoff_dropped, f"{faceoff_dropped} rows dropped"),
    ("timeOnIce inconsistency dropped", time_inconsistent_dropped, f"{time_inconsistent_dropped} rows dropped"),
    ("Final cleaned row count", final_count, None)
]

summary_df = spark.createDataFrame(summary_data, ["Step", "Row Count", "Details"])
print("📊 Cleaning Process Summary (step-by-step):")
display(summary_df)

# --- Dropped rows preview ---
print("🚨 Sample dropped rows for verification:")
for step, sample_df in dropped_samples.items():
    print(f"\n--- {step} ---")
    display(sample_df)

# --- Final Cleaned Data Preview ---
print("✅ Cleaned Data Preview:")
display(stats_df.limit(10))


stats_df.write.mode("overwrite").saveAsTable("silver_game_skater_stats")




## player_info.csv
Loads player information from a CSV file and performs extensive cleaning and validation. Key operations include type casting, handling nulls, removing duplicates, converting height and weight units, and validating height, weight, primary position, and shoots/catches fields. Finally, the cleaned dataset is saved to the Silver layer in the Lakehouse.

In [None]:
from pyspark.sql.functions import col, lower, trim, when, lit, udf
from pyspark.sql.types import IntegerType, FloatType, StringType, DateType, DoubleType, StructType
from functools import reduce
from operator import add

# --- Helper: Full schema comparison ---
def full_schema_comparison_df(original_schema: StructType, new_schema: StructType):
    orig_types = {f.name: f.dataType.simpleString() for f in original_schema.fields}
    new_types = {f.name: f.dataType.simpleString() for f in new_schema.fields}
    all_columns = sorted(set(orig_types.keys()).union(set(new_types.keys())))
    rows = [(col_name, orig_types.get(col_name, "N/A (added)"), new_types.get(col_name, "N/A (removed)")) for col_name in all_columns]
    return spark.createDataFrame(rows, ["Column", "Original Type", "New Type"])

# --- Step 0: Load raw data ---
raw_player_df = spark.read.format("csv").option("header","true").load("Files/Bronze(raw datasets)/player_info.csv")
initial_count = raw_player_df.count()
print(f"Initial row count: {initial_count}")


# Save original schema
original_schema = raw_player_df.schema

# --- Step 1: Type casting & cleaning ---

# UDF to convert height string (e.g., "6' 2\"") to cm
def height_to_cm(height_str):
    import re
    if height_str is None:
        return None
    match = re.match(r"(\d+)' (\d+)\"", height_str)
    if match:
        feet = int(match.group(1))
        inches = int(match.group(2))
        return round(feet * 30.48 + inches * 2.54, 2)
    return None

height_to_cm_udf = udf(height_to_cm, DoubleType())

player_df = raw_player_df \
    .withColumn("nationality", lower(trim(col("nationality")))) \
    .withColumn("birthCity", trim(col("birthCity"))) \
    .withColumn("primaryPosition", lower(trim(col("primaryPosition")))) \
    .withColumn("birthDate", col("birthDate").cast(DateType())) \
    .withColumn("birthStateProvince", lower(trim(col("birthStateProvince")))) \
    .withColumn("height_cm", when(col("height_cm").isNull(), height_to_cm_udf(col("height"))).otherwise(col("height_cm"))) \
    .withColumn("height_cm", col("height_cm").cast(FloatType())) \
    .withColumn("weight", col("weight").cast(FloatType())) \
    .withColumn("shootsCatches", lower(trim(col("shootsCatches")))) \
    .withColumn("player_id", col("player_id").cast(StringType()))

# --- New Step: Convert weight lbs to kg in new column weight_kg ---
player_df = player_df.withColumn("weight_kg", (col("weight") * 0.453592).cast(FloatType()))

count_after_type_cast = player_df.count()

# Show full schema comparison
new_schema = player_df.schema
schema_comparison_df = full_schema_comparison_df(original_schema, new_schema)
print("📋 Full Schema Comparison (Original vs After Type Casting):")
display(schema_comparison_df)

# --- Step 2+: Cleaning steps ---
dropped_samples = {}

# Step 2: Drop exact duplicates
count_before = count_after_type_cast
exact_duplicates_df = player_df.subtract(player_df.dropDuplicates())
dropped_samples['Exact duplicates dropped'] = exact_duplicates_df.limit(5)
player_df = player_df.dropDuplicates()
count_after = player_df.count()
exact_dupes_dropped = count_before - count_after

# Step 3: Drop duplicates on player_id
if "player_id" in player_df.columns:
    count_before = count_after
    player_id_dupes_df = player_df.subtract(player_df.dropDuplicates(["player_id"]))
    dropped_samples['Duplicate player_id dropped'] = player_id_dupes_df.limit(5)
    player_df = player_df.dropDuplicates(["player_id"])
    count_after = player_df.count()
    player_id_dupes_dropped = count_before - count_after
else:
    player_id_dupes_dropped = 0

# Step 4: Drop rows with nulls in mandatory fields
mandatory_cols = ["player_id", "nationality", "primaryPosition", "birthDate", "height_cm", "weight_kg", "shootsCatches"]
count_before = count_after
null_condition = reduce(add, [col(c).isNull().cast("int") for c in mandatory_cols]) > 0
nulls_df = player_df.filter(null_condition)
dropped_samples['Rows with null mandatory fields dropped'] = nulls_df.limit(5)
player_df = player_df.subtract(nulls_df)
count_after = player_df.count()
nulls_dropped = count_before - count_after

# Step 5: Validate height_cm (140–250 cm)
count_before = count_after
invalid_height_df = player_df.filter((col("height_cm") < 140) | (col("height_cm") > 250) | col("height_cm").isNull())
dropped_samples['Invalid height_cm removed'] = invalid_height_df.limit(5)
player_df = player_df.subtract(invalid_height_df)
count_after = player_df.count()
height_dropped = count_before - count_after

# Step 6: Validate weight_kg (40–200 kg)
count_before = count_after
invalid_weight_df = player_df.filter((col("weight_kg") < 40) | (col("weight_kg") > 200) | col("weight_kg").isNull())
dropped_samples['Invalid weight removed'] = invalid_weight_df.limit(5)
player_df = player_df.subtract(invalid_weight_df)
count_after = player_df.count()
weight_dropped = count_before - count_after

# Step 7: Validate primaryPosition
valid_positions = ["c", "lw", "rw", "d", "g"]
count_before = count_after
invalid_pos_df = player_df.filter(~col("primaryPosition").isin(valid_positions) | col("primaryPosition").isNull())
dropped_samples['Invalid primaryPosition removed'] = invalid_pos_df.limit(5)
player_df = player_df.subtract(invalid_pos_df)
count_after = player_df.count()
pos_dropped = count_before - count_after

# Step 8: Validate shootsCatches
count_before = count_after
invalid_shoots_df = player_df.filter(~col("shootsCatches").isin("l", "r") | col("shootsCatches").isNull())
dropped_samples['Invalid shootsCatches removed'] = invalid_shoots_df.limit(5)
player_df = player_df.subtract(invalid_shoots_df)
count_after = player_df.count()
shoots_dropped = count_before - count_after

# --- Final count ---
final_count = player_df.count()

# --- Summary ---
summary_data = [
    ("Initial row count (raw)", initial_count, None),
    ("After type casting", count_after_type_cast, None),
    ("Exact duplicates dropped", exact_dupes_dropped, f"{exact_dupes_dropped} rows dropped"),
    ("Duplicate player_id dropped", player_id_dupes_dropped, f"{player_id_dupes_dropped} rows dropped"),
    ("Rows with null mandatory fields dropped", nulls_dropped, f"{nulls_dropped} rows dropped"),
    ("Invalid height_cm removed", height_dropped, f"{height_dropped} rows dropped"),
    ("Invalid weight removed", weight_dropped, f"{weight_dropped} rows dropped"),
    ("Invalid primaryPosition removed", pos_dropped, f"{pos_dropped} rows dropped"),
    ("Invalid shootsCatches removed", shoots_dropped, f"{shoots_dropped} rows dropped"),
    ("Final cleaned row count", final_count, None)
]

summary_df = spark.createDataFrame(summary_data, ["Step", "Row Count", "Details"])
print("📊 Cleaning Process Summary (step-by-step):")
display(summary_df)

# --- Preview raw vs cleaned data ---
print("🧐 Original raw data preview (5 rows):")
display(raw_player_df.limit(5))

print("✅ Final cleaned data preview (5 rows):")
display(player_df.limit(5))

# --- Dropped row samples ---
print("🔍 Examples of dropped rows at each cleaning step:")
for step, sample_df in dropped_samples.items():
    print(f"\nStep: {step} — Showing up to 5 rows dropped")
    display(sample_df)


player_df.write.mode("overwrite").saveAsTable("silver_player_info")


## game_goalie_stats.csv
Reads the game_goalie_stats.csv file and casts each column to the appropriate data type. It cleans the data by removing exact duplicates, checking for duplicate game_id and player_id combinations, filling missing values, and handling negative values. Finally, it saves the cleaned DataFrame to the Lakehouse Silver layer table.

In [None]:
from pyspark.sql.functions import col, count, when, isnull, sum as pyspark_sum
from pyspark.sql.types import StringType, IntegerType, FloatType

# Read csv from current directory
df = spark.read.format("csv").option("header","true").load("Files/Bronze(raw datasets)/game_goalie_stats.csv")

# Verify data loaded successfully
print(f"Row count: {df.count()}")
print("Column names:")
print(df.columns)
print("\nSchema:")
df.printSchema()

print("\nFirst 5 rows:")
df.show(5, truncate=False)

# Basic Data Cleaning Steps
if df is not None:
    print("\n=== Starting Data Cleaning ===")
    
    # 1. Fix data types based on data dictionary
    df_cleaned = df.withColumn("game_id", col("game_id").cast(StringType())) \
                   .withColumn("player_id", col("player_id").cast(StringType())) \
                   .withColumn("team_id", col("team_id").cast(StringType())) \
                   .withColumn("timeOnIce", col("timeOnIce").cast(IntegerType())) \
                   .withColumn("assists", col("assists").cast(IntegerType())) \
                   .withColumn("goals", col("goals").cast(IntegerType())) \
                   .withColumn("pim", col("pim").cast(IntegerType())) \
                   .withColumn("shots", col("shots").cast(IntegerType())) \
                   .withColumn("saves", col("saves").cast(IntegerType())) \
                   .withColumn("powerPlaySaves", col("powerPlaySaves").cast(IntegerType())) \
                   .withColumn("shortHandedSaves", col("shortHandedSaves").cast(IntegerType())) \
                   .withColumn("evenSaves", col("evenSaves").cast(IntegerType())) \
                   .withColumn("shortHandedShotsAgainst", col("shortHandedShotsAgainst").cast(IntegerType())) \
                   .withColumn("evenShotsAgainst", col("evenShotsAgainst").cast(IntegerType())) \
                   .withColumn("powerPlayShotsAgainst", col("powerPlayShotsAgainst").cast(IntegerType())) \
                   .withColumn("decision", col("decision").cast(StringType())) \
                   .withColumn("savePercentage", col("savePercentage").cast(FloatType())) \
                   .withColumn("powerPlaySavePercentage", col("powerPlaySavePercentage").cast(FloatType())) \
                   .withColumn("evenStrengthSavePercentage", col("evenStrengthSavePercentage").cast(FloatType()))
    print("Schema after type casting:")
    df_cleaned.printSchema()

    # 2.1: Check for exact duplicates (all columns identical)
    print("\n=== Checking for Exact Duplicates ===")
    # Count occurrences of each row
    duplicate_counts = df_cleaned.groupBy(df_cleaned.columns).count().filter(col("count") > 1)
    duplicate_count = duplicate_counts.count()
    if duplicate_count > 0:
        print(f"Found {duplicate_count} groups of exact duplicate rows")
        print("Sample of exact duplicate rows:")
        duplicate_counts.show(5, truncate=False)
    else:
        print("No exact duplicate rows found")
    # Decision: Drop exact duplicates (exact replicas likely errors)
    if duplicate_count > 0:
        print("Dropping exact duplicate rows...")
        df_cleaned = df_cleaned.dropDuplicates()
        print(f"Rows after dropping exact duplicates: {df_cleaned.count()}")

    # 2.2: Check for game_id and player_id group duplicates (non-exact duplicates)
    print("\nChecking for duplicates in game_id and player_id groups...")
    game_player_duplicates = df_cleaned.groupBy("game_id", "player_id").count().filter(col("count") > 1)
    game_player_duplicate_count = game_player_duplicates.count()
    if game_player_duplicate_count > 0:
        print(f"Found {game_player_duplicate_count} groups of game_id and player_id duplicates")
        print("Sample of game_id and player_id duplicates (top 5 groups):")
        game_player_duplicates.show(5, truncate=False)
    else:
        print("No game_id and player_id duplicates found")
    # Decision: Don't drop now
    # If game_player had duplicate, don't drop it now

    # 3: Handle missing values
    print("\n=== Handling Missing Values ===")
    # Show missing value summary before cleaning
    print("Missing value summary before cleaning:")
    missing_summary_before = df_cleaned.select([count(when(col(c).isNull(), c)).alias(c) for c in df_cleaned.columns])
    missing_summary_before.show(truncate=False)

    # Handle decision column
    # Replace nulls with "ND" (No Decision)
    df_cleaned = df_cleaned.withColumn(
        "decision",
        when(col("decision").isNull(), "ND").otherwise(col("decision"))
    )
    
    # Handle powerPlaySavePercentage column
    # Replace nulls with 0.0 (no power play shots faced)
    df_cleaned = df_cleaned.withColumn(
        "powerPlaySavePercentage",
        when(col("powerPlaySavePercentage").isNull(), 0.0).otherwise(col("powerPlaySavePercentage"))
    )

    # Show missing value summary after cleaning
    print("Missing value summary after cleaning:")
    missing_summary_after = df_cleaned.select([count(when(col(c).isNull(), c)).alias(c) for c in df_cleaned.columns])
    missing_summary_after.show(truncate=False)

    # 4. Handle negative values in shortHandedSaves
    df_cleaned = df_cleaned.withColumn(
        "shortHandedSaves",
        when(col("shortHandedSaves") < 0, 0).otherwise(col("shortHandedSaves"))
    )

    # 5. Handle high percentage of zeros in certain columns (optional: leave as is for now)
    # Columns like pim (96.5% zeros), shortHandedSaves (52.4% zeros), etc., are valid as zeros are meaningful in context

    # Print cleaned DataFrame info
    print(f"\nCleaned DataFrame:")
    print(f"Row count: {df_cleaned.count()}")
    print("\nCleaned sample data:")
    df_cleaned.show(5, truncate=False)

    # Check for null values after cleaning
    print("\nNull value check after cleaning:")
    null_counts = df_cleaned.select([pyspark_sum(col(c).isNull().cast("int")).alias(c) for c in df_cleaned.columns])
    null_counts.show()

    # Save cleaned DataFrame to Lakehouse (Silver layer)
    (
        df_cleaned.write
        .mode("overwrite")
        .saveAsTable("silver_goalie_stats")
    )

## game_goals.csv
Reads the game_goals.csv file and casts columns to appropriate types, filling missing Boolean values with False. It cleans the data by removing exact duplicates and standardizing the strength column to lowercase. Finally, the cleaned DataFrame is saved to the Lakehouse Silver layer table.

In [None]:
from pyspark.sql.functions import col, when, isnull, sum as pyspark_sum, lower
from pyspark.sql.types import StringType, BooleanType

# Read csv from current directory
df = spark.read.format("csv").option("header","true").load("Files/Bronze(raw datasets)/game_goals.csv")

# Verify data loaded successfully
print(f"\nOriginal DataFrame loaded successfully!")
print(f"Row count: {df.count()}")
print("Column names:")
print(df.columns)
print("\nSchema:")
df.printSchema()

print("\nFirst 5 rows:")
df.show(5, truncate=False)

# Basic Data Cleaning Steps
if df is not None:
    print("\n=== Starting Data Cleaning ===")
    
    # 1. Cast Boolean columns to correct type first to avoid type mismatch
    df_cleaned = df.withColumn(
        "gameWinningGoal",
        when(col("gameWinningGoal").cast(BooleanType()).isNull(), False).otherwise(col("gameWinningGoal").cast(BooleanType()))
    ).withColumn(
        "emptyNet",
        when(col("emptyNet").cast(BooleanType()).isNull(), False).otherwise(col("emptyNet").cast(BooleanType()))
    )
    
    # 2. Fix data types (ensure correct types for all columns)
    df_cleaned = df_cleaned.withColumn("play_id", col("play_id").cast(StringType())) \
                           .withColumn("strength", col("strength").cast(StringType())) \
                           .withColumn("gameWinningGoal", col("gameWinningGoal").cast(BooleanType())) \
                           .withColumn("emptyNet", col("emptyNet").cast(BooleanType()))
    print("Schema after type casting:")
    df_cleaned.printSchema()

    # 3. Check for exact duplicates (all columns identical)
    print("\n=== Checking for Exact Duplicates ===")
    # Count occurrences of each row
    duplicate_counts = df_cleaned.groupBy(df_cleaned.columns).count().filter(col("count") > 1)
    duplicate_count = duplicate_counts.count()
    if duplicate_count > 0:
        print(f"Found {duplicate_count} groups of exact duplicate rows")
        print("Sample of exact duplicate rows:")
        duplicate_counts.show(5, truncate=False)
    else:
        print("No exact duplicate rows found")
    # Decision: Drop exact duplicates (exact replicas likely errors)
    original_count = df_cleaned.count()
    df_cleaned = df_cleaned.dropDuplicates()
    print(f"Removed {original_count - df_cleaned.count()} exact duplicate rows")
    
    # 4. Handle imbalanced emptyNet column (optional: filter out rare True values if needed)
    # For now, we'll keep all values but verify distribution
    print("\nDistribution of emptyNet values:")
    df_cleaned.groupBy("emptyNet").count().show()
    
    # 5. Standardize strength column (convert to lowercase)
    df_cleaned = df_cleaned.withColumn(
        "strength",
        when(col("strength").isNotNull(), lower(col("strength"))).otherwise(col("strength"))
    )
    
    # Print cleaned DataFrame info
    print(f"\nCleaned DataFrame:")
    print(f"Row count: {df_cleaned.count()}")
    print("\nCleaned sample data:")
    df_cleaned.show(5, truncate=False)
    
    # Check for null values after cleaning
    print("\nNull value check after cleaning:")
    null_counts = df_cleaned.select([pyspark_sum(col(c).isNull().cast("int")).alias(c) for c in df_cleaned.columns])
    null_counts.show()
    
    # Save cleaned DataFrame to Lakehouse (Silver layer)
    (
        df_cleaned.write
        .mode("overwrite")
        .saveAsTable("silver_goals")
    )

## game_officials.csv
Reads the game_officials.csv file, casts columns to appropriate types, and standardizes text (e.g., title-casing official names and normalizing official_type to either Referee or Linesman). It cleans the data by removing exact duplicates and filtering out invalid game_id or official_type values. Finally, the cleaned DataFrame is saved to the Lakehouse Silver layer table.

In [None]:
from pyspark.sql.functions import col, when, trim, lower, initcap, sum as pyspark_sum, count
from pyspark.sql.types import IntegerType, StringType

# Read csv from current directory
df = spark.read.format("csv").option("header","true").load("Files/Bronze(raw datasets)/game_officials.csv")

# Verify we successfully read the data
print(f"\nOriginal DataFrame loaded successfully!")
print(f"Row count: {df.count()}")
print("Column names:")
print(df.columns)
print("\nSchema:")
df.printSchema()

print("\nFirst 5 rows:")
df.show(5, truncate=False)

# Basic Data Cleaning Steps (only if data was loaded successfully)
if df is not None:
    print("\n=== Starting Data Cleaning ===")
    
    # 1. Ensure correct data types
    df_cleaned = df.withColumn("game_id", col("game_id").cast(StringType())) \
                   .withColumn("official_name", col("official_name").cast(StringType())) \
                   .withColumn("official_type", col("official_type").cast(StringType()))
    
    # 2. Standardize text columns
    # - Trim whitespace and convert official_name to title case
    # - Ensure official_type is either 'Referee' or 'Linesman'
    df_cleaned = df_cleaned.withColumn("official_name", trim(col("official_name"))) \
                          .withColumn("official_name", 
                                      when(col("official_name").isNotNull(),
                                           initcap(lower(col("official_name")))).otherwise(col("official_name"))) \
                          .withColumn("official_type",
                                      when(lower(col("official_type")).isin("referee", "linesman"),
                                           initcap(lower(col("official_type")))).otherwise("Unknown"))
    print("Schema after type casting:")
    df_cleaned.printSchema()
    
    # 3: Check for exact duplicates (all columns identical)
    print("\n=== Checking for Exact Duplicates ===")
    # Count occurrences of each row
    duplicate_counts = df_cleaned.groupBy(["game_id", "official_name", "official_type"]).agg(count("*").alias("row_count")).filter(col("row_count") > 1)
    if duplicate_count > 0:
        print(f"Found {duplicate_count} groups of exact duplicate rows")
        print("Sample of exact duplicate rows:")
        duplicate_counts.show(5, truncate=False)
    else:
        print("No exact duplicate rows found")
    
    # Decision: Drop exact duplicates (exact replicas likely errors)
    original_count = df_cleaned.count()
    df_cleaned = df_cleaned.dropDuplicates()
    print(f"Removed {original_count - df_cleaned.count()} exact duplicate rows")
    
    # 4. Validate game_id (ensure no negative values, as per profile report)
    if "game_id" in df_cleaned.columns:
        df_cleaned = df_cleaned.filter(col("game_id") > 0)
        print(f"Filtered out any rows with invalid (negative) game_id")
    
    # 5. Validate official_type (ensure only 'Referee' or 'Linesman')
    df_cleaned = df_cleaned.filter(col("official_type").isin("Referee", "Linesman"))
    print(f"Filtered rows to ensure official_type is either 'Referee' or 'Linesman'")
    
    # Print cleaned DataFrame info
    print(f"\nCleaned DataFrame:")
    print(f"Row count: {df_cleaned.count()}")
    print("\nCleaned sample data:")
    df_cleaned.show(5, truncate=False)
    
    # Check for null values
    print("\nNull value check after cleaning:")
    null_counts = df_cleaned.select([pyspark_sum(col(c).isNull().cast("int")).alias(c) for c in df_cleaned.columns])
    null_counts.show()
    
    # Save cleaned DataFrame to Lakehouse (Silver layer)
    (
        df_cleaned.write
        .mode("overwrite")
        .saveAsTable("silver_officials")
    )

## game_penalties.csv
Reads the game_penalties.csv file, casts columns to appropriate types, and handles missing values (e.g., filling null penaltySeverity with "Unknown"). It cleans the data by removing exact duplicates, validating play_id format, and logging distributions of key columns. Finally, the cleaned DataFrame is saved to the Lakehouse Silver layer table.

In [None]:
from pyspark.sql.functions import col, when, isnan, isnull, sum as pyspark_sum
from pyspark.sql.types import StringType, IntegerType

# Read csv from current directory
df = spark.read.format("csv").option("header","true").load("Files/Bronze(raw datasets)/game_penalties.csv")

# Verify we successfully read the data
print(f"\nOriginal DataFrame loaded successfully!")
print(f"Row count: {df.count()}")
print("Column names:")
print(df.columns)
print("\nSchema:")
df.printSchema()

print("\nFirst 5 rows:")
df.show(5, truncate=False)

# Basic Data Cleaning Steps (only if data was loaded successfully)
df_cleaned = df.withColumn(
        "penaltySeverity",
        when(col("penaltySeverity").isNull() | isnan(col("penaltySeverity")), "Unknown").otherwise(col("penaltySeverity"))
    )

if df is not None:
    print("\n=== Starting Data Cleaning ===")
    
    # 1. Fix data types
    if "penaltyMinutes" in df_cleaned.columns:
        df_cleaned = df_cleaned.withColumn("penaltyMinutes", col("penaltyMinutes").cast(IntegerType()))
    if "penaltySeverity" in df_cleaned.columns:
        df_cleaned = df_cleaned.withColumn("penaltySeverity", col("penaltySeverity").cast(StringType()))
    if "play_id" in df_cleaned.columns:
        df_cleaned = df_cleaned.withColumn("play_id", col("play_id").cast(StringType()))
    print("Schema after type casting:")
    df_cleaned.printSchema()

    # 2. Check for exact duplicates (all columns identical)
    print("\n=== Checking for Exact Duplicates ===")
    # Count occurrences of each row
    duplicate_counts = df_cleaned.groupBy(df_cleaned.columns).count().filter(col("count") > 1)
    duplicate_count = duplicate_counts.count()
    if duplicate_count > 0:
        print(f"Found {duplicate_count} groups of exact duplicate rows")
        print("Sample of exact duplicate rows:")
        duplicate_counts.show(5, truncate=False)
    else:
        print("No exact duplicate rows found")
    # Decision: Drop exact duplicates (exact replicas likely errors)
    original_count = df_cleaned.count()
    df_cleaned = df_cleaned.dropDuplicates()
    print(f"Removed {original_count - df_cleaned.count()} exact duplicate rows")
    
    # 3. Address imbalance in penaltySeverity and penaltyMinutes
    # For simplicity, we'll just log the counts for now
    if "penaltySeverity" in df_cleaned.columns:
        print("\nPenalty Severity distribution:")
        df_cleaned.groupBy("penaltySeverity").count().show()
    
    if "penaltyMinutes" in df_cleaned.columns:
        print("\nPenalty Minutes distribution:")
        df_cleaned.groupBy("penaltyMinutes").count().show()
    
    # 4. Validate play_id format (should be 12-14 characters)
    if "play_id" in df_cleaned.columns:
        df_cleaned = df_cleaned.withColumn(
            "play_id_valid",
            when(col("play_id").cast(StringType()).rlike("^[0-9]{10}_[0-9]+$"), True).otherwise(False)
        )
        invalid_play_ids = df_cleaned.filter(~col("play_id_valid")).count()
        print(f"Found {invalid_play_ids} rows with invalid play_id format")
        df_cleaned = df_cleaned.drop("play_id_valid")  # Drop temporary column
    
    # Print cleaned DataFrame info
    print(f"\nCleaned DataFrame:")
    print(f"Row count: {df_cleaned.count()}")
    print("\nCleaned sample data:")
    df_cleaned.show(5, truncate=False)
    
    # Check for null values after cleaning
    print("\nNull value check after cleaning:")
    null_counts = df_cleaned.select([pyspark_sum(col(c).isNull().cast("int")).alias(c) for c in df_cleaned.columns])
    null_counts.show()
    
    # Save cleaned DataFrame to Lakehouse (Silver layer)
    (
        df_cleaned.write
        .mode("overwrite")
        .saveAsTable("silver_penalties")
    )

## game_plays.csv
Reads the game_plays.csv file, fills missing values (e.g., coordinates with 0, team_id with -1, secondaryType with "Unknown"), and casts columns to appropriate types. It cleans the data by removing exact duplicates, deduplicating play_id, and validating play_id format while logging distributions and invalid entries. Finally, the cleaned DataFrame is saved to the Lakehouse Silver layer table.

In [None]:
from pyspark.sql.functions import col, when, isnull, sum as pyspark_sum
from pyspark.sql.types import StringType, IntegerType, FloatType, TimestampType

# Read csv from current directory
df = spark.read.format("csv").option("header","true").load("Files/Bronze(raw datasets)/game_plays.csv")

# Verify we successfully read the data
print(f"\nOriginal DataFrame loaded successfully!")
print(f"Row count: {df.count()}")
print("Column names:")
print(df.columns)
print("\nSchema:")
df.printSchema()

print("\nFirst 5 rows:")
df.show(5, truncate=False)

# Basic Data Cleaning Steps (only if data was loaded successfully)
if df is not None:
    print("\n=== Starting Data Cleaning ===")
    
    # 1. Handle missing value
    # 1.1: For secondaryType - fill with "Unknown"
    df_cleaned = df.withColumn(
        "secondaryType",
        when(isnull(col("secondaryType")), "Unknown").otherwise(col("secondaryType"))
    )
    
    # 1.2: For coordinates (x, y, st_x, st_y) - fill with 0 (assuming rink center or neutral)
    coord_cols = ["x", "y", "st_x", "st_y"]
    for col_name in coord_cols:
        if col_name in df_cleaned.columns:
            df_cleaned = df_cleaned.withColumn(
                col_name,
                when(isnull(col(col_name)), 0).otherwise(col(col_name))
            )
    
    # 1.3: For team_id_for and team_id_against - fill with -1 (unknown team)
    team_cols = ["team_id_for", "team_id_against"]
    for col_name in team_cols:
        if col_name in df_cleaned.columns:
            df_cleaned = df_cleaned.withColumn(
                col_name,
                when(isnull(col(col_name)), -1).otherwise(col(col_name))
            )
    
    # 1.4: For periodTimeRemaining - fill with mean or 0
    if "periodTimeRemaining" in df_cleaned.columns:
        df_cleaned = df_cleaned.withColumn(
            "periodTimeRemaining",
            when(isnull(col("periodTimeRemaining")), 0).otherwise(col("periodTimeRemaining"))
        )

    # 2. Fix data types
    if "play_id" in df_cleaned.columns:
        df_cleaned = df_cleaned.withColumn("play_id", col("play_id").cast(StringType()))
    if "game_id" in df_cleaned.columns:
        df_cleaned = df_cleaned.withColumn("game_id", col("game_id").cast(StringType()))
    if "period" in df_cleaned.columns:
        df_cleaned = df_cleaned.withColumn("period", col("period").cast(IntegerType()))
    if "periodTime" in df_cleaned.columns:
        df_cleaned = df_cleaned.withColumn("periodTime", col("periodTime").cast(IntegerType()))
    if "dateTime" in df_cleaned.columns:
        df_cleaned = df_cleaned.withColumn("dateTime", col("dateTime").cast(TimestampType()))
    coord_cols += ["goals_away", "goals_home"]
    for col_name in coord_cols:
        if col_name in df_cleaned.columns:
            df_cleaned = df_cleaned.withColumn(col_name, col(col_name).cast(FloatType()))
    print("Schema after type casting:")
    df_cleaned.printSchema()
    
    # 3.1: Check for exact duplicates (all columns identical)
    print("\n=== Checking for Exact Duplicates ===")
    # Count occurrences of each row
    duplicate_counts = df_cleaned.groupBy(df_cleaned.columns).count().filter(col("count") > 1)
    duplicate_count = duplicate_counts.count()
    if duplicate_count > 0:
        print(f"Found {duplicate_count} groups of exact duplicate rows")
        print("Sample of exact duplicate rows:")
        duplicate_counts.show(5, truncate=False)
    else:
        print("No exact duplicate rows found")
    # Decision: Drop exact duplicates (exact replicas likely errors)
    original_count = df_cleaned.count()
    df_cleaned = df_cleaned.dropDuplicates()
    print(f"Removed {original_count - df_cleaned.count()} exact duplicate rows")

    # 3.2: Check for play_id duplicates (non-exact duplicates)
    print("\n=== Checking for play_id Duplicates ===")
    # Count occurrences of each play_id
    play_id_counts = df_cleaned.groupBy("play_id").count().filter(col("count") > 1)
    play_id_duplicate_count = play_id_counts.count()
    if play_id_duplicate_count > 0:
        print(f"Found {play_id_duplicate_count} play_id duplicates")
        print("Sample of play_id duplicate rows:")
        # Show the rows for duplicate play_ids
        duplicate_play_ids = play_id_counts.select("play_id")
        duplicate_rows = df_cleaned.join(duplicate_play_ids, "play_id", "inner")
        duplicate_rows.show(5, truncate=False)
    else:
        print("No play_id duplicate rows found")
    # Decision: Drop play_id duplicates, keeping first occurrence
    original_count = df_cleaned.count()
    df_cleaned = df_cleaned.dropDuplicates(["play_id"])
    new_count = df_cleaned.count()
    print(f"Removed {original_count - new_count} play_id duplicate rows (keeping first occurrence)")
    original_count = new_count  # Update count for subsequent steps
    
    # 4. Address imbalance in periodType - just log distribution for now
    if "periodType" in df_cleaned.columns:
        print("\nPeriod Type distribution:")
        df_cleaned.groupBy("periodType").count().show()
    
    # 5. Validate play_id format (should be like "YYYYMMDDGG_N" based on report)
    if "play_id" in df_cleaned.columns:
        df_cleaned = df_cleaned.withColumn(
            "play_id_valid",
            when(col("play_id").rlike("^[0-9]{10}_[0-9]+$"), True).otherwise(False)
        )
        invalid_play_ids = df_cleaned.filter(~col("play_id_valid")).count()
        print(f"Found {invalid_play_ids} rows with invalid play_id format")
        df_cleaned = df_cleaned.drop("play_id_valid")  # Drop temporary column
    
    # Print cleaned DataFrame info
    print(f"\nCleaned DataFrame:")
    print(f"Row count: {df_cleaned.count()}")
    print("\nCleaned sample data:")
    df_cleaned.show(5, truncate=False)
    
    # Check for null values after cleaning
    print("\nNull value check after cleaning:")
    null_counts = df_cleaned.select([pyspark_sum(col(c).isNull().cast("int")).alias(c) for c in df_cleaned.columns])
    null_counts.show()
    
    # Save cleaned DataFrame to Lakehouse (Silver layer)
    (
        df_cleaned.write
        .mode("overwrite")
        .saveAsTable("silver_plays")
    )