In [None]:
import os, logging
from typing import Optional, Union
from pyspark.sql import SparkSession, DataFrame
from concurrent.futures import Future, as_completed
from pyspark.sql.types import * 
import pyspark.sql.functions as sf
import pyspark.errors as pe


In [2]:
SILVER_PATH = '../data/silver/'
BRONZE_PATH = '../data/bronze/'
RAW_PATH = '../raw'
LOG_DIR = '../log'

In [3]:
logger = logging.getLogger("Silver_Layer_Log")
logger.setLevel(logging.DEBUG)
log_path = os.path.join(LOG_DIR,"silver.log")
fh = logging.FileHandler(log_path)
sh = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
fh.setFormatter(formatter)
# sh.setFormatter(formatter)

if not logger.handlers:
    logger.addHandler(fh)
# logger.addHandler(sh)


In [None]:
# Create Spark Session
spark = SparkSession.builder.appName("Retail_ETL_Pipeline").\
    config("spark.sql.shuffle.partitions", "4").getOrCreate()

In [5]:
# Read bronze files
def load_bronze(table:str) -> DataFrame:
    file_path = os.path.join(BRONZE_PATH,f"{table}_clean.parquet")
    logger.info(f"========== Loading {table} data from {file_path} ==========")
    try:
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"No files found in {file_path}")
        df = spark.read.parquet(file_path)
        logger.info(f"Successfully loaded data from {file_path} to DataFrame")
        return df
    except FileNotFoundError as e:
        logger.exception(f"Loading {table} data from {file_path} failed: {e}", exc_info=True)
        raise


In [28]:
# Clean bronze files
def extend_customers(df:DataFrame) -> DataFrame:
    """Clean and extend customers"""
    df = df.withColumnsRenamed({"customer_fname":"first_name", 
                                "customer_lname":"last_name",
                                "customer_street":"street",
                                "customer_city":"city",
                                "customer_state":"state",
                                "customer_zipcode":"zipcode"})
    logger.debug("Renamed columns in customers for uniformity")
    df = df.withColumn('state',sf.ucase("state")).\
        withColumn('full_name',sf.concat('first_name', sf.lit(' '), 'last_name'))
    logger.debug("Added customer's full_name column")
    return df

def extend_orders(df:DataFrame) -> DataFrame:
    """Clean and extend orders table"""
    # rename columns
    df = df.withColumnRenamed("order_customer_id","customer_id")
    logger.debug("Renamed columns in orders for uniformity")
    df = df.withColumns({
        "order_year":sf.year("order_date"),
        "order_month":sf.month("order_date"),
        # "order_monthname":sf.monthname("order_date"), - abbreviated
        "order_monthname":sf.date_format("order_date","MMMM"),
        "order_day":sf.day("order_date"),
        # "order_weekday":sf.dayname("order_date") - abbreviated
        "order_weekday":sf.date_format("order_date","EEEE")})
    logger.debug("Added columns for year, month, day, and weekday from order_date timestamp")
    return df

def extend_order_items(df:DataFrame) -> DataFrame:
    # Rename Columns
    df = df.withColumnsRenamed({
        "order_item_order_id":"order_id",
        "order_item_product_id":"product_id",
        "order_item_quantity":"quantity",
        "order_item_subtotal":"subtotal",
        "order_item_product_price":"product_price",
        })
    logger.debug("Renamed columns in order_items for uniformity")
    # Validated the subtotal logic 
    df.createOrReplaceTempView("temp_oi_logic_check")
    df_count = spark.sql("""
            SELECT COUNT(1) num_invalid 
            FROM temp_oi_logic_check
            WHERE ROUND(quantity * product_price,2) <> subtotal""")
    logger.debug("Validating the order items subtotal logic")
    count = df_count.first()
    logger.warning(f"Number of records where subtotal calculation is invalid: {count.num_invalid}")
    if count.num_invalid > 0:
        df = df.withColumn('subtotal',sf.round(df.quantity * df.product_price,2))
        logger.debug("Replaced subtotal values: quantity * product price")
    return df

def extend_categories(df:DataFrame) -> DataFrame:
    """Clean categories table"""
    df = df.withColumnRenamed("category_department_id","department_id")
    logger.debug("Renamed columns in categories for uniformity")
    return df

def extend_products(df:DataFrame) -> DataFrame:
    """Clean products table"""
    df = df.withColumnsRenamed({"product_cateogry_id":"category_id",
                                "product_price":"price"})
    logger.debug("Renamed columns in Products for uniformity")
    return df

In [58]:
def get_date_dim(orders:DataFrame) -> DataFrame:
    """Create date dimension table"""
    orders.createOrReplaceTempView("temp_orders")
    df = spark.sql("""
    SELECT EXPLODE(
        SEQUENCE(
        CAST(MIN(order_date) AS date),
        CAST(MAX(order_date) AS date),
        INTERVAL 1 DAY)) AS date_seq
    FROM temp_orders
    """)
    logger.debug("Generate series of dates from min to max order_dates")
    df = df.withColumns({
        'date_id': sf.date_format('date_seq','yyyyMMdd'),
        'year': sf.year('date_seq'),
        'quarter': sf.quarter('date_seq'),
        'month': sf.month('date_seq'),
        'month_name': sf.date_format('date_seq','MMMM'),
        'day': sf.day('date_seq'),
        'day_of_week': sf.weekday('date_seq'),
        'day_name': sf.date_format('date_seq','EEEE'),
    })
    logger.debug("Added columns for year, quarter, month, and date")
    return df

In [None]:
def get_sales_fact(order_items:DataFrame, orders:DataFrame, \
                   products:DataFrame, categories:DataFrame) -> DataFrame:
    columns = ["order_id","order_item_id","customer_id","product_id","category_id",\
               "department_id","quantity","subtotal","product_price","order_date"]
    try:
        df = order_items.join(orders, on='order_id', how="left")
        logger.debug("Left joined order_items and orders")
        df = df.join(products, on='product_id', how="left").\
            join(categories, on='category_id', how="left")
        logger.debug("Left joined products and categories")
        sales_fact = df.select(columns).sort(df.order_id.asc(),df.order_item_id.asc()).\
            withColumn('date_id',sf.date_format('order_date',"yyyyMMdd"))
        logger.info("Successfully built sales_fact table")
        return sales_fact
    except Exception as e:
        logger.exception(f"An error occured while building sales_fact table: {e}")
        raise

In [None]:
def save_as_silver(df:DataFrame, name:str):
    logger.info(f"============ Saving {name} as Parquet ==========")
    try:
        # create save path
        save_path = os.path.join(SILVER_PATH,f"{name}.parquet")
        logger.debug(f"Created save path for {name}: '{save_path}'")
        # write to parquet
        df.write.parquet(path=save_path,mode='overwrite')
        logger.info(f"Successfully saved {name} to '{save_path}'")
    except (pe.PySparkException, OSError) as e:
        logger.error(f"Error saving {name}: {e}")

In [None]:
def _process_silver_layer(table:str):
    logger.info(f"========== Started processing {table} ==========")
    try:
        df = load_bronze(table)
        silver_fn_map = {
            "orders": extend_orders,
            "order_items": extend_order_items,
            "products": extend_products,
            "categories": extend_categories,
            "customers": extend_customers,
        }
        cleaned = silver_fn_map.get(table,lambda x: x)(df)
        save_as_silver(cleaned,table)
        logger.info(f"========== Finished processing {table} ==========")
        return table
    except Exception as e:
        logger.exception(f"Error processing {table}: {e}")
        raise


In [None]:
try:
    df = extend_order_items(load_bronze('order_items'))
    orders = extend_orders(load_bronze('orders'))
    products = extend_products(load_bronze('products'))
    categories = extend_categories(load_bronze('categories'))

    save_as_silver(get_sales_fact(df, orders,products,categories),"sales_fact")
    save_as_silver(get_date_dim(orders),"date_dim")
except Exception as e:
    raise

NameError: name 'df' is not defined

In [None]:
# Enrich tables
df.columns

['order_id', 'order_date', 'order_customer_id', 'order_status']

In [None]:
# Save as silver.parquet