### Boiler Plate Codes

In [1]:

import os
import sys
import pandas as pd
import numpy as np
import datetime
import math
import pyspark
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
import pyspark.sql.functions as F
from delta import configure_spark_with_delta_pip

In [2]:
jars = ",".join(["/usr/local/spark/jars/delta-spark_2.12-3.0.0.jar",
"/usr/local/spark/jars/delta-storage-3.0.0.jar"])


In [3]:
# Check if SparkSession already exists - use it if exists
if 'spark' in locals() and spark._jsc is not None:
    print("Using existing SparkSession")
else:
    print("Creating new SparkSession")
    builder = SparkSession.builder \
        .appName("preprocessing_data")\
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        

    spark = builder.getOrCreate()
    print("Spark Session Initialized")

Creating new SparkSession
Spark Session Initialized


In [None]:
!ls /home/jovian/data/full_files

In [4]:
!pwd

/home/jovyan/work/notebooks


In [5]:
%cd ../..

/home/jovyan


In [6]:
!pwd

/home/jovyan


In [9]:
%ls -l /home/jovyan/data/full_files

total 2004
-rwxrwxrwx 1 root root  155860 Jul  7 11:48 [0m[01;32mProduct_Data.csv[0m*
-rwxrwxrwx 1 root root 1885285 Jul  7 11:48 [01;32mSales_Data.csv[0m*
-rwxrwxrwx 1 root root    1059 Jul  7 11:48 [01;32mStore_Data.csv[0m*


### reading the files

In [10]:
# input file paths
product_path = "/home/jovyan/data/full_files/Product_Data.csv"
sales_path = "/home/jovyan/data/full_files/Sales_Data.csv"
store_path = "/home/jovyan/data/full_files/Store_Data.csv"

##### For product

In [11]:
product_df = spark.read\
         .option("inferSchema","true")\
            .option("header","true")\
            .option("delimiter",",")\
            .csv(product_path)

In [12]:
product_df.limit(10).toPandas().head()

Unnamed: 0,Product ID,Product Name,Product Category,Supplier ID,Orig_Price,Date Added
0,PROD0001,Lamp,Furniture,SUP2175,408.78,4/17/22
1,PROD0002,Wardrobe,Clothing,SUP2727,881.32,1/23/21
2,PROD0003,Camera,Accessories,SUP1802,131.51,7/29/20
3,PROD0004,,Kitchenware,SUP0374,647.34,1/29/23
4,PROD0005,Wardrobe,Kitchenware,SUP1980,925.23,4/19/22


In [14]:
# cleaning the column names
from typing import Optional
def preprocessing_cols(df, ingestion_date : Optional[bool] = False):
    """
    cleans the column names and adds the ingestion date to the dataframe
    """
    new_column_name = [x.lower().replace(" ","_") for x in df.columns]
    df = df.toDF(*new_column_name)
    if ingestion_date:
        df = df.withColumn('ingestion_date',F.lit(str(datetime.date.today())))
    return df

In [15]:
from functools import wraps

In [16]:
products_main = preprocessing_cols(product_df,ingestion_date = False)

In [18]:
products_main.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- supplier_id: string (nullable = true)
 |-- orig_price: double (nullable = true)
 |-- date_added: string (nullable = true)



In [21]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [22]:
df2 = products_main.withColumn('date_added',F.to_date('date_added',"M/dd/yy"))

In [23]:
df2.limit(10).toPandas().head()

Unnamed: 0,product_id,product_name,product_category,supplier_id,orig_price,date_added
0,PROD0001,Lamp,Furniture,SUP2175,408.78,2022-04-17
1,PROD0002,Wardrobe,Clothing,SUP2727,881.32,2021-01-23
2,PROD0003,Camera,Accessories,SUP1802,131.51,2020-07-29
3,PROD0004,,Kitchenware,SUP0374,647.34,2023-01-29
4,PROD0005,Wardrobe,Kitchenware,SUP1980,925.23,2022-04-19


In [24]:
df2.count()

3010

In [25]:
df3 = df2.dropDuplicates()

In [26]:
df3.count()

3000

In [27]:
spark.sql("CREATE DATABASE IF NOT EXISTS retail ;")

DataFrame[]

In [29]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
|   retail|
+---------+



In [31]:
spark.sql("USE retail;").show()

++
||
++
++



In [37]:
df3.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- supplier_id: string (nullable = true)
 |-- orig_price: double (nullable = true)
 |-- date_added: date (nullable = true)



In [34]:
from delta.tables import DeltaTable

In [38]:
df3.limit(10).show()

+----------+---------------+----------------+-----------+----------+----------+
|product_id|   product_name|product_category|supplier_id|orig_price|date_added|
+----------+---------------+----------------+-----------+----------+----------+
|  PROD0015|       Cookware|       Furniture|    SUP1269|     230.7|2023-06-27|
|  PROD0145|          Table|     Electronics|    SUP0119|    525.56|2020-02-05|
|  PROD0150|      Bookshelf|     Kitchenware|    SUP1528|    108.15|2022-06-09|
|  PROD0223|        T-shirt|      Home Decor|    SUP4918|    244.77|2021-02-27|
|  PROD0253|       Wardrobe|     Electronics|    SUP2070|    844.61|2021-07-01|
|  PROD0734|   Coffee Maker|      Appliances|    SUP1057|     48.94|2021-03-30|
|  PROD0793|          Table|      Appliances|    SUP3812|     522.5|2020-11-22|
|  PROD0861|       Cookware|        Footwear|    SUP2533|    530.51|2020-02-19|
|  PROD0912|Washing Machine|      Home Decor|    SUP3065|    190.03|2022-02-20|
|  PROD0939|   Coffee Maker|       Furni

In [40]:
df3.write.format("delta").mode("overwrite").save("/home/jovyan/data/preprocessing/products.delta")

In [41]:
### same for stores

In [42]:
store_df = spark.read\
         .option("inferSchema","true")\
            .option("header","true")\
            .option("delimiter",",")\
            .csv(store_path)

In [43]:
store_df.limit(10).toPandas().head()

Unnamed: 0,Store ID,Store Name,Region,Manager ID,Manager Name
0,SI01,CityGrocer,Midwest,MAN01,Sophia Lopez
1,SI02,FreshMart,Northeast,MAN02,Linda White
2,SI03,ShopEasy,Midwest,MAN03,Elijah Moore
3,SI04,CornerStore,West,MAN04,Liam Wilson
4,SI05,MarketPlace,West,MAN05,Lucas Martin


In [47]:
store_main = preprocessing_cols(store_df,ingestion_date = False)

In [48]:
store_main.printSchema()

root
 |-- store_id: string (nullable = true)
 |-- store_name: string (nullable = true)
 |-- region: string (nullable = true)
 |-- manager_id: string (nullable = true)
 |-- manager_name: string (nullable = true)



In [49]:
store_main.count()

23

In [50]:
store_main = store_main.dropDuplicates()

In [51]:
store_main.count()

23

In [52]:
store_main.write.format("delta").mode("overwrite").save("/home/jovyan/data/preprocessing/stores.delta")

### Same for Sales Data

In [53]:
sales_df = spark.read\
         .option("inferSchema","true")\
            .option("header","true")\
            .option("delimiter",",")\
            .csv(sales_path)

In [54]:
sales_df.limit(10).toPandas().head(5)

Unnamed: 0,Transaction ID,Customer ID,Product ID,Quantity Sold,Price per Unit,Transaction Date,Salesperson ID,Payment Method,Store ID
0,TXN0001,C1175,PROD2089,12,98.8,11/21/23,SP059,Bank Transfer,SI16
1,TXN0002,C1213,PROD1359,2,168.66,11/15/23,SP012,Check,SI06
2,TXN0003,C1212,PROD1263,14,413.96,9/18/23,SP086,Bank Transfer,SI18
3,TXN0004,C1266,PROD0423,7,417.73,10/7/23,SP013,Check,SI11
4,TXN0005,C1185,PROD0382,7,450.67,10/4/23,SP052,Bank Transfer,SI09


In [55]:
sales_main = preprocessing_cols(sales_df,ingestion_date = False)

In [56]:
sales_main.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- quantity_sold: integer (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- salesperson_id: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- store_id: string (nullable = true)



In [58]:
sales_main  = sales_main.withColumn('transaction_date',F.to_date('transaction_date','M/dd/yy'))

In [59]:
sales_main.limit(10).toPandas().head()

Unnamed: 0,transaction_id,customer_id,product_id,quantity_sold,price_per_unit,transaction_date,salesperson_id,payment_method,store_id
0,TXN0001,C1175,PROD2089,12,98.8,2023-11-21,SP059,Bank Transfer,SI16
1,TXN0002,C1213,PROD1359,2,168.66,2023-11-15,SP012,Check,SI06
2,TXN0003,C1212,PROD1263,14,413.96,2023-09-18,SP086,Bank Transfer,SI18
3,TXN0004,C1266,PROD0423,7,417.73,2023-10-07,SP013,Check,SI11
4,TXN0005,C1185,PROD0382,7,450.67,2023-10-04,SP052,Bank Transfer,SI09


In [60]:
sales_main.write.format("delta").mode("overwrite").save("/home/jovyan/data/preprocessing/sales.delta")