# Data Loading

The data set is provided by Ben Roshan, published in [Kaggle](https://www.kaggle.com/benroshan/ecommerce-data). It consists of 3 CSV files. 

To load data in Spark, you need to use a SparkSession. In a Databricks notebook like this, you can access it using the `spark` variable.
The SparkSession has a `read` property that exposes a `format()` function. The function returns a DataFrameReader, which can be configured using its `option()` function.
The DataFrameReader has a `load()` function that will evaluate the file / data we want to load.

When programming a data processing in Spark, you will again and again see **method chaining** -- functions consecutively being called in the same line. This is by design. Below is an example.

In [0]:
# Create a reusable CSV reader with some default options
spark_csv_reader = spark.read.format("csv").option("header", "true").option("inferSchema", "true")

In [0]:
# Load CSV files into these variables 
sales_target = spark_csv_reader.load("dbfs:/FileStore/shared_uploads/dtuworkshop@outlook.com/Sales_target.csv")
sales_order_header = spark_csv_reader.load("dbfs:/FileStore/shared_uploads/dtuworkshop@outlook.com/List_of_Orders.csv")
sales_order_detail = spark_csv_reader.load("dbfs:/FileStore/shared_uploads/dtuworkshop@outlook.com/Order_Details.csv")

# Sales Order Header Data

This data is represented by `sales_order_header` variable. There are some data processing needed for this data:

1. Records with missing ID
2. Order Date in String format
3. We prefer snake_case column names

Since Spark's code is **lazily evaluated**, at this point, the variable is just an empty DataFrame. This lazy evaluation has one main benefit: optimisation.

Question to the class: When does Spark perform the execution of data processing?

In [0]:
display(sales_order_header.limit(10))

Order ID,Order Date,CustomerName,State,City
B-25601,01-04-2018,Bharat,Gujarat,Ahmedabad
B-25602,01-04-2018,Pearl,Maharashtra,Pune
B-25603,03-04-2018,Jahan,Madhya Pradesh,Bhopal
B-25604,03-04-2018,Divsha,Rajasthan,Jaipur
B-25605,05-04-2018,Kasheen,West Bengal,Kolkata
B-25606,06-04-2018,Hazel,Karnataka,Bangalore
B-25607,06-04-2018,Sonakshi,Jammu and Kashmir,Kashmir
B-25608,08-04-2018,Aarushi,Tamil Nadu,Chennai
B-25609,09-04-2018,Jitesh,Uttar Pradesh,Lucknow
B-25610,09-04-2018,Yogesh,Bihar,Patna


In [0]:
# Import this package because we need the to_date() function
import pyspark.sql.functions as fn

# Use withColumnRenamed(old_name, new_name) to rename columns
# Use withColumn(new_name, column_expression) to introduce new column (or replace if the name is same)
sales_order_header = sales_order_header.dropna("any")
sales_order_header = sales_order_header.withColumnRenamed(
  "Order ID", "order_id"
).withColumn(
  "order_date",
  fn.to_date(sales_order_header["Order Date"], "d-M-y")
).withColumnRenamed(
  "State", "state"
).withColumnRenamed(
  "City", "city"
).withColumnRenamed(
  "CustomerName", "customer_name"
)
# We don't need this column anymore
sales_order_header = sales_order_header.drop("Order Date")

In [0]:
display(sales_order_header.limit(10))

order_id,customer_name,state,city,order_date
B-25601,Bharat,Gujarat,Ahmedabad,2018-04-01
B-25602,Pearl,Maharashtra,Pune,2018-04-01
B-25603,Jahan,Madhya Pradesh,Bhopal,2018-04-03
B-25604,Divsha,Rajasthan,Jaipur,2018-04-03
B-25605,Kasheen,West Bengal,Kolkata,2018-04-05
B-25606,Hazel,Karnataka,Bangalore,2018-04-06
B-25607,Sonakshi,Jammu and Kashmir,Kashmir,2018-04-06
B-25608,Aarushi,Tamil Nadu,Chennai,2018-04-08
B-25609,Jitesh,Uttar Pradesh,Lucknow,2018-04-09
B-25610,Yogesh,Bihar,Patna,2018-04-09


In [0]:
# An Action. Spark will start processing the data when we run this line.
sales_order_header.write.format("delta").saveAsTable("sales_order_header")

# Sales Order Detail

There are 1 data processing needed for this data set:

1. We prefer snake_case column names. Rename them.

In [0]:
display(sales_order_detail.limit(10))

Order ID,Amount,Profit,Quantity,Category,Sub-Category
B-25601,1275.0,-1148.0,7,Furniture,Bookcases
B-25601,66.0,-12.0,5,Clothing,Stole
B-25601,8.0,-2.0,3,Clothing,Hankerchief
B-25601,80.0,-56.0,4,Electronics,Electronic Games
B-25602,168.0,-111.0,2,Electronics,Phones
B-25602,424.0,-272.0,5,Electronics,Phones
B-25602,2617.0,1151.0,4,Electronics,Phones
B-25602,561.0,212.0,3,Clothing,Saree
B-25602,119.0,-5.0,8,Clothing,Saree
B-25603,1355.0,-60.0,5,Clothing,Trousers


In [0]:
sales_order_detail = sales_order_detail.withColumnRenamed(
  "Order ID", "order_id"
).withColumnRenamed(
  "Amount", "amount"
).withColumnRenamed(
  "Profit","profit"
).withColumnRenamed(
  "Quantity", "quantity"
).withColumnRenamed(
  "Category", "category"
).withColumnRenamed(
  "Sub-Category", "sub_category"
)

In [0]:
display(sales_order_detail)

Order ID,Amount,Profit,Quantity,Category,Sub-Category
B-25601,1275.0,-1148.0,7,Furniture,Bookcases
B-25601,66.0,-12.0,5,Clothing,Stole
B-25601,8.0,-2.0,3,Clothing,Hankerchief
B-25601,80.0,-56.0,4,Electronics,Electronic Games
B-25602,168.0,-111.0,2,Electronics,Phones
B-25602,424.0,-272.0,5,Electronics,Phones
B-25602,2617.0,1151.0,4,Electronics,Phones
B-25602,561.0,212.0,3,Clothing,Saree
B-25602,119.0,-5.0,8,Clothing,Saree
B-25603,1355.0,-60.0,5,Clothing,Trousers


In [0]:
sales_order_detail.write.format("delta").saveAsTable("sales_order_detail")

# Sales Target Data

There are 2 data processing tasks needed for this data set:

1. The 'Month of Order Date' column is in String, and it will make it hard to make calculations
2. Renaming columns so that it is snake_cased

In [0]:
display(sales_target.sort("month").take(5))

In [0]:
# We actually don't need to re-import if it has been done in another cell
import pyspark.sql.functions as fn


sales_target = sales_target.withColumn(
  "month",
  # Take the month
  fn.from_unixtime(
    # Get a Unix Timestamp representation of the string
    fn.unix_timestamp(
      # Split "Apr-18" into "Apr" and "18" -- take the "Apr"
      fn.split(sales_target["Month of Order Date"], "-").getItem(0),
      'MMM'
    ),
    'M'
  ).cast("int")
).withColumn(
  "year",
  # Split "Apr-18" into "Apr" and "18" -- take the "18", cast to Integer and add 2000
  fn.split(sales_target["Month of Order Date"], "-").getItem(1).cast("int") + 2000
).withColumnRenamed(
  "Target",
  "target"
).withColumnRenamed(
  "Category", "category"
)
sales_target = sales_target.drop("Month of Order Date")

In [0]:
display(sales_target.sort("month").take(5))

category,target,month,year
Electronics,16000.0,1,2019
Furniture,11500.0,1,2019
Clothing,16000.0,1,2019
Clothing,16000.0,2,2019
Furniture,11600.0,2,2019


In [0]:
sales_target.write.format("delta").saveAsTable("sales_target")