# Data Ingestion: Landing_to_Staging

## Big Data and Cloud Computing Assignment Group 18
#### Datasets: 
There is a single customer dataset (json file) that needs to be uploaded once.
- customer.json => cust_id, gender, age, full_name, email, customer since, place name, county, city, state, zip, region

There are Orders files (csv file) for each month starting from October 2020 to September 2021. These files arrive every month and have to be loaded monthly.
- orders.csv => index, order_id, order_date, item_id, qty_ordered, price, value, discount_amount, total, category, payment_method, cust_id, final_status


In [None]:
#### Importing packages
from pyspark.sql.types import *
from pyspark.sql.functions import *

## Landing Zone
The files when generated will be land in the landing zone.

In [None]:
%fs ls dbfs:/FileStore/tables/

path,name,size,modificationTime
dbfs:/FileStore/tables/BDCC/,BDCC/,0,0
dbfs:/FileStore/tables/business.json,business.json,55412111,1694586639000
dbfs:/FileStore/tables/customers.json,customers.json,15815082,1722965143000
dbfs:/FileStore/tables/movies.txt,movies.txt,236344,1694583795000
dbfs:/FileStore/tables/orders_01_21.csv,orders_01_21.csv,1578303,1722965281000
dbfs:/FileStore/tables/orders_02_21.csv,orders_02_21.csv,924491,1722965282000
dbfs:/FileStore/tables/orders_03_21.csv,orders_03_21.csv,2522578,1722965273000
dbfs:/FileStore/tables/orders_04_21.csv,orders_04_21.csv,3919934,1722965273000
dbfs:/FileStore/tables/orders_05_21.csv,orders_05_21.csv,1359574,1722965274000
dbfs:/FileStore/tables/orders_06_21.csv,orders_06_21.csv,2588185,1722965276000


In [None]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

## Preparing the Staging Zone


Removing the database if present. This is done avoid error when notebook is run again.

In [None]:
%sql 

-- This cell will remove everything from the staging database 
-- run this command or cell only when you want to run this notebook and create the zone and the tables from scratch.

DROP DATABASE IF EXISTS staging CASCADE;

CREATE DATABASE staging;

Removing the tables if present.

In [None]:
%sql

-- Uncomment and run this cell, if want to drop the table only (not the database)
DROP TABLE IF EXISTS customers;
DROP TABLE IF EXISTS orders;

### Ingest the Customer Data, one time load.
The file has header that do not match the criteria for tables, so also need to rename the header so that we can create the table easily.

In [None]:
# Define the custom shchema
cust_fields = StructType([StructField("City", StringType(), True),
                     StructField("County", StringType(), True),
                     StructField("Customer Since", StringType(), False),
                     StructField("E Mail", StringType(), True), 
                     StructField("Gender", StringType(), False),
                     StructField("Place Name", StringType(), False),
                     StructField("Region", StringType(), True),
                     StructField("State", StringType(), True),
                     StructField("Zip", IntegerType(), False),
                     StructField("age", StringType(), True),
                     StructField("cust_id", StringType(), False),
                     StructField("full_name", StringType(), True)])

cust_df = sqlContext.read.json("/FileStore/tables/customers.json",
                               schema = StructType(cust_fields))

cust_df = (cust_df.withColumnRenamed("Customer Since", "Customer_Since")
                  .withColumnRenamed("E Mail", "E_Mail")
                  .withColumnRenamed("Place Name", "Place_Name")
                  .withColumnRenamed("cust_id", "Cust_Id")
                  .withColumnRenamed("full_name", "Full_Name"))             

In [None]:
cust_df.show(5)

+-----------+-----------+--------------+--------------------+------+-----------+-------+-----+-----+----+-------+-----------------+
|       City|     County|Customer_Since|              E_Mail|Gender| Place_Name| Region|State|  Zip| age|Cust_Id|        Full_Name|
+-----------+-----------+--------------+--------------------+------+-----------+-------+-----+-----+----+-------+-----------------+
|     Vinson|     Harmon|     8/22/2006|jani.titus@gmail.com|     F|     Vinson|  South|   OK|73571|43.0|60124.0|      Titus, Jani|
|     Graham|   Bradford|      2/4/1981| lee.eaker@gmail.com|     M|     Graham|  South|   FL|32042|28.0|42485.0|       Eaker, Lee|
|Grand Forks|Grand Forks|     6/27/2010|jason.simoneau@gm...|     M|Grand Forks|Midwest|   ND|58201|65.0|53620.0|  Simoneau, Jason|
|Laupahoehoe|     Hawaii|      4/3/1992|grover.bayless@ya...|     M|Laupahoehoe|   West|   HI|96764|33.0|56836.0|  Bayless, Grover|
|     Glendo|     Platte|     6/21/2015|albertina.bensen@...|     F|     Gle

#### Reformatting fields to get features to be used later.

In [None]:
cust_df = cust_df.withColumn("Customer_Since", to_date(col("Customer_Since"), "MM/dd/yyyy"))
cust_df = cust_df.withColumn("ZIP", col("Zip").cast("integer"))
cust_df = cust_df.withColumn("Age", col("age").cast("integer"))
cust_df = cust_df.withColumn("Cust_Id", col("Cust_Id").cast("integer"))

cust_df.show(5)

+-----------+-----------+--------------+--------------------+------+-----------+-------+-----+-----+---+-------+-----------------+
|       City|     County|Customer_Since|              E_Mail|Gender| Place_Name| Region|State|  ZIP|Age|Cust_Id|        Full_Name|
+-----------+-----------+--------------+--------------------+------+-----------+-------+-----+-----+---+-------+-----------------+
|     Vinson|     Harmon|    2006-08-22|jani.titus@gmail.com|     F|     Vinson|  South|   OK|73571| 43|  60124|      Titus, Jani|
|     Graham|   Bradford|    1981-02-04| lee.eaker@gmail.com|     M|     Graham|  South|   FL|32042| 28|  42485|       Eaker, Lee|
|Grand Forks|Grand Forks|    2010-06-27|jason.simoneau@gm...|     M|Grand Forks|Midwest|   ND|58201| 65|  53620|  Simoneau, Jason|
|Laupahoehoe|     Hawaii|    1992-04-03|grover.bayless@ya...|     M|Laupahoehoe|   West|   HI|96764| 33|  56836|  Bayless, Grover|
|     Glendo|     Platte|    2015-06-21|albertina.bensen@...|     F|     Glendo|   

In [None]:
cust_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- Customer_Since: date (nullable = true)
 |-- E_Mail: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Place_Name: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZIP: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Cust_Id: integer (nullable = true)
 |-- Full_Name: string (nullable = true)



Before writing to the table in staging, just checking if any table exists.

In [None]:
%fs ls dbfs:/user/hive/warehouse/staging.db/

path,name,size,modificationTime
dbfs:/user/hive/warehouse/staging.db/customers/,customers/,0,0
dbfs:/user/hive/warehouse/staging.db/orders/,orders/,0,0


Remove any test tables created before ingesting data to the staging zone.

In [None]:
%fs rm -r dbfs:/user/hive/warehouse/staging.db/customers

##### Write the Customer data to the table

In [None]:
cust_df.write.saveAsTable("staging.customers")

In [None]:
%sql
SELECT * FROM staging.customers limit 3;

City,County,Customer_Since,E_Mail,Gender,Place_Name,Region,State,ZIP,Age,Cust_Id,Full_Name
Vinson,Harmon,2006-08-22,jani.titus@gmail.com,F,Vinson,South,OK,73571,43,60124,"Titus, Jani"
Graham,Bradford,1981-02-04,lee.eaker@gmail.com,M,Graham,South,FL,32042,28,42485,"Eaker, Lee"
Grand Forks,Grand Forks,2010-06-27,jason.simoneau@gmail.com,M,Grand Forks,Midwest,ND,58201,65,53620,"Simoneau, Jason"


### Ingesting the Orders Data, base partition.

In [None]:
# Define the custom shchema
orders_fields = StructType([StructField("Index", IntegerType(), True),
                     StructField("Order_Id", IntegerType(), False),
                     StructField("Order_Date", StringType(), False),
                     StructField("Item_Id", StringType(), False), 
                     StructField("Qty_Ordered", IntegerType(), False),
                     StructField("Price", StringType(), False),
                     StructField("Value", StringType(), True),
                     StructField("Discount_Amount", StringType(), True),
                     StructField("Total", StringType(), True),
                     StructField("Category", StringType(), True),
                     StructField("Payment_Method", StringType(), True),
                     StructField("Cust_Id", IntegerType(), False),
                     StructField("Final_Status", StringType(), True)])

orders_df = sqlContext.read.csv("/FileStore/tables/orders_10_20.csv",
                                header= True,
                                schema = StructType(orders_fields))
orders_df.show(5)

+-----+---------+----------+-------+-----------+-----+------+---------------+------+-----------------+--------------+-------+------------+
|Index| Order_Id|Order_Date|Item_Id|Qty_Ordered|Price| Value|Discount_Amount| Total|         Category|Payment_Method|Cust_Id|Final_Status|
+-----+---------+----------+-------+-----------+-----+------+---------------+------+-----------------+--------------+-------+------------+
|    0|100354678|  01/10/20| 574772|         21| 89.9|1798.0|            0.0|1798.0|    Men's Fashion|           cod|  60124|    received|
|    1|100354678|  01/10/20| 574774|         11| 19.0| 190.0|            0.0| 190.0|    Men's Fashion|           cod|  60124|    received|
|    2|100354680|  01/10/20| 574777|          9|149.9|1199.2|            0.0|1199.2|    Men's Fashion|           cod|  60124|    received|
|    3|100354680|  01/10/20| 574779|          9| 79.9| 639.2|            0.0| 639.2|    Men's Fashion|           cod|  60124|    received|
|    7|100354677|  01/10/20

In [None]:
orders_df = orders_df.withColumn("Order_Date", to_date(col("Order_Date"), "dd/MM/yy"))
orders_df = orders_df.withColumn("Year_Month", date_format(col("Order_Date"), "yyyy-MM"))

orders_df.show(5)

+-----+---------+----------+-------+-----------+-----+------+---------------+------+-----------------+--------------+-------+------------+----------+
|Index| Order_Id|Order_Date|Item_Id|Qty_Ordered|Price| Value|Discount_Amount| Total|         Category|Payment_Method|Cust_Id|Final_Status|Year_Month|
+-----+---------+----------+-------+-----------+-----+------+---------------+------+-----------------+--------------+-------+------------+----------+
|    0|100354678|2020-10-01| 574772|         21| 89.9|1798.0|            0.0|1798.0|    Men's Fashion|           cod|  60124|    received|   2020-10|
|    1|100354678|2020-10-01| 574774|         11| 19.0| 190.0|            0.0| 190.0|    Men's Fashion|           cod|  60124|    received|   2020-10|
|    2|100354680|2020-10-01| 574777|          9|149.9|1199.2|            0.0|1199.2|    Men's Fashion|           cod|  60124|    received|   2020-10|
|    3|100354680|2020-10-01| 574779|          9| 79.9| 639.2|            0.0| 639.2|    Men's Fashio

In [None]:
orders_df.printSchema()

root
 |-- Index: integer (nullable = true)
 |-- Order_Id: integer (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Item_Id: string (nullable = true)
 |-- Qty_Ordered: integer (nullable = true)
 |-- Price: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Discount_Amount: string (nullable = true)
 |-- Total: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Payment_Method: string (nullable = true)
 |-- Cust_Id: integer (nullable = true)
 |-- Final_Status: string (nullable = true)
 |-- Year_Month: string (nullable = true)



In [None]:
%fs ls dbfs:/user/hive/warehouse/staging.db/

path,name,size,modificationTime
dbfs:/user/hive/warehouse/staging.db/customers/,customers/,0,0
dbfs:/user/hive/warehouse/staging.db/orders/,orders/,0,0


In [None]:
%fs rm -r dbfs:/user/hive/warehouse/staging.db/orders

In [None]:
orders_df.write.format("parquet").partitionBy("Year_Month").saveAsTable("staging.orders")

In [None]:
%sql

SELECT * FROM staging.orders limit 3;

Index,Order_Id,Order_Date,Item_Id,Qty_Ordered,Price,Value,Discount_Amount,Total,Category,Payment_Method,Cust_Id,Final_Status,Year_Month
0,100354678,2020-10-01,574772,21,89.9,1798.0,0.0,1798.0,Men's Fashion,cod,60124,received,2020-10
1,100354678,2020-10-01,574774,11,19.0,190.0,0.0,190.0,Men's Fashion,cod,60124,received,2020-10
2,100354680,2020-10-01,574777,9,149.9,1199.2,0.0,1199.2,Men's Fashion,cod,60124,received,2020-10


## Ingest New Order Partitions

In [None]:
def NewOrderPartition(mth,yr):
    filename = "orders_" + mth + "_" + yr + ".csv"
    path = "/FileStore/tables/" + filename
    print("Loading partition, ", path)
    df = sqlContext.read.csv(path, header = True, 
                             schema = StructType(orders_fields))
    df = df.withColumn("Order_Date", to_date(col("Order_Date"), "dd/MM/yy"))
    df = df.withColumn("Year_Month", date_format(col("Order_Date"), "yyyy-MM"))
        
    df.write.format("parquet").mode('append').partitionBy("Year_Month").saveAsTable("staging.orders")
    
    return

In [None]:
NewOrderPartition('11','20')

Loading partition,  /FileStore/tables/orders_11_20.csv


In [None]:
%fs ls dbfs:/user/hive/warehouse/staging.db/orders/

path,name,size,modificationTime
dbfs:/user/hive/warehouse/staging.db/orders/Year_Month=2020-10/,Year_Month=2020-10/,0,0
dbfs:/user/hive/warehouse/staging.db/orders/Year_Month=2020-11/,Year_Month=2020-11/,0,0
dbfs:/user/hive/warehouse/staging.db/orders/_SUCCESS,_SUCCESS,0,1723745623000


In [None]:
%sql
SHOW PARTITIONS staging.orders

partition
Year_Month=2020-10
Year_Month=2020-11


In [None]:
NewOrderPartition('12','20')
NewOrderPartition('01','21')
NewOrderPartition('02','21')
NewOrderPartition('03','21')
NewOrderPartition('04','21')

Loading partition,  /FileStore/tables/orders_12_20.csv
Loading partition,  /FileStore/tables/orders_01_21.csv
Loading partition,  /FileStore/tables/orders_02_21.csv
Loading partition,  /FileStore/tables/orders_03_21.csv
Loading partition,  /FileStore/tables/orders_04_21.csv


In [None]:
%sql
SHOW PARTITIONS staging.orders

partition
Year_Month=2020-10
Year_Month=2020-11
Year_Month=2020-12
Year_Month=2021-01
Year_Month=2021-02
Year_Month=2021-03
Year_Month=2021-04


In [None]:
NewOrderPartition('05','21')
NewOrderPartition('06','21')
NewOrderPartition('07','21')
NewOrderPartition('08','21')
NewOrderPartition('09','21')

Loading partition,  /FileStore/tables/orders_05_21.csv
Loading partition,  /FileStore/tables/orders_06_21.csv
Loading partition,  /FileStore/tables/orders_07_21.csv
Loading partition,  /FileStore/tables/orders_08_21.csv
Loading partition,  /FileStore/tables/orders_09_21.csv


In [None]:
%sql
SHOW PARTITIONS staging.orders

partition
Year_Month=2020-10
Year_Month=2020-11
Year_Month=2020-12
Year_Month=2021-01
Year_Month=2021-02
Year_Month=2021-03
Year_Month=2021-04
Year_Month=2021-05
Year_Month=2021-06
Year_Month=2021-07


#### Big data and Cloud compution assignment group 18
Dataset:
customer.json =>
orders.csv => index,	order_id,	order_date,	item_id,	qty_ordered,	price,	value,	discount_amount,	total	category,	payment_method,	cust_id,	final_status

