##Bronze to Silver

Please refer to the image "bronze to silver" to visualize the changes we need to make

Orders dataframe

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
# Reading the orders csv file
# Initially assigning the order_datetime column as a string because it is not in the correct timestamp format
# Note in all instances when defining the schema are nullable


orders_path = "dbfs:/FileStore/tables/medallion_customer_orders/orders.csv"

order_schema= StructType ( [  
                            StructField ("Order_ID", IntegerType(), False ),
                            StructField("ORDER_DATETIME", StringType(), False),
                            StructField("CUSTOMER_ID", IntegerType(), False),
                            StructField("ORDER_STATUS", StringType(), False),
                            StructField("STORE_ID", IntegerType(), False)

] )

orders= spark.read.csv("dbfs:/FileStore/tables/medallion_customer_orders/bronze/orders.csv", header=True, schema=order_schema)

In [0]:
# Converting the order_datetime column to a timestamp and aliasing the name as 'order_timestamp' 

orders = orders.select('ORDER_ID', to_timestamp(orders['order_datetime'], "dd-MMM-yy kk.mm.ss.SS").alias('ORDER_TIMESTAMP'), 'CUSTOMER_ID', 'ORDER_STATUS', 'STORE_ID')

In [0]:
# Confirming the data types
orders.printSchema()

root
 |-- ORDER_ID: integer (nullable = true)
 |-- ORDER_TIMESTAMP: timestamp (nullable = true)
 |-- CUSTOMER_ID: integer (nullable = true)
 |-- ORDER_STATUS: string (nullable = true)
 |-- STORE_ID: integer (nullable = true)



In [0]:
# filtering the records to display only 'COMPLETE' orders
# assigning the result back to the orders dataframe

orders= orders.filter( orders["ORDER_STATUS"]=='COMPLETE' )

Stores parquet

In [0]:
# Reading the stores csv file

stores_schema = StructType([
                    StructField("STORE_ID", IntegerType(), False),
                    StructField("STORE_NAME", StringType(), False),
                    StructField("WEB_ADDRESS", StringType(), False),
                    StructField("LATITUDE", DoubleType(), False),
                    StructField("LONGITUDE", DoubleType(), False)
                    ]
                    )

stores=spark.read.csv("dbfs:/FileStore/tables/medallion_customer_orders/bronze/stores.csv", header=True, schema=stores_schema)

Create Join of orders and stores parquet

In [0]:
# joining the orders and stores via a 'left' join, the orders table is the left table.
# this operation adds the store_name to the orders dataframe
# the final operation is a select method to select only the required columns and assign it back to the orders dataframe

orders=orders.join( stores, orders['store_id']==stores['store_id'], 'left'  ).select('ORDER_ID', 'ORDER_TIMESTAMP', 'CUSTOMER_ID', 'STORE_NAME')

In [0]:
##display new dataframe of orders

display(orders)

ORDER_ID,ORDER_TIMESTAMP,CUSTOMER_ID,STORE_NAME
447,2022-01-06T09:35:42Z,355,Online
448,2022-01-06T10:23:14Z,155,Online
449,2022-01-06T01:21:54Z,242,Online
450,2022-01-06T05:57:04Z,49,Online
451,2022-01-06T10:39:07Z,204,Online
452,2022-01-07T01:11:46Z,216,Online
453,2022-01-07T06:53:06Z,4,New York City
454,2022-01-07T03:55:15Z,388,Online
455,2022-01-07T06:38:38Z,291,Online
456,2022-01-08T12:52:12Z,272,Online


In [0]:
# writing the parquet file
orders.write.parquet("/FileStore/tables/medallion_customer_orders/silver/orders/", mode="overwrite")

ORDER_ITEMS parquet

In [0]:
order_items_schema = StructType([
                    StructField("ORDER_ID", IntegerType(), False),
                    StructField("LINE_ITEM_ID", IntegerType(), False),
                    StructField("PRODUCT_ID", IntegerType(), False),
                    StructField("UNIT_PRICE", DoubleType(), False),
                    StructField("QUANTITY", IntegerType(), False)
                    ]
                    )

order_items=spark.read.csv("dbfs:/FileStore/tables/medallion_customer_orders/bronze/order_items.csv", header=True, schema=order_items_schema)

In [0]:
# selecting only the required columns and assigning this back to the order_items variable, we DROP the column we dont need
order_items = order_items.drop('LINE_ITEM_ID')

In [0]:
# writing the parquet file
order_items.write.parquet("/FileStore/tables/medallion_customer_orders/silver/order_items/", mode="overwrite")

Products 

In [0]:
# Reading the products csv file
products_schema = StructType([
                    StructField("PRODUCT_ID", IntegerType(), False),
                    StructField("PRODUCT_NAME", StringType(), False),
                    StructField("UNIT_PRICE", DoubleType(), False)
                    ]
                    )

products=spark.read.csv("dbfs:/FileStore/tables/medallion_customer_orders/bronze/products.csv", header=True, schema=products_schema)

In [0]:
# writing the parquet file
products.write.parquet("/FileStore/tables/medallion_customer_orders/silver/products/", mode="overwrite")

Customers parquet

In [0]:
# Reading the customers csv file

customers_schema = StructType([
                    StructField("CUSTOMER_ID", IntegerType(), False),
                    StructField("FULL_NAME", StringType(), False),
                    StructField("EMAIL_ADDRESS", StringType(), False)
                    ]
                    )

customers=spark.read.csv("dbfs:/FileStore/tables/medallion_customer_orders/bronze/customers.csv", header=True, schema=customers_schema)

In [0]:
# writing the parquet file
customers.write.parquet('/FileStore/tables/medallion_customer_orders/silver/customers/', mode='overwrite')