In [0]:
"""
configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": dbutils.secrets.get(scope="my-scope", key="application-id"),
           "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="my-scope", key="application-secret"),
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/" + dbutils.secrets.get(scope="my-scope", key="application-tenant-id") + "/oauth2/token"
}

#mouting storage container
dbutils.fs.mount(
  source = "abfss://shopease-container@shopeasestorage123.dfs.core.windows.net/",
  mount_point = "/mnt/shopease",
  extra_configs = configs)

"""

True

In [0]:
#load data

customers = spark.read.csv("/mnt/shopease/silver/data/customers.csv", header=True, inferSchema=True)
sales = spark.read.csv("/mnt/shopease/silver/data/sales.csv", header=True, inferSchema=True)

In [0]:
#Show data

customers.show()


+-----------+------------+----------+--------------------+----------------+-------------------+-----------+--------------------+------------+---------------+--------------+-------------------+
|customer_id|  first_name| last_name|               email|           phone|            address|       city|               state|     country|       zip_code| date_of_birth|      customer_type|
+-----------+------------+----------+--------------------+----------------+-------------------+-----------+--------------------+------------+---------------+--------------+-------------------+
|          1|        Leah|     Olson| leah.olson@iclou...|4.47402064176E11|    10 Jones Estate|    Centers|          Birmingham|  Birmingham| United Kingdom|       954 7MN|2002-10-20 00:00:00|
|          2|      Denise|     Green| denise.green@out...|4.47645225488E11|   52 Mosley Bridge|      Vista|          Birmingham|  Birmingham| United Kingdom|       708 9QR|1957-08-14 00:00:00|
|          3| Christopher|   Johnso

In [0]:
sales.show()

+-------+------------+-------------------+---------+-------------+----------------+---------------+
|sale_id| customer_id|          sale_date| store_id| total_amount| discount_amount| payment_method|
+-------+------------+-------------------+---------+-------------+----------------+---------------+
|      1|      5631.0|2022-12-08 00:00:00|      5.0|       481.42|            14.8|           Cash|
|      2|      2268.0|2024-03-13 00:00:00|      1.0|       258.69|           16.03|         PayPal|
|      3|      3775.0|2024-09-19 00:00:00|      9.0|       343.14|           41.95|           Cash|
|      4|      3536.0|2023-08-16 00:00:00|      2.0|       279.98|            7.99|         PayPal|
|      5|       957.0|2023-08-06 00:00:00|     10.0|       303.46|           22.51|         PayPal|
|      6|      6949.0|2024-04-09 00:00:00|      3.0|       140.74|            27.2|         PayPal|
|      7|      9489.0|2024-03-02 00:00:00|      9.0|       307.17|           38.54|           Cash|


In [0]:
from pyspark.sql import functions as f
from pyspark.sql.functions import concat, concat_ws, lit, when, col, split


In [0]:
customers.columns

['customer_id',
 ' first_name',
 ' last_name',
 ' email',
 ' phone',
 ' address',
 ' city',
 ' state',
 ' country',
 ' zip_code',
 ' date_of_birth',
 ' customer_type']

In [0]:
#Remove trailing whitespace from the column names
customers = customers.toDF(*[c.strip() for c in customers.columns]) 

In [0]:
customers_split = customers.withColumn("customer_date_of_birth", split(customers["customer_type"], " ")[0]) \
                              .withColumn("customer_type", split(customers["customer_type"], " ")[1])


In [0]:
customers_split.show()

+-----------+------------+---------+--------------------+----------------+-------------------+-----------+--------------------+------------+---------------+-------------+-------------+----------------------+
|customer_id|  first_name|last_name|               email|           phone|            address|       city|               state|     country|       zip_code|date_of_birth|customer_type|customer_date_of_birth|
+-----------+------------+---------+--------------------+----------------+-------------------+-----------+--------------------+------------+---------------+-------------+-------------+----------------------+
|          1|        Leah|    Olson| leah.olson@iclou...|4.47402064176E11|    10 Jones Estate|    Centers|          Birmingham|  Birmingham| United Kingdom|      954 7MN|     00:00:00|            2002-10-20|
|          2|      Denise|    Green| denise.green@out...|4.47645225488E11|   52 Mosley Bridge|      Vista|          Birmingham|  Birmingham| United Kingdom|      708 9Q

In [0]:
#Dropping duplicate and null columns
customers_split = customers_split.drop("country", "customer_type")

In [0]:
#Renaming colunms
customers_renamed = customers_split.withColumnRenamed("zip_code", "country").withColumnRenamed("date_of_birth", "zip_code").withColumnRenamed("customer_date_of_birth", "date_of_birth")

In [0]:
sales = sales.toDF(*[c.strip() for c in sales.columns])

In [0]:
#Write the customers_renamed as a csv to storage
customers_point = "/mnt/shopease/gold/customers"
sales_point =  "/mnt/shopease/gold/sales"
customers_renamed.write.format("csv").option("header", "true").mode("overwrite").save(customers_point)
sales.write.format("csv").option("header", "true").mode("overwrite").save(sales_point)