In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
print("Starting aggregation and loading data to gold layer")

Starting aggregation and loading data to gold layer


#### Product Data

In [0]:
product_df = spark.read.format("delta")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load(f"abfss://silver@azureprojectdatalakegen2.dfs.core.windows.net/Product")

In [0]:
product_df.printSchema()
product_df.display()

root
 |-- ProductKey: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- StandardCost: double (nullable = true)
 |-- Color: string (nullable = true)
 |-- Subcategory: string (nullable = true)
 |-- Category: string (nullable = true)



ProductKey,Product,StandardCost,Color,Subcategory,Category
210,"HL Road Frame - Black, 58",868.63,Black,Road Frames,Components
215,"Sport-100 Helmet, Black",12.03,Black,Helmets,Accessories
216,"Sport-100 Helmet, Black",13.88,Black,Helmets,Accessories
217,"Sport-100 Helmet, Black",13.09,Black,Helmets,Accessories
253,"LL Road Frame - Black, 58",176.2,Black,Road Frames,Components
254,"LL Road Frame - Black, 58",170.14,Black,Road Frames,Components
255,"LL Road Frame - Black, 58",204.63,Black,Road Frames,Components
256,"LL Road Frame - Black, 60",176.2,Black,Road Frames,Components
257,"LL Road Frame - Black, 60",170.14,Black,Road Frames,Components
258,"LL Road Frame - Black, 60",204.63,Black,Road Frames,Components


In [0]:
product_df_aggregated = product_df.groupBy(["Category", "Subcategory"])\
    .agg(
        mean(col("StandardCost")).alias("AverageCost"),
        sum(col("StandardCost")).alias("TotalCost")
    )\
    .orderBy(["Category", "Subcategory"])
product_df_aggregated.display()

Category,Subcategory,AverageCost,TotalCost
Accessories,Bike Racks,44.88,44.88
Accessories,Bike Stands,59.47,59.47
Accessories,Bottles and Cages,2.99,8.97
Accessories,Cleaners,2.97,2.97
Accessories,Fenders,8.22,8.22
Accessories,Helmets,13.0,117.0
Accessories,Hydration Packs,20.57,20.57
Accessories,Lights,12.92,38.76
Accessories,Locks,10.31,10.31
Accessories,Panniers,51.56,51.56


In [0]:
product_df.write.format("delta").mode("overwrite")\
    .option("overwriteSchema", "true")\
    .option("path", f"abfss://gold@azureprojectdatalakegen2.dfs.core.windows.net/Product")\
    .save()

product_df_aggregated.write.format("delta").mode("overwrite")\
    .option("overwriteSchema", "true")\
    .option("path", f"abfss://gold@azureprojectdatalakegen2.dfs.core.windows.net/ProductAggregated")\
    .save()

### Region Data

In [0]:
region_df = spark.read.format("delta")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load(f"abfss://silver@azureprojectdatalakegen2.dfs.core.windows.net/Region")

In [0]:
region_df.write.format("delta").mode("overwrite")\
    .option('overwriteSchema','true')\
    .option("path", f"abfss://gold@azureprojectdatalakegen2.dfs.core.windows.net/Region")\
    .save()

### Reseller Data

In [0]:
reseller_df = spark.read.format("delta")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load(f"abfss://silver@azureprojectdatalakegen2.dfs.core.windows.net/Reseller")

In [0]:
reseller_df.printSchema()
reseller_df.display()

root
 |-- ResellerKey: integer (nullable = true)
 |-- BusinessType: string (nullable = true)
 |-- Reseller: string (nullable = true)
 |-- City: string (nullable = true)
 |-- StateProvince: string (nullable = true)
 |-- CountryRegion: string (nullable = true)



ResellerKey,BusinessType,Reseller,City,StateProvince,CountryRegion
277,Specialty Bike Shop,The Bicycle Accessories Company,Alhambra,California,United States
455,Value Added Reseller,Timely Shipping Service,Alpine,California,United States
609,Value Added Reseller,Good Toys,Auburn,California,United States
492,Specialty Bike Shop,Basic Sports Equipment,Baldwin Park,California,United States
365,Specialty Bike Shop,Distinctive Store,Barstow,California,United States
168,Specialty Bike Shop,Economy Bikes Company,Bell Gardens,California,United States
6,Warehouse,Aerobic Exercise Company,Camarillo,California,United States
402,Warehouse,Pro Sporting Goods,Camarillo,California,United States
529,Warehouse,Big-Time Bike Store,Camarillo,California,United States
241,Specialty Bike Shop,Vale Riding Supplies,Canoga Park,California,United States


In [0]:
reseller_df_aggregated = reseller_df.groupBy(col("CountryRegion"))\
    .agg(
        count(col("city")).alias("CityCount")
    )\
    .orderBy(col("CountryRegion"))
reseller_df_aggregated.display()

CountryRegion,CityCount
Australia,40
Canada,114
France,40
Germany,40
United Kingdom,40
United States,427


In [0]:
reseller_df.write.format("delta").mode("overwrite")\
    .option("overwriteSchema", "true")\
    .option("path", f"abfss://gold@azureprojectdatalakegen2.dfs.core.windows.net/Reseller")\
    .save()

reseller_df_aggregated.write.format("delta").mode("overwrite")\
    .option("overwriteSchema", "true")\
    .option("path", f"abfss://gold@azureprojectdatalakegen2.dfs.core.windows.net/ResellerAggregated")\
    .save()

### Sales Data

In [0]:
sales_df = spark.read.format("delta")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load(f"abfss://silver@azureprojectdatalakegen2.dfs.core.windows.net/Sales")

In [0]:
sales_df.printSchema()
sales_df.display()

root
 |-- SalesOrderNumber: string (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- ProductKey: integer (nullable = true)
 |-- ResellerKey: integer (nullable = true)
 |-- EmployeeKey: integer (nullable = true)
 |-- SalesTerritoryKey: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Cost: double (nullable = true)



SalesOrderNumber,OrderDate,ProductKey,ResellerKey,EmployeeKey,SalesTerritoryKey,Quantity,UnitPrice,Sales,Cost
SO43897,2017-08-25,235,312,282,4,2,28.84,57.68,63.45
SO43897,2017-08-25,351,312,282,4,2,2024.99,4049.98,3796.19
SO43897,2017-08-25,348,312,282,4,2,2024.99,4049.98,3796.19
SO43897,2017-08-25,232,312,282,4,2,28.84,57.68,63.45
SO44544,2017-11-18,292,312,282,4,2,818.7,1637.4,1413.62
SO44544,2017-11-18,220,312,282,4,2,20.19,40.38,24.06
SO44544,2017-11-18,351,312,282,4,2,2024.99,4049.98,3796.19
SO44544,2017-11-18,349,312,282,4,2,2024.99,4049.98,3796.19
SO44544,2017-11-18,344,312,282,4,2,2039.99,4079.98,3824.31
SO45321,2018-02-18,346,312,282,4,2,2039.99,4079.98,3824.31


In [0]:
sales_df_aggregated = sales_df.groupBy(col('SalesOrderNumber'))\
    .agg(
        sum(col("Sales")).alias("TotalSales")
    )\
    .orderBy(col("TotalSales").desc())
sales_df_aggregated.display()

SalesOrderNumber,TotalSales
SO51131,175753.58000000002
SO55282,166854.16
SO46616,153431.37
SO46981,149533.15
SO47395,148014.93
SO47369,141523.24
SO51858,131627.88999999998
SO51822,130409.95
SO47355,130185.05
SO44518,127099.79


In [0]:
sales_df.write.format("delta").mode("overwrite")\
    .option("overwriteSchema", "true")\
    .option("path", f"abfss://gold@azureprojectdatalakegen2.dfs.core.windows.net/Sales")\
    .save()

sales_df_aggregated.write.format("delta").mode("overwrite")\
    .option("overwriteSchema", "true")\
    .option("path", f"abfss://gold@azureprojectdatalakegen2.dfs.core.windows.net/SalesAggregated")\
    .save()


### Salesperson Data

In [0]:
salesperson_df = spark.read.format("delta")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load(f"abfss://silver@azureprojectdatalakegen2.dfs.core.windows.net/Salesperson")

In [0]:
salesperson_df.write.format("delta").mode("overwrite")\
    .option("overwriteSchema", "true")\
    .option("path", f"abfss://gold@azureprojectdatalakegen2.dfs.core.windows.net/Salesperson")\
    .save()

### SalespersonRegion Data

In [0]:
salespersonregion_df = spark.read.format("delta")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load(f"abfss://silver@azureprojectdatalakegen2.dfs.core.windows.net/SalespersonRegion")

In [0]:
salespersonregion_df.write.format("delta").mode("overwrite")\
    .option("overwriteSchema", "true")\
    .option("path", f"abfss://gold@azureprojectdatalakegen2.dfs.core.windows.net/SalespersonRegion")\
    .save()

### Targets Data

In [0]:
targets_df = spark.read.format("delta")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load(f"abfss://silver@azureprojectdatalakegen2.dfs.core.windows.net/Targets")

In [0]:
targets_df.write.format("delta").mode("overwrite")\
    .option('overwriteSchema', 'true')\
    .option("path", f"abfss://gold@azureprojectdatalakegen2.dfs.core.windows.net/Targets")\
    .save()

In [0]:
print("Loading completed.")
print("ETL Pipeline completed. Data is ready for production use.")

Loading completed.
ETL Pipeline completed. Data is ready for production use.
