In [0]:
%run ../config/setup

# Bronze Layer ETL Process

## Why I Made This Notebook
I want a simple and smooth way to move raw energy demand data into Delta tables for my ETL pipeline. This step helps me keep the data organized and ready for later transformations. By doing things in this notebook, I avoid messy manual tasks and make everything reproducible.

## Getting the Raw Data
I use a function to download the raw household power consumption data into the right volume folder. When I'm testing in 'dev', I skip the download if the file already exists. This saves time and keeps things efficient in development.

## Reading and Cleaning
I load the raw data from a text file, making sure I handle the semicolon separator correctly. I keep all columns as strings in this step to avoid unexpected errors from inferring the schema too early.

## Adding Metadata
I add columns to track when the data arrived and the file it came from. This makes it easier to debug and trace data issues later on.

## Saving to Bronze
I write the cleaned and tracked data as a Delta table, overwriting any previous version. This way, I know the bronze layer always has the freshest raw data. I also print the table name so I can see exactly where things were written.

By doing all these steps, I know my raw data is safe, organized, and ready for the next part of the pipeline.

In [0]:
import requests
import zipfile
import io
import os
from pyspark.sql.functions import current_timestamp, input_file_name, col

def download_data_to_volume(url, target_dir):
    # In 'dev', skip download if data exists to save time
    if env == 'dev' and os.path.exists(target_dir):
        print(f"Skipping download in {env}. Data already exists.")
        return

    print(f"Downloading from {url}...")
    try:
        r = requests.get(url)
        if r.status_code == 200:
            z = zipfile.ZipFile(io.BytesIO(r.content))
            
            # Ensure target directory exists in the Volume
            os.makedirs(target_dir, exist_ok=True)
            
            # Extract directly to the Volume path
            z.extractall(target_dir)
            print(f"Download and extraction complete at {target_dir}")
        else:
            raise Exception(f"Download failed with status code: {r.status_code}")
    except Exception as e:
        print(f"Error during ingestion: {str(e)}")
        raise e

download_data_to_volume(raw_data_url, landing_path)



### Read Raw Data from txt file

In [0]:
txt_file_path = f"{landing_path}/household_power_consumption.txt"

# Handle semicolons separators, store as Delta table, Add metadata columns
df_raw = (spark.read
          .format("csv") # use csv for txt files, it works
          .option("header", "true")
          .option("delimiter", ";") 
          .option("inferSchema", "false") # Keep as string to avoid schema errors on load
          .load(txt_file_path))

### Add metadata

In [0]:
df_bronze = df_raw \
    .withColumn("_ingestion_timestamp", current_timestamp()) \
    .withColumn("_source_file", col("_metadata.file_path"))

### Write to bronze layer

In [0]:
df_bronze.write.format("delta").mode("overwrite").saveAsTable(full_path_bronze)
print(f"Table saved: {full_path_bronze}")