In [19]:
df_cust = spark.read.option("recursiveFileLookup", "true").parquet("Files/Bronze/customerTable")
df_prod = spark.read.option("recursiveFileLookup", "true").parquet("Files/Bronze/productTable")
df_trans = spark.read.option("recursiveFileLookup", "true").parquet("Files/Bronze/transactionTable")
df_store= spark.read.option("recursiveFileLookup", "true").parquet("Files/Bronze/storeTable")


StatementMeta(, 1a6713cf-aa81-4bca-a786-b658270b27ce, 21, Finished, Available, Finished)

In [20]:
df_cust.show()
df_prod.show()
df_store.show()
df_trans.show()

StatementMeta(, 1a6713cf-aa81-4bca-a786-b658270b27ce, 22, Finished, Available, Finished)

+-----------+----------+---------+-------------------+----------+---------+-----------------+
|customer_id|first_name|last_name|              email|     phone|     city|registration_date|
+-----------+----------+---------+-------------------+----------+---------+-----------------+
|        101|      Ravi|    Yadav|user101@example.com|9887654321|    Delhi|       2023-09-14|
|        102|      Nina|    Joshi|user102@example.com|9876543210|   Mumbai|       2024-01-21|
|        103|     Sonal|   Sharma|user103@example.com|9865432109|Bangalore|       2023-07-10|
|        104|     Karan|    Patel|user104@example.com|9854321098|Hyderabad|       2024-02-05|
|        105|      Riya|    Singh|user105@example.com|9843210987|  Chennai|       2023-06-28|
|        106|      Ajay|   Mishra|user106@example.com|9832109876|     Pune|       2024-03-10|
|        107|     Priya|   Kapoor|user107@example.com|9821098765|Ahmedabad|       2023-05-12|
|        108|     Rahul|    Verma|user108@example.com|981098

In [22]:
df_cust.printSchema()
df_prod.printSchema()
df_store.printSchema()
df_trans.printSchema()

StatementMeta(, 1a6713cf-aa81-4bca-a786-b658270b27ce, 24, Finished, Available, Finished)

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- city: string (nullable = true)
 |-- registration_date: string (nullable = true)

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: decimal(10,2) (nullable = true)

root
 |-- store_id: integer (nullable = true)
 |-- store_name: string (nullable = true)
 |-- location: string (nullable = true)

root
 |-- transaction_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- transaction_date: date (nullable = true)



In [24]:
from pyspark.sql.functions import col

StatementMeta(, 1a6713cf-aa81-4bca-a786-b658270b27ce, 26, Finished, Available, Finished)

In [27]:
#convert types and clean data 
df_trans=df_trans.select(col("customer_id").cast('int'),
col("transaction_id").cast('int'),
col("store_id").cast('int'),
col("product_id").cast('int'),
col("quantity").cast('int'),
col("transaction_date").cast('date'))

df_prod =df_prod.select(col("product_id").cast('int'),
 col('product_name'),
 col('category'),
 col('price').cast('double'))

df_store=df_store.select(col('store_id').cast('int'),
 col('store_name'),
 col('location'))

df_cust=df_cust.select("customer_id",
"first_name",
"last_name",
 "email",
"phone",
"city",
"registration_date").dropDuplicates(["customer_id"])

StatementMeta(, 1a6713cf-aa81-4bca-a786-b658270b27ce, 29, Finished, Available, Finished)

In [29]:
df_silver= df_trans\
.join(df_cust,'customer_id')\
.join(df_prod,'product_id')\
.join(df_store,'store_id')\
.withColumn('total_amount',col('quantity')*col('price'))

StatementMeta(, 1a6713cf-aa81-4bca-a786-b658270b27ce, 31, Finished, Available, Finished)

In [31]:
silver_path="Files/silver/cleaned_transactions"
df_silver.write.mode("overwrite").format("parquet").save(silver_path)



StatementMeta(, 1a6713cf-aa81-4bca-a786-b658270b27ce, 33, Finished, Available, Finished)

In [32]:
df_silver.write.mode("overwrite").saveAsTable("cleaned_transactions")


StatementMeta(, 1a6713cf-aa81-4bca-a786-b658270b27ce, 34, Finished, Available, Finished)

In [33]:
%%sql
select * from cleaned_transactions

StatementMeta(, 1a6713cf-aa81-4bca-a786-b658270b27ce, 35, Finished, Available, Finished)

<Spark SQL result set with 30 rows and 18 fields>

In [1]:
from pyspark.sql.functions import sum, countDistinct, avg

# Step 1: Read source Lakehouse table
df = spark.read.table("cleaned_transactions")

# Step 2: Perform aggregation
df_summary = df.groupBy(
    "transaction_date", "product_id", "store_id", "product_name", 
    "category", "store_name", "location"
).agg(
    sum("quantity").alias("total_quantity_sold"),
    sum("total_amount").alias("total_sales_amount"),
    countDistinct("transaction_id").alias("number_of_transactions"),
    avg("total_amount").alias("average_transaction_value")
)




StatementMeta(, b5010438-0a5e-4a9b-bdef-38155feb3069, 3, Finished, Available, Finished)

In [2]:
gold_path="Files/gold/cleaned_transactions_summary"
df_summary.write.mode("overwrite").format("parquet").save(gold_path)

StatementMeta(, b5010438-0a5e-4a9b-bdef-38155feb3069, 4, Finished, Available, Finished)

In [3]:
df_summary.write.mode("overwrite").saveAsTable("cleaned_transactions_summary")

StatementMeta(, b5010438-0a5e-4a9b-bdef-38155feb3069, 5, Finished, Available, Finished)