# 1. Bronze Layer Ingestion (Alberta Economic Monitor)

**Objective:** Ingest all 5 clean, raw data files from the secure Volume (`/Volumes/mycatalog/default/bronze_data/`) and create our official Bronze tables.

**Process:** This code reads each CSV, infers the schema, and saves it as a Delta table. The `.mode("overwrite")` command ensures any old, broken tables are replaced.

## 1.1 Ingest Wages

- **Source:** `wages_raw.csv`
- **Output:** `mycatalog.default.wages_bronze`

In [0]:
# --- 1. WAGES INGESTION ---
# --- 1. WAGES INGESTION ---

file_path = "/Volumes/mycatalog/default/bronze_data/Wages_raw.csv"
table_name = "mycatalog.default.Wages_bronze"

print(f"Reading wages data from: {file_path}")


# Read the CSV
raw_df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(file_path)
)

# Sanitize column names: replace spaces and special characters with underscores
for col in raw_df.columns:
    new_col = (
        col.replace(" ", "_")
        .replace(",", "_")
        .replace("(", "")
        .replace(")", "")
        .replace(";", "_")
        .replace("{", "")
        .replace("}", "")
        .replace("\n", "")
        .replace("\t", "")
        .replace("=", "_")
    )
    raw_df = raw_df.withColumnRenamed(col, new_col)

# Drop the table if it exists
spark.sql(f"DROP TABLE IF EXISTS {table_name}")

# Write to Delta
raw_df.write.format("delta").mode("overwrite").saveAsTable(table_name)

display(spark.table(table_name).limit(5))

## 1.2 Ingest CPI (Inflation)

- **Source:** `cpi_raw.csv`
- **Output:** `mycatalog.default.cpi_bronze`

In [0]:
# --- 2. CPI INGESTION ---

file_path = "/Volumes/mycatalog/default/bronze_data/CPI.csv"
table_name = "mycatalog.default.cpi_bronze"

print(f"Reading CPI data from: {file_path}")

# Read the CSV
raw_df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(file_path)
)

# Sanitize column names: replace invalid characters with underscores
def sanitize_col(col):
    invalid_chars = [' ', ',', ';', '{', '}', '(', ')', '\n', '\t', '=']
    for ch in invalid_chars:
        col = col.replace(ch, '_')
    return col

sanitized_cols = [sanitize_col(col) for col in raw_df.columns]

# Ensure column names are unique
from collections import Counter

def make_unique(cols):
    counts = Counter()
    new_cols = []
    for col in cols:
        counts[col] += 1
        if counts[col] == 1:
            new_cols.append(col)
        else:
            new_cols.append(f"{col}_{counts[col]}")
    return new_cols

unique_cols = make_unique(sanitized_cols)
raw_df = raw_df.toDF(*unique_cols)

# Drop the table if it exists
spark.sql(f"DROP TABLE IF EXISTS {table_name}")

# Write to Delta
raw_df.write.format("delta").mode("overwrite").saveAsTable(table_name)

display(spark.table(table_name).limit(5))

## 1.3 Ingest Rent (Housing)

- **Source:** `download.csv`
- **Output:** `mycatalog.default.rent_bronze`

In [0]:
%python
# Drop the existing table if it exists
spark.sql(
    """
    DROP TABLE IF EXISTS mycatalog.default.rent_bronze
    """
)

file_path = "/Volumes/mycatalog/default/bronze_data/download.csv"

rent_bronze = (
    spark.read
    .format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(file_path)
)

rent_bronze.write.format("delta") \
    .option("delta.columnMapping.mode", "name") \
    .mode("overwrite") \
    .saveAsTable("mycatalog.default.rent_bronze")

display(spark.table("mycatalog.default.rent_bronze").limit(10))

## 1.4 Ingest GDP

- **Source:** `gdp_raw.csv`
- **Output:** `mycatalog.default.gdp_bronze`

In [0]:
# --- 4. GDP INGESTION ---

file_path = "/Volumes/mycatalog/default/bronze_data/gdp.csv"
table_name = "mycatalog.default.gdp_bronze"

print(f"Reading GDP data from: {file_path}")

# Read the CSV
raw_df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(file_path)
)

# Sanitize and make column names unique
from collections import Counter

def sanitize_col(col):
    invalid_chars = [' ', ',', ';', '{', '}', '(', ')', '\n', '\t', '=']
    for ch in invalid_chars:
        col = col.replace(ch, '_')
    return col

sanitized_cols = [sanitize_col(col) for col in raw_df.columns]

def make_unique(cols):
    counts = Counter()
    new_cols = []
    for col in cols:
        counts[col] += 1
        if counts[col] == 1:
            new_cols.append(col)
        else:
            new_cols.append(f"{col}_{counts[col]}")
    return new_cols

unique_cols = make_unique(sanitized_cols)
raw_df = raw_df.toDF(*unique_cols)

# Drop the table if it exists
spark.sql(f"DROP TABLE IF EXISTS {table_name}")

# Write to Delta
raw_df.write.format("delta").mode("overwrite").saveAsTable(table_name)

print(f"✅ Successfully created table: {table_name}")
display(spark.table(table_name).limit(5))

## 1.5 Ingest Unemployment

- **Source:** `unemployment_raw.csv`
- **Output:** `mycatalog.default.unemployment_bronze`

In [0]:
# --- 5. UNEMPLOYMENT INGESTION ---

file_path = "/Volumes/mycatalog/default/bronze_data/Unemployment.csv"
table_name = "mycatalog.default.unemployment_bronze"

print(f"Reading unemployment data from: {file_path}")

raw_df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(file_path)
)

# Sanitize column names to remove invalid characters
def sanitize_col(col):
    invalid_chars = [' ', ',', ';', '{', '}', '(', ')', '\n', '\t', '=']
    for ch in invalid_chars:
        col = col.replace(ch, '_')
    return col

sanitized_cols = [sanitize_col(col) for col in raw_df.columns]
raw_df = raw_df.toDF(*sanitized_cols)

# Drop the table if it exists
spark.sql(f"DROP TABLE IF EXISTS {table_name}")

raw_df.write.format("delta").mode("overwrite").saveAsTable(table_name)

print(f"✅ Successfully created table: {table_name}")
display(spark.table(table_name).limit(5))

## 1.6 Ingest Wages

- **Source:** `Wages_raw.csv`
- **Output:** `mycatalog.default.wages_bronze`

In [0]:
# COMMAND ----------
# List files in your bronze_data Volume to confirm the path / file
display(dbutils.fs.ls("/Volumes/mycatalog/default/bronze_data"))


In [0]:
# COMMAND ----------
from pyspark.sql import functions as F

raw_path = "/Volumes/mycatalog/default/bronze_data/Wages_raw.csv"

wages_raw_df = (
    spark.read
    .format("csv")          
    .option("header", "true")
    .load(raw_path)
)

print("Wages_raw sample:")
display(wages_raw_df.limit(10))

print("Wages_raw schema:")
wages_raw_df.printSchema()


In [0]:
%python
import re

def sanitize_column(col):
    return re.sub(r'[ ,;{}()\n\t=]', '_', col)

wages_raw_df = wages_raw_df.toDF(
    *[sanitize_column(col) for col in wages_raw_df.columns]
)

wages_bronze = "mycatalog.default.wages_bronze"

wages_raw_df.write.format("delta").mode("overwrite").saveAsTable(wages_bronze)

print(f"✅ Recreated Bronze table: {wages_bronze}")

display(spark.table(wages_bronze).limit(10))

## 1.7 Final Verification

Run this cell to confirm all 5 Bronze tables exist in our catalog.

In [0]:
%sql
SHOW TABLES IN mycatalog.default LIKE '*_bronze';