In [8]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import glob

StatementMeta(, 0eefe091-dc8e-4e5d-9031-9a876d9ebe9e, 3, Finished, Available, Finished)

Reading data from excel using glob and pandas

In [10]:
# 1) Find the Excel files under the mounted Lakehouse path
files = glob.glob("/lakehouse/default/Files/Current/Sales_*.xlsx") # output as list
if not files:
    raise FileNotFoundError("No files matching /lakehouse/default/Files/Current/Sales_*.xlsx")

print("Found files:\n", "\n".join(files))

# 2) Read only the 'Sales' sheet from each file
dfs_sales = [pd.read_excel(f, sheet_name="Sales", engine="openpyxl") for f in files]
df_sales = pd.concat(dfs_sales, ignore_index=True)
print("df_sales shape:", df_sales.shape)

StatementMeta(, 0eefe091-dc8e-4e5d-9031-9a876d9ebe9e, 4, Finished, Available, Finished)

Found files:
 /lakehouse/default/Files/Current/Sales_01012023.xlsx
/lakehouse/default/Files/Current/Sales_02012023.xlsx


df_sales shape: (8242, 20)


Loading the data spark dataframes

In [None]:
df1 = spark.createDataFrame(df_sales)

StatementMeta(, 0eefe091-dc8e-4e5d-9031-9a876d9ebe9e, 5, Finished, Available, Finished)

In [None]:
display(df1.head(10))

StatementMeta(, 0eefe091-dc8e-4e5d-9031-9a876d9ebe9e, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d56454b9-b8db-4c03-98e7-26ad7429958c)

In [None]:
# 3) Read the 'Returns' sheet
dfs_returns = [pd.read_excel(f, sheet_name="Returns", engine="openpyxl") for f in files]
df_returns = pd.concat(dfs_returns, ignore_index=True)
print("df_returns shape:", df_returns.shape)

StatementMeta(, 0eefe091-dc8e-4e5d-9031-9a876d9ebe9e, 7, Finished, Available, Finished)

df_returns shape: (123, 4)


In [None]:
df2 = spark.createDataFrame(df_returns)

StatementMeta(, 0eefe091-dc8e-4e5d-9031-9a876d9ebe9e, 8, Finished, Available, Finished)

In [None]:
display(df2.head(10))

StatementMeta(, 0eefe091-dc8e-4e5d-9031-9a876d9ebe9e, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f5a8c1e2-9133-4df6-ae8b-2786df0ba914)

Now we will join both the dataframes based on Order_ID, and from return table we will pick the return flag

In [None]:
#df1.join(df2, df1.Order_ID == df2.Order_ID, how = "left").show(10)
df_final = df1.join(df2, df1.Order_ID == df2.Order_ID, how = "left").drop(df2.Order_ID,df2.Customer_Name,df2.Sales_Amount)

StatementMeta(, 0eefe091-dc8e-4e5d-9031-9a876d9ebe9e, 10, Finished, Available, Finished)

Adding two more columns to dataframe Order_Year and Order_Month, based on these columns we can create partitions on bronze table

In [None]:
df_mod = df_final.withColumns({"Order_Year": year("Order_Date"),\
                "Order_Month": month("Order_Date"),\
                "Created_TS": current_timestamp(),\
                "Modified_TS": current_timestamp(),\
                })

StatementMeta(, 0eefe091-dc8e-4e5d-9031-9a876d9ebe9e, 11, Finished, Available, Finished)

over this modified dataframe we will create a view to load data into bronze table

In [None]:
df_mod.createOrReplaceTempView("ViewSales") # view creates SQL view on top of your DataFrame

StatementMeta(, 0eefe091-dc8e-4e5d-9031-9a876d9ebe9e, 12, Finished, Available, Finished)

now we will create a bronze table

In [None]:
%%sql
create table if not exists Bronze_Sales
(
    Order_ID	string,
    Order_Date	Date,
    Shipping_Date	date,
    Aging	int,
    Ship_Mode	string,
    Product_Category	string,
    Product	 string,
    Sales 	float,
    Quantity	float,
    Discount	 float,
    Profit 	 float,
    Shipping_Cost 	float,
    Order_Priority	string,
    Customer_ID	string,
    Customer_Name	string,
    Segment	string,
    City	string,
    State	string,
    Country	string,
    Region string,
	Return string,
    Order_Year int,
    Order_Month int,
    Created_TS  TIMESTAMP,
    Modified_TS  TIMESTAMP
)
using DELTA
PARTITIONED by(Order_Year,Order_Month)


StatementMeta(, 0eefe091-dc8e-4e5d-9031-9a876d9ebe9e, 13, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

 we will write a merge query to update data into a bronze table so that we can capture updated as well as new record in the upcoming runs

In [None]:
%%sql
MERGE INTO Bronze_Sales AS BS
USING ViewSales AS VS
ON  BS.Order_Year  = VS.Order_Year
AND BS.Order_Month = VS.Order_Month
AND BS.Order_ID    = VS.Order_ID

WHEN MATCHED THEN UPDATE SET
  BS.Order_Date       = VS.Order_Date,
  BS.Shipping_Date    = VS.Shipping_Date,
  BS.Aging            = VS.Aging,
  BS.Ship_Mode        = VS.Ship_Mode,
  BS.Product_Category = VS.Product_Category,
  BS.Product          = VS.Product,
  BS.Sales            = VS.Sales,
  BS.Quantity         = VS.Quantity,
  BS.Discount         = VS.Discount,
  BS.Profit           = VS.Profit,
  BS.Shipping_Cost    = VS.Shipping_Cost,
  BS.Order_Priority   = VS.Order_Priority,
  BS.Customer_ID      = VS.Customer_ID,
  BS.Customer_Name    = VS.Customer_Name,
  BS.Segment          = VS.Segment,
  BS.City             = VS.City,
  BS.State            = VS.State,
  BS.Country          = VS.Country,
  BS.Region           = VS.Region,
  BS.Return           = VS.Return,
  BS.Modified_TS      = VS.Modified_TS

WHEN NOT MATCHED THEN INSERT (
  Order_ID, Order_Date, Shipping_Date, Aging, Ship_Mode, Product_Category, Product,
  Sales, Quantity, Discount, Profit, Shipping_Cost, Order_Priority, Customer_ID,
  Customer_Name, Segment, City, State, Country, Region, Return,
  Order_Year, Order_Month, Created_TS, Modified_TS
) VALUES (
  VS.Order_ID, VS.Order_Date, VS.Shipping_Date, VS.Aging, VS.Ship_Mode, VS.Product_Category, VS.Product,
  VS.Sales, VS.Quantity, VS.Discount, VS.Profit, VS.Shipping_Cost, VS.Order_Priority, VS.Customer_ID,
  VS.Customer_Name, VS.Segment, VS.City, VS.State, VS.Country, VS.Region, VS.Return,
  VS.Order_Year, VS.Order_Month, VS.Created_TS, VS.Modified_TS
);


StatementMeta(, 0eefe091-dc8e-4e5d-9031-9a876d9ebe9e, 14, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

In [None]:
%%sql
select * from Bronze_Sales limit 10

StatementMeta(, 0eefe091-dc8e-4e5d-9031-9a876d9ebe9e, 15, Finished, Available, Finished)

<Spark SQL result set with 10 rows and 25 fields>

After creating bronze table now we will create a dimensional model where we create a fact and dimensional tables based columns we have in bronze table