In [0]:
from datetime import datetime

today = datetime.today().strftime('%Y-%m-%d')
base_url = "abfss://mycontainer@adlsgen2salesdata2025.dfs.core.windows.net/salesdata"

datasets = {
    "products": f"{base_url}/Products/{today}/Products_{today}.csv",
    "sales": f"{base_url}/Sales/{today}/Sales_{today}.csv",
    "inventory": f"{base_url}/Inventory/{today}/Inventory_{today}.csv",
    "status": f"{base_url}/Status/{today}/Status_{today}.csv",
}

catalog = "my_sales_catalog"
schema = "sales_schema"

for table_name, file_path in datasets.items():
    df = (spark.read
            .option("header", True)
            .option("inferSchema", True)
            .csv(file_path))
    
    (df.write
            .format("delta")
            .mode("overwrite")
            .option("overwriteSchema", "true")
            .saveAsTable(f"{catalog}.{schema}.{table_name}"))
    
    print(f"Table {table_name} updated for current {today} date")

Table products updated for current 2025-08-26 date
Table sales updated for current 2025-08-26 date
Table inventory updated for current 2025-08-26 date
Table status updated for current 2025-08-26 date


In [0]:
from datetime import datetime

today = datetime.today().strftime('%Y-%m-%d')
base_url = "abfss://mycontainer@adlsgen2salesdata2025.dfs.core.windows.net/salesdata"

datasets = {
    "products": f"{base_url}/Products/{today}/Products_{today}.csv",
    "sales": f"{base_url}/Sales/{today}/Sales_{today}.csv",
    "inventory": f"{base_url}/Inventory/{today}/Inventory_{today}.csv",
    "status": f"{base_url}/Status/{today}/Status_{today}.csv",

}

dataframes = {}

for table_name, file_path in datasets.items():
    df = (spark.read
            .option("header", True)
            .option("inferSchema", True)
            .csv(file_path))

    dataframes[f"{table_name}_df"] = df

    print(f" DataFrame for {table_name} created.")

products_df = dataframes['products_df']
sales_df = dataframes['sales_df']
inventory_df = dataframes['inventory_df']
status_df = dataframes['status_df']


 DataFrame for products created.
 DataFrame for sales created.
 DataFrame for inventory created.
 DataFrame for status created.


In [0]:
sales_df.show(8)

+-------+---------+--------+-------+-------+--------+-------+-------------------+------+
|OrderID|FirstName|LastName|Country|Product|Quantity|  Price|          Timestamp|Status|
+-------+---------+--------+-------+-------+--------+-------+-------------------+------+
|   1001|Charlotte|  Lorenz|    USA|      1|       2|  41.33|2025-08-20 15:06:36|     2|
|   1002|   Olivia|  Garcia| Canada|      6|       3| 198.88|2025-08-20 21:55:00|     2|
|   1003|     Noah|Williams| Mexico|      8|       1|1027.44|2025-08-20 06:15:38|     2|
|   1004| Isabella|  Miller|Germany|      6|       4| 655.33|2025-08-20 07:40:00|     3|
|   1005|    Corey|  Miller| Canada|      8|       1| 970.68|2025-08-20 00:15:11|     1|
|   1006|     Alex|   Lopez|    USA|      3|       3| 781.07|2025-08-20 03:32:48|     3|
|   1007|      Ian|  Garcia|Austria|      8|       3| 437.76|2025-08-20 00:18:56|     1|
|   1008|    James|   Smith|  Spain|      4|       2| 820.53|2025-08-20 15:41:30|     2|
+-------+---------+--

In [0]:
from pyspark.sql.functions import col, to_date
sales_df = sales_df.withColumn("timestamp", to_date(col("timestamp"), "yyyy-MM-dd"))
sales_df = sales_df.withColumnRenamed("Timestamp", "Date")
sales_df.show(8)

+-------+---------+--------+-------+-------+--------+-------+----------+------+
|OrderID|FirstName|LastName|Country|Product|Quantity|  Price|      Date|Status|
+-------+---------+--------+-------+-------+--------+-------+----------+------+
|   1001|Charlotte|  Lorenz|    USA|      1|       2|  41.33|2025-08-20|     2|
|   1002|   Olivia|  Garcia| Canada|      6|       3| 198.88|2025-08-20|     2|
|   1003|     Noah|Williams| Mexico|      8|       1|1027.44|2025-08-20|     2|
|   1004| Isabella|  Miller|Germany|      6|       4| 655.33|2025-08-20|     3|
|   1005|    Corey|  Miller| Canada|      8|       1| 970.68|2025-08-20|     1|
|   1006|     Alex|   Lopez|    USA|      3|       3| 781.07|2025-08-20|     3|
|   1007|      Ian|  Garcia|Austria|      8|       3| 437.76|2025-08-20|     1|
|   1008|    James|   Smith|  Spain|      4|       2| 820.53|2025-08-20|     2|
+-------+---------+--------+-------+-------+--------+-------+----------+------+
only showing top 8 rows


In [0]:
print(sales_df)
print(products_df)
print(inventory_df)
print(status_df)

DataFrame[OrderID: int, FirstName: string, LastName: string, Country: string, Product: int, Quantity: int, Price: double, Date: date, Status: int]
DataFrame[ProductID: int, ProductName: string]
DataFrame[ProductID: int, InStock: int, Sold: string, UpdatedStock: string]
DataFrame[StatusID: int, Status: string]


In [0]:
s = sales_df.alias("s")
p = products_df.alias("p")
st = status_df.alias("st")

joined_sales_df = (s.join(p, s.Product == p.ProductID)
                  .join(st, s.Status == st.StatusID))

joined_sales_df=joined_sales_df.select("s.OrderID", "s.FirstName", "s.LastName", "s.Country", "s.Quantity", "p.ProductName", "s.Price", "s.Date", "st.Status")

joined_sales_df.show(8)


+-------+---------+--------+-------+--------+-----------+-------+----------+---------+
|OrderID|FirstName|LastName|Country|Quantity|ProductName|  Price|      Date|   Status|
+-------+---------+--------+-------+--------+-----------+-------+----------+---------+
|   1001|Charlotte|  Lorenz|    USA|       2|     Laptop|  41.33|2025-08-20|  Shipped|
|   1002|   Olivia|  Garcia| Canada|       3|         TV| 198.88|2025-08-20|  Shipped|
|   1003|     Noah|Williams| Mexico|       1|    Charger|1027.44|2025-08-20|  Shipped|
|   1004| Isabella|  Miller|Germany|       4|         TV| 655.33|2025-08-20|Delivered|
|   1005|    Corey|  Miller| Canada|       1|    Charger| 970.68|2025-08-20|     Open|
|   1006|     Alex|   Lopez|    USA|       3|   Keyboard| 781.07|2025-08-20|Delivered|
|   1007|      Ian|  Garcia|Austria|       3|    Charger| 437.76|2025-08-20|     Open|
|   1008|    James|   Smith|  Spain|       2|    Monitor| 820.53|2025-08-20|  Shipped|
+-------+---------+--------+-------+-------

In [0]:
from pyspark.sql.functions import col, count

open_orders_df = joined_sales_df.filter(col("Status") == "Open")


open_orders_df = open_orders_df.groupby("Status").agg(count("Status").alias("TotalOpenOrders"))

open_orders_df.show(8)

+------+---------------+
|Status|TotalOpenOrders|
+------+---------------+
|  Open|             76|
+------+---------------+



In [0]:
from pyspark.sql.functions import count, sum
sales_summary = sales_df.groupBy("Product").agg(count("Quantity").alias("NumberofSales"),(sum("quantity").alias("TotalQuantity")))
sales_summary.show(10)

+-------+-------------+-------------+
|Product|NumberofSales|TotalQuantity|
+-------+-------------+-------------+
|      1|           43|          106|
|      4|           31|           79|
|      8|           50|          119|
|      6|           31|           84|
|      3|           30|           70|
|      2|           39|           87|
|      7|           27|           70|
|      5|           28|           59|
+-------+-------------+-------------+



In [0]:
ss = sales_summary.alias("ss")
p = products_df.alias("p")
i = inventory_df.alias("i")

upd_inv_df = (ss.join(i, ss.Product == i.ProductID, "inner")
              .join(p, ss.Product == p.ProductID, "inner"))

upd_inv_df = upd_inv_df.select("ss.Product", "ss.TotalQuantity", "i.InStock", "p.ProductName")
upd_inv_df = upd_inv_df.withColumn("UpdatedStock", col("InStock") - col("TotalQuantity"))
upd_inv_df = upd_inv_df.orderBy("Product")
upd_inv_df.show(8)


+-------+-------------+-------+-----------+------------+
|Product|TotalQuantity|InStock|ProductName|UpdatedStock|
+-------+-------------+-------+-----------+------------+
|      1|          106|   1000|     Laptop|         894|
|      2|           87|   1000|      Mouse|         913|
|      3|           70|   1000|   Keyboard|         930|
|      4|           79|   1000|    Monitor|         921|
|      5|           59|   1000|     Webcam|         941|
|      6|           84|   1000|         TV|         916|
|      7|           70|   1000|    Speaker|         930|
|      8|          119|   1000|    Charger|         881|
+-------+-------------+-------+-----------+------------+



In [0]:
joined_sales_df.write.format("delta").mode("overwrite").saveAsTable("my_sales_catalog.sales_schema.joined_sales")

In [0]:
joined_sales_df.write.mode("append").parquet("abfss://mycontainer@adlsgen2salesdata2025.dfs.core.windows.net/salesdata/cleaned/sales_summary")

In [0]:
upd_inv_df.write.option("header", "true").mode("overwrite").csv("abfss://mycontainer@adlsgen2salesdata2025.dfs.core.windows.net/salesdata/cleaned/inventory")