In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType, FloatType
from pyspark.sql import SparkSession

In [None]:
# Create configuration
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "x",
"fs.azure.account.oauth2.client.secret": "x",
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/x/oauth2/token"}

# Mounting point (path)
dbutils.fs.mount(
source = "abfss://bike-store-data@x.dfs.core.windows.net", # container@storageaccname
mount_point = "/mnt/bikestore",
extra_configs = configs)

In [None]:
%fs
ls "/mnt/bikestore"

path,name,size,modificationTime
dbfs:/mnt/bikestore/raw-data/,raw-data/,0,1698053609000
dbfs:/mnt/bikestore/transformed-data/,transformed-data/,0,1698053618000


In [None]:
# Read data
brands = spark.read.format("csv").option("header", "true").load("/mnt/bikestore/raw-data/brands.csv")
categories = spark.read.format("csv").option("header", "true").load("/mnt/bikestore/raw-data/categories.csv")
customers = spark.read.format("csv").option("header", "true").load("/mnt/bikestore/raw-data/customers.csv")
ordereditems = spark.read.format("csv").option("header", "true").load("/mnt/bikestore/raw-data/ordereditems.csv")
orders = spark.read.format("csv").option("header", "true").load("/mnt/bikestore/raw-data/orders.csv")
products = spark.read.format("csv").option("header", "true").load("/mnt/bikestore/raw-data/products.csv")

+--------+------------+
|brand_id|  brand_name|
+--------+------------+
|       1|     Electra|
|       2|        Haro|
|       3|      Heller|
|       4| Pure Cycles|
|       5|     Ritchey|
|       6|     Strider|
|       7|Sun Bicycles|
|       8|       Surly|
|       9|        Trek|
+--------+------------+



In [None]:
# Investigate brands table schema
brands.printSchema()
brands.show()

root
 |-- brand_id: integer (nullable = true)
 |-- brand_name: string (nullable = true)

+--------+------------+
|brand_id|  brand_name|
+--------+------------+
|       1|     Electra|
|       2|        Haro|
|       3|      Heller|
|       4| Pure Cycles|
|       5|     Ritchey|
|       6|     Strider|
|       7|Sun Bicycles|
|       8|       Surly|
|       9|        Trek|
+--------+------------+



In [None]:
# Convert brand_id from str to int
brands = brands.withColumn("brand_id", col("brand_id").cast(IntegerType()))

In [None]:
# Investigate categories table schema
categories.printSchema()
categories.show()

root
 |-- category_id: integer (nullable = true)
 |-- category_name: string (nullable = true)

+-----------+-------------------+
|category_id|      category_name|
+-----------+-------------------+
|          1|  Children Bicycles|
|          2|   Comfort Bicycles|
|          3|  Cruisers Bicycles|
|          4|Cyclocross Bicycles|
|          5|     Electric Bikes|
|          6|     Mountain Bikes|
|          7|         Road Bikes|
+-----------+-------------------+



In [None]:
# Convert category_id from str to int
categories = categories.withColumn("category_id", col("category_id").cast(IntegerType()))

In [None]:
# Investigate customers table schema
customers.printSchema()
customers.show()

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- email: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip_code: integer (nullable = true)

+-----------+----------+---------+--------------+--------------------+--------------------+---------------+-----+--------+
|customer_id|first_name|last_name|         phone|               email|              street|           city|state|zip_code|
+-----------+----------+---------+--------------+--------------------+--------------------+---------------+-----+--------+
|          1|     Debra|    Burks|          NULL|debra.burks@yahoo...|   9273 Thorne Ave. |   Orchard Park|   NY|   14127|
|          2|     Kasha|     Todd|          NULL|kasha.todd@yahoo.com|    910 Vine Street |       Campbell|   CA|   95008|
|          3|    Tameka|  

In [None]:
# Convert customer_id from str to int
# Convert zip_code from str to int
customers = (customers
             .withColumn("customer_id", col("customer_id").cast(IntegerType()))
             .withColumn("zip_code", col("zip_code").cast(IntegerType())))

In [None]:
# Investigate ordereditems table schema
ordereditems.printSchema()
ordereditems.show()

root
 |-- order_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- list_price: float (nullable = true)
 |-- discount: float (nullable = true)

+--------+-------+----------+--------+----------+--------+
|order_id|item_id|product_id|quantity|list_price|discount|
+--------+-------+----------+--------+----------+--------+
|       1|      1|        20|       1|    599.99|     0.2|
|       1|      2|         8|       2|   1799.99|    0.07|
|       1|      3|        10|       2|    1549.0|    0.05|
|       1|      4|        16|       2|    599.99|    0.05|
|       1|      5|         4|       1|   2899.99|     0.2|
|       2|      1|        20|       1|    599.99|    0.07|
|       2|      2|        16|       2|    599.99|    0.05|
|       3|      1|         3|       1|    999.99|    0.05|
|       3|      2|        20|       1|    599.99|    0.05|
|       4|      1|         2|       2|   

In [None]:
# Convert order_id from str to int
# Convert item_id from str to int
# Convert product_id from str to int
# Convert quantity from str to int
# Convert list_price from str to float
# Convert discount from str to float
ordereditems = (ordereditems
             .withColumn("order_id", col("order_id").cast(IntegerType()))
             .withColumn("item_id", col("item_id").cast(IntegerType()))
             .withColumn("product_id", col("product_id").cast(IntegerType()))
             .withColumn("quantity", col("quantity").cast(IntegerType()))
             .withColumn("list_price", col("list_price").cast(FloatType()))
             .withColumn("discount", col("discount").cast(FloatType())))


In [None]:
# Investigate orders table schema
orders.printSchema()
orders.show()

root
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_status: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- required_date: date (nullable = true)
 |-- shipped_date: date (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)

+--------+-----------+------------+----------+-------------+------------+--------+--------+
|order_id|customer_id|order_status|order_date|required_date|shipped_date|store_id|staff_id|
+--------+-----------+------------+----------+-------------+------------+--------+--------+
|       1|        259|           4|2016-01-01|   2016-01-03|  2016-01-03|       1|       2|
|       2|       1212|           4|2016-01-01|   2016-01-04|  2016-01-03|       2|       6|
|       3|        523|           4|2016-01-02|   2016-01-05|  2016-01-03|       2|       7|
|       4|        175|           4|2016-01-03|   2016-01-04|  2016-01-05|       1|       3|
|       5|   

In [None]:
# Convert order_id from str to int
# Convert customer_id from str to int
# Convert order_status from str to int
# Convert order_date from str to date
# Convert required_date from str to date
# Convert shipped_date from str to date
# Convert store_id from str to int
# Convert staff_id from str to int
orders = (orders
             .withColumn("order_id", col("order_id").cast(IntegerType()))
             .withColumn("customer_id", col("customer_id").cast(IntegerType()))
             .withColumn("order_status", col("order_status").cast(IntegerType()))
             .withColumn("order_date", col("order_date").cast(DateType()))
             .withColumn("required_date", col("required_date").cast(DateType()))
             .withColumn("shipped_date", col("shipped_date").cast(DateType()))
             .withColumn("store_id", col("store_id").cast(IntegerType()))
             .withColumn("staff_id", col("staff_id").cast(IntegerType())))


In [None]:
# Investigate products table schema
products.printSchema()
products.show()

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- brand_id: integer (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- model_year: integer (nullable = true)
 |-- list_price: float (nullable = true)

+----------+--------------------+--------+-----------+----------+----------+
|product_id|        product_name|brand_id|category_id|model_year|list_price|
+----------+--------------------+--------+-----------+----------+----------+
|         1|     Trek 820 - 2016|       9|          6|      2016|    379.99|
|         2|Ritchey Timberwol...|       5|          6|      2016|    749.99|
|         3|Surly Wednesday F...|       8|          6|      2016|    999.99|
|         4|Trek Fuel EX 8 29...|       9|          6|      2016|   2899.99|
|         5|Heller Shagamaw F...|       3|          6|      2016|   1320.99|
|         6|Surly Ice Cream T...|       8|          6|      2016|    469.99|
|         7|Trek Slash 8 27.5...|       9|   

In [None]:
# Convert product_id from str to int
# Convert brand_id from str to int
# Convert category_id from str to int
# Convert model_year from str to int
# Convert list_price from str to float
products = (products
             .withColumn("product_id", col("product_id").cast(IntegerType()))
             .withColumn("brand_id", col("brand_id").cast(IntegerType()))
             .withColumn("category_id", col("category_id").cast(IntegerType()))
             .withColumn("model_year", col("model_year").cast(IntegerType()))
             .withColumn("list_price", col("list_price").cast(FloatType())))

In [None]:
# Save transformed data
brands.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/bikestore/transformed-data/brands.csv")
categories.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/bikestore/transformed-data/categories.csv")
customers.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/bikestore/transformed-data/customers.csv")
ordereditems.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/bikestore/transformed-data/ordereditems.csv")
orders.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/bikestore/transformed-data/orders.csv")
products.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/bikestore/transformed-data/products.csv")