In [0]:
from pyspark.sql.types import IntegerType,StringType,DoubleType,StructField,StructType

In [0]:
orders_path="/FileStore/tBronze/orders.csv"

In [0]:
orders_schema=StructType([
    StructField("ORDER_ID",IntegerType(),False),
    StructField("ORDER_DATETIME",StringType(),False),
    StructField("CUSTOMER_ID",IntegerType(),False),
    StructField("ORDER_STATUS",StringType(),False),
    StructField("STORE_ID",IntegerType(),False)
])

In [0]:
orders=spark.read.csv(path=orders_path,header=True,schema=orders_schema)

In [0]:
display(orders)

ORDER_ID,ORDER_DATETIME,CUSTOMER_ID,ORDER_STATUS,STORE_ID
447,06-JAN-22 09.35.42.00,355,COMPLETE,1
448,06-JAN-22 10.23.14.00,155,COMPLETE,1
449,06-JAN-22 01.21.54.00,242,COMPLETE,1
450,06-JAN-22 05.57.04.00,49,COMPLETE,1
451,06-JAN-22 10.39.07.00,204,COMPLETE,1
452,07-JAN-22 01.11.46.00,216,COMPLETE,1
453,07-JAN-22 06.53.06.00,4,COMPLETE,4
454,07-JAN-22 03.55.15.00,388,COMPLETE,1
455,07-JAN-22 06.38.38.00,291,COMPLETE,1
456,08-JAN-22 12.52.12.00,272,COMPLETE,1


In [0]:
from pyspark.sql.functions import to_timestamp

In [0]:
orders = orders.select('ORDER_ID', 
                       to_timestamp(orders['order_datetime'], "dd-MM-yy kk.mm.ss.SS").alias("ORDER_TIMESTAMP"),
                       'CUSTOMER_ID', 'ORDER_STATUS','STORE_ID')

In [0]:
orders.dtypes

In [0]:
orders.display()

ORDER_ID,ORDER_TIMESTAMP,CUSTOMER_ID,ORDER_STATUS,STORE_ID
447,,355,COMPLETE,1
448,,155,COMPLETE,1
449,,242,COMPLETE,1
450,,49,COMPLETE,1
451,,204,COMPLETE,1
452,,216,COMPLETE,1
453,,4,COMPLETE,4
454,,388,COMPLETE,1
455,,291,COMPLETE,1
456,,272,COMPLETE,1


In [0]:
orders=orders.filter(orders['order_status']=="COMPLETE")

In [0]:
##Reading storecsv
stores_path="/FileStore/tBronze/stores.csv"
store_schema=StructType([
    StructField("STORE_ID",IntegerType(),False),
    StructField("STORE_NAME",StringType(),False),
    StructField("WEB_ADDRESS",StringType(),False),
    StructField("LATITUDE",DoubleType(),False),
    StructField("LONGITUDE",DoubleType(),False)
])
stores=spark.read.csv(path=stores_path,header=True,schema=store_schema)

In [0]:
stores.display()

STORE_ID,STORE_NAME,WEB_ADDRESS,LATITUDE,LONGITUDE
1,Online,https://www.example.com,,
2,San Francisco,,37.529395,-122.267237
3,Seattle,,47.6053,-122.33221
4,New York City,,40.745216,-73.980518
5,Chicago,,41.878751,-87.636675
6,London,,51.519281,-0.087296
7,Bucharest,,44.43225,26.10626
8,Berlin,,52.5161,13.3873
9,Utrecht,,52.103263,5.061644
10,Madrid,,40.4929,-3.8737


In [0]:
orders=orders.join(stores,orders['store_id']==stores['store_id'],'left').select ('ORDER_ID','ORDER_TIMESTAMP','CUSTOMER_ID','STORE_NAME')

In [0]:
orders.display()

ORDER_ID,ORDER_TIMESTAMP,CUSTOMER_ID,STORE_NAME
447,,355,Online
448,,155,Online
449,,242,Online
450,,49,Online
451,,204,Online
452,,216,Online
453,,4,New York City
454,,388,Online
455,,291,Online
456,,272,Online


In [0]:
##writing the orders dataframe as a paraquet file in the silver layer, should us mode='overwrite' in this instance
orders.write.parquet("/FileStore/silver/orders",mode='overwrite')

In [0]:
order_items_path="/FileStore/tBronze/order_items.csv"
order_items_schema = StructType([
                    StructField("ORDER_ID", IntegerType(), False),
                    StructField("LINE_ITEM_ID", IntegerType(), False),
                    StructField("PRODUCT_ID", IntegerType(), False),
                    StructField("UNIT_PRICE", DoubleType(), False),
                    StructField("QUANTITY", IntegerType(), False)
                    ]
                    )
order_items=spark.read.csv(path=order_items_path,header=True,schema=order_items_schema)

In [0]:
order_items.display()

ORDER_ID,LINE_ITEM_ID,PRODUCT_ID,UNIT_PRICE,QUANTITY
334,1,26,48.75,1
334,2,46,39.16,4
334,3,12,10.48,4
335,1,32,5.65,2
336,1,2,29.55,5
336,2,20,28.21,5
337,1,32,5.65,4
337,2,29,24.71,4
337,3,45,31.68,3
338,1,35,7.18,2


In [0]:
order_items=order_items.drop('LINE_ITEM_ID')

In [0]:
order_items.display()

ORDER_ID,PRODUCT_ID,UNIT_PRICE,QUANTITY
334,26,48.75,1
334,46,39.16,4
334,12,10.48,4
335,32,5.65,2
336,2,29.55,5
336,20,28.21,5
337,32,5.65,4
337,29,24.71,4
337,45,31.68,3
338,35,7.18,2


In [0]:
order_items.write.parquet("/FileStore/silver/order_items",mode='overwrite')

In [0]:
products_path = "/FileStore/tBronze/products.csv"

products_schema = StructType([
                    StructField("PRODUCT_ID", IntegerType(), False),
                    StructField("PRODUCT_NAME", StringType(), False),
                    StructField("UNIT_PRICE", DoubleType(), False)
                    ]
                    )

products=spark.read.csv(path=products_path, header=True, schema=products_schema)

In [0]:
products.display()

PRODUCT_ID,PRODUCT_NAME,UNIT_PRICE
16,Women's Socks (Grey),39.89
17,Women's Sweater (Brown),24.46
18,Women's Jacket (Black),14.34
19,Men's Coat (Red),28.21
20,Girl's Shorts (Green),38.34
21,Girl's Pyjamas (White),39.78
22,Men's Shorts (Black),10.33
23,Men's Pyjamas (Blue),48.39
24,Boy's Sweater (Red),9.8
25,Girl's Jeans (Grey),48.75


In [0]:
products.write.parquet('/FileStore/silver/products',mode='overwrite')

In [0]:
# Reading the customers csv file
customers_path = "/FileStore/tBronze/customers.csv"

customers_schema = StructType([
                    StructField("CUSTOMER_ID", IntegerType(), False),
                    StructField("FULL_NAME", StringType(), False),
                    StructField("EMAIL_ADDRESS", StringType(), False)
                    ]
                    )

customers=spark.read.csv(path=customers_path, header=True, schema=customers_schema)

In [0]:
customers.display()

CUSTOMER_ID,FULL_NAME,EMAIL_ADDRESS
286,Wilfred Welch,wilfred.welch@internalmail
287,Kristina Nunez,kristina.nunez@internalmail
288,Mable Ballard,mable.ballard@internalmail
289,Diane Wilkerson,diane.wilkerson@internalmail
290,Sheryl Banks,sheryl.banks@internalmail
291,Opal Cruz,opal.cruz@internalmail
292,Dale Hughes,dale.hughes@internalmail
293,Diana Fowler,diana.fowler@internalmail
294,Travis Schwartz,travis.schwartz@internalmail
295,Anthony Boone,anthony.boone@internalmail


In [0]:
customers.write.parquet('/FileStore/silver/customers',mode='overwrite')