###ORDERS table:

ORDER_ID integer

ORDER_TIMESTAMP - timestamp

CUSTOMER_ID - integer

STORE_NAME - string 

Filter records where ORDER_TYPE='COMPLETE'

###ORDER_ITEMS table 

ORDER_ID - integer 

PRODUCT_ID - integer 

UNIT_PRICE - double 

QUANTITY - integer 


###PRODUCTS table 

PRODUCT_ID - integer 

PRODUCT_NAME - string

UNIT_PRICE - double 

###CUSTOMERS table 

CUSTOMER_ID - integer 

FULL_NAME - string 

EMAIL_ADDRESS - string

In [0]:
from pyspark.sql.functions import * 

In [0]:
# filepaths
customers_path = "/FileStore/tables/bronze/customers.csv" 
order_items_path = "/FileStore/tables/bronze/order_items.csv" 
orders_path = "/FileStore/tables/bronze/orders.csv" 
products_path = "/FileStore/tables/bronze/products.csv" 
stores_path = "/FileStore/tables/bronze/stores.csv"

In [0]:
from pyspark.sql.types import IntegerType, StringType, DoubleType, StructField, StructType 

orders_schema = StructType([
    StructField("ORDER_ID", IntegerType(), False),
    StructField("ORDER_DATETIME", StringType(), False),
    StructField("CUSTOMER_ID", IntegerType(), False),
    StructField("ORDER_STATUS", StringType(), False),
    StructField("STORE_ID", IntegerType(), False)
])

order_items_schema = StructType([
    StructField("ORDER_ID", IntegerType(), False),
    StructField("LINE_ITEM_ID", IntegerType(), False),
    StructField("PRODUCT_ID", IntegerType(), False),
    StructField("UNIT_PRICE", DoubleType(), False),
    StructField("QUANTITY", IntegerType(), False)
])

products_schema = StructType([
    StructField("PRODUCT_ID", IntegerType(), False),
    StructField("PRODUCT_NAME", StringType(), False),
    StructField("UNIT_PRICE", DoubleType(), False)
])

customers_schema = StructType([
    StructField("CUSTOMER_ID", IntegerType(), False),
    StructField("FULL_NAME", StringType(), False),
    StructField("EMAIL_ADDRESS", StringType(), False)
])

stores_schema = StructType([
    StructField("STORE_ID", IntegerType(), False),
    StructField("STORE_NAME", StringType(), False),
    StructField("WEB_ADDRESS", StringType(), False),
    StructField("LATITUDE", DoubleType(), False),
    StructField("LONGITUDE", DoubleType(), False)
])

In [0]:
customers_df = spark.read.csv(customers_path, header=True, schema=customers_schema)
customers_df.display()

CUSTOMER_ID,FULL_NAME,EMAIL_ADDRESS
286,Wilfred Welch,wilfred.welch@internalmail
287,Kristina Nunez,kristina.nunez@internalmail
288,Mable Ballard,mable.ballard@internalmail
289,Diane Wilkerson,diane.wilkerson@internalmail
290,Sheryl Banks,sheryl.banks@internalmail
291,Opal Cruz,opal.cruz@internalmail
292,Dale Hughes,dale.hughes@internalmail
293,Diana Fowler,diana.fowler@internalmail
294,Travis Schwartz,travis.schwartz@internalmail
295,Anthony Boone,anthony.boone@internalmail


In [0]:
order_items_df = spark.read.csv(order_items_path, header=True, schema=order_items_schema)
order_items_df.display()

ORDER_ID,LINE_ITEM_ID,PRODUCT_ID,UNIT_PRICE,QUANTITY
334,1,26,48.75,1
334,2,46,39.16,4
334,3,12,10.48,4
335,1,32,5.65,2
336,1,2,29.55,5
336,2,20,28.21,5
337,1,32,5.65,4
337,2,29,24.71,4
337,3,45,31.68,3
338,1,35,7.18,2


In [0]:
orders_df = spark.read.csv(orders_path, header=True, schema=orders_schema)
orders_df.display()

ORDER_ID,ORDER_DATETIME,CUSTOMER_ID,ORDER_STATUS,STORE_ID
447,06-JAN-22 09.35.42.00,355,COMPLETE,1
448,06-JAN-22 10.23.14.00,155,COMPLETE,1
449,06-JAN-22 01.21.54.00,242,COMPLETE,1
450,06-JAN-22 05.57.04.00,49,COMPLETE,1
451,06-JAN-22 10.39.07.00,204,COMPLETE,1
452,07-JAN-22 01.11.46.00,216,COMPLETE,1
453,07-JAN-22 06.53.06.00,4,COMPLETE,4
454,07-JAN-22 03.55.15.00,388,COMPLETE,1
455,07-JAN-22 06.38.38.00,291,COMPLETE,1
456,08-JAN-22 12.52.12.00,272,COMPLETE,1


In [0]:
orders_df.dtypes # timestamp is string type currently

In [0]:
products_df = spark.read.csv(products_path, header=True, schema=products_schema)
products_df.display()

PRODUCT_ID,PRODUCT_NAME,UNIT_PRICE
16,Women's Socks (Grey),39.89
17,Women's Sweater (Brown),24.46
18,Women's Jacket (Black),14.34
19,Men's Coat (Red),28.21
20,Girl's Shorts (Green),38.34
21,Girl's Pyjamas (White),39.78
22,Men's Shorts (Black),10.33
23,Men's Pyjamas (Blue),48.39
24,Boy's Sweater (Red),9.8
25,Girl's Jeans (Grey),48.75


In [0]:
stores_df = spark.read.csv(stores_path, header=True, schema=stores_schema)
stores_df.display()

STORE_ID,STORE_NAME,WEB_ADDRESS,LATITUDE,LONGITUDE
1,Online,https://www.example.com,,
2,San Francisco,,37.529395,-122.267237
3,Seattle,,47.6053,-122.33221
4,New York City,,40.745216,-73.980518
5,Chicago,,41.878751,-87.636675
6,London,,51.519281,-0.087296
7,Bucharest,,44.43225,26.10626
8,Berlin,,52.5161,13.3873
9,Utrecht,,52.103263,5.061644
10,Madrid,,40.4929,-3.8737


In [0]:
stores_df.dtypes

In [0]:
# Change string date to timestamp date
# orders_df.withColumn('ORDER_TIMESTAMP', to_timestamp(orders_df['ORDER_DATETIME'], 'dd-MM-yy HH:mm:ss'))

orders_df_timestamp = orders_df.select(to_timestamp(orders_df['ORDER_DATETIME'], 'dd-MMM-yy HH.mm.ss.SS').alias('ORDER_TIMESTAMP'))

orders_df_timestamp

In [0]:
orders_df_timestamp.display()

ORDER_TIMESTAMP
2022-01-06T09:35:42.000+0000
2022-01-06T10:23:14.000+0000
2022-01-06T01:21:54.000+0000
2022-01-06T05:57:04.000+0000
2022-01-06T10:39:07.000+0000
2022-01-07T01:11:46.000+0000
2022-01-07T06:53:06.000+0000
2022-01-07T03:55:15.000+0000
2022-01-07T06:38:38.000+0000
2022-01-08T12:52:12.000+0000


In [0]:
orders_df.display()

ORDER_ID,ORDER_DATETIME,CUSTOMER_ID,ORDER_STATUS,STORE_ID
447,06-JAN-22 09.35.42.00,355,COMPLETE,1
448,06-JAN-22 10.23.14.00,155,COMPLETE,1
449,06-JAN-22 01.21.54.00,242,COMPLETE,1
450,06-JAN-22 05.57.04.00,49,COMPLETE,1
451,06-JAN-22 10.39.07.00,204,COMPLETE,1
452,07-JAN-22 01.11.46.00,216,COMPLETE,1
453,07-JAN-22 06.53.06.00,4,COMPLETE,4
454,07-JAN-22 03.55.15.00,388,COMPLETE,1
455,07-JAN-22 06.38.38.00,291,COMPLETE,1
456,08-JAN-22 12.52.12.00,272,COMPLETE,1


In [0]:
new_orders_df = orders_df.withColumn('ORDER_TIMESTAMP', to_timestamp(orders_df.ORDER_DATETIME, 'dd-MMM-yy HH.mm.ss.SS'))
new_orders_df.display()

ORDER_ID,ORDER_DATETIME,CUSTOMER_ID,ORDER_STATUS,STORE_ID,ORDER_TIMESTAMP
447,06-JAN-22 09.35.42.00,355,COMPLETE,1,2022-01-06T09:35:42.000+0000
448,06-JAN-22 10.23.14.00,155,COMPLETE,1,2022-01-06T10:23:14.000+0000
449,06-JAN-22 01.21.54.00,242,COMPLETE,1,2022-01-06T01:21:54.000+0000
450,06-JAN-22 05.57.04.00,49,COMPLETE,1,2022-01-06T05:57:04.000+0000
451,06-JAN-22 10.39.07.00,204,COMPLETE,1,2022-01-06T10:39:07.000+0000
452,07-JAN-22 01.11.46.00,216,COMPLETE,1,2022-01-07T01:11:46.000+0000
453,07-JAN-22 06.53.06.00,4,COMPLETE,4,2022-01-07T06:53:06.000+0000
454,07-JAN-22 03.55.15.00,388,COMPLETE,1,2022-01-07T03:55:15.000+0000
455,07-JAN-22 06.38.38.00,291,COMPLETE,1,2022-01-07T06:38:38.000+0000
456,08-JAN-22 12.52.12.00,272,COMPLETE,1,2022-01-08T12:52:12.000+0000


In [0]:
new_orders_df = new_orders_df.drop('ORDER_DATETIME')
new_orders_df.dtypes

In [0]:
[str(col) for col in new_orders_df.columns]

In [0]:
new_orders_df.join(stores_df, new_orders_df.STORE_ID==stores_df.STORE_ID, 'left').display()

ORDER_ID,CUSTOMER_ID,ORDER_STATUS,STORE_ID,ORDER_TIMESTAMP,STORE_ID.1,STORE_NAME,WEB_ADDRESS,LATITUDE,LONGITUDE
447,355,COMPLETE,1,2022-01-06T09:35:42.000+0000,1,Online,https://www.example.com,,
448,155,COMPLETE,1,2022-01-06T10:23:14.000+0000,1,Online,https://www.example.com,,
449,242,COMPLETE,1,2022-01-06T01:21:54.000+0000,1,Online,https://www.example.com,,
450,49,COMPLETE,1,2022-01-06T05:57:04.000+0000,1,Online,https://www.example.com,,
451,204,COMPLETE,1,2022-01-06T10:39:07.000+0000,1,Online,https://www.example.com,,
452,216,COMPLETE,1,2022-01-07T01:11:46.000+0000,1,Online,https://www.example.com,,
453,4,COMPLETE,4,2022-01-07T06:53:06.000+0000,4,New York City,,40.745216,-73.980518
454,388,COMPLETE,1,2022-01-07T03:55:15.000+0000,1,Online,https://www.example.com,,
455,291,COMPLETE,1,2022-01-07T06:38:38.000+0000,1,Online,https://www.example.com,,
456,272,COMPLETE,1,2022-01-08T12:52:12.000+0000,1,Online,https://www.example.com,,


In [0]:
new_orders_df[[col for col in new_orders_df.columns]]

In [0]:
# Join orders_df and stores_df where STORE_ID is equal
new_orders_df = new_orders_df.join(stores_df, new_orders_df.STORE_ID==stores_df.STORE_ID, 'left'). \
select(new_orders_df['ORDER_ID'], new_orders_df['CUSTOMER_ID'], new_orders_df['ORDER_STATUS'], new_orders_df['ORDER_TIMESTAMP'], stores_df['STORE_NAME']). \
filter(new_orders_df['ORDER_STATUS']=='COMPLETE')
new_orders_df.display()

ORDER_ID,CUSTOMER_ID,ORDER_STATUS,ORDER_TIMESTAMP,STORE_NAME
447,355,COMPLETE,2022-01-06T09:35:42.000+0000,Online
448,155,COMPLETE,2022-01-06T10:23:14.000+0000,Online
449,242,COMPLETE,2022-01-06T01:21:54.000+0000,Online
450,49,COMPLETE,2022-01-06T05:57:04.000+0000,Online
451,204,COMPLETE,2022-01-06T10:39:07.000+0000,Online
452,216,COMPLETE,2022-01-07T01:11:46.000+0000,Online
453,4,COMPLETE,2022-01-07T06:53:06.000+0000,New York City
454,388,COMPLETE,2022-01-07T03:55:15.000+0000,Online
455,291,COMPLETE,2022-01-07T06:38:38.000+0000,Online
456,272,COMPLETE,2022-01-08T12:52:12.000+0000,Online


In [0]:
# Don't need order status column
new_orders_df = new_orders_df.drop('ORDER_STATUS')
new_orders_df.display()

ORDER_ID,CUSTOMER_ID,ORDER_TIMESTAMP,STORE_NAME
447,355,2022-01-06T09:35:42.000+0000,Online
448,155,2022-01-06T10:23:14.000+0000,Online
449,242,2022-01-06T01:21:54.000+0000,Online
450,49,2022-01-06T05:57:04.000+0000,Online
451,204,2022-01-06T10:39:07.000+0000,Online
452,216,2022-01-07T01:11:46.000+0000,Online
453,4,2022-01-07T06:53:06.000+0000,New York City
454,388,2022-01-07T03:55:15.000+0000,Online
455,291,2022-01-07T06:38:38.000+0000,Online
456,272,2022-01-08T12:52:12.000+0000,Online


In [0]:
# Join ORDER_ITEMS df and PRODUCTS df 
order_items_df = order_items_df.join(products_df, order_items_df['PRODUCT_ID']==products_df['PRODUCT_ID'], 'left'). \
select(order_items_df.ORDER_ID, order_items_df.PRODUCT_ID, order_items_df.UNIT_PRICE, order_items_df.QUANTITY)
order_items_df.display()

ORDER_ID,PRODUCT_ID,UNIT_PRICE,QUANTITY
334,26,48.75,1
334,46,39.16,4
334,12,10.48,4
335,32,5.65,2
336,2,29.55,5
336,20,28.21,5
337,32,5.65,4
337,29,24.71,4
337,45,31.68,3
338,35,7.18,2


In [0]:
customers_df.display()

CUSTOMER_ID,FULL_NAME,EMAIL_ADDRESS
286,Wilfred Welch,wilfred.welch@internalmail
287,Kristina Nunez,kristina.nunez@internalmail
288,Mable Ballard,mable.ballard@internalmail
289,Diane Wilkerson,diane.wilkerson@internalmail
290,Sheryl Banks,sheryl.banks@internalmail
291,Opal Cruz,opal.cruz@internalmail
292,Dale Hughes,dale.hughes@internalmail
293,Diana Fowler,diana.fowler@internalmail
294,Travis Schwartz,travis.schwartz@internalmail
295,Anthony Boone,anthony.boone@internalmail


In [0]:
# Write files to parquet file format
dbutils.fs.rm("/FileStore/tables/silver/", recurse=True)
path = "/FileStore/tables/silver/"
new_orders_df.write.parquet(path+"orders.parquet")
customers_df.write.parquet(path+"customers.parquet") 
products_df.write.parquet(path+"products.parquet")
order_items_df.write.parquet(path+"order_items.parquet")

In [0]:
dbutils.fs.ls("/FileStore/tables/silver/")