In [2]:
from pyspark.sql import SparkSession
import psycopg2

In [3]:
spark = SparkSession.builder \
    .appName("ShopEasy") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [3]:
pg_conn = psycopg2.connect(
    dbname="shopeasy_dw",
    user="postgres",
    password="123",
    host="localhost"
)
pg_cursor = pg_conn.cursor()

In [4]:
pg_cursor.execute("SELECT * FROM dim_users")
users_data = pg_cursor.fetchall()
pg_cursor.execute("SELECT * FROM dim_products")
products_data = pg_cursor.fetchall()
pg_cursor.execute("SELECT * FROM dim_categories")
categories_data = pg_cursor.fetchall()
pg_cursor.execute("SELECT * FROM dim_dates")
dates_data = pg_cursor.fetchall()
pg_cursor.execute("SELECT * FROM fact_sales")
sales_data = pg_cursor.fetchall()
pg_conn.close()



In [5]:
users_columns = ["user_id", "name", "email", "address"]
products_columns = ["product_id", "product_name", "category_id", "price"]
categories_columns = ["category_id", "category"]
dates_columns = ["date_id", "order_date", "day_of_week", "day_type"]
sales_columns = ["sale_id", "user_id", "product_id", "category_id", "date_id", "quantity", "total_price"]

In [6]:
users_df = spark.createDataFrame(users_data, users_columns)
products_df = spark.createDataFrame(products_data, products_columns)
categories_df = spark.createDataFrame(categories_data, categories_columns)
dates_df = spark.createDataFrame(dates_data, dates_columns)
sales_df = spark.createDataFrame(sales_data, sales_columns)

In [7]:
bronze_path = "file:///C:/Users/User/Desktop/project2/bronze"

In [8]:
users_df.write.format("delta").mode("overwrite").save(f"{bronze_path}/users")
products_df.write.format("delta").mode("overwrite").save(f"{bronze_path}/products")
categories_df.write.format("delta").mode("overwrite").save(f"{bronze_path}/categories")
dates_df.write.format("delta").mode("overwrite").save(f"{bronze_path}/dates")
sales_df.write.format("delta").mode("overwrite").save(f"{bronze_path}/sales")


In [9]:
df_users = spark.read.format("delta").load(f"{bronze_path}/users")
df_users.show()


+-------+-------------------+--------------------+--------------------+
|user_id|               name|               email|             address|
+-------+-------------------+--------------------+--------------------+
|    126|    Jeremy Castillo|sandrareynolds@ex...|61052 Mckinney Gl...|
|    127|       Calvin Silva| david75@example.org|Unit 0447 Box 813...|
|    128|         Jamie Soto|  cody26@example.org|3644 Trujillo Roa...|
|    129|     Vicki Martinez| vmoreno@example.org|741 James Run Apt...|
|    130|     Patricia Reyes|debbie54@example.net|1700 Nathan Hollo...|
|    131|     Lacey Richards|ssullivan@example...|5645 Thompson Roa...|
|    132|Jeffrey Crawford MD|goodwincrystal@ex...|8142 Erik Motorwa...|
|    133|        Linda Cline|herringalan@examp...|1937 Lauren Plaza...|
|    134|   Melissa Anderson|  mark38@example.org|PSC 2433, Box 677...|
|    135|    Sharon Davidson|melanie73@example...|0837 Sarah Island...|
|    136|    Margaret Taylor|tracyfreeman@exam...|575 Carr Turnp

In [5]:
from pyspark.sql.functions import col
silver_path = "C:/Users/User/Desktop/project2/silver"

In [16]:
df_users_silver = spark.read.format("delta").load(f"{bronze_path}/users").dropDuplicates(["user_id"])
df_products_silver = spark.read.format("delta").load(f"{bronze_path}/products").dropDuplicates(["product_id"])
df_categories_silver = spark.read.format("delta").load(f"{bronze_path}/categories").dropDuplicates(["category_id"])
df_dates_silver = spark.read.format("delta").load(f"{bronze_path}/dates").dropDuplicates(["date_id"])
df_sales_silver = spark.read.format("delta").load(f"{bronze_path}/sales").dropDuplicates(["sale_id"])


In [17]:
df_products_silver = df_products_silver.join(df_categories_silver, "category_id", "left")


In [18]:
df_sales_silver = df_sales_silver \
    .join(df_users_silver, "user_id", "inner") \
    .join(df_products_silver, "product_id", "inner") \
    .join(df_dates_silver, "date_id", "inner") \
    .select("sale_id", "user_id", "name", "email", "product_name", "category", 
            "quantity", "total_price", "order_date")
df_users_silver.write.format("delta").mode("overwrite").save(f"{silver_path}/users")
df_products_silver.write.format("delta").mode("overwrite").save(f"{silver_path}/products")
df_sales_silver.write.format("delta").mode("overwrite").save(f"{silver_path}/sales")

In [19]:
df_users = spark.read.format("delta").load(f"{silver_path}/sales")
df_users.show()

+-------+-------+------------------+--------------------+--------------+--------------------+--------+--------------------+----------+
|sale_id|user_id|              name|               email|  product_name|            category|quantity|         total_price|order_date|
+-------+-------+------------------+--------------------+--------------+--------------------+--------+--------------------+----------+
|   3091|    887| Christopher Young|rrobbins@example.net|     Crime Max|            Clothing|       3|125.9000000000000...|2024-04-18|
|   3506|    896|     Jordan Turner|gonzaleslisa@exam...|       Any Max|              makeup|       1|360.9900000000000...|2024-05-25|
|   3764|    839|       Erik Bailey|sierra80@example.com|    Relate Max|    Sports & Fitness|       1|36.44000000000000...|2024-06-02|
|   4590|     38|      Hannah Davis| aaron64@example.org|       Set Pro|      Grocery & Food|       4|187.6900000000000...|2024-08-21|
|   4823|    597|     Alan Marshall| karen24@example.co

In [20]:
from pyspark.sql import functions as F
gold_path = "C:/Users/User/Desktop/project2/gold"

In [21]:

df_sales_silver = spark.read.format("delta").load(f"{silver_path}/sales")

In [24]:
df_sales_summary = df_sales_silver \
    .groupBy("product_name", "category") \
    .agg(
        F.sum("total_price").alias("total_revenue"),
        F.sum("quantity").alias("total_quantity"),
        F.countDistinct("user_id").alias("unique_customers")
    )
df_sales_summary.write.format("delta").mode("overwrite").save(f"{gold_path}/sales_summary")

In [25]:
df_top_customers = df_sales_silver \
    .groupBy("user_id", "name", "email") \
    .agg(
        F.sum("total_price").alias("total_spent"),
        F.count("sale_id").alias("total_purchases")
    ) \
    .orderBy(F.desc("total_spent"))
df_top_customers.write.format("delta").mode("overwrite").save(f"{gold_path}/top_customers")

In [75]:
df_users = spark.read.format("delta").load(f"{gold_path}/sales_summary")
df_users.show()

+--------------+--------------------+--------------------+--------------+----------------+
|  product_name|            category|       total_revenue|total_quantity|unique_customers|
+--------------+--------------------+--------------------+--------------+----------------+
|   Thought Pro|   Health & Wellness|4701.320000000000...|            33|              15|
|        Draw X|                Toys|2230.240000000000...|            26|               7|
|   Station Max|      Grocery & Food|1309.890000000000...|            17|               6|
|Individual Max|    Sports & Fitness|2220.940000000000...|            29|               8|
|Population Pro|         Electronics|3300.410000000000...|            30|              11|
|   Billion Pro|      Home & Kitchen|1445.860000000000...|            26|               8|
|      Want Pro|               Books|2228.800000000000...|            34|              10|
|     Again Pro|   Health & Wellness|1486.820000000000...|            21|               8|

In [26]:
# merging
from delta.tables import DeltaTable

# Read the Silver Users table
users_silver_path = f"{silver_path}/users"
delta_users = DeltaTable.forPath(spark, users_silver_path)

# New data (assume it's coming from the Bronze layer)
df_new_users = spark.read.format("delta").load(f"{bronze_path}/users")

# Perform MERGE (Upsert)
delta_users.alias("target").merge(
    df_new_users.alias("source"),
    "target.user_id = source.user_id"
).whenMatchedUpdate(set={
    "name": "source.name",
    "email": "source.email",
    "address": "source.address"
}).whenNotMatchedInsert(values={
    "user_id": "source.user_id",
    "name": "source.name",
    "email": "source.email",
    "address": "source.address"
}).execute()


In [6]:
from delta.tables import DeltaTable

# Load the Delta table
sales_silver_table = DeltaTable.forPath(spark, f"{silver_path}/sales")

# View history (returns a DataFrame with version information)
df_history = sales_silver_table.history()  # Default: last 10 versions
df_history.show(truncate=False)  # Show full details


+-------+----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp             |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                 |userMetadata|engineInfo                         |
+-------+----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------+------------+-----------------------------------+
|0      |2025-03-30 17:45:35.66|null  |null    |WRITE    |{mode -> Overwrite, partitionBy -> []}|null|null    |null     |null       |Serializable  |false        |{num