## Sales Data Exploration

Given sales data of a retail company. We have data for the year 2019 for every month.

data format: **csv**

In [4]:
# Importing the required libraries

import os
import glob

# spark packages
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, count, when, desc, sum, round, to_timestamp, max as max_, row_number, year, month
from pyspark.sql.window import Window

In [2]:
from delta import configure_spark_with_delta_pip

def create_spark_session(app_name: str = "SalesIngestion") -> SparkSession:
    builder = SparkSession.builder.appName(app_name) \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    
    # This function wraps your SparkSession with Delta Lake capabilities.
    spark = configure_spark_with_delta_pip(builder).getOrCreate()
    return spark

### Understand the different columns with in the given data and also the schema that can be infered from the data

In [3]:
spark = create_spark_session()

In [5]:
folder_path = "Sales_Data"

csv_files = glob.glob(os.path.join(folder_path, "*.csv"))

for file in csv_files:
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(file)
    print(file)
    print(len(df.columns))
    print()
    df.printSchema()

Sales_Data\Sales_April_2019.csv
6

root
 |-- Order ID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Price Each: double (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)

Sales_Data\Sales_August_2019.csv
6

root
 |-- Order ID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Price Each: double (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)

Sales_Data\Sales_December_2019.csv
6

root
 |-- Order ID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Price Each: double (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)

Sales_Data\Sales_February_2019.csv
6

root
 |-- Order ID: integer (nullable = true)
 |-- P

### Assumption 1:
For a Sales transaction, Order ID should not be nullable. similarly, Each order should atleast have a product with non zero quantity. similarly, for each order, the date should be recorded. 

So, the Order ID, Product, Quantity Ordered, Order Date should not have null values.

While Purchase address can be null and Price can be 0 (possible)

and Ideal schema should be

root

* |-- **Order ID**: integer (nullable = False)
* |-- **Product**: string (nullable = False)
* |-- **Quantity Ordered**: integer (nullable = False)
* |-- **Price Each**: double (nullable = true)
* |-- **Order Date**: timestamp (nullable = False)
* |-- **Purchase Address**: string (nullable = true)

### Exploring a single csv file to understand more about the data we are handling

In [6]:
df.describe().show(truncate=False)

+-------+------------------+------------+-------------------+------------------+--------------+----------------------------------+
|summary|Order ID          |Product     |Quantity Ordered   |Price Each        |Order Date    |Purchase Address                  |
+-------+------------------+------------+-------------------+------------------+--------------+----------------------------------+
|count  |11629             |11646       |11629              |11629             |11646         |11646                             |
|mean   |253751.81442944362|NULL        |1.1281279559721387 |179.40000687934585|NULL          |NULL                              |
|stddev |3235.175358525277 |NULL        |0.43507719933866423|328.59504155699716|NULL          |NULL                              |
|min    |248151            |20in Monitor|1                  |2.99              |09/01/19 05:10|1 11th St, San Francisco, CA 94016|
|max    |259357            |iPhone      |6                  |1700.0            |Ord

### Checking for Null values:

In [7]:
# Counting the number of null values in each column
null_count = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
null_count.show()

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|      57|     40|              57|        57|        40|              40|
+--------+-------+----------------+----------+----------+----------------+



In [8]:
df.filter(col('Order ID').isNull()).show()

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|Product|            NULL|      NULL|Order Date|Purchase Address|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|       

In [9]:
df.filter(col('Product').isNull()).show()

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|            NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|       

In [10]:
# Counting the number of null values in each column after filtering by Order ID
null_count = df.filter(col('Order ID').isNotNull()).select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
null_count.show()

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|       0|      0|               0|         0|         0|               0|
+--------+-------+----------------+----------+----------+----------------+



**Observations:**
* There are null values in each columns.
* Product, Order Date and Purchase Address have some false values or garbage values. all these records have Order Id as NUll.

As per the assumption, for any sales transaction, Order Id should not be null. Hence removing the records with Order ID = Null

Seems like the removing the records which have Order ID = Null cleans all the records with null values in other columns

### Checking for Duplicate Values

In [19]:
df1 =  df.filter(col('Order ID').isNotNull())
print(df1.count())
print(df1.distinct().count())
print(df1.select(['Order ID', 'Product']).distinct().count())

11629
11611
11610


In [12]:
df1.groupBy(['Order ID', 'Product']).count().sort(desc('count')).show()

+--------+--------------------+-----+
|Order ID|             Product|count|
+--------+--------------------+-----+
|  250174|Apple Airpods Hea...|    2|
|  253981|Lightning Chargin...|    2|
|  256196|USB-C Charging Cable|    2|
|  256588|Apple Airpods Hea...|    2|
|  249895|34in Ultrawide Mo...|    2|
|  252537|    Wired Headphones|    2|
|  259296|Apple Airpods Hea...|    2|
|  249910|AAA Batteries (4-...|    2|
|  251220|    Wired Headphones|    2|
|  259297|Lightning Chargin...|    2|
|  258715|Lightning Chargin...|    2|
|  259035|    27in FHD Monitor|    2|
|  248787|AA Batteries (4-p...|    2|
|  257530|USB-C Charging Cable|    2|
|  248171|USB-C Charging Cable|    2|
|  255390|Lightning Chargin...|    2|
|  256763|    27in FHD Monitor|    2|
|  254945|Apple Airpods Hea...|    2|
|  255318|  Macbook Pro Laptop|    2|
|  248234|34in Ultrawide Mo...|    1|
+--------+--------------------+-----+
only showing top 20 rows



In [13]:
df1.filter((col('Order ID')== 251220)&(col('Product')=='Wired Headphones')).show()

+--------+----------------+----------------+----------+--------------+--------------------+
|Order ID|         Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+----------------+----------------+----------+--------------+--------------------+
|  251220|Wired Headphones|               2|     11.99|09/17/19 05:49|153 Meadow St, Po...|
|  251220|Wired Headphones|               1|     11.99|09/17/19 05:49|153 Meadow St, Po...|
+--------+----------------+----------------+----------+--------------+--------------------+



in the above records, we can observe that
* there are atleast (11629 - 11611) = 18 duplicate values in the data set
* for some records, every thing is same except for Quantity ordered. 

### Assumption 2: 
Multiple Products can be order with in one Order Id and have two records with Order ID with different products. Order ID belonging to one Product should have only 1 record at an Order Date time. 

As per the above assumption, the above could not be possible.

Hence, only taking the max value (Quantity Ordered) record of the above two.

In [21]:
df_dedup = df1.distinct()

# Define the window specification
windowSpec = Window.partitionBy("Order ID", "Product").orderBy(desc("Quantity Ordered"))

# Apply the window function and filter the results
df_dedup = df_dedup.withColumn("row_number", row_number().over(windowSpec)) \
             .filter(col("row_number") == 1) \
             .drop("row_number")

In [16]:
df_dedup.filter((col('Order ID')== 251220)&(col('Product')=='Wired Headphones')).show()

+--------+----------------+----------------+----------+--------------+--------------------+
|Order ID|         Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+----------------+----------------+----------+--------------+--------------------+
|  251220|Wired Headphones|               2|     11.99|09/17/19 05:49|153 Meadow St, Po...|
+--------+----------------+----------------+----------+--------------+--------------------+



In [1]:
df_dedup.groupBy(['Order ID', 'Product']).count().sort(desc('count')).show(5)

NameError: name 'df_dedup' is not defined

These above operations of handling null values, duplicates and invalid records must be done on each data set and form a single data fram before saving into the suitable format.

In [22]:
spark.stop()