In [1]:
import os
os.environ["PYSPARK_PYTHON"] = "./env/Scripts/python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = "./env/Scripts/python.exe"

# Sets up a PySpark session with Delta Lake support
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.warehouse.dir", "spark-warehouse") \
    .master("local[8]") \
    .enableHiveSupport() \

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [2]:
# Create the database if it does not exist

spark.sql("CREATE DATABASE IF NOT EXISTS Ecom_Products_Data_Pipeline")

spark.sql("USE Ecom_Products_Data_Pipeline")

# Define the path for the silver table

silver_path = os.path.abspath("data_lake/silver/products").replace("\\", "/")

# Create the silver table if it does not exist
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS Ecom_Products_Data_Pipeline.products_silver
    USING DELTA
    LOCATION '{silver_path}'
""")

DataFrame[]

In [3]:
spark.sql("SHOW DATABASES").show()
spark.sql("SHOW TABLES IN Ecom_Products_Data_Pipeline").show()

+--------------------+
|           namespace|
+--------------------+
|             default|
|ecom_products_dat...|
+--------------------+

+--------------------+---------------+-----------+
|           namespace|      tableName|isTemporary|
+--------------------+---------------+-----------+
|ecom_products_dat...|products_silver|      false|
+--------------------+---------------+-----------+



In [6]:
# Load the new unprocessed data from silver layer

last_processed_timestamp_file = "last_processed_timestamp_gold.txt"
try:
    with open(last_processed_timestamp_file, 'r') as f:
        last_processed_timestamp = f.read().strip()
    print(f"Previous high-watermark loaded: {last_processed_timestamp}")
except FileNotFoundError:
    # Set a very old timestamp for the first run
    last_processed_timestamp = "1970-01-01"
    print(f"No previous high-watermark found. Initializing to: {last_processed_timestamp}")

# Reads the silver data from the Delta Lake
silver_df = spark.sql(f"""
                      SELECT 
                      Product_Name, MRP, Price_After_Discount AS Current_Price, (MRP - Current_Price) AS Discount_Amount, (Discount_Amount / MRP) AS Discount_Percentage,
                      Bought_Last_Month AS Last_Month_Sales, Number_of_Reviews AS Total_Reviews, Rating AS Avg_Rating,
                      5_Star_Percentage AS Five_Star_Percentage, 4_Star_Percentage AS Four_Star_Percentage,
                      3_Star_Percentage AS Three_Star_Percentage, 2_Star_Percentage AS Two_Star_Percentage,
                      1_Star_Percentage AS One_Star_Percentage, category AS Product_Category, Image_URL AS Image,
                      scrape_date AS Scrape_Date
                      FROM ecom_products_data_pipeline.products_silver AS products_silver
                      WHERE scrape_date > DATE('{last_processed_timestamp}')
                      """)

# Show the first 5 rows of the DataFrame
silver_df.show(5)
silver_df.explain(True)

No previous high-watermark found. Initializing to: 1970-01-01
+--------------------+------+-------------+---------------+-------------------+----------------+-------------+----------+--------------------+--------------------+---------------------+-------------------+-------------------+----------------+--------------------+-----------+
|        Product_Name|   MRP|Current_Price|Discount_Amount|Discount_Percentage|Last_Month_Sales|Total_Reviews|Avg_Rating|Five_Star_Percentage|Four_Star_Percentage|Three_Star_Percentage|Two_Star_Percentage|One_Star_Percentage|Product_Category|               Image|Scrape_Date|
+--------------------+------+-------------+---------------+-------------------+----------------+-------------+----------+--------------------+--------------------+---------------------+-------------------+-------------------+----------------+--------------------+-----------+
|soundcore by Anke...|9598.0|       4498.0|         5100.0| 0.5313607001458637|            2000|        32476|

In [8]:
# Function to get null and empty string counts for each column

from pyspark.sql.functions import col, trim, when, sum as _sum

def get_null_empty_counts(df):
    exprs = []
    for c in df.columns:
        exprs.append(_sum(when(col(c).isNull(), 1).otherwise(0)).alias(f"{c}_null_count"))
        exprs.append(_sum(when(col(c) == "null", 1).otherwise(0)).alias(f"{c}_string_null_count"))

    result_df = df.agg(*exprs)
    result_df.show(truncate=False)

get_null_empty_counts(silver_df)

+-----------------------+------------------------------+--------------+---------------------+------------------------+-------------------------------+--------------------------+---------------------------------+------------------------------+-------------------------------------+---------------------------+----------------------------------+------------------------+-------------------------------+---------------------+----------------------------+-------------------------------+--------------------------------------+-------------------------------+--------------------------------------+--------------------------------+---------------------------------------+------------------------------+-------------------------------------+------------------------------+-------------------------------------+---------------------------+----------------------------------+----------------+-----------------------+----------------------+-----------------------------+
|Product_Name_null_count|Product_Name_

In [7]:
silver_df.printSchema()

root
 |-- Product_Name: string (nullable = true)
 |-- MRP: float (nullable = true)
 |-- Current_Price: float (nullable = true)
 |-- Discount_Amount: float (nullable = true)
 |-- Discount_Percentage: double (nullable = true)
 |-- Last_Month_Sales: integer (nullable = true)
 |-- Total_Reviews: integer (nullable = true)
 |-- Avg_Rating: double (nullable = true)
 |-- Five_Star_Percentage: float (nullable = true)
 |-- Four_Star_Percentage: float (nullable = true)
 |-- Three_Star_Percentage: float (nullable = true)
 |-- Two_Star_Percentage: float (nullable = true)
 |-- One_Star_Percentage: float (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Image: string (nullable = true)
 |-- Scrape_Date: date (nullable = true)



In [9]:
# One-time creation of base directories
os.makedirs("data_lake/gold/dashboard_mart", exist_ok=True)

from pyspark.sql.functions import col, lit, max as spark_max
import datetime # For default high-watermark and example update

# Write the cleaned DataFrame to Delta Lake in the silver layer
silver_df.write.format("delta").mode("append").partitionBy("scrape_date").save("data_lake/gold/dashboard_mart/")

new_max_date_obj = silver_df.agg(spark_max("scrape_date")).collect()[0][0] # This is a datetime.date object or None

new_max_date_str = None # Initialize a string variable

if new_max_date_obj is None:
    # If no data was processed, use today's date as a string
    new_max_date_str = datetime.date.today().strftime('%Y-%m-%d')
else:
    # If data was processed, convert the datetime.date object to a string
    new_max_date_str = new_max_date_obj.strftime('%Y-%m-%d')

# Persist the new high-watermark string
with open(last_processed_timestamp_file, 'w') as f:
    f.write(new_max_date_str) # <--- Always a string now
print(f"High-watermark updated to: {new_max_date_str}")

High-watermark updated to: 2025-07-19


In [10]:
# Path to gold mart
gold_path = os.path.abspath("data_lake/gold/dashboard_mart").replace("\\", "/")

# Create the gold table if it does not exist

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS Ecom_Products_Data_Pipeline.products_gold
    USING DELTA
    LOCATION '{gold_path}'
""")

spark.sql("""
    CREATE OR REPLACE VIEW Ecom_Products_Data_Pipeline.products_gold_latest AS
    SELECT *
    FROM Ecom_Products_Data_Pipeline.products_gold
    WHERE scrape_date = (
        SELECT MAX(scrape_date)
        FROM Ecom_Products_Data_Pipeline.products_gold
    )
""")

DataFrame[]

In [13]:
spark.sql("SHOW TABLES IN Ecom_Products_Data_Pipeline").show(truncate=False)

+---------------------------+--------------------+-----------+
|namespace                  |tableName           |isTemporary|
+---------------------------+--------------------+-----------+
|ecom_products_data_pipeline|products_gold       |false      |
|ecom_products_data_pipeline|products_gold_latest|false      |
|ecom_products_data_pipeline|products_silver     |false      |
+---------------------------+--------------------+-----------+



In [14]:
spark.sql("SELECT * FROM ecom_products_data_pipeline.products_gold").show()

+--------------------+------+-------------+---------------+-------------------+----------------+-------------+----------+--------------------+--------------------+---------------------+-------------------+-------------------+----------------+--------------------+-----------+
|        Product_Name|   MRP|Current_Price|Discount_Amount|Discount_Percentage|Last_Month_Sales|Total_Reviews|Avg_Rating|Five_Star_Percentage|Four_Star_Percentage|Three_Star_Percentage|Two_Star_Percentage|One_Star_Percentage|Product_Category|               Image|Scrape_Date|
+--------------------+------+-------------+---------------+-------------------+----------------+-------------+----------+--------------------+--------------------+---------------------+-------------------+-------------------+----------------+--------------------+-----------+
|soundcore by Anke...|9598.0|       4498.0|         5100.0| 0.5313607001458637|            2000|        32476|       4.3|                61.0|                23.0|         

In [15]:
spark.sql("SELECT * FROM ecom_products_data_pipeline.products_gold_latest").show()

+--------------------+------+-------------+---------------+-------------------+----------------+-------------+----------+--------------------+--------------------+---------------------+-------------------+-------------------+----------------+--------------------+-----------+
|        Product_Name|   MRP|Current_Price|Discount_Amount|Discount_Percentage|Last_Month_Sales|Total_Reviews|Avg_Rating|Five_Star_Percentage|Four_Star_Percentage|Three_Star_Percentage|Two_Star_Percentage|One_Star_Percentage|Product_Category|               Image|Scrape_Date|
+--------------------+------+-------------+---------------+-------------------+----------------+-------------+----------+--------------------+--------------------+---------------------+-------------------+-------------------+----------------+--------------------+-----------+
|soundcore by Anke...|9598.0|       4498.0|         5100.0| 0.5313607001458637|            2000|        32476|       4.3|                61.0|                23.0|         

In [16]:
spark.stop()