# game.csv

Performed column standardization to snake_case, removed duplicates, dropped columns with over 85% null values (except key identifiers), validated score columns for non-negative values, and verified row counts before and after loading to ensure consistency.

In [5]:

# ============================
# Game Pipeline
# ============================

import pandas as pd
import re

df_game = pd.read_csv("/lakehouse/default/Files/dataset/game.csv")

# data inspection 

print(df_game.head())
print(df_game.info())

# Step 1 — Standardize column names to snake_case
def to_snake_case(name):
    name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', name)
    return name.lower()

df_game.columns = [to_snake_case(c) for c in df_game.columns]

# Step 2 — Drop columns with >85% null values but keep required columns
required_cols = ['game_id', 'season', 'home_team_id', 'away_team_id']
null_threshold = 0.85
df_game = df_game.drop(columns=[c for c in df_game.columns 
                                if df_game[c].isnull().mean() > null_threshold and c not in required_cols])
print("✅ Dropped high-null columns where applicable.")

# Step 3 — Remove duplicates
df_game = df_game.drop_duplicates()

# Step 4 — Range/value validation
if 'home_score' in df_game.columns:
    invalid_home = df_game[df_game['home_goals'] < 0]
    if not invalid_home.empty:
        print("⚠️ Invalid home_score rows:")
        print(invalid_home)

if 'away_score' in df_game.columns:
    invalid_away = df_game[df_game['away_goals'] < 0]
    if not invalid_away.empty:
        print("⚠️ Invalid away_score rows:")
        print(invalid_away)

# Step 5 — Save to Silver layer
sdf_game = spark.createDataFrame(df_game)
sdf_game.write.mode("overwrite").format("delta").saveAsTable("silver_game")

# Step 6 — Verify row count
df_loaded_game = spark.read.table("silver_game").toPandas()
print(f"✅ Rows before load: {len(df_game)}, after load: {len(df_loaded_game)}")

StatementMeta(, c290fbeb-1be0-4cc5-aeca-d29437c557f5, 7, Finished, Available, Finished)

      game_id    season type         date_time_GMT  away_team_id  \
0  2016020045  20162017    R  2016-10-19T00:30:00Z             4   
1  2017020812  20172018    R  2018-02-07T00:00:00Z            24   
2  2015020314  20152016    R  2015-11-24T01:00:00Z            21   
3  2015020849  20152016    R  2016-02-17T00:00:00Z            52   
4  2017020586  20172018    R  2017-12-30T03:00:00Z            20   

   home_team_id  away_goals  home_goals       outcome home_rink_side_start  \
0            16           4           7  home win REG                right   
1             7           4           3   away win OT                 left   
2            52           4           1  away win REG                right   
3            12           1           2  home win REG                right   
4            24           1           2  home win REG                 left   

            venue           venue_link   venue_time_zone_id  \
0   United Center  /api/v1/venues/null      America/Chicago

# game_skater_stats.csv

Standardized column names to snake_case, dropped columns exceeding 85% null values, removed duplicates, renamed penaltyMinutes to pim, validated numeric columns such as goals and time_on_ice, and verified schema and row count consistency before and after loading.

In [6]:
# Step 1a — Read raw data
skater_file = "/lakehouse/default/Files/dataset/game_skater_stats.csv"
df_skater_raw = pd.read_csv(skater_file)
print(f"📥 Raw rows loaded: {len(df_skater_raw)}")

# step 1b - data inspection 
print(df_skater_raw.head())
print(df_skater_raw.info())

StatementMeta(, c290fbeb-1be0-4cc5-aeca-d29437c557f5, 8, Finished, Available, Finished)

📥 Raw rows loaded: 945830
      game_id  player_id  team_id  timeOnIce  assists  goals  shots  hits  \
0  2016020045    8468513        4        955        1      0      0   2.0   
1  2016020045    8476906        4       1396        1      0      4   2.0   
2  2016020045    8474668        4        915        0      0      1   1.0   
3  2016020045    8473512        4       1367        3      0      0   0.0   
4  2016020045    8471762        4        676        0      0      3   2.0   

   powerPlayGoals  powerPlayAssists  ...  faceoffTaken  takeaways  giveaways  \
0               0                 0  ...             0        1.0        1.0   
1               0                 0  ...             0        1.0        2.0   
2               0                 0  ...             0        2.0        0.0   
3               0                 2  ...            27        0.0        0.0   
4               0                 0  ...             0        0.0        1.0   

   shortHandedGoals  shortHand

In [17]:
# ============================
# Game Skater Stats Pipeline
# ============================

import pandas as pd
import re

# Step 1 — Read raw data
skater_file = "/lakehouse/default/Files/dataset/game_skater_stats.csv"
df_skater_raw = pd.read_csv(skater_file)
print(f"📥 Raw rows loaded: {len(df_skater_raw)}")


# Step 2 — Define required columns to never drop
required_columns = ['game_id', 'player_id', 'team_id', 'time_on_ice', 'assists', 'goals']

# Step 3 — Drop columns with >85% null values but keep required columns
null_threshold = 0.85
null_ratios = df_skater_raw.isnull().mean()
cols_to_drop = [col for col in df_skater_raw.columns
                if null_ratios[col] > null_threshold and col not in required_columns]

if cols_to_drop:
    df_skater = df_skater_raw.drop(columns=cols_to_drop)
    print(f"🧹 Dropped columns with >{int(null_threshold*100)}% nulls: {cols_to_drop}")
else:
    df_skater = df_skater_raw.copy()
    print(f"✅ No columns exceeded {int(null_threshold*100)}% null threshold.")

# Step 4 — Validate numeric columns
skater_checks = {
    'goals': lambda x: x >= 0,
    'TimeOnIce': lambda x: x >= 0  # Use original CSV column name for validation
}

for col, rule in skater_checks.items():
    if col in df_skater.columns:
        invalid = df_skater[~df_skater[col].apply(rule)]
        if len(invalid) > 0:
            print(f"⚠️ Invalid values in {col}: {len(invalid)} rows")
        else:
            print(f"✅ All values valid in {col}")

# Step 5 — Remove duplicates based on key identifiers
unique_id_cols = ['game_id', 'player_id']
before = len(df_skater)
df_skater = df_skater.drop_duplicates(subset=unique_id_cols)
print(f"✅ Removed {before - len(df_skater)} duplicate rows")

# Step 6 — Convert only non-snake_case columns to snake_case
def is_snake_case(name):
    return name == name.lower() and "_" in name

def to_snake_case(name):
    name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', name)
    return name.lower()

df_skater.columns = [to_snake_case(col) if not is_snake_case(col) else col for col in df_skater.columns]

# Step 6b — Rename penaltyMinutes to pim if present
if 'penalty_minutes' in df_skater.columns:
    df_skater = df_skater.rename(columns={'penalty_minutes': 'pim'})

# Optional: enforce any known column corrections (like face_off_taken)
rename_map = {
    'faceoff_taken': 'face_off_taken'
}
df_skater = df_skater.rename(columns={k: v for k, v in rename_map.items() if k in df_skater.columns})

print("✅ Column names standardized; existing snake_case preserved; penaltyMinutes renamed to pim.")

# Step 7 — Preserve column order using renamed columns
original_sequence = [col for col in df_skater.columns if col in df_skater.columns]
df_skater = df_skater[original_sequence]

print(f"✅ Columns kept for Silver table: {list(df_skater.columns)}")

# Step 8 — Verify required columns exist (for warning, not selection)
missing_cols = [col for col in required_columns if col not in df_skater.columns]
if missing_cols:
    print(f"⚠️ Missing required columns: {missing_cols}")
else:
    print("✅ All required columns present before load.")

print(f"📊 Final row count before load: {len(df_skater)}")

# Step 9 — Load cleaned data to Silver Layer with schema overwrite
sdf_skater = spark.createDataFrame(df_skater)
sdf_skater.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .format("delta") \
    .saveAsTable("silver_game_skater_stats")
print("✅ Data saved to silver_game_skater_stats")

# Step 10 — Verify after load
df_loaded = spark.read.table("silver_game_skater_stats").toPandas()

# Compare row count
if len(df_loaded) == len(df_skater):
    print("✅ Row count matches after load.")
else:
    print(f"❌ Row count mismatch! Before: {len(df_skater)}, After: {len(df_loaded)}")

# Compare columns
if set(df_loaded.columns) == set(df_skater.columns):
    print("✅ Column structure matches after load.")
else:
    print("❌ Column mismatch detected after load.")





StatementMeta(, 8c4d41ea-f1b4-470d-96bb-8c3981bacd28, 19, Finished, Available, Finished)

📥 Raw rows loaded: 945830
✅ No columns exceeded 85% null threshold.
✅ All values valid in goals
✅ Removed 92426 duplicate rows
✅ Column names standardized; existing snake_case preserved; penaltyMinutes renamed to pim.
✅ Columns kept for Silver table: ['game_id', 'player_id', 'team_id', 'time_on_ice', 'assists', 'goals', 'shots', 'hits', 'power_play_goals', 'power_play_assists', 'pim', 'face_off_wins', 'face_off_taken', 'takeaways', 'giveaways', 'short_handed_goals', 'short_handed_assists', 'blocked', 'plus_minus', 'even_time_on_ice', 'short_handed_time_on_ice', 'power_play_time_on_ice']
✅ All required columns present before load.
📊 Final row count before load: 853404
✅ Data saved to silver_game_skater_stats
✅ Row count matches after load.
✅ Column structure matches after load.


# game_goalie_stats.csv

Cleaned and standardized columns, removed duplicates, dropped sparse columns, validated shot- and save-related metrics, converted column names to consistent snake_case, and confirmed data integrity through schema and row count checks post-load.

In [None]:
import pandas as pd

# Step 1a — Read raw data
goalie_file = "/lakehouse/default/Files/dataset/game_goalie_stats.csv"
df_goalie_raw = pd.read_csv(goalie_file)
print(f"📥 Raw rows loaded: {len(df_goalie_raw)}")

# Step 1b - data inspection 
print(df_goalie_raw.head())
print(df_goalie_raw.info())

In [18]:
# ============================
# Game Goalie Stats Pipeline
# ============================

import pandas as pd
import re

# Step 1 — Read raw data
goalie_file = "/lakehouse/default/Files/dataset/game_goalie_stats.csv"
df_goalie_raw = pd.read_csv(goalie_file)
print(f"📥 Raw rows loaded: {len(df_goalie_raw)}")

# Step 2 — Define required columns to never drop
required_columns = ['game_id', 'player_id', 'team_id', 'shots', 'saves']

# Step 3 — Drop columns with >85% null values but keep required columns
null_threshold = 0.85
null_ratios = df_goalie_raw.isnull().mean()
cols_to_drop = [col for col in df_goalie_raw.columns
                if null_ratios[col] > null_threshold and col not in required_columns]

if cols_to_drop:
    df_goalie = df_goalie_raw.drop(columns=cols_to_drop)
    print(f"🧹 Dropped columns with >{int(null_threshold*100)}% nulls: {cols_to_drop}")
else:
    df_goalie = df_goalie_raw.copy()
    print(f"✅ No columns exceeded {int(null_threshold*100)}% null threshold.")

# Step 4 — Validate numeric columns
goalie_checks = {
    'saves': lambda x: x >= 0,
    'shots': lambda x: x >= 0
}

for col, rule in goalie_checks.items():
    if col in df_goalie.columns:
        invalid = df_goalie[~df_goalie[col].apply(rule)]
        if len(invalid) > 0:
            print(f"⚠️ Invalid values in {col}: {len(invalid)} rows")
        else:
            print(f"✅ All values valid in {col}")

# Step 5 — Remove duplicates based on key identifiers
unique_id_cols = ['game_id', 'player_id']
before = len(df_goalie)
df_goalie = df_goalie.drop_duplicates(subset=unique_id_cols)
print(f"✅ Removed {before - len(df_goalie)} duplicate rows")

# Step 6 — Convert only non-snake_case columns to snake_case
def is_snake_case(name):
    return name == name.lower() and "_" in name

def to_snake_case(name):
    name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', name)
    return name.lower()

df_goalie.columns = [to_snake_case(col) if not is_snake_case(col) else col for col in df_goalie.columns]

# Step 6b — Rename penaltyMinutes to pim if present
if 'penalty_minutes' in df_goalie.columns:
    df_goalie = df_goalie.rename(columns={'penalty_minutes': 'pim'})

# Optional: enforce any known column corrections if needed
rename_map = {
    # Ensure save_pct stays snake_case if exists
    'savePct': 'save_pct'
}
df_goalie = df_goalie.rename(columns={k: v for k, v in rename_map.items() if k in df_goalie.columns})

print("✅ Column names standardized; existing snake_case preserved; penaltyMinutes renamed to pim.")

# Step 7 — Verify required columns exist (warning only, not subsetting)
missing_cols = [col for col in required_columns if col not in df_goalie.columns]
if missing_cols:
    print(f"⚠️ Missing required columns: {missing_cols}")
else:
    print("✅ All required columns present before load.")

print(f"📊 Final row count before load: {len(df_goalie)}")

# Step 8 — Load cleaned data to Silver Layer with schema overwrite
sdf_goalie = spark.createDataFrame(df_goalie)
sdf_goalie.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .format("delta") \
    .saveAsTable("silver_game_goalie_stats")
print("✅ Data saved to silver_game_goalie_stats")

# Step 9 — Verify after load
df_loaded = spark.read.table("silver_game_goalie_stats").toPandas()

# Compare row count
if len(df_loaded) == len(df_goalie):
    print("✅ Row count matches after load.")
else:
    print(f"❌ Row count mismatch! Before: {len(df_goalie)}, After: {len(df_loaded)}")

# Compare columns
if set(df_loaded.columns) == set(df_goalie.columns):
    print("✅ Column structure matches after load.")
else:
    print("❌ Column mismatch detected after load.")




StatementMeta(, 8c4d41ea-f1b4-470d-96bb-8c3981bacd28, 20, Finished, Available, Finished)

📥 Raw rows loaded: 56656
✅ No columns exceeded 85% null threshold.
✅ All values valid in saves
✅ All values valid in shots
✅ Removed 5493 duplicate rows
✅ Column names standardized; existing snake_case preserved; penaltyMinutes renamed to pim.
✅ All required columns present before load.
📊 Final row count before load: 51163
✅ Data saved to silver_game_goalie_stats
✅ Row count matches after load.
✅ Column structure matches after load.


# game_team_stats.csv

Cleaned duplicate entries, standardized column naming, removed sparse columns, validated team-level game statistics (goals, shots, PIM), and verified schema alignment between the raw and Silver layers.

In [None]:
import pandas as pd

# Step 1a — Load raw CSV
team_file = "/lakehouse/default/Files/dataset/game_teams_stats.csv"
df_team_raw = pd.read_csv(team_file)
print(f"📥 Raw rows loaded: {len(df_team_raw)}")

# Step 1b - data inspection 
print(df_team_raw.head())
print(df_team_raw.info())

In [36]:
# ============================
# Game Team Stats Pipeline
# ============================

import pandas as pd
import re

# Step 1 — Load raw CSV
team_file = "/lakehouse/default/Files/dataset/game_teams_stats.csv"
df_team_raw = pd.read_csv(team_file)
print(f"📥 Raw rows loaded: {len(df_team_raw)}")

# Step 2 — Required columns (never drop)
required_columns = ['game_id', 'team_id', 'goals', 'shots', 'pim']

# Step 3 — Drop columns with >85% nulls, excluding required columns
null_threshold = 0.85
null_ratios = df_team_raw.isnull().mean()
cols_to_drop = [col for col in df_team_raw.columns if null_ratios[col] > null_threshold and col not in required_columns]

if cols_to_drop:
    df_team = df_team_raw.drop(columns=cols_to_drop)
    print(f"🧹 Dropped columns with >{int(null_threshold*100)}% nulls: {cols_to_drop}")
else:
    df_team = df_team_raw.copy()
    print(f"✅ No columns exceeded {int(null_threshold*100)}% null threshold.")

# Step 4 — Standardize column names to snake_case
def is_snake_case(name):
    return name == name.lower() and "_" in name

def to_snake_case(name):
    name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', name)
    return name.lower()

df_team.columns = [to_snake_case(col) if not is_snake_case(col) else col for col in df_team.columns]

# Step 4b — Rename penaltyMinutes to pim if present
if 'penaltyminutes' in df_team.columns:
    df_team = df_team.rename(columns={'penaltyminutes': 'pim'})

print("✅ Column names standardized; existing snake_case preserved; penaltyMinutes renamed to pim.")

# Step 5 — Validate numeric columns and display invalid rows
numeric_checks = ['goals', 'shots', 'pim']
invalid_rows = df_team[
    (df_team['goals'] < 0) |
    (df_team['shots'] < 0) |
    (df_team['pim'] < 0)
]

if not invalid_rows.empty:
    print(f"⚠️ Found {len(invalid_rows)} invalid rows:")
    print(invalid_rows.to_string(index=False))
    
    # Optional: save invalid rows for audit
    invalid_rows.to_csv("/lakehouse/default/Files/invalid_game_team_stats.csv", index=False)
    
    # Remove invalid rows from main DataFrame
    df_team = df_team.drop(invalid_rows.index)
    print(f"✅ Removed {len(invalid_rows)} invalid rows")
else:
    print("✅ No invalid numeric rows found")

# Step 6 — Remove duplicates based on key identifiers
unique_id_cols = ['game_id', 'team_id']
before = len(df_team)
df_team = df_team.drop_duplicates(subset=unique_id_cols)
print(f"✅ Removed {before - len(df_team)} duplicate rows")

# Step 7 — Verify required columns exist
missing_cols = [col for col in required_columns if col not in df_team.columns]
if missing_cols:
    print(f"⚠️ Missing required columns: {missing_cols}")
else:
    print("✅ All required columns present before load.")

print(f"📊 Final row count before load: {len(df_team)}")

# Step 8 — Load cleaned data to Silver Layer with schema overwrite
sdf_team = spark.createDataFrame(df_team)
sdf_team.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .format("delta") \
    .saveAsTable("silver_game_team_stats")
print("✅ Data saved to silver_game_team_stats")

# Step 9 — Verify after load
df_loaded = spark.read.table("silver_game_team_stats").toPandas()

# Compare row count
if len(df_loaded) == len(df_team):
    print("✅ Row count matches after load.")
else:
    print(f"❌ Row count mismatch! Before: {len(df_team)}, After: {len(df_loaded)}")

# Compare columns
if list(df_loaded.columns) == list(df_team.columns):
    print("✅ Column order and names match after load.")
else:
    print("❌ Column mismatch detected after load.")







StatementMeta(, 8c4d41ea-f1b4-470d-96bb-8c3981bacd28, 38, Finished, Available, Finished)

📥 Raw rows loaded: 52610
✅ No columns exceeded 85% null threshold.
✅ Column names standardized; existing snake_case preserved; penaltyMinutes renamed to pim.
✅ No invalid numeric rows found
✅ Removed 5140 duplicate rows
✅ All required columns present before load.
📊 Final row count before load: 47470
✅ Data saved to silver_game_team_stats
✅ Row count matches after load.
✅ Column order and names match after load.


# player_info.csv

Converted height (inches) and weight (pounds) to metric units with standardized precision, handled missing values via median and mode imputation, validated categorical fields like shooting hand and position, standardized string formatting (names, cities, countries), and ensured clean, consistent data ready for analytical joins.

In [None]:
import pandas as pd

# Step 1 — Load raw CSV
player_file = "/lakehouse/default/Files/dataset/player_info.csv"
df_player = pd.read_csv(player_file)
print(f"📥 Raw rows loaded: {len(df_player)}")

# Step 1b - data inspection 
print(df_player.head())
print(df_player.info())

In [5]:

# ============================
# player_info Pipeline
# ============================

import pandas as pd
import re

# Step 1 — Load raw CSV
player_file = "/lakehouse/default/Files/dataset/player_info.csv"
df_player = pd.read_csv(player_file)
print(f"📥 Raw rows loaded: {len(df_player)}")

# Step 2 — Standardize column names to snake_case
def to_snake_case(name):
    name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', name)
    return name.lower()

df_player.columns = [to_snake_case(c) if not (c==c.lower() and "_" in c) else c for c in df_player.columns]
print("✅ Column names standardized; existing snake_case preserved.")

# Step 3 — Drop columns with >85% nulls (keep required)
required_columns = ['player_id', 'first_name', 'last_name', 'birth_date', 'nationality']
null_threshold = 0.85
df_player = df_player.drop(columns=[c for c in df_player.columns 
                                    if df_player[c].isnull().mean() > null_threshold 
                                    and c not in required_columns])
print("✅ Dropped high-null columns where applicable.")

# Step 4 — Convert weight to kg, round to 2 dp, drop height in inches, round height_cm
if 'weight' in df_player.columns:
    df_player['weight'] = (df_player['weight'] * 0.453592).round(2)
    print("✅ Converted weight to kg (2 decimals)")
if 'height_cm' in df_player.columns:
    df_player['height_cm'] = df_player['height_cm'].round(2)
if 'height' in df_player.columns:
    df_player = df_player.drop(columns=['height'])
    print("✅ Dropped original height (inches); height_cm preserved")

# Step 5 — Range/value checks
numeric_ranges = {'height_cm': (140, 230), 'weight': (40, 150)}  # height in cm, weight in kg
for col, (min_val, max_val) in numeric_ranges.items():
    if col in df_player.columns:
        invalid = df_player[(df_player[col] < min_val) | (df_player[col] > max_val)]
        if not invalid.empty:
            print(f"⚠️ Invalid {col} values:")
            print(invalid[['player_id', col]])
        else:
            print(f"✅ All {col} values valid")

# Step 6 — Categorical validation
if 'shoots_catches' in df_player.columns:
    invalid = df_player[~df_player['shoots_catches'].isin(['L','R']) & df_player['shoots_catches'].notna()]
    if not invalid.empty:
        print(f"⚠️ Invalid shoots_catches values:\n{invalid[['player_id','shoots_catches']]}")
if 'primary_position' in df_player.columns:
    valid_pos = ['C','LW','RW','D','G']
    invalid = df_player[~df_player['primary_position'].isin(valid_pos)]
    if not invalid.empty:
        print(f"⚠️ Invalid primary_position values:\n{invalid[['player_id','primary_position']]}")

# Step 7 — Standardize text formatting
for col in ['first_name','last_name','birth_city']:
    if col in df_player.columns:
        df_player[col] = df_player[col].str.title()
for col in ['country','birth_state_province']:
    if col in df_player.columns:
        df_player[col] = df_player[col].str.upper()

# Step 8 — Handle missing values
# Numeric: fill with median
for col in ['height_cm','weight']:
    if col in df_player.columns:
        df_player[col].fillna(df_player[col].median(), inplace=True)
# Categorical: fill with mode
for col in ['shoots_catches']:
    if col in df_player.columns:
        df_player[col].fillna(df_player[col].mode()[0], inplace=True)

# Step 9 — Remove duplicates
df_player = df_player.drop_duplicates(subset=['player_id'])
print(f"✅ Removed duplicates; final rows: {len(df_player)}")

# Step 10 — Verification before load
missing_cols = [col for col in required_columns if col not in df_player.columns]
if missing_cols:
    print(f"⚠️ Missing required columns before load: {missing_cols}")
else:
    print("✅ All required columns present before load.")
print(f"📊 Final row count before load: {len(df_player)}")

# Step 11 — Save to Silver Layer
spark.createDataFrame(df_player).write.mode("overwrite") \
      .option("overwriteSchema", "true").format("delta") \
      .saveAsTable("silver_player_info")
print("✅ Data saved to silver_player_info")

# Step 12 — Verification after load
df_loaded = spark.read.table("silver_player_info").toPandas()
missing_cols_after = [col for col in required_columns if col not in df_loaded.columns]
if missing_cols_after:
    print(f"⚠️ Missing required columns after load: {missing_cols_after}")
else:
    print("✅ All required columns present after load.")
if len(df_loaded) == len(df_player):
    print(f"✅ Row count matches after load: {len(df_loaded)}")
else:
    print(f"⚠️ Row count mismatch! Loaded: {len(df_loaded)}, Expected: {len(df_player)}")


StatementMeta(, 57bc7e90-2a29-4db4-9fec-ae5f7b4d21a2, 6, Finished, Available, Finished)

📥 Raw rows loaded: 3925
✅ Column names standardized; existing snake_case preserved.
✅ Dropped high-null columns where applicable.
✅ Converted weight to kg (2 decimals)
✅ Dropped original height (inches); height_cm preserved
✅ All height_cm values valid
✅ All weight values valid
✅ Removed duplicates; final rows: 3925
✅ All required columns present before load.
📊 Final row count before load: 3925
✅ Data saved to silver_player_info
✅ All required columns present after load.
✅ Row count matches after load: 3925


# team_info.csv

Harmonized naming conventions, dropped redundant or null-heavy fields, verified team IDs and abbreviations for validity, and ensured schema and record consistency post-ingestion.

In [None]:
import pandas as pd

# Step 1 — Load raw CSV
df_team = pd.read_csv("/lakehouse/default/Files/dataset/team_info.csv")

# Step 1b - data inspection 
print(df_team.head())
print(df_team.info())

In [9]:
# ============================
# team_info. Pipeline
# ============================ 

import pandas as pd
import re

df_team = pd.read_csv("/lakehouse/default/Files/dataset/team_info.csv")

# Step 1 — Standardize column names to snake_case
def to_snake_case(name):
    s1 = ""
    for i, c in enumerate(name):
        if c.isupper() and i != 0 and name[i-1].islower():
            s1 += "_"
        s1 += c.lower()
    return s1

df_team.columns = [to_snake_case(c) for c in df_team.columns]

# Step 2 — Drop columns with >85% null values but keep required columns
required_cols = ['team_id', 'team_name', 'abbreviation']
null_threshold = 0.85
df_team = df_team.drop(columns=[c for c in df_team.columns 
                                if df_team[c].isnull().mean() > null_threshold and c not in required_cols])
print("✅ Dropped high-null columns where applicable.")

# Step 3 — Remove duplicates
df_team = df_team.drop_duplicates()

# Step 4 — Verify required columns exist
missing_cols = [col for col in required_cols if col not in df_team.columns]
if missing_cols:
    print(f"⚠️ Missing required columns: {missing_cols}")
else:
    print("✅ All required columns present.")

# Step 5 — Save to Silver layer
sdf_team = spark.createDataFrame(df_team)
sdf_team.write.mode("overwrite").format("delta").saveAsTable("silver_team_info")

# Step 6 — Verify row count
df_loaded_team = spark.read.table("silver_team_info").toPandas()
print(f"✅ Rows before load: {len(df_team)}, after load: {len(df_loaded_team)}")

StatementMeta(, ce3146a1-b3d8-40ee-b6d9-365a28822113, 11, Finished, Available, Finished)

✅ Dropped high-null columns where applicable.
✅ All required columns present.
✅ Rows before load: 33, after load: 33


# game_goal.csv

Renamed columns to consistent snake_case, removed duplicate records, validated goal-related values, ensured strength values were limited to expected categories (Even, Power Play, Short Handed), and verified load consistency.

In [None]:
import pandas as pd

# Step 1 — Load raw CSV
df_goals = pd.read_csv("/lakehouse/default/Files/dataset/game_goals.csv")

# Step 1b - data inspection 
print(df_goals.head())
print(df_goals.info())

In [10]:

# ============================
# game_goal. Pipeline
# ============================

import pandas as pd
import re

df_goals = pd.read_csv("/lakehouse/default/Files/dataset/game_goals.csv")

# Step 1 — Standardize column names to snake_case
def to_snake_case(name):
    name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', name)
    return name.lower()

df_goals.columns = [to_snake_case(c) for c in df_goals.columns]

# Step 2 — Drop columns with >85% null values but keep required columns
required_cols = ['player_id', 'strength', 'game_winning_goal', 'empty_net']
null_threshold = 0.85
df_goals = df_goals.drop(columns=[c for c in df_goals.columns
                                  if df_goals[c].isnull().mean() > null_threshold and c not in required_cols])
print("✅ Dropped high-null columns where applicable.")

# Step 3 — Remove duplicates
df_goals = df_goals.drop_duplicates()

# Step 4 — Range/value validation
# Strength column should have valid categories: 'Even', 'Power Play', 'Short Handed'
valid_strength = ['Even', 'Power Play', 'Short Handed']
if 'strength' in df_goals.columns:
    invalid_strength = df_goals[~df_goals['strength'].isin(valid_strength) & df_goals['strength'].notna()]
    if not invalid_strength.empty:
        print("⚠️ Invalid strength rows:")
        print(invalid_strength)

# Step 5 — Save to Silver layer
sdf_goals = spark.createDataFrame(df_goals)
sdf_goals.write.mode("overwrite").format("delta").saveAsTable("silver_game_goals")
print("✅ Saved to Lakehouse table: silver_game_goals")

# Step 6 — Verify row count
df_loaded_goals = spark.read.table("silver_game_goals").toPandas()
print(f"✅ Rows before load: {len(df_goals)}, after load: {len(df_loaded_goals)}")

StatementMeta(, ce3146a1-b3d8-40ee-b6d9-365a28822113, 12, Finished, Available, Finished)

✅ Dropped high-null columns where applicable.
✅ Saved to Lakehouse table: silver_game_goals
✅ Rows before load: 133345, after load: 133345


#

# game_penalties.csv

Standardized columns, dropped high-null columns, renamed penalty_minutes to pim, validated penalty duration values, removed duplicates, and ensured accurate row counts after loading to the Silver layer.

In [None]:
import pandas as pd

# Step 1 — Load raw CSV
df_penalties = pd.read_csv("/lakehouse/default/Files/dataset/game_penalties.csv")

# Step 1b - data inspection 
print(df_penalties.head())
print(df_penalties.info())

In [11]:

# ============================
# game_penalties. Pipeline
# ============================

import pandas as pd
import re

df_penalties = pd.read_csv("/lakehouse/default/Files/dataset/game_penalties.csv")

# Step 1 — Standardize column names to snake_case
def to_snake_case(name):
    name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', name)
    return name.lower()

df_penalties.columns = [to_snake_case(c) for c in df_penalties.columns]

# Step 2 — Drop columns with >85% null values but keep required columns
required_cols = ['play_id', 'penalty_minutes']
null_threshold = 0.85
df_penalties = df_penalties.drop(columns=[c for c in df_penalties.columns
                                          if df_penalties[c].isnull().mean() > null_threshold and c not in required_cols])
print("✅ Dropped high-null columns where applicable.")

# Step 3 — Remove duplicates
df_penalties = df_penalties.drop_duplicates()

# Step 4 — Rename penalty_minutes to pim
if 'penalty_minutes' in df_penalties.columns:
    df_penalties = df_penalties.rename(columns={'penalty_minutes': 'pim'})
    print("✅ Renamed 'penalty_minutes' to 'pim'")

# Step 5 — Range/value validation
invalid_pim = df_penalties[df_penalties['pim'] < 0]
if not invalid_pim.empty:
    print("⚠️ Invalid pim rows:")
    print(invalid_pim)

# Step 6 — Save to Silver layer
sdf_penalties = spark.createDataFrame(df_penalties)
sdf_penalties.write.mode("overwrite").format("delta").saveAsTable("silver_game_penalties")
print("✅ Saved to Lakehouse table: silver_game_penalties")

# Step 7 — Verify row count
df_loaded_penalties = spark.read.table("silver_game_penalties").toPandas()
print(f"✅ Rows before load: {len(df_penalties)}, after load: {len(df_loaded_penalties)}")

StatementMeta(, ce3146a1-b3d8-40ee-b6d9-365a28822113, 13, Finished, Available, Finished)

✅ Dropped high-null columns where applicable.
✅ Renamed 'penalty_minutes' to 'pim'
✅ Saved to Lakehouse table: silver_game_penalties
✅ Rows before load: 229228, after load: 229228
