## Parameters

In [1]:
workspace_name = "PF_002_Fabric-DEV"
lakehouse_name = "MainStorage"
lakehouse_abfss = f"abfss://{workspace_name}@onelake.dfs.fabric.microsoft.com/{lakehouse_name}.Lakehouse"
files_path = f"{lakehouse_abfss}/Files/Raw"
tables_path = f"{lakehouse_abfss}/Tables" 


StatementMeta(, 5459e36a-09fe-472b-b68c-ef97cdedb214, 3, Finished, Available, Finished)

## Imports

In [2]:
spark.conf.set('spark.sql.caseSensitive', True)
from pyspark.sql.functions import *

StatementMeta(, 5459e36a-09fe-472b-b68c-ef97cdedb214, 4, Finished, Available, Finished)

## FactInternetSales

In [3]:
# Read
sales_path = f"{files_path}/dbo.FactInternetSales.parquet"
df_sales = spark.read.load(sales_path, format="parquet")
# display(df_sales) 

# Select columns
df_sales = df_sales.select(
    'ProductKey', 
    'OrderDateKey', 
    'DueDateKey', 
    'ShipDateKey', 
    'CustomerKey', 
    'SalesOrderNumber', 
    'OrderQuantity', 
    'UnitPrice', 
    'UnitPriceDiscountPct', 
    'ProductStandardCost'
)
# display(df_sales) 

# Write to tables
df_sales.write.format("delta") \
          .mode("overwrite") \
          .save(f"{tables_path}/FactInternetSales") 


StatementMeta(, 5459e36a-09fe-472b-b68c-ef97cdedb214, 5, Finished, Available, Finished)

## DimCustomer

In [4]:
# Read
customer_path = f"{files_path}/dbo.DimCustomer.parquet"
df_customer = spark.read.load(customer_path, format="parquet")
# display(df_customer)

# Select
df_customer = df_customer.select(
    "CustomerKey",
    "GeographyKey",
    "CustomerAlternateKey",
    "FirstName",
    "MiddleName",
    "LastName",
    "BirthDate",
    "Gender",
    "MaritalStatus",
    "TotalChildren",
    "EnglishEducation",
    "EnglishOccupation",
    "HouseOwnerFlag",
    "NumberCarsOwned"
)
# display(df_customer)

# Clean
df_customer = df_customer \
    .withColumn("FullName", concat_ws(" ", col("FirstName"), col("MiddleName"), col("LastName"))) \
    .drop("FirstName", "MiddleName", "LastName") \
    .withColumnsRenamed({
        "EnglishEducation":"Education",
        "EnglishOccupation": "Occupation"
    })
# display(df_customer) 

StatementMeta(, 5459e36a-09fe-472b-b68c-ef97cdedb214, 6, Finished, Available, Finished)

In [5]:
# Read Geography to join with Customers
geography_path = f"{files_path}/dbo.DimGeography.parquet"
df_geography = spark.read.load(geography_path, format="parquet")
# display(df_geography)

# Select
df_geography = df_geography.select(
    "GeographyKey", 
    "City",
    "EnglishCountryRegionName"
)
# display(df_geography)

# Clean
df_geography = df_geography \
    .withColumnRenamed("EnglishCountryRegionName", "CountryRegion") 
# display(df_geography)


StatementMeta(, 5459e36a-09fe-472b-b68c-ef97cdedb214, 7, Finished, Available, Finished)

In [6]:
# Join Geography on Customer
df_customer = df_customer.join(
    df_geography,
    on="GeographyKey",  
    how="left"          
).drop("GeographyKey") 
# display(df_customer) 

# Write to tables
df_customer.write.format("delta") \
          .mode("overwrite") \
          .save(f"{tables_path}/DimCustomer")  

StatementMeta(, 5459e36a-09fe-472b-b68c-ef97cdedb214, 8, Finished, Available, Finished)

## DimProduct

In [7]:
# Read
product_path = f"{files_path}/dbo.DimProduct.parquet"
df_product = spark.read.load(product_path, format="parquet")
# display(df_product)

# Select
df_product = df_product.select(
    "ProductKey", 
    "ProductSubcategoryKey",
    "EnglishProductName",
    "Color",
    "Size",
    "ModelName",
    "LargePhoto",
    "EnglishDescription"
)
# display(df_product) 

StatementMeta(, 5459e36a-09fe-472b-b68c-ef97cdedb214, 9, Finished, Available, Finished)

In [8]:
# Read Category
product_category_path = f"{files_path}/dbo.DimProductCategory.parquet"
df_product_category = spark.read.load(product_category_path, format="parquet")
# display(df_product_category)

# Select
df_product_category = df_product_category.select(
    "ProductCategoryKey",
    "EnglishProductCategoryName"
)
# display(df_product_category) 

StatementMeta(, 5459e36a-09fe-472b-b68c-ef97cdedb214, 10, Finished, Available, Finished)

In [9]:
# Read Subcategory
product_subcategory_path = f"{files_path}/dbo.DimProductSubcategory.parquet"
df_product_subcategory = spark.read.load(product_subcategory_path, format="parquet")
# display(df_product_subcategory)

# Select
df_product_subcategory = df_product_subcategory.select("ProductSubcategoryKey", "EnglishProductSubcategoryName", "ProductCategoryKey") 
# display(df_product_subcategory) 

StatementMeta(, 5459e36a-09fe-472b-b68c-ef97cdedb214, 11, Finished, Available, Finished)

In [10]:
# Join subcategory on products
df_product = df_product.join(
    df_product_subcategory,
    on="ProductSubcategoryKey",
    how="left"
).drop("ProductSubcategoryKey")
# display(df_product)

# Join category on products
df_product = df_product.join(
    df_product_category,
    on="ProductCategoryKey",
    how="left"
).drop("ProductCategoryKey")
# display(df_product)

# Clean
df_product = df_product \
    .withColumnsRenamed(
        {
            "EnglishProductName" : "ProductName",
            "EnglishDescription": "Description",
            "EnglishProductSubcategoryName": "ProductSubcategoyName",
            "EnglishProductCategoryName": "ProductCategoryName" 
        }
    )
# display(df_product) 

# Write
df_product.write.format("delta") \
          .mode("overwrite") \
          .save(f"{tables_path}/DimProduct")   

StatementMeta(, 5459e36a-09fe-472b-b68c-ef97cdedb214, 12, Finished, Available, Finished)

## DimDate

In [11]:
# Read
date_path = f"{files_path}/dbo.DimDate.parquet"
df_date = spark.read.load(date_path, format="parquet")
# display(df_date)

# Select
df_date = df_date.drop("SpanishDayNameOfWeek", "FrenchDayNameOfWeek", "SpanishMonthName", "FrenchMonthName")
# display(df_date)

# Clean
df_date = df_date.withColumnsRenamed({"EnglishDayNameOfWeek":"DayNameOfWeek", "EnglishMonthName": "MonthName"})
# display(df_date) 

# Write
df_date.write.format("delta") \
          .mode("overwrite") \
          .save(f"{tables_path}/DimDate")   

StatementMeta(, 5459e36a-09fe-472b-b68c-ef97cdedb214, 13, Finished, Available, Finished)