In [4]:
# Parameters
# Project name (folder reference)
project_name = "nhl"

# Prefix used for Delta table names
table_prefix = "nhl"


StatementMeta(, f255e5b4-9b62-48e2-bdb7-7ec3f66bbd36, 6, Finished, Available, Finished)

In [5]:
# Environment setup
import os
import re
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s | %(levelname)s | %(message)s')
logger = logging.getLogger("ingest")

StatementMeta(, f255e5b4-9b62-48e2-bdb7-7ec3f66bbd36, 7, Finished, Available, Finished)

In [6]:
# Absolute filesystem path (used for listing files)
raw_fs_path = f"/lakehouse/default/Files/Raw/{project_name}"

# Relative Spark path (used by spark.read)
raw_spark_path = f"Files/Raw/{project_name}"

StatementMeta(, f255e5b4-9b62-48e2-bdb7-7ec3f66bbd36, 8, Finished, Available, Finished)

In [7]:
# Column Name Cleaning Function
def clean_column(col_name: str) -> str:
    """
    Standardizes column names:
    - lowercase
    - replace special characters with underscores
    - remove duplicate underscores
    """
    col_name = col_name.lower()
    col_name = re.sub(r"[^a-z0-9_]", "_", col_name)
    col_name = re.sub(r"_+", "_", col_name)
    return col_name.strip("_")

StatementMeta(, f255e5b4-9b62-48e2-bdb7-7ec3f66bbd36, 9, Finished, Available, Finished)

In [8]:
# Read CSVs and Write Delta Tables
# Get list of CSV files in the raw folder
csv_files = [f for f in os.listdir(raw_fs_path) if f.endswith(".csv")]

# Fail early if no data found
if not csv_files:
    raise Exception("No CSV files found for ingestion")

logger.info(f"Found {len(csv_files)} CSV files")

# Loop through each CSV file
for file in csv_files:
    # Generate table name using prefix
    table_name = f"{table_prefix}_{file.replace('.csv','').lower()}"

    # Full Spark path to the CSV file
    file_path = f"{raw_spark_path}/{file}"

    try:
        logger.info(f"Ingesting file: {file}")

        # Read CSV into Spark DataFrame
        df = (
            spark.read
                 .format("csv")
                 .option("header", True)
                 .option("inferSchema", True)
                 .load(file_path)
        )

        # Clean column names
        for col in df.columns:
            df = df.withColumnRenamed(col, clean_column(col))

        # Write DataFrame as Delta table (overwrite mode)
        (
            df.write
              .format("delta")
              .mode("overwrite")
              .saveAsTable(table_name)
        )

        logger.info(f"Delta table created/overwritten: {table_name}")

    except Exception:
        # Capture detailed error info per file
        logger.exception(f"Failed ingesting file: {file}")
        raise

StatementMeta(, f255e5b4-9b62-48e2-bdb7-7ec3f66bbd36, 10, Finished, Available, Finished)

2026-01-13 08:55:15,401 | INFO | Found 13 CSV files
2026-01-13 08:55:15,402 | INFO | Ingesting file: game.csv
2026-01-13 08:56:04,768 | INFO | Delta table created/overwritten: nhl_game
2026-01-13 08:56:04,768 | INFO | Ingesting file: game_goalie_stats.csv
2026-01-13 08:56:14,270 | INFO | Delta table created/overwritten: nhl_game_goalie_stats
2026-01-13 08:56:14,271 | INFO | Ingesting file: game_goals.csv
2026-01-13 08:56:22,774 | INFO | Delta table created/overwritten: nhl_game_goals
2026-01-13 08:56:22,775 | INFO | Ingesting file: game_officials.csv
2026-01-13 08:56:29,892 | INFO | Delta table created/overwritten: nhl_game_officials
2026-01-13 08:56:29,893 | INFO | Ingesting file: game_penalties.csv
2026-01-13 08:56:37,087 | INFO | Delta table created/overwritten: nhl_game_penalties
2026-01-13 08:56:37,088 | INFO | Ingesting file: game_plays.csv
2026-01-13 08:56:56,659 | INFO | Delta table created/overwritten: nhl_game_plays
2026-01-13 08:56:56,660 | INFO | Ingesting file: game_plays_