In [0]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

orders_path = 'dbfs:/FileStore/tables/bronze/orders.csv'
customers_path = 'dbfs:/FileStore/tables/bronze/customers.csv'
store_path = 'dbfs:/FileStore/tables/bronze/stores.csv'
order_items_path = 'dbfs:/FileStore/tables/bronze/order_items.csv'
products_path = 'dbfs:/FileStore/tables/bronze/products.csv'

In [0]:

from pyspark.sql.types import IntegerType, StringType, StructField, StructType

orders_schema = StructType([
    StructField("ORDER_ID", IntegerType(), False),
    StructField("ORDER_DATETIME", StringType(), False),
    StructField("CUSTOMER_ID", IntegerType(), False),
    StructField("ORDER_STATUS", StringType(), False),
    StructField("STORE_ID", IntegerType(), False)
])

store_schema = StructType([
    StructField("STORE_ID", IntegerType(), False),
    StructField("STORE_NAME", StringType(), False),
    # StructField("ORDER_STATUS", StringType(), False),
    # StructField("STORE_ID",)
])

In [0]:
orders_df = spark.read.csv(path=orders_path, schema=orders_schema, header=True)

store_df = spark.read.csv(path=store_path, schema=store_schema, header=True)

In [0]:
orders_df.display()

In [0]:
store_df.display() 

In [0]:
orders_silver = orders_df.join(store_df, orders_df['STORE_ID'] == store_df['STORE_ID'])

In [0]:
orders_silver.display()

In [0]:
from pyspark.sql.functions import to_timestamp


selected_orders = orders_silver.select(orders_silver['ORDER_ID'], to_timestamp(orders_silver['ORDER_DATETIME'], 'dd-MMM-yy HH.mm.ss.00').alias('ORDER_TIMESTAMP'), orders_silver['CUSTOMER_ID'], orders_silver['STORE_NAME']).where(orders_silver['ORDER_STATUS'] == 'COMPLETE')

In [0]:
selected_orders.display()

In [0]:
selected_orders.write.parquet(path='dbfs:/FileStore/tables/silver/orders', mode="overwrite")

In [0]:
spark.read.parquet('dbfs:/FileStore/tables/silver/orders', header=True).display()


In [0]:

from pyspark.sql.types import IntegerType, StringType, StructField, StructType

customers_schema = StructType([
    StructField("CUSTOMER_ID", IntegerType(), False),
    StructField("FULL_NAME", StringType(), False),
    StructField("EMAIL_ADDRESS", StringType(), False),
])


In [0]:
customers_df = spark.read.csv(path=customers_path, schema=customers_schema, header=True)

In [0]:
customers_df.display()

In [0]:
customers_df.write.parquet(path='dbfs:/FileStore/tables/silver/customers', mode="overwrite")

In [0]:

from pyspark.sql.types import IntegerType, StringType, StructField, StructType, DoubleType

product_schema = StructType([
    StructField("PRODUCT_ID", IntegerType(), False),
    StructField("PRODUCT_NAME", StringType(), False),
    StructField("UNIT_PRICE", DoubleType(), False),
])

In [0]:
products_df = spark.read.csv(path=products_path, schema=product_schema, header=True)

In [0]:
products_df.display()

In [0]:
products_df.write.parquet(path='dbfs:/FileStore/tables/silver/products', mode='overwrite')

In [0]:

from pyspark.sql.types import IntegerType, StringType, StructField, StructType, DoubleType

order_items_schema = StructType([
    StructField("ORDER_ID", IntegerType(), False),
    StructField("LINE_ITEM_ID", IntegerType(), False),
    StructField("PRODUCT_ID", IntegerType(), False),
    StructField("UNIT_PRICE", DoubleType(), False),
    StructField("QUANTITY", IntegerType(), False),
])

In [0]:
order_items_df = spark.read.csv(path=order_items_path, schema=order_items_schema, header="True")

In [0]:
order_items_df.display()

In [0]:
ordered_items = order_items_df.select(order_items_df['ORDER_ID'], order_items_df['PRODUCT_ID'], order_items_df['UNIT_PRICE'], order_items_df['QUANTITY'])

In [0]:
ordered_items.display()

In [0]:
ordered_items.write.parquet(path='dbfs:/FileStore/tables/silver/order_items', mode='overwrite')