In [25]:

# Downloading file from my github to lakehouse folder "Files"
import requests
import os

url_api = 'https://api.github.com/repos/felipe-de-castro/fabric-end-to-end/contents/files'
url_git = 'https://github.com/felipe-de-castro/fabric-end-to-end/raw/main/files/'

response = requests.get(url_api)

if response.status_code == 200:
    files = response.json()
    for file_info in files:
        file_url = f"{url_git}{file_info['name']}"
        file_name = os.path.basename(file_info['name'])
        print(f"Downloading {file_name}...")
        file_response = requests.get(file_url)
        if file_response.status_code == 200:
            with open(f"/lakehouse/default/Files/Current/{file_name}", "wb") as f:
                f.write(file_response.content)
            print(f"{file_name} downloaded successfully.")
        else:
            print(f"Failed to download {file_name}. Status code: {file_response.status_code}")
else:
    print("Failed to retrieve file list from GitHub.")

StatementMeta(, , , Waiting, )

Downloading Sales_01012023.xlsx...
Sales_01012023.xlsx downloaded successfully.
Downloading Sales_02012023.xlsx...


Sales_02012023.xlsx downloaded successfully.


In [None]:
mssparkutils.fs.ls('/bd04dad0-be37-4094-8292-4fb6d4e5fb18/Files/')
#/lakehouse/default/Files/Current

StatementMeta(, , , Waiting, )

[FileInfo(path=abfss://6f193087-1feb-49e4-a817-330a261f218e@onelake.dfs.fabric.microsoft.com/bd04dad0-be37-4094-8292-4fb6d4e5fb18/Files/Archive, name=Archive, size=557795),
 FileInfo(path=abfss://6f193087-1feb-49e4-a817-330a261f218e@onelake.dfs.fabric.microsoft.com/bd04dad0-be37-4094-8292-4fb6d4e5fb18/Files/Archive_new, name=Archive_new, size=557795),
 FileInfo(path=abfss://6f193087-1feb-49e4-a817-330a261f218e@onelake.dfs.fabric.microsoft.com/bd04dad0-be37-4094-8292-4fb6d4e5fb18/Files/Archives, name=Archives, size=0),
 FileInfo(path=abfss://6f193087-1feb-49e4-a817-330a261f218e@onelake.dfs.fabric.microsoft.com/bd04dad0-be37-4094-8292-4fb6d4e5fb18/Files/Current, name=Current, size=0),
 FileInfo(path=abfss://6f193087-1feb-49e4-a817-330a261f218e@onelake.dfs.fabric.microsoft.com/bd04dad0-be37-4094-8292-4fb6d4e5fb18/Files/files_datafactory, name=files_datafactory, size=0)]

In [50]:
#Reading Files for each sheet (sales and return) and creating a dataframe for each
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import glob

file_paths = glob.glob('/lakehouse/default/Files/Current/Sales_*.xlsx')

#Sales Dataframe - df1
df1s = []
for file_path in file_paths:
    df = pd.read_excel(file_path, sheet_name="Sales")
    df1s.append(df)

df1 = pd.concat(df1s, ignore_index=True)
df1.shape


#Returns dataframe - df2
dfs2 = []
for file_path in file_paths:
    df = pd.read_excel(file_path, sheet_name="Returns")
    dfs2.append(df)

df2 = pd.concat(dfs2, ignore_index=True)

# Print shape of both dataframes
print(f'Dataframe Sales: {df1.shape}, Dataframe returns: {df2.shape}')

StatementMeta(, , , Waiting, )

Dataframe Sales: (8242, 20), Dataframe returns: (123, 4)


In [None]:
df_sales = spark.createDataFrame(df1)
df_returns = spark.createDataFrame(df2)

StatementMeta(, , , Waiting, )

In [4]:
display(df_sales.head(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 62f02fe4-c1a9-4ee3-8e79-3e39f181f388)

In [28]:
display(df_returns.head(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 81e0c549-f332-4c18-b9c1-ddf089bd60d1)

In [29]:
#Trouble shooting duplicates columns
df_final = df_sales.join(df_returns,df_sales.Order_ID==df_returns.Order_ID,how="left").drop(df_returns.Order_ID,df_returns.Customer_Name,df_returns.Sales_Amount)


StatementMeta(, , , Waiting, )

In [None]:
display(df_final.head(10))


StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 0d7a6509-0c55-4ce6-9d16-a81ba69c32c3)

In [31]:
#Adding columns to previous Dataframe.
df_mod = df_final.withColumns({
    "Order_Year": year("Order_Date"),
    "Order_Month": month("Order_Date"),
    "Create_TS": current_timestamp(),
    "Modified_TS": current_timestamp()
})

StatementMeta(, , , Waiting, )

In [32]:
#Creating a temp view of dataframe to query
df_mod.createOrReplaceTempView("ViewSales")

StatementMeta(, , , Waiting, )

In [43]:
%%sql
-- Quering the table created
-- obs Temp View dont write no folder "Tables"
select * from ViewSales limit 10;

StatementMeta(, , , Waiting, )

<Spark SQL result set with 10 rows and 25 fields>

In [44]:
%%sql
-- Creating a table "Bronze Sales" to dump the data from dataframe
create table if not exists project_lakehouse.bronze_sales
    (
    Order_ID string,
    Order_Date Date,
    Shipping_Date Date,
    Aging int,
    Ship_Mode string,
    Product_Category string,
    Product string,
    Sales float,
    Quantity float,
    Discount float,
    Profit float,
    Shipping_Cost float,
    Order_Priority string,
    Customer_ID string,
    Customer_Name string,
    Segment string,
    City string,
    State string,
    Country string,
    Region string,
    Return string,
    Order_Year int,
    Order_Month int,
    Create_TS TIMESTAMP,
    Modified_TS TIMESTAMP
    ) 
using delta
PARTITIONED by (Order_Year, Order_Month)

StatementMeta(, , , Waiting, )

<Spark SQL result set with 0 rows and 0 fields>

In [45]:
%%sql
merge into project_lakehouse.bronze_sales as BS
using ViewSales as VS 
on BS.Order_Year=VS.Order_Year 
    and BS.Order_Month=VS.Order_Month 
    and BS.Order_ID = VS.Order_ID
when matched THEN
UPDATE SET 
    BS.Order_Date = VS.Order_Date,
    BS.Shipping_Date = VS.Shipping_Date,
    BS.Aging = VS.Aging,
    BS.Ship_Mode = VS.Ship_Mode,
    BS.Product_Category = VS.Product_Category,
    BS.Product = VS.Product,
    BS.Sales = VS.Sales,
    BS.Quantity = VS.Quantity,
    BS.Discount = VS.Discount,
    BS.Profit = VS.Profit,
    BS.Shipping_Cost = VS.Shipping_Cost,
    BS.Order_Priority = VS.Order_Priority,
    BS.Customer_ID = VS.Customer_ID,
    BS.Customer_Name=VS.Customer_Name,
    BS.Segment = VS.Segment,
    BS.City = VS.City,
    BS.State = VS.State,
    BS.Country = VS.Country,
    BS.Region = VS.Region,
    BS.Return = VS.Return,
    BS.Modified_TS = VS.Modified_TS
WHEN NOT MATCHED THEN 
INSERT
(
    BS.Order_ID,
    BS.Order_Date,
    BS.Shipping_Date,
    BS.Aging,
    BS.Ship_Mode,
    BS.Product_Category,
    BS.Product,
    BS.Sales,
    BS.Quantity,
    BS.Discount,
    BS.Profit,
    BS.Shipping_Cost,
    BS.Order_Priority,
    BS.Customer_ID,
    BS.Customer_Name,
    BS.Segment,
    BS.City,
    BS.State,
    BS.Country,
    BS.Region,
    BS.Return,
    BS.Order_Year,
    BS.Order_Month,
    BS.Create_TS,
    BS.Modified_TS
)
VALUES
(
    VS.Order_ID,
    VS.Order_Date,
    VS.Shipping_Date,
    VS.Aging,
    VS.Ship_Mode,
    VS.Product_Category,
    VS.Product,
    VS.Sales,
    VS.Quantity,
    VS.Discount,
    VS.Profit,
    VS.Shipping_Cost,
    VS.Order_Priority,
    VS.Customer_ID,
    VS.Customer_Name,
    VS.Segment,
    VS.City,
    VS.State,
    VS.Country,
    VS.Region,
    VS.Return,
    VS.Order_Year,
    VS.Order_Month,
    VS.Create_TS,
    VS.Modified_TS
);

StatementMeta(, , , Waiting, )

<Spark SQL result set with 1 rows and 4 fields>

In [1]:
%%sql
select * from project_lakehouse.bronze_sales limit 10;

StatementMeta(, , , Waiting, )

<Spark SQL result set with 10 rows and 25 fields>