<a href="https://colab.research.google.com/github/VICIWUOHA/Apache-Spark-Data-Trans/blob/main/Spark_Job_for_Data_Transformation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **SPARK Module for Data Cleaning**

This Script built on Apache Spark was used to wrange data from a Google Cloud Data Store before writing into a Postgres Database.

In [None]:
# Install jdk
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Download Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
# Unzip the downloaded file
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

In [None]:
# Install Pyspark
!pip install -q findspark
!pip install pyspark
!pip install pyarrow


Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 48.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=2256f07ba5aff5f5ef9e15edd229d2680a3c90c9be89f94b81b66f942ce17392
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
# Set up Environment path
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"



In [None]:
# Import Pyspark and initialize it within the environment

import findspark
findspark.init()

In [None]:
# Initialize a Spark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
        
# print to preview the spark variable and confirm that it is working
spark

In [None]:
# To allow us view the Spark UI
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

--2022-05-16 13:11:31--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 18.205.222.128, 54.237.133.81, 54.161.241.46, ...
Connecting to bin.equinox.io (bin.equinox.io)|18.205.222.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.1’


2022-05-16 13:11:32 (13.8 MB/s) - ‘ngrok-stable-linux-amd64.zip.1’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: ngrok                   
{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://6627-35-204-111-138.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p

In [None]:
# Importing data folders into spark dataframe
from  pyspark.sql.functions import input_file_name,substring_index,substring,regexp_replace,expr
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType,DateType

# Defining Schema for the table, because the inferSchema= True arg can be faulty

ordersSchema = StructType([
# Add all fields
StructField('Order No', StringType(), nullable=False),
StructField('Order Time', StringType(), nullable=False),
StructField('Order Type', StringType(), True),
StructField('Order Taken By', StringType(), True),
StructField('Customer Name', StringType(), True),
StructField('Customer Number', StringType(), True),
StructField('Item name', StringType(), True),
StructField('Quantity', FloatType(), True),
StructField('Item Type', StringType(), True),
StructField('Price', FloatType(), True),
StructField('Item Tax', FloatType(), True),
StructField('Item discount', FloatType(), True)
])
Jan_Feb_product_orders = spark.read.csv('/content/drive/MyDrive/SD Raw Data/Jan-Feb 2022/Product_Orders',
                                      header=True,
                                      schema=ordersSchema).withColumn("Filename", input_file_name())
Jan_Feb_product_orders.printSchema()
Jan_Feb_product_orders.show(5)

April_product_orders = spark.read.csv('/content/drive/MyDrive/SD Raw Data/April 2022/Product_orders',
                                      header=True,
                                      schema=ordersSchema
                                      ).withColumn("Filename", input_file_name())
# previously used method
# April_product_orders = spark.read.csv('/content/drive/MyDrive/SD Raw Data/April 2022/Product_orders',
#                                       header=True,
#                                       inferSchema=True
#                                       ).withColumn("Store", input_file_name())
April_product_orders.printSchema()
April_product_orders.show(5)

root
 |-- Order No: string (nullable = true)
 |-- Order Time: string (nullable = true)
 |-- Order Type: string (nullable = true)
 |-- Order Taken By: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Customer Number: string (nullable = true)
 |-- Item name: string (nullable = true)
 |-- Quantity: float (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Price: float (nullable = true)
 |-- Item Tax: float (nullable = true)
 |-- Item discount: float (nullable = true)
 |-- Filename: string (nullable = false)

+--------+--------------------+----------+--------------+-------------+---------------+--------------------+--------+------------+------+--------+-------------+--------------------+
|Order No|          Order Time|Order Type|Order Taken By|Customer Name|Customer Number|           Item name|Quantity|   Item Type| Price|Item Tax|Item discount|            Filename|
+--------+--------------------+----------+--------------+-------------+---------------+

### **Data Exploration**

This is to confirm that spark read all the csv files in my folders at once.

- I'll do  a Tail Preview and then Find the Length of the Dataframe.

- I'll also check the Number of Distinct Orders in each dataframe, to confirm that it's what i have on my application/ Aggregated Orders Table.

In [None]:
# Previewing top 10 and bottom 10 rows in both Jan-Feb and April Data Folders
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import desc,asc

Jan_Feb_tail = Jan_Feb_product_orders.withColumn("index", monotonically_increasing_id())
Jan_Feb_tail.orderBy(desc("index")).show(5)
# Jan_Feb_tail = Jan_Feb_product_orders.orderBy("")
print('Jan - Feb No of Rows in df :',Jan_Feb_product_orders.count())

Apr_tail = April_product_orders.withColumn("index", monotonically_increasing_id())
Apr_tail.orderBy(desc("index")).drop("index").show(5) #Here i dropped the index generated, above i left it there.
print('April No of Rows in df :',April_product_orders.count())


+--------+--------------------+----------+--------------+-------------+---------------+-------------------+--------+------------+------+--------+-------------+--------------------+------+
|Order No|          Order Time|Order Type|Order Taken By|Customer Name|Customer Number|          Item name|Quantity|   Item Type| Price|Item Tax|Item discount|            Filename| index|
+--------+--------------------+----------+--------------+-------------+---------------+-------------------+--------+------------+------+--------+-------------+--------------------+------+
|  329385|28-Feb-2022 06:22 PM|   Walk-In|   Abel Affang|         null|           null|  Nutty  Chocolatta|     1.0|    Modifier| 650.0|     0.0|         null|file:///content/d...|204379|
|  329385|28-Feb-2022 06:22 PM|   Walk-In|   Abel Affang|         null|           null|           Assorted|     1.0|Regular Item|   0.0|     0.0|         null|file:///content/d...|204378|
|  329345|28-Feb-2022 05:25 PM|   Walk-In|   Abel Affang|   

## Getting Distinct Order Count Using **SparkSQL**

N/B Because this project is propietary, some columns have been omitted 

In [None]:
# Create SQL Temp Views in SQL for both dataframes
# After Cleaninig, when i call this, the createorReplaceTempView on the file, it will be replaced.
April_product_orders.createOrReplaceTempView("AprilpdtOrders")
Jan_Feb_product_orders.createOrReplaceTempView("JanFebpdtOrders")


In [None]:
# Showing Order counts, used the backticks `column name` to avoid errors when querying
# Note that the output of sparksql queries is still a spark dataframe.

Jan_Feb_distinct_order_count = spark.sql("select count(distinct(`Order No`)) As No_of_Jan_Feb_orders from JanFebpdtOrders")
April_distinct_order_count = spark.sql("select count(distinct(`Order No`)) As No_of_Apr_orders from AprilpdtOrders")

Jan_Feb_distinct_order_count.show()
April_distinct_order_count.show()

# Showing two diff methods of printing df output of spark

print('No of Orders from Jan - Feb :',Jan_Feb_distinct_order_count.head(1)[0]['No_of_Jan_Feb_orders'])
print('No of Orders in April :', April_distinct_order_count.collect()[0]['No_of_Apr_orders'])


# The Above satisfies my data validation that all rows of my data have been pulled in from Google Cloud

+--------------------+
|No_of_Jan_Feb_orders|
+--------------------+
|               48022|
+--------------------+

+----------------+
|No_of_Apr_orders|
+----------------+
|           24307|
+----------------+

No of Orders from Jan - Feb : 48022
No of Orders in April : 24307


## **Data Cleaning of Columns and Creation of Additional Columns.**

On this data set there is need to do the following;

1) Replace all the **whitespaces** in column names with underscore symbols - **_**.

2) Make all column names to be in  **Camel_Case** .

3) More Transformation




 

In [None]:
# Since Both DataFrames are in the same format, we can now merge them into a singular spark df and perform all cleaning at once.

Jan_Apr_product_orders = Jan_Feb_product_orders.union(April_product_orders)
Jan_Apr_product_orders.drop('Order No','Item name','Order Taken By').show(1)

# Confirm the length of the new df
Jan_Apr_product_orders.count()

# Merge Successful



+--------------------+----------+-------------+---------------+--------+------------+-----+--------+-------------+--------------------+
|          Order Time|Order Type|Customer Name|Customer Number|Quantity|   Item Type|Price|Item Tax|Item discount|            Filename|
+--------------------+----------+-------------+---------------+--------+------------+-----+--------+-------------+--------------------+
|01-Jan-2022 06:21 AM|   Walk-In|         null|           null|     1.0|Regular Item|900.0|     0.0|         null|file:///content/d...|
+--------------------+----------+-------------+---------------+--------+------------+-----+--------+-------------+--------------------+
only showing top 1 row



309631

### Clean Column Names

In [None]:
from pyspark.sql.column import Column
#  Clean Column names
from pyspark.sql.functions import initcap
from functools import reduce

# Get a list of columns as python list
current_column_names = Jan_Apr_product_orders.columns 
print(current_column_names)

# Get a list of columns as python list
new_column_names = list(map(lambda column_name : column_name.replace(" ","_"),current_column_names))
print(new_column_names)

# Map new column names to dataframe
# Rename columns: Item_name,Order_Time,Order_No,Item_Type to [Product,Order_Date_Time,Order_Id,Product_Type] respectively

Jan_Apr_product_orders = reduce(lambda data, idx: data.withColumnRenamed(current_column_names[idx], new_column_names[idx]),
                                range(len(current_column_names)), Jan_Apr_product_orders
                                ).withColumnRenamed('Item_name','Product'
                                ).withColumnRenamed('Order_Time','Order_Date_Time'
                                ).withColumnRenamed('Order_No','Order_Id'
                                ).withColumnRenamed('Item_Type','Product_Type')

Jan_Apr_product_orders.printSchema()
# Jan_Apr_product_orders.show(1)



['Order No', 'Order Time', 'Order Type', 'Order Taken By', 'Customer Name', 'Customer Number', 'Item name', 'Quantity', 'Item Type', 'Price', 'Item Tax', 'Item discount', 'Filename']
['Order_No', 'Order_Time', 'Order_Type', 'Order_Taken_By', 'Customer_Name', 'Customer_Number', 'Item_name', 'Quantity', 'Item_Type', 'Price', 'Item_Tax', 'Item_discount', 'Filename']
root
 |-- Order_Id: string (nullable = true)
 |-- Order_Date_Time: string (nullable = true)
 |-- Order_Type: string (nullable = true)
 |-- Order_Taken_By: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Number: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity: float (nullable = true)
 |-- Product_Type: string (nullable = true)
 |-- Price: float (nullable = true)
 |-- Item_Tax: float (nullable = true)
 |-- Item_discount: float (nullable = true)
 |-- Filename: string (nullable = false)



## Creating new **derived** Columns and **dropping** unwanted columns

In [None]:
from pyspark.sql.functions import col, to_date, to_timestamp, from_unixtime,unix_timestamp,date_format
from pyspark.sql import functions as F
Jan_Apr_product_orders_2 = Jan_Apr_product_orders.withColumn("Store",
                                                   (regexp_replace(
                                                       (substring_index(
                                                           (substring_index(
                                                             "Filename",'/',-1)),'.',1)),'%20'," "))
                                                   ).withColumn("Order_Date",to_date((substring_index("Order_Date_Time",' ',1)),"dd-MMM-yyy")
                                                   ).withColumn("Order_Time",substring_index("Order_Date_Time",' ',-2)
                                                   ).withColumn("Order_Time_2",date_format(to_timestamp("Order_Date_Time","dd-MMM-yyyy hh:mm a"),"HH:mm:ss")
                                                   ).withColumn("Order_Date_Time_2",to_timestamp("Order_Date_Time","dd-MMM-yyyy hh:mm a")
                                                   ).withColumn("Sales", (col("Price")*col("Quantity"))
                                                   ).withColumn("Product_Has_Price",F.when((col("Price")==0.0),0).otherwise(1)
                                                   ).drop("Item_Discount","Filename","Customer_Name","Customer_Number"
                                                   )


# X = April_product_orders.select('Store').show(5,truncate=False)
# Y = April_product_orders.select(substring_index('Store',))
# df.select(substring_index(df.s, '.', -3).alias('s')
# .withColumn("Order_Time",substring_index("Order_Date_Time",' ',-2)

# Jan_Apr_product_orders_2.show(1)
Jan_Apr_product_orders_2.printSchema()
# changing long file path names to shorter actual names


root
 |-- Order_Id: string (nullable = true)
 |-- Order_Date_Time: string (nullable = true)
 |-- Order_Type: string (nullable = true)
 |-- Order_Taken_By: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity: float (nullable = true)
 |-- Product_Type: string (nullable = true)
 |-- Price: float (nullable = true)
 |-- Item_Tax: float (nullable = true)
 |-- Store: string (nullable = false)
 |-- Order_Date: date (nullable = true)
 |-- Order_Time: string (nullable = true)
 |-- Order_Time_2: string (nullable = true)
 |-- Order_Date_Time_2: timestamp (nullable = true)
 |-- Sales: float (nullable = true)
 |-- Product_Has_Price: integer (nullable = false)



### **Data Validation Using Spark SQL**

In [None]:
# Let me confirm that my Time column was parsed correctly using sparksql
Jan_Apr_product_orders_2.createOrReplaceTempView("TestCleaned")

# Test_cleaned = spark.sql("""SELECT 
#                               MAX(Order_Time) AS max_UNcleaned_order_time, 
#                               MIN(Order_Time) AS min_UNcleaned_order_time,
#                               MAX(Order_Time_2) AS max_cleaned_order_time, 
#                               MIN(Order_Time_2) AS min_cleaned_order_time,
#                               MAX(Order_Date_Time) AS max_UNcleaned_order_date_time,
#                               MIN(Order_Date_Time) AS min_UNcleaned_order_date_time, 
#                               MAX(Order_Date_Time_2) AS max_cleaned_order_date_time,
#                               MIN(Order_Date_Time_2) AS min_cleaned_order_date_time 
#                               FROM TestCleaned """)

# Test_cleaned.show()


# Test_Cleaned_2 = spark.sql("""SELECT * FROM TestCleaned
#                               WHERE Order_Time_2 = (
#                                 SELECT MIN(Order_Time_2) AS min_cleaned_order_time
#                                 FROM TestCleaned)""")

# Test_Cleaned_3 = spark.sql("""SELECT * FROM TestCleaned
#                               WHERE Order_Time_2 = (
#                                 SELECT Order_Time_2 FROM (
#                                   SELECT Order_Time_2, ROW_NUMBER() OVER (ORDER BY Order_Time_2) AS Time_Record
#                                 FROM TestCleaned) 
#                                 WHERE Time_Record = 130)""")
# Test_Cleaned_3.show()

### **Add blank Columns to dataframe**
These Columns are present in data GENERATED from another System,
After this data cleaning, we combining (Union) this dataframe with data from the other system.

After this, we would ;
- drop the remaining wrong date & time columns 
- Rename the correct date & time columns to _**Order_date** & **Order_Time.**_

###Lastly, 
We would Select the columns in the **order** which we want to ingest into our data warehouse

In [None]:
from pyspark.sql.functions import lit

Jan_Apr_product_orders_2 = Jan_Apr_product_orders_2.withColumn("Price_Without_Discount",lit(None).cast(FloatType())
                                                  ).withColumn("Unit_Cost",lit(None).cast(FloatType())
                                                  ).withColumn("Product_Id",lit(None).cast(StringType())
                                                  ).drop("Order_Date_Time","Order_Time"
                                                  ).withColumnRenamed("Order_Time_2","Order_Time"
                                                  ).na.fill({"Price_Without_Discount": 0.0, "Unit_Cost": 0.0}
                                                  ).select("Order_Id","Order_Date",
                                                           "Order_Time","Store","Order_Type","Order_Taken_By",
                                                           "Product_Id","Product","Quantity","Product_Type",
                                                           "Price","Sales","Item_Tax","Price_Without_Discount",
                                                           "Unit_Cost","Product_Has_Price")
# Price_Without_Discount, Unit_Cost,
# 

# Jan_Apr_product_orders_2.show(5)
Jan_Apr_product_orders_2.printSchema()
Jan_Apr_product_orders_2.createOrReplaceTempView("finalProductOrders")

root
 |-- Order_Id: string (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Order_Time: string (nullable = true)
 |-- Store: string (nullable = false)
 |-- Order_Type: string (nullable = true)
 |-- Order_Taken_By: string (nullable = true)
 |-- Product_Id: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity: float (nullable = true)
 |-- Product_Type: string (nullable = true)
 |-- Price: float (nullable = true)
 |-- Sales: float (nullable = true)
 |-- Item_Tax: float (nullable = true)
 |-- Price_Without_Discount: float (nullable = false)
 |-- Unit_Cost: float (nullable = false)
 |-- Product_Has_Price: integer (nullable = false)



In [None]:
# # Used for cleaning  store name formerly
# April_product_orders.withColumn("Store_2",
#     regexp_replace(
#         (substring_index(
#             (substring_index('Store','/',-1)
#             .alias('Store_2')),'.',1)),'%20'," ")
#             .alias('Store_2')
#             ).show(5)
# # April_product_orders.show(5)

## **Export To Csv File**

- Prepare for deployment to PostgreSql DB

In [None]:
# Jan_Apr_product_orders_2.coalesce(1).write.option("header",True
#                                                   ).csv("/content/drive/MyDrive/SD Raw Data/product_orders_Jan_Feb_Apr_3"
#                                                         )



## **Importing Total Orders Datasets**

- Data from these files will be transformed for ready sinking into the Data Warehouse.

- Expected Columns in DW

  Total_Orders ---> Order_Id, Order_date, Order_Time, Store, Order_Taken_By, Order_Products_Price,Product_Count, Order_Extra_Items_Price, Order_Amount, Payment_Channel, Payment_Mode, Order_Type, Order_State.

In [None]:

#  'Order Time'	' Order Type'	' Location'	' Order Taken By'	' Ready At'	' Picked At'	' Picked By'	' Delivered At'	' Order Amount'	' Payments'	' Tax'	' Tips'	' Amount Received'	' Received By'	' Amount Returned'	' Status'	' Notes'	' Consumption Tax + VAT (12.5%)'	' Net Amount'	' Con Tax + VAT (12.5%)'	' Net Amount'	' VAT (0%)'	' Net Amount'	' VAT (7.5%)'	' Net Amount'	' VAT + Consumption Tax (10%)'	' Net Amount'	' VAT + Consumption Tax (12.5%)'	' Net Amount'
TotalordersSchema = StructType([
# Add all fields
StructField('Order No', StringType(), nullable=False),
StructField(' Order Time', StringType(), nullable=False),
StructField(' Order Type', StringType(), True),
StructField(' Location', StringType(), True),
StructField(' Order Taken By', StringType(), True),
StructField(' Ready At', StringType(), True),
StructField(' Picked At', StringType(), True),
StructField(' Picked By', StringType(), True),
StructField(' Delivered At', StringType(), True),
StructField(' Order Amount', FloatType(), True),
StructField(' Payments', StringType(), True),
StructField(' Tax', FloatType(), True),
StructField(' Tips', FloatType(), True),
StructField(' Amount Received', FloatType(), True),
StructField(' Received By', StringType(), True),
StructField(' Amount Returned', FloatType(), True),
StructField(' Status', StringType(), True),
StructField(' Notes', StringType(), True),
StructField(' Consumption Tax + VAT (12.5%)', FloatType(), True),
StructField(' Net Amount', FloatType(), True),
StructField(' Con Tax + VAT (12.5%)', FloatType(), True),
StructField(' Net Amount_2', FloatType(), True),
StructField(' VAT (0%)', FloatType(), True),
StructField(' Net Amount_3', FloatType(), True),
StructField(' VAT (7.5%)', FloatType(), True),
StructField(' Net Amount_4', FloatType(), True),
StructField(' VAT + Consumption Tax (10%)', FloatType(), True),
StructField(' Net Amount_5', FloatType(), True),
StructField(' VAT + Consumption Tax (12.5%)', FloatType(), True),
StructField(' Net Amount_6', FloatType(), True)
])


# sd_total_orders_raw = spark.read.option("header","true").csv("/content/drive/MyDrive/SD Raw Data/Jan_Feb_Apr_Totals",
#                                      schema=TotalordersSchema,
#                                      multiLine= True)

sd_total_orders_raw = spark.read.option("header","true").csv("/content/drive/MyDrive/SD Raw Data/Jan_Feb_Apr_Totals",
                                     inferSchema=True,
                                     multiLine= True
                                     ).withColumn('Order No',expr("CAST(`Order No` as String)"))
                                

sd_total_orders_raw.printSchema()
# sd_total_orders_raw.show(5)
print(' No of Rows in files : ', sd_total_orders_raw.count())




root
 |-- Order No: string (nullable = true)
 |--  Order Time: string (nullable = true)
 |--  Order Type: string (nullable = true)
 |--  Location: string (nullable = true)
 |--  Order Taken By: string (nullable = true)
 |--  Ready At: string (nullable = true)
 |--  Picked At: string (nullable = true)
 |--  Picked By: string (nullable = true)
 |--  Delivered At: string (nullable = true)
 |--  Order Amount: string (nullable = true)
 |--  Payments: string (nullable = true)
 |--  Tax: double (nullable = true)
 |--  Tips: double (nullable = true)
 |--  Amount Received: double (nullable = true)
 |--  Received By: string (nullable = true)
 |--  Amount Returned: double (nullable = true)
 |--  Status: string (nullable = true)
 |--  Notes: string (nullable = true)
 |--  Consumption Tax + VAT (12.5%): double (nullable = true)
 |--  Net Amount19: double (nullable = true)
 |--  Con Tax + VAT (12.5%): double (nullable = true)
 |--  Net Amount21: double (nullable = true)
 |--  VAT (0%): string (nulla

##### **Notes on Importing File**

 1) The Syntax, .withColumn(" Tax",expr("CAST(` Tax` as String)")) - is Used here to automatically  restructure the Tax column's data type.
 2) Again for some reason, without the multiline = True Argument, i was getting some errors on the number of rows being imported.
More rows were being imported, because when looking at the csv on a notepad, it was evident that some data from some rows were scattered across multiple lines.

Explanation of this concept can be found here https://sparkbyexamples.com/spark/spark-read-multiline-multiple-line-csv-file/



## Transform Column Names

old names = ['']


In [None]:
# There is need to drop some columns which may just have null values

# Let me do some descriptive overview
# sd_total_orders_raw.describe().show()

# It's evident that i have columns that are completely empty, I'll drop them in the next line.
# But to avoid errors, let me curate all the column names
sd_old_col_names = sd_total_orders_raw.columns
print(sd_old_col_names)

""" The next is to replace all the spaces in the column names with underscore but trim the ones before  them
here i want apply a different logic from what i used above, first, 
i will strip all the column names above of the leading white spaces,i would use 
.strip() incase there are also trailing spaces.
Next up i will replace all the white spaces and special characters with underscore
 """
import re
sd_new_col_names = list(re.sub(r'\W+','_',(column.strip())) for column in sd_total_orders_raw.columns)
# list((column.strip().replace(' ', '_') for column in sd_total_orders_raw.columns))
  
print(sd_new_col_names)

# I now have a clean list of column names to be mapped back to my dataframe

sd_total_orders_withcolrenamed = sd_total_orders_raw.toDF(*sd_new_col_names)
# sd_total_orders_withcolrenamed.show(5)

['Order No', ' Order Time', ' Order Type', ' Location', ' Order Taken By', ' Ready At', ' Picked At', ' Picked By', ' Delivered At', ' Order Amount', ' Payments', ' Tax', ' Tips', ' Amount Received', ' Received By', ' Amount Returned', ' Status', ' Notes', ' Consumption Tax + VAT (12.5%)', ' Net Amount19', ' Con Tax + VAT (12.5%)', ' Net Amount21', ' VAT (0%)', ' Net Amount23', ' VAT (7.5%)', ' Net Amount25', ' VAT + Consumption Tax (10%)', ' Net Amount27', ' VAT + Consumption Tax (12.5%)', ' Net Amount29']
['Order_No', 'Order_Time', 'Order_Type', 'Location', 'Order_Taken_By', 'Ready_At', 'Picked_At', 'Picked_By', 'Delivered_At', 'Order_Amount', 'Payments', 'Tax', 'Tips', 'Amount_Received', 'Received_By', 'Amount_Returned', 'Status', 'Notes', 'Consumption_Tax_VAT_12_5_', 'Net_Amount19', 'Con_Tax_VAT_12_5_', 'Net_Amount21', 'VAT_0_', 'Net_Amount23', 'VAT_7_5_', 'Net_Amount25', 'VAT_Consumption_Tax_10_', 'Net_Amount27', 'VAT_Consumption_Tax_12_5_', 'Net_Amount29']


#### **Preview the df again and then drop unwanted columns**

In [None]:
# I'll perform the descriptive overview again before drpopping  null columns

# sd_total_orders_withcolrenamed.describe().show()

non_null_columns = [column for column in sd_total_orders_withcolrenamed.columns if sd_total_orders_withcolrenamed.filter(F.col(column).isNotNull()).count() > 0]
sd_total_orders_withdroppednull_cols = sd_total_orders_withcolrenamed.select(*non_null_columns)


sd_total_orders_withdroppednull_cols = sd_total_orders_withdroppednull_cols.withColumnRenamed('Order_Time', 'Order_Date_Time'
                                                                          ).withColumnRenamed('Order_No', 'Order_Id')
# sd_total_orders_withdroppednull_cols.show(5)

# At this point, we have dropped completely null columns

# sd_total_orders_withcolrenamed.show(5)

### **Processing Data Types for Columns**

- Extract Order_Date and Order_Time from Order_Time which is a datetime column.
- Our Order_Amount Column ia also faulty, we need to remove the commas and cast it as a float.
- Replace null for all remaining numeric columns with 0



In [None]:
sd_total_orders_withdroppednull_cols = sd_total_orders_withdroppednull_cols.withColumn('Order_Date', to_date((substring_index("Order_Date_Time",' ',1)),"dd-MMM-yyy")
                                                                          ).withColumn('Order_Time', date_format(to_timestamp("Order_Date_Time","dd-MMM-yyyy hh:mm a"),"HH:mm:ss")
                                                                          ).withColumn('Order_Amount',F.regexp_replace(col("Order_Amount"),",",'').cast(FloatType())
                                                                          ).drop('Order_Date_Time')

# sd_total_orders_withdroppednull_cols.show(5)

# validate schema 

sd_total_orders_withdroppednull_cols.printSchema()

root
 |-- Order_Id: string (nullable = true)
 |-- Order_Type: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Order_Taken_By: string (nullable = true)
 |-- Ready_At: string (nullable = true)
 |-- Order_Amount: float (nullable = true)
 |-- Payments: string (nullable = true)
 |-- Tax: double (nullable = true)
 |-- Tips: double (nullable = true)
 |-- Amount_Received: double (nullable = true)
 |-- Received_By: string (nullable = true)
 |-- Amount_Returned: double (nullable = true)
 |-- Status: string (nullable = true)
 |-- Notes: string (nullable = true)
 |-- Consumption_Tax_VAT_12_5_: double (nullable = true)
 |-- Net_Amount19: double (nullable = true)
 |-- Con_Tax_VAT_12_5_: double (nullable = true)
 |-- Net_Amount21: double (nullable = true)
 |-- VAT_7_5_: double (nullable = true)
 |-- Net_Amount25: double (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Order_Time: string (nullable = true)



### **Extract Necessary Tables from Dataframe**

In [None]:
#  Extract the Raw tax data from the Main Dataframe

sd_total_orders_withdroppednull_cols.createOrReplaceTempView('sd_Total_Orders')

# sd_Taxes = sd_total_orders_withdroppednull_cols.filter(F.col('Tax') > 0
#                                               ).select('Order_Id','Order_Date','Consumption_Tax_VAT_12_5_','Net_Amount19','Con_Tax_VAT_12_5_','Net_Amount21','VAT_7_5_','Net_Amount25')
# sd_Taxes.show(5)

sd_tax_table = spark.sql("""SELECT 
                          Order_Id, 
                          Order_Date,
                          Order_Amount,
                          Tax as Total_Tax,
                          COALESCE(COALESCE(Consumption_Tax_VAT_12_5_,Con_Tax_VAT_12_5_),0)  as Consumption_Tax ,
                          COALESCE(VAT_7_5_,0) as VAT,
                          COALESCE(COALESCE(Net_Amount19, Net_Amount21),Net_Amount25) as Net_Amount_After_Tax
                          FROM sd_Total_Orders
                          WHERE Tax > 0""")

sd_tax_table.show(5)

+--------+----------+------------+----------------+----------------+-----------------+--------------------+
|Order_Id|Order_Date|Order_Amount|       Total_Tax| Consumption_Tax|              VAT|Net_Amount_After_Tax|
+--------+----------+------------+----------------+----------------+-----------------+--------------------+
|  281511|2022-01-01|      4500.0|           500.0|           500.0|              0.0|              4000.0|
|  281525|2022-01-01|     10700.0|746.511627906976|             0.0|746.5116279069761|   9953.488372093025|
|  281530|2022-01-01|      2300.0|255.555555555556|255.555555555556|              0.0|   2044.444444444444|
|  281531|2022-01-01|      1100.0|122.222222222222|122.222222222222|              0.0|    977.777777777778|
|  281532|2022-01-01|       200.0|22.2222222222222|22.2222222222222|              0.0|   177.7777777777778|
+--------+----------+------------+----------------+----------------+-----------------+--------------------+
only showing top 5 rows



In [None]:
# Extract the Table which would be used in creating sd_final_total_orders  from the sql view created above


sd_final_total_orders_raw = spark.sql("""SELECT 
                                        Order_Id, Order_Date, Order_Time, trim(Location) as Store, 
                                        Order_Taken_By, Order_Amount,trim(Payments) as Payments, trim(Notes) as Notes
                                        FROM sd_Total_Orders
                                      """)

# sd_final_total_orders_raw.show(5)

In [None]:
# Let's Extract The Payment_Channel from the  Payments column, 
# removing  white spaces and hyphen after the text then removing () afterwards using the substring function
# replacing all special characters in the Notes column with '' to avoid database application errors.
from pyspark.sql.functions import trim,initcap

sd_final_total_orders_raw_2 = sd_final_total_orders_raw.withColumn("Payment_Channel",substring_index(substring_index(col('Payments'),' -',1), '()',1)
                                                      ).withColumn("Notes", initcap(trim(regexp_replace(col("Notes"), r'\W+'," "))))

# sd_final_total_orders_raw_2.filter("Notes is NOT NULL").show(10, truncate= False)

# preview
# sd_final_total_orders_raw_2.select(col('Payment_Channel')).distinct().show()

# code below didn't work due to the factors that needed to be replaced 
  # withColumn(
              # "Payment_Channel_2",substring_index(col('Payments'),' -',1)).select(col('Payment_Channel_2')).distinct().show()
              # withColumn("Payment_Channel",regexp_extract(col('Payments'), '[a-zA-Z]+',0)



##**Final Addition of Columns**

In [None]:
sd_final_total_orders_raw_3 = sd_final_total_orders_raw_2.withColumn('Payment_Mode', F.when((col("Payment_Channel")) == "Cash","Cash").otherwise("Non Cash")
                                            ).withColumn('Order_Products_Price', lit(None).cast(FloatType())
                                            ).withColumn('Order_Extra_Items_Price', lit(None).cast(FloatType())
                                            ).withColumn('Order_Type', lit(None).cast(StringType())
                                            ).withColumn('Order_State', lit(None).cast(StringType())
                                            ).withColumn('Order_Source', lit(None).cast(StringType())
                                            ).select('Order_Id', 'Order_Date', 'Order_Time','Store',
                                                     'Order_Taken_By','Order_Products_Price',
                                                      'Order_Extra_Items_Price',
                                                     'Order_Amount','Payment_Channel','Payment_Mode',
                                                     'Order_Type', 'Order_State','Order_Source','Notes'
                                                     )
                                            # .sort(col("Order_Date").asc(),col("Order_Time").asc())

# sd_final_total_orders_raw_3.show(5)

### **Replacing Names within the Store Column**


In [None]:
# Reviewing the distinct store names

raw_store_names = [store[0] for store in sd_final_total_orders_raw_3.select('Store').distinct().collect()]

print(raw_store_names)

# Next ,I'll define the raw list of actual_Store names
actual_store_names = ['xxxxxx','xxxxxxxxxx','xxxxxxxxxxx']
# sd_final_total_orders = sd_final_total_orders.replace([])

sd_final_total_orders_raw_4 = sd_final_total_orders_raw_3.replace( raw_store_names, actual_store_names)

# sd_final_total_orders_raw_4.show(5)
sd_final_total_orders_raw_4.printSchema()

sd_final_total_orders_raw_4.count()



In [None]:
# Get product Count for each order from the products table

sd_final_total_orders_raw_4.createOrReplaceTempView('finalTotalOrders')


sd_final_total_orders = spark.sql("""
                                  
                                  WITH prodcountTable AS(
                                    SELECT Order_Id, Order_Date, COUNT(Product) as Product_Count
                                    FROM finalProductOrders
                                    WHERE Product_Has_Price = 1
                                    GROUP BY 1,2
                                  )
                                  SELECT CAST(T.Order_Id as String) as Order_Id, T.Order_Date, T.Order_Time, T.Store, T.Order_Taken_By,
                                            T.Order_Products_Price,CAST(COALESCE(P.Product_Count,0) AS INT) as Product_Count, 
                                            T.Order_Extra_Items_Price,T.Order_Amount,T.Payment_Channel,
                                            T.Payment_Mode,T.Order_Type, T.Order_State, T.Order_Source,T.Notes
                                     FROM finalTotalOrders as T
                                     LEFT JOIN prodcountTable as P
                                     ON T.Order_Id = P.Order_Id
                                     AND T.Order_Date = P.Order_Date
                                     ORDER BY 2, 3
                                      """)

# sd_final_total_orders.show(5)
sd_final_total_orders.printSchema()
sd_final_total_orders.count()


root
 |-- Order_Id: string (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Order_Time: string (nullable = true)
 |-- Store: string (nullable = true)
 |-- Order_Taken_By: string (nullable = true)
 |-- Order_Products_Price: float (nullable = true)
 |-- Product_Count: integer (nullable = false)
 |-- Order_Extra_Items_Price: float (nullable = true)
 |-- Order_Amount: float (nullable = true)
 |-- Payment_Channel: string (nullable = true)
 |-- Payment_Mode: string (nullable = false)
 |-- Order_Type: string (nullable = true)
 |-- Order_State: string (nullable = true)
 |-- Order_Source: string (nullable = true)
 |-- Notes: string (nullable = true)



72329

In [None]:
# last preview to be sure that no order_amount is null
null_order_amounts = spark.sql('Select * from finalTotalOrders WHERE Order_Amount IS NULL')
null_order_amounts.count()


0

## **Export to csv File**
 - This format will be used in  Postgres DB

In [None]:
# sd_final_total_orders.coalesce(1).write.option("header",True
                                              #  ).csv("/content/drive/MyDrive/SD Raw Data/Transformed_Output/sd_total_orders_Jan_Feb_Apr_3"
                                                      #  )