In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

## SILVER LAYER TRANSFORMATION SCRIPT

### DATA ACCESS USING APP

In [0]:
spark.conf.set("fs.azure.account.auth.type.awstorageadls.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.awstorageadls.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.awstorageadls.dfs.core.windows.net", "6372c749-0645-4965-af56-6d618d5f5a6a")
spark.conf.set("fs.azure.account.oauth2.client.secret.awstorageadls.dfs.core.windows.net", "NJB8Q~4UjZa3YPn~S7kqQu0cSUVp9xnbmqBL5bnt")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.awstorageadls.dfs.core.windows.net", "https://login.microsoftonline.com/1e00239c-e27c-4528-b2a6-b47ccccdb693/oauth2/token")

In [0]:
# To use mounting method

# dbutils.fs.mount(
#   source = "wasbs://bronze@awstorageadls.blob.core.windows.net/",
#   mount_point = "/mnt/bronze",
#   extra_configs = {
#     "fs.azure.account.key.awstorageadls.blob.core.windows.net": "/<Access Key>"
#   }
# )

# # Trying out with calendar
# df = spark.read.csv("/mnt/bronze/Calendar", header=True, inferSchema=True)
# display(df)


### DATA LOADING


#### Read Data

In [0]:
# # For one folder
# df_Customers = spark.read.format("csv")\
#                 .option("header", True)\
#                 .option("inferSchema", True)\
#                 .load("abfss://bronze@awstorageadls.dfs.core.windows.net/Customers")
# display(df_Customers)

# Example to load all data like Sales
# df_sales = spark.read.format("csv")\
#                 .option("header", True)\
#                 .option("inferSchema", True)\
#                 .load("abfss://bronze@awstorageadls.dfs.core.windows.net/Sales*")
# display(df)

SalesTerritoryKey,Region,Country,Continent
1,Northwest,United States,North America
2,Northeast,United States,North America
3,Central,United States,North America
4,Southwest,United States,North America
5,Southeast,United States,North America
6,Canada,Canada,North America
7,France,France,Europe
8,Germany,Germany,Europe
9,Australia,Australia,Pacific
10,United Kingdom,United Kingdom,Europe


In [0]:
from pyspark.sql import SparkSession
import os

# Define the root path
root_path = "abfss://bronze@awstorageadls.dfs.core.windows.net/"

# List folders to load separately
individual_folders = [
    "Calendar",
    "Customers",
    "Products",
    "Products_Categories",
    "Products_Subcategories",
    "Returns",
    "Territories"
]

# Load individual folders into named dataframes
for folder in individual_folders:
    full_path = os.path.join(root_path, folder)
    df = spark.read.format("csv")\
        .option("header", True)\
        .option("inferSchema", True)\
        .load(full_path)
    
    globals()[f"df_{folder}"] = df

# Combine sales data
sales_folders = ["Sales_2015", "Sales_2016", "Sales_2017"]
df_sales = None

for folder in sales_folders:
    full_path = os.path.join(root_path, folder)
    df_temp = spark.read.format("csv")\
        .option("header", True)\
        .option("inferSchema", True)\
        .load(full_path)
    
    df_sales = df_temp if df_sales is None else df_sales.unionByName(df_temp)


In [0]:
display(df_sales)

OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,TerritoryKey,OrderLineItem,OrderQuantity
2015-01-01,2001-09-21,SO45080,332,14657,1,1,1
2015-01-01,2001-12-05,SO45079,312,29255,4,1,1
2015-01-01,2001-10-29,SO45082,350,11455,9,1,1
2015-01-01,2001-11-16,SO45081,338,26782,6,1,1
2015-01-02,2001-12-15,SO45083,312,14947,10,1,1
2015-01-02,2001-10-12,SO45084,310,29143,4,1,1
2015-01-02,2001-12-18,SO45086,314,18747,9,1,1
2015-01-02,2001-10-09,SO45085,312,18746,9,1,1
2015-01-03,2001-10-03,SO45093,312,18906,9,1,1
2015-01-03,2001-09-29,SO45090,310,29170,4,1,1


### TRANSFORMATIONS

#### Calendar

In [0]:
df_Calendar = df_Calendar.withColumn('Month', month(col('Date')))\
                .withColumn('Year', year(col('Date')))\
                .withColumn('Quarter', quarter(col('Date')))
df_Calendar.display()

Date,Month,Year,Quarter
2015-01-01,1,2015,1
2015-01-02,1,2015,1
2015-01-03,1,2015,1
2015-01-04,1,2015,1
2015-01-05,1,2015,1
2015-01-06,1,2015,1
2015-01-07,1,2015,1
2015-01-08,1,2015,1
2015-01-09,1,2015,1
2015-01-10,1,2015,1


In [0]:
df_Calendar.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@awstorageadls.dfs.core.windows.net/Calendar")\
            .save()

#### Customers

In [0]:
df_Customers = df_Customers.withColumn('FullName', concat_ws(' ', col('Prefix'), col('FirstName'), col('LastName')))


In [0]:
df_Customers.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@awstorageadls.dfs.core.windows.net/Customers")\
            .save()


#### Product Subcategories

In [0]:
df_Products_Subcategories.display()

ProductSubcategoryKey,SubcategoryName,ProductCategoryKey
1,Mountain Bikes,1
2,Road Bikes,1
3,Touring Bikes,1
4,Handlebars,2
5,Bottom Brackets,2
6,Brakes,2
7,Chains,2
8,Cranksets,2
9,Derailleurs,2
10,Forks,2


In [0]:
df_Products_Subcategories.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@awstorageadls.dfs.core.windows.net/Products_Subcategories")\
            .save()


#### Product


In [0]:
df_Products.display()

ProductKey,ProductSubcategoryKey,ProductSKU,ProductName,ModelName,ProductDescription,ProductColor,ProductSize,ProductStyle,ProductCost,ProductPrice
214,31,HL-U509-R,"Sport-100 Helmet, Red",Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Red,0,0,13.0863,34.99
215,31,HL-U509,"Sport-100 Helmet, Black",Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Black,0,0,12.0278,33.6442
218,23,SO-B909-M,"Mountain Bike Socks, M",Mountain Bike Socks,Combination of natural and synthetic fibers stays dry and provides just the right cushioning.,White,M,U,3.3963,9.5
219,23,SO-B909-L,"Mountain Bike Socks, L",Mountain Bike Socks,Combination of natural and synthetic fibers stays dry and provides just the right cushioning.,White,L,U,3.3963,9.5
220,31,HL-U509-B,"Sport-100 Helmet, Blue",Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Blue,0,0,12.0278,33.6442
223,19,CA-1098,AWC Logo Cap,Cycling Cap,Traditional style with a flip-up brim; one-size fits all.,Multi,0,U,5.7052,8.6442
226,21,LJ-0192-S,"Long-Sleeve Logo Jersey, S",Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,S,U,31.7244,48.0673
229,21,LJ-0192-M,"Long-Sleeve Logo Jersey, M",Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,M,U,31.7244,48.0673
232,21,LJ-0192-L,"Long-Sleeve Logo Jersey, L",Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,L,U,31.7244,48.0673
235,21,LJ-0192-X,"Long-Sleeve Logo Jersey, XL",Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,XL,U,31.7244,48.0673


In [0]:
df_Products = df_Products.withColumn('ProductSKU', split(col('ProductSKU'), '-')[0])\
                        .withColumn('ProductName', split(col('ProductName'), ' ')[0])

In [0]:
display(df_Products)

ProductKey,ProductSubcategoryKey,ProductSKU,ProductName,ModelName,ProductDescription,ProductColor,ProductSize,ProductStyle,ProductCost,ProductPrice
214,31,HL,Sport-100,Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Red,0,0,13.0863,34.99
215,31,HL,Sport-100,Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Black,0,0,12.0278,33.6442
218,23,SO,Mountain,Mountain Bike Socks,Combination of natural and synthetic fibers stays dry and provides just the right cushioning.,White,M,U,3.3963,9.5
219,23,SO,Mountain,Mountain Bike Socks,Combination of natural and synthetic fibers stays dry and provides just the right cushioning.,White,L,U,3.3963,9.5
220,31,HL,Sport-100,Sport-100,"Universal fit, well-vented, lightweight , snap-on visor.",Blue,0,0,12.0278,33.6442
223,19,CA,AWC,Cycling Cap,Traditional style with a flip-up brim; one-size fits all.,Multi,0,U,5.7052,8.6442
226,21,LJ,Long-Sleeve,Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,S,U,31.7244,48.0673
229,21,LJ,Long-Sleeve,Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,M,U,31.7244,48.0673
232,21,LJ,Long-Sleeve,Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,L,U,31.7244,48.0673
235,21,LJ,Long-Sleeve,Long-Sleeve Logo Jersey,Unisex long-sleeve AWC logo microfiber cycling jersey,Multi,XL,U,31.7244,48.0673


In [0]:
df_Products.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@awstorageadls.dfs.core.windows.net/Products")\
            .save()


#### Returns

In [0]:
df_Returns.display()

ReturnDate,TerritoryKey,ProductKey,ReturnQuantity
2015-01-18,9,312,1
2015-01-18,10,310,1
2015-01-21,8,346,1
2015-01-22,4,311,1
2015-02-02,6,312,1
2015-02-15,1,312,1
2015-02-19,9,311,1
2015-02-24,8,314,1
2015-03-08,8,350,1
2015-03-13,9,350,1


In [0]:
df_Returns.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@awstorageadls.dfs.core.windows.net/Returns")\
            .save()


#### Territories

In [0]:
df_Territories.display()

SalesTerritoryKey,Region,Country,Continent
1,Northwest,United States,North America
2,Northeast,United States,North America
3,Central,United States,North America
4,Southwest,United States,North America
5,Southeast,United States,North America
6,Canada,Canada,North America
7,France,France,Europe
8,Germany,Germany,Europe
9,Australia,Australia,Pacific
10,United Kingdom,United Kingdom,Europe


In [0]:
df_Territories.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@awstorageadls.dfs.core.windows.net/Territories")\
            .save()


#### Sales

In [0]:
df_sales.display()

OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,TerritoryKey,OrderLineItem,OrderQuantity
2017-01-01,2003-12-13,SO61285,529,23791,1,2,2
2017-01-01,2003-09-24,SO61285,214,23791,1,3,1
2017-01-01,2003-09-04,SO61285,540,23791,1,1,1
2017-01-01,2003-09-28,SO61301,529,16747,1,2,2
2017-01-01,2003-10-21,SO61301,377,16747,1,1,1
2017-01-01,2003-10-23,SO61301,540,16747,1,3,1
2017-01-01,2003-09-04,SO61269,215,11792,4,1,1
2017-01-01,2003-10-21,SO61269,229,11792,4,2,1
2017-01-01,2003-10-24,SO61286,528,11530,6,2,2
2017-01-01,2003-09-27,SO61286,536,11530,6,1,2


In [0]:
df_sales = df_sales.withColumn('StockDate', to_timestamp(col('StockDate') ))\
                    .withColumn('OrderNumber', regexp_replace(col('OrderNumber'), 'S', 'T'))\
                    .withColumn('Multiply', col('OrderLineItem') * col('OrderQuantity'))

In [0]:
df_sales.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@awstorageadls.dfs.core.windows.net/Sales")\
            .save()

#### Sales Aggregation

In [0]:
df_sales.groupBy('OrderDate').agg(count('OrderNumber').alias('TotalOrders')).display()


OrderDate,TotalOrders
2017-01-06,151
2017-01-27,142
2017-02-26,119
2017-01-24,173
2017-06-29,172
2017-02-16,124
2017-04-09,140
2017-02-28,162
2017-03-28,149
2017-06-30,136


Databricks visualization. Run in Databricks to view.

In [0]:
df_Products_Categories.display()


ProductCategoryKey,CategoryName
1,Bikes
2,Components
3,Clothing
4,Accessories


Databricks visualization. Run in Databricks to view.

In [0]:
df_Territories.display()

SalesTerritoryKey,Region,Country,Continent
1,Northwest,United States,North America
2,Northeast,United States,North America
3,Central,United States,North America
4,Southwest,United States,North America
5,Southeast,United States,North America
6,Canada,Canada,North America
7,France,France,Europe
8,Germany,Germany,Europe
9,Australia,Australia,Pacific
10,United Kingdom,United Kingdom,Europe


Databricks visualization. Run in Databricks to view.