## Objective
We are presented with two datasets containing transactional data pertaining to various products purchased online across various US states. One dataset contains the *order* details and the other has the *customer* information.

The fields present in the files are:
- **orders.csv**: order_id, order_date, status, item_id, qty_ordered, price, value, discount_amount, total, category, payment_method, cust_id
- **customers.json**: City, County, Customer Since, E Mail, Gender, Place Name, Region, State, Zip, Age, cust_id, full_name


The objective of the assignment is to create a data lake with three zones 
- landing 
- staging and
- curated 

to draw crucial insights about market, products, and customers. 

This notebook deals with creating the landing zone. The remaining two are attached separately.

The databricks public link to this code is https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1721482899250574/415017039183874/2654077590604412/latest.html



### 1. Creating a Landing Zone
- The following section deals with reading the two input files and storing them on Databricks File System (DBFS)

In [None]:
# Run the following command to view the list of files uploaded on DBFS
%fs ls dbfs:/FileStore/tables/bdcc/

path,name,size,modificationTime
dbfs:/FileStore/tables/bdcc/customers.json,customers.json,15815082,1704873720000
dbfs:/FileStore/tables/bdcc/orders.csv,orders.csv,32138462,1704874151000


In [None]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

### 2. Creating a Staging Zone
This section deals with
- Creating a staging schema and two tables - orders and customers - within it.
- Reading data from the given input files and loading them first into dataframes
- Applying column transformations if required
- Finally, loading the dataframes into the tables created above

#### 2a) Creating a staging schema and two tables - orders and customers - within it.

In [None]:
%sql

-- Run the following command to create a new database called staging. We then use this database to load the orders and customer information in tables.

DROP DATABASE IF EXISTS staging CASCADE;

CREATE DATABASE staging;

In [None]:
%sql

-- Run the following command to drop any existing tables by the name "orders" and "customers".

DROP TABLE IF EXISTS staging.orders;
DROP TABLE IF EXISTS staging.customers;

### 2b) Ingest Base Partition
The following section deals with 
- Defining the column types of the input data and applying transformations if needed.
- Loading the data into dataframes.

#### Reading the orders.csv file and loading it into a dataframe

In [None]:
from pyspark.sql.types import *

# We first load the orders information from the orders.csv file into a dataframe called orders_df.

# Define the custom shchema
columns = StructType([StructField("OrderID", IntegerType(), False),
                     StructField("Order_Date", DateType(), False),
                     StructField("Status", StringType(), False),
                     StructField("ItemID", FloatType(), False), 
                     StructField("Quantity_Ordered", FloatType(), False),
                     StructField("Price", FloatType(), False),
                     StructField("Value", FloatType(), False),
                     StructField("Discount_Amount", FloatType(), False),
                     StructField("Total", FloatType(), False),
                     StructField("Category", StringType(), False),
                     StructField("Payment_Method", StringType(), False),
                     StructField("CustID", StringType(), False),
                     StructField("Year", IntegerType(), False),
                     StructField("Month_Year", StringType(), False)
                     ])
                     
  
# Read from landing zone into dataframe                     
orders_df = sqlContext.read.csv("/FileStore/tables/bdcc/orders.csv", 
                                header = True, 
                                schema = StructType(columns) 
                                )

In [None]:
#Convert the following columns from float to int

orders_df = orders_df.withColumn("Quantity_Ordered", orders_df["Quantity_Ordered"].cast(IntegerType()))
orders_df = orders_df.withColumn("ItemID", orders_df["ItemID"].cast(IntegerType()))
orders_df = orders_df.withColumn("CustID", orders_df["CustID"].cast(IntegerType()))

#Display the first ten records
orders_df.show(10)

+---------+----------+--------------+------+----------------+-----+------+---------------+------+-----------------+--------------+------+----+----------+
|  OrderID|Order_Date|        Status|ItemID|Quantity_Ordered|Price| Value|Discount_Amount| Total|         Category|Payment_Method|CustID|Year|Month_Year|
+---------+----------+--------------+------+----------------+-----+------+---------------+------+-----------------+--------------+------+----+----------+
|100354678|2020-10-01|      received|574772|              21| 89.9|1798.0|            0.0|1798.0|    Men's Fashion|           cod| 60124|2020|  Oct-2020|
|100354678|2020-10-01|      received|574774|              11| 19.0| 190.0|            0.0| 190.0|    Men's Fashion|           cod| 60124|2020|  Oct-2020|
|100354680|2020-10-01|      complete|574777|               9|149.9|1199.2|            0.0|1199.2|    Men's Fashion|           cod| 60124|2020|  Oct-2020|
|100354680|2020-10-01|      complete|574779|               9| 79.9| 639.2|  

In [None]:
orders_df.printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Status: string (nullable = true)
 |-- ItemID: integer (nullable = true)
 |-- Quantity_Ordered: integer (nullable = true)
 |-- Price: float (nullable = true)
 |-- Value: float (nullable = true)
 |-- Discount_Amount: float (nullable = true)
 |-- Total: float (nullable = true)
 |-- Category: string (nullable = true)
 |-- Payment_Method: string (nullable = true)
 |-- CustID: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month_Year: string (nullable = true)



#### Reading the customers.json file and loading it into a dataframe by following similar steps

In [None]:

#First, load the data from the input customers.json into a dataframe called customers_df

# Define custom schema
headers = StructType([
      StructField("City",StringType(),True),
      StructField("County",StringType(),True),
      StructField("Customer Since",StringType(),True),
      StructField("E Mail",StringType(),False),
      StructField("Gender",StringType(),True),
      StructField("Place Name",StringType(),True),
      StructField("Region",StringType(),True),
      StructField("State",StringType(),True),
      StructField("Zip",StringType(),True),
      StructField("age",StringType(),True),
      StructField("cust_id",StringType(),True),
      StructField("full_name",StringType(),True)
      ])

customers_df = spark.read.schema(headers) \
                         .json("/FileStore/tables/bdcc/customers.json")

#Display the first ten records
customers_df.show(10)

+-----------+-----------+--------------+--------------------+------+-----------+---------+-----+-----+----+-------+-----------------+
|       City|     County|Customer Since|              E Mail|Gender| Place Name|   Region|State|  Zip| age|cust_id|        full_name|
+-----------+-----------+--------------+--------------------+------+-----------+---------+-----+-----+----+-------+-----------------+
|     Vinson|     Harmon|     8/22/2006|jani.titus@gmail.com|     F|     Vinson|    South|   OK|73571|43.0|60124.0|      Titus, Jani|
|     Graham|   Bradford|      2/4/1981| lee.eaker@gmail.com|     M|     Graham|    South|   FL|32042|28.0|42485.0|       Eaker, Lee|
|Grand Forks|Grand Forks|     6/27/2010|jason.simoneau@gm...|     M|Grand Forks|  Midwest|   ND|58201|65.0|53620.0|  Simoneau, Jason|
|Laupahoehoe|     Hawaii|      4/3/1992|grover.bayless@ya...|     M|Laupahoehoe|     West|   HI|96764|33.0|56836.0|  Bayless, Grover|
|     Glendo|     Platte|     6/21/2015|albertina.bensen@...| 

In [None]:
from pyspark.sql.functions import month, year, to_date, col

#Apply the following column transformations
customers_df = customers_df.withColumn("Customer Since", to_date(col("Customer Since"), "MM/dd/yy"))
customers_df = customers_df.withColumn("age", customers_df["age"].cast(IntegerType()))
customers_df = customers_df.withColumn("cust_id", customers_df["cust_id"].cast(IntegerType()))

#Rename the columns so as to remove spaces between words
customers_df = customers_df.withColumnRenamed("Customer Since", "CustomerSince")\
                           .withColumnRenamed("E Mail", "Email")\
                           .withColumnRenamed("Place Name", "PlaceName")\
                           .withColumnRenamed("age", "Age")\
                           .withColumnRenamed("cust_id", "CustID")\
                           .withColumnRenamed("full_name", "FullName")

customers_df.show(5)

+-----------+-----------+-------------+--------------------+------+-----------+-------+-----+-----+---+------+-----------------+
|       City|     County|CustomerSince|               Email|Gender|  PlaceName| Region|State|  Zip|Age|CustID|         FullName|
+-----------+-----------+-------------+--------------------+------+-----------+-------+-----+-----+---+------+-----------------+
|     Vinson|     Harmon|   2006-08-22|jani.titus@gmail.com|     F|     Vinson|  South|   OK|73571| 43| 60124|      Titus, Jani|
|     Graham|   Bradford|   1981-02-04| lee.eaker@gmail.com|     M|     Graham|  South|   FL|32042| 28| 42485|       Eaker, Lee|
|Grand Forks|Grand Forks|   2010-06-27|jason.simoneau@gm...|     M|Grand Forks|Midwest|   ND|58201| 65| 53620|  Simoneau, Jason|
|Laupahoehoe|     Hawaii|   1992-04-03|grover.bayless@ya...|     M|Laupahoehoe|   West|   HI|96764| 33| 56836|  Bayless, Grover|
|     Glendo|     Platte|   2015-06-21|albertina.bensen@...|     F|     Glendo|   West|   WY|8221

### 2c) Loading data into tables
- Now that we have created two dataframes, the next step is to load this information into two tables
- The orders_df dataframe will be loaded into the staging.orders table and the customers_df will be loaded into the staging.customers table

In [None]:
# Run the following command to create a new table "orders" in the staging schema.

orders_df.write.format("parquet").partitionBy("Year").saveAsTable("staging.orders")

In [None]:
%fs ls dbfs:/user/hive/warehouse/staging.db/orders

path,name,size,modificationTime
dbfs:/user/hive/warehouse/staging.db/orders/Year=2020/,Year=2020/,0,0
dbfs:/user/hive/warehouse/staging.db/orders/Year=2021/,Year=2021/,0,0
dbfs:/user/hive/warehouse/staging.db/orders/_SUCCESS,_SUCCESS,0,1705774476000


In [None]:
%sql

-- Run the following command to show the partitions created in the orders table

SHOW PARTITIONS staging.orders;

partition
Year=2020
Year=2021


In [None]:
#Run the following command to load the customers table in the staging database

customers_df.write.saveAsTable("staging.customers")

In [None]:
%sql

-- Query the top 10 records in the customers table

SELECT * 
FROM staging.customers 
limit 10;

City,County,CustomerSince,Email,Gender,PlaceName,Region,State,Zip,Age,CustID,FullName
Vinson,Harmon,2006-08-22,jani.titus@gmail.com,F,Vinson,South,OK,73571,43,60124,"Titus, Jani"
Graham,Bradford,1981-02-04,lee.eaker@gmail.com,M,Graham,South,FL,32042,28,42485,"Eaker, Lee"
Grand Forks,Grand Forks,2010-06-27,jason.simoneau@gmail.com,M,Grand Forks,Midwest,ND,58201,65,53620,"Simoneau, Jason"
Laupahoehoe,Hawaii,1992-04-03,grover.bayless@yahoo.com,M,Laupahoehoe,West,HI,96764,33,56836,"Bayless, Grover"
Glendo,Platte,2015-06-21,albertina.bensen@gmail.com,F,Glendo,West,WY,82213,73,60125,"Bensen, Albertina"
Farmington,Oconee,2016-10-19,drema.galle@gmail.com,F,Farmington,South,GA,30638,64,51286,"Galle, Drema"
Nashville,Davidson,2011-07-10,sheryll.newsome@gmail.com,F,Nashville,South,TN,37245,75,60126,"Newsome, Sheryll"
Warwick,Kent,1995-05-11,bernard.dewald@hotmail.co.uk,M,Warwick,Northeast,RI,2889,52,60127,"Dewald, Bernard"
Sarasota,Sarasota,1998-08-25,hilario.brammer@hotmail.com,M,Sarasota,South,FL,34277,31,60128,"Brammer, Hilario"
Brownstown,Fayette,2017-03-31,elizbeth.raminez@gmail.com,F,Brownstown,Midwest,IL,62418,71,56449,"Raminez, Elizbeth"
