In [144]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local") \
    .getOrCreate()


In [145]:
##Get the schemas

import json

file_path = 'data/retail_db/schemas.json'
schemas = json.load(open(file_path))

In [146]:
## Function to get the columnNames

def get_column_names(schemas, dsName):
    column_details = schemas[dsName]
    sorted_column = sorted(column_details, key= lambda col : col['column_position'])
    return [column['column_name'] for column in sorted_column]

In [147]:
## Define Column Schema

customers_column =get_column_names(schemas,'customers')
order_colum = get_column_names(schemas,'orders')
order_item_column = get_column_names(schemas,'order_items')

In [148]:
### Read the sources

##customer
customerDataset = spark.read.option('header','false').csv('data/retail_db/customers/').toDF(*customers_column)
##orders
ordersDataset = spark.read.option('header','false').csv('data/retail_db/orders/').toDF(*order_colum)
#order_items
orderItemDataset = spark.read.option('header','false').csv('data/retail_db/order_items/').toDF(*order_item_column)


In [149]:
#Process Order Data Set
##Filter 2023 Order and convert the order_date to date.
ordersDataset = ordersDataset.withColumn("year_date", year(to_date("order_date"))) \
                             .filter((col("year_date") == "2013") & (col("order_status") == "COMPLETE")).\
                             select(
                                 col("order_id"),
                                 to_date(col("order_date")).alias("order_date"),
                                 col("order_customer_id"),
                                 col("order_status")
                             )

In [150]:
#Process Customer Dataset
customerDataset = customerDataset.withColumn("customer_name",
                                              concat_ws(" ",col("customer_fname"),col("customer_lname"))).withColumn("address",
                                                                                                                     concat_ws(",",col("customer_street"), col("customer_city"),col("customer_state"), col("customer_zipcode")))\
.select(
    col("customer_id"),
    col("customer_name"),
    col("address")
)

In [151]:
##Process Order Item
orderItemDataset_agg = orderItemDataset.groupBy(col("order_item_order_id"))\
    .agg(
        sum(expr("order_item_subtotal * order_item_quantity")).alias("sub_total")
    ).withColumn("totalPrice",round(col("sub_total"),2))\
        .select(
        col("order_item_order_id"),
        col("totalPrice")
        )



In [152]:
##Create maindataset
##join customer, order 
customerOrder = customerDataset.join(ordersDataset, 
                     customerDataset["customer_id"] == ordersDataset["order_customer_id"], "inner")

## Join customerOrder and orderItemDataset_agg
mainDataset = customerOrder.join(orderItemDataset_agg,
                                  customerOrder["order_id"] == orderItemDataset_agg["order_item_order_id"], "inner")

In [155]:
finalDataSet = mainDataset.groupBy(col("customer_id")
                                   ,col("customer_name")
                                   ,col("address")
                                   ,col("order_status")
                                   )\
.agg(round(sum(col("totalPrice")),2).alias("totalAmount")
     ).select(
        col("customer_id")
        ,col("customer_name")
        ,col("address")
        ,col("order_status")
        ,col("totalAmount")
     )

In [156]:
finalDataSet.show(10,truncate=False)

+-----------+---------------+-------------------------------------------+------------+-----------+
|customer_id|customer_name  |address                                    |order_status|totalAmount|
+-----------+---------------+-------------------------------------------+------------+-----------+
|993        |Mary Davis     |1202 Quaking Embers Link,Fairfield,OH,45014|COMPLETE    |859.88     |
|8716       |Evelyn Smith   |1260 Stony Village,Caguas,PR,00725         |COMPLETE    |2139.86    |
|4750       |Mary Smith     |9406 Old Lagoon Canyon,Martinsburg,WV,25401|COMPLETE    |2759.81    |
|5790       |Mary Young     |4174 Dewy Pioneer Court,Fremont,CA,94536   |COMPLETE    |2169.52    |
|4065       |Mary Mccullough|7284 Hidden Elk Forest,Caguas,PR,00725     |COMPLETE    |1389.72    |
|5830       |Mary Walker    |5012 Tawny Brook Vale,Caguas,PR,00725      |COMPLETE    |199.99     |
|4251       |Mary Serrano   |9714 Quaking Elk Green,Caguas,PR,00725     |COMPLETE    |1189.98    |
|9189     