In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "",
"fs.azure.account.oauth2.client.secret": '',
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com//oauth2/token"}

dbutils.fs.unmount("/mnt/shop")
dbutils.fs.mount(
source = "abfss://.dfs.core.windows.net", # contrainer@storageacc
mount_point = "/mnt/shop",
extra_configs = configs)

/mnt/shop has been unmounted.


True

In [0]:
%fs
ls "/mnt/shop"

path,name,size,modificationTime
dbfs:/mnt/shop/gold-layer/,gold-layer/,0,1727800356000
dbfs:/mnt/shop/raw-data/,raw-data/,0,1727792291000
dbfs:/mnt/shop/silver-layer/,silver-layer/,0,1727799947000


In [0]:
Customer = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/shop/bronze-layer/Customer.csv")
Generations = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/shop/bronze-layer/Generations.csv")
Inventory = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/shop/bronze-layer/Inventory.csv")
Product = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/shop/bronze-layer/Product.csv")
SalesOrders = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/shop/bronze-layer/SalesOrders.csv")
SalesOutlet = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/shop/bronze-layer/SalesOutlet.csv")
SalesTarget = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/shop/bronze-layer/SalesTarget.csv")
staff = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/shop/bronze-layer/staff.csv")

In [0]:
Customer.printSchema()
Generations.printSchema()
Inventory.printSchema()
Product.printSchema()
SalesOrders.printSchema()
SalesOutlet.printSchema()
SalesTarget.printSchema()
staff.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- home_store: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- customer_since: date (nullable = true)
 |-- loyalty_cardNumber: string (nullable = true)
 |-- birthdate: date (nullable = true)
 |-- gender: string (nullable = true)
 |-- birth_year: integer (nullable = true)

root
 |-- birth_year: integer (nullable = true)
 |-- generation: string (nullable = true)

root
 |-- outlet_id: integer (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- start_of_day: integer (nullable = true)
 |-- quantity_sold: integer (nullable = true)
 |-- waste: integer (nullable = true)
 |-- waste_percentage: string (nullable = true)

root
 |-- product_id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- sub_category: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- name: string (nullable = tru

In [0]:
# Clean Customer DataFrame
Customer = Customer.dropna(subset=["first_name", "email"]) \
                                .fillna({"home_store": 3, 
                                          "loyalty_cardNumber": "Unknown", 
                                          "birthdate": "2000-01-01", 
                                          "gender": "Unknown", 
                                          "birth_year": 2000})

# Clean Generations DataFrame
Generations = Generations.dropna(subset=["birth_year", "generation"])

# Clean Inventory DataFrame
Inventory = Inventory.dropna(subset=["outlet_id", "transaction_date", "product_id"]) \
                                  .fillna({"start_of_day": 0, 
                                            "quantity_sold": 0, 
                                            "waste": 0, 
                                            "waste_percentage": "0%"})

# Clean Product DataFrame
Product = Product.dropna(subset=["name","category","sub_category"]) \
                             .fillna({"product_description": "Unknown", 
                                       "product_weight": "0", 
                                       "wholesale_price": 0.0, 
                                       "retail_price": 0.0, 
                                       "tax_exempt_yn": False, 
                                       "promo_yn": False, 
                                       "new_product_yn": False})

# Clean SalesOutlet DataFrame
SalesOutlet = SalesOutlet.dropna(subset=["store_address"])

SalesOrders = SalesOrders.dropna(subset=["transaction_id"])
                                        

# Clean SalesTarget DataFrame
SalesTarget = SalesTarget.dropna(subset=["year_month"]) \
                                        .fillna({"beans_goal": 0, 
                                                  "beverage_goal": 0, 
                                                  "food_goal": 0, 
                                                  "merchandise_goal": 0, 
                                                  "total_goal": 0})

# Clean Staff DataFrame
staff = staff.fillna({"last_name": "Unknown", 
                        "position": "Unknown", 
                        "location": "Unknown", 
                        "start_date": "2018-01-01"})
                         

In [0]:
# Drop duplicates from Customer DataFrame based on customer_id and email
Customer = Customer.dropDuplicates(["customer_id", "email"])

# Drop duplicates from Generations DataFrame based on birth_year
Generations = Generations.dropDuplicates(["birth_year"])

# Drop duplicates from Inventory DataFrame based on outlet_id, transaction_date, and product_id
Inventory = Inventory.dropDuplicates(["outlet_id", "transaction_date", "product_id"])

# Drop duplicates from Product DataFrame based on product_id
Product = Product.dropDuplicates(["product_id"])

# Drop duplicates from SalesOrders DataFrame based on transaction_id
SalesOrders = SalesOrders.dropDuplicates(["transaction_id","transaction_time"])

# Drop duplicates from SalesOutlet DataFrame based on outlet_id
SalesOutlet = SalesOutlet.dropDuplicates(["outlet_id"])

# Drop duplicates from SalesTarget DataFrame based on outlet_id and year_month
SalesTarget = SalesTarget.dropDuplicates(["outlet_id", "year_month"])

# Drop duplicates from Staff DataFrame based on staff_id
staff = staff.dropDuplicates(["staff_id"])

In [0]:
Customer.toPandas().to_csv("/dbfs/mnt/shop/silver-layer/Customer.csv", index = False)
Generations.toPandas().to_csv("/dbfs/mnt/shop/silver-layer/Generations.csv", index = False)
Inventory.toPandas().to_csv("/dbfs/mnt/shop/silver-layer/Inventory.csv", index = False)
Product.toPandas().to_csv("/dbfs/mnt/shop/silver-layer/Product.csv", index = False)
SalesOrders.toPandas().to_csv("/dbfs/mnt/shop/silver-layer/SalesOrders.csv", index = False)
SalesOutlet.toPandas().to_csv("/dbfs/mnt/shop/silver-layer/SalesOutlet.csv", index = False)
SalesTarget.toPandas().to_csv("/dbfs/mnt/shop/silver-layer/SalesTarget.csv", index = False)
staff.toPandas().to_csv("/dbfs/mnt/shop/silver-layer/staff.csv", index = False)