<a href="https://colab.research.google.com/github/ShravankumarMR/BigData-Spark/blob/main/SalesDataAnalysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pyspark



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [None]:
spark = SparkSession.builder.appName(' Sales Data Analysis ').getOrCreate()
sc = spark.sparkContext

In [None]:
import os
#importing os to set environment variable
def install_java():
  !apt-get install -y openjdk-17-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
!java -version
install_java()

openjdk version "17.0.16" 2025-07-15
OpenJDK Runtime Environment (build 17.0.16+8-Ubuntu-0ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 17.0.16+8-Ubuntu-0ubuntu122.04.1, mixed mode, sharing)


In [None]:
!apt-get update

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to cloud.r-project.or                                                                               Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
                                                                               Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
0% [2 InRelease 15.6 kB/128 kB 12%] [3 InRelease 43.1 kB/129 kB 33%] [Waiting f                                                                               Get:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
0% [2 InRelease 21.4 kB/128 kB 17%] [3 InRelease 54.7 kB/129 kB 42%] [4 InRelea0% [2 InRelease 21.4 kB/128 kB 17%] [3 InRelease 57.6 kB/129 kB 45%] [Connected                                                                               Hit:5 https://cli.github.com/packages stabl

In [None]:
# Read data into df
inp = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/content/Input/US_Sales_Datasets.csv")

In [None]:
print(" Total records : ", inp.count() , " \n\n Sample records: \n")
for i in inp.take(5):
  print(i)

 Total records :  9648  

 Sample records: 

Row(Retailer='Foot Locker', Retailer ID=1185732, Invoice Date='01-01-2020', Region='Northeast', State='New York', City='New York', Product="Men's Street Footwear", Price per Unit=50, Units Sold='1,200', Total Sales='6,00,000', Operating Profit='3,00,000', Operating Margin='50%', Sales Method='In-store')
Row(Retailer='Foot Locker', Retailer ID=1185732, Invoice Date='02-01-2020', Region='Northeast', State='New York', City='New York', Product="Men's Athletic Footwear", Price per Unit=50, Units Sold='1,000', Total Sales='5,00,000', Operating Profit='1,50,000', Operating Margin='30%', Sales Method='In-store')
Row(Retailer='Foot Locker', Retailer ID=1185732, Invoice Date='03-01-2020', Region='Northeast', State='New York', City='New York', Product="Women's Street Footwear", Price per Unit=40, Units Sold='1,000', Total Sales='4,00,000', Operating Profit='1,40,000', Operating Margin='35%', Sales Method='In-store')
Row(Retailer='Foot Locker', Retailer

In [None]:
inp.printSchema()

root
 |-- Retailer: string (nullable = true)
 |-- Retailer ID: integer (nullable = true)
 |-- Invoice Date: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- State: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Price per Unit: integer (nullable = true)
 |-- Units Sold: string (nullable = true)
 |-- Total Sales: string (nullable = true)
 |-- Operating Profit: string (nullable = true)
 |-- Operating Margin: string (nullable = true)
 |-- Sales Method: string (nullable = true)



In [None]:
#convert string date column into date object
rawDataDF1 = inp.withColumn("Invoice Date", to_date(col("Invoice Date"), "dd-MM-yyyy"))

for i in rawDataDF1.take(5):
  print(i)

Row(Retailer='Foot Locker', Retailer ID=1185732, Invoice Date=datetime.date(2020, 1, 1), Region='Northeast', State='New York', City='New York', Product="Men's Street Footwear", Price per Unit=50, Units Sold='1,200', Total Sales='6,00,000', Operating Profit='3,00,000', Operating Margin='50%', Sales Method='In-store')
Row(Retailer='Foot Locker', Retailer ID=1185732, Invoice Date=datetime.date(2020, 1, 2), Region='Northeast', State='New York', City='New York', Product="Men's Athletic Footwear", Price per Unit=50, Units Sold='1,000', Total Sales='5,00,000', Operating Profit='1,50,000', Operating Margin='30%', Sales Method='In-store')
Row(Retailer='Foot Locker', Retailer ID=1185732, Invoice Date=datetime.date(2020, 1, 3), Region='Northeast', State='New York', City='New York', Product="Women's Street Footwear", Price per Unit=40, Units Sold='1,000', Total Sales='4,00,000', Operating Profit='1,40,000', Operating Margin='35%', Sales Method='In-store')
Row(Retailer='Foot Locker', Retailer ID=11

In [None]:
rawDataDF2 = rawDataDF1.withColumn("Gender", split(col("Product"), "'s ").getItem(0)).withColumn("Category", split(col("Product"), "'s ").getItem(1))

rawDataDF3 = rawDataDF2.withColumn("Units Sold", regexp_replace(col("Units Sold"),",","")).withColumn("Units Sold", col("Units Sold").cast("Integer"))

rawDataDF4 = rawDataDF3.withColumn("Operating Margin", regexp_replace(col("Operating Margin"),"%","")).withColumn("Operating Margin", col("Operating Margin").cast("Integer"))

rawDataDF5 = rawDataDF4.withColumn("Total Sales", col("Units Sold")*col("Price per Unit")).withColumn("Total Sales", col("Total Sales").cast("Double"))

rawDataDF6 = rawDataDF5.withColumn("Operating Profit", col("Total Sales")*col("Operating Margin")/100  ).withColumn("Operating Profit", col("Operating Profit").cast("Double"))

rawDataDF7 = rawDataDF6.withColumnRenamed("Invoice Date","Invoice_Date").withColumnRenamed("Total Sales","Total_Sales").withColumnRenamed("Operating Profit","Operating_Profit").withColumnRenamed("Units Sold","Units_Sold")

In [None]:
print(" Sample records: \n")
for i in rawDataDF7.take(5):
  print(i)

 Sample records: 

Row(Retailer='Foot Locker', Retailer ID=1185732, Invoice_Date=datetime.date(2020, 1, 1), Region='Northeast', State='New York', City='New York', Product="Men's Street Footwear", Price per Unit=50, Units_Sold=1200, Total_Sales=60000.0, Operating_Profit=30000.0, Operating Margin=50, Sales Method='In-store', Gender='Men', Category='Street Footwear')
Row(Retailer='Foot Locker', Retailer ID=1185732, Invoice_Date=datetime.date(2020, 1, 2), Region='Northeast', State='New York', City='New York', Product="Men's Athletic Footwear", Price per Unit=50, Units_Sold=1000, Total_Sales=50000.0, Operating_Profit=15000.0, Operating Margin=30, Sales Method='In-store', Gender='Men', Category='Athletic Footwear')
Row(Retailer='Foot Locker', Retailer ID=1185732, Invoice_Date=datetime.date(2020, 1, 3), Region='Northeast', State='New York', City='New York', Product="Women's Street Footwear", Price per Unit=40, Units_Sold=1000, Total_Sales=40000.0, Operating_Profit=14000.0, Operating Margin=35

In [None]:
rawDataDF7.printSchema()

root
 |-- Retailer: string (nullable = true)
 |-- Retailer ID: integer (nullable = true)
 |-- Invoice_Date: date (nullable = true)
 |-- Region: string (nullable = true)
 |-- State: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Price per Unit: integer (nullable = true)
 |-- Units_Sold: integer (nullable = true)
 |-- Total_Sales: double (nullable = true)
 |-- Operating_Profit: double (nullable = true)
 |-- Operating Margin: integer (nullable = true)
 |-- Sales Method: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Category: string (nullable = true)



## Total Sales, Total Profit, Average price per unit and Total units sold, Cost of Sales, Net Operating Margin â€“ Year wise

In [33]:
rawDataDF7.registerTempTable("sales")

q1 = spark.sql("""
    SELECT
        YEAR(Invoice_Date) AS Year,
        ROUND(SUM(Total_Sales) / 1000000, 2) AS Total_Sales_in_Mill,
        ROUND(SUM(Operating_Profit) / 1000000, 2) AS Total_Profit_in_Mill,
        CASE WHEN SUM(Units_Sold) > 0 THEN ROUND(SUM(Total_Sales) / SUM(Units_Sold), 2) ELSE 0 END AS Avg_Selling_Price,
        SUM(Units_Sold) AS Total_Units_Sold,
        ROUND((SUM(Total_Sales) - SUM(Operating_Profit)) / 1000000, 2) AS Cost_of_Sales_in_Mill,
        CASE WHEN SUM(Total_Sales) > 0 THEN ROUND((SUM(Operating_Profit) / SUM(Total_Sales)) * 100, 2) ELSE 0 END AS Net_Operating_Margin
    FROM sales
    GROUP BY YEAR(Invoice_Date)
    ORDER BY Year
""")

In [34]:
print(" Total sales by retailer: \n\n ")
q1.show(5)

 Total sales by retailer: 

 
+----+-------------------+--------------------+-----------------+----------------+---------------------+--------------------+
|Year|Total_Sales_in_Mill|Total_Profit_in_Mill|Avg_Selling_Price|Total_Units_Sold|Cost_of_Sales_in_Mill|Net_Operating_Margin|
+----+-------------------+--------------------+-----------------+----------------+---------------------+--------------------+
|2020|              24.24|                9.02|            52.42|          462349|                15.22|                37.2|
|2021|              95.93|               38.21|            47.57|         2016512|                57.72|               39.83|
+----+-------------------+--------------------+-----------------+----------------+---------------------+--------------------+



## Total sales by month

In [None]:
q2 = spark.sql("""
    SELECT
        DATE_FORMAT(Invoice_Date, 'MMMM') AS Month_Name,
        ROUND(SUM(Total_Sales), 2) AS Total_Sales
    FROM sales
    GROUP BY DATE_FORMAT(Invoice_Date, 'MMMM'), MONTH(Invoice_Date)
    ORDER BY MONTH(Invoice_Date)
""")

print("Total sales per month: \n \n")
q2.show()

Total sales per month: 
 

+----------+-----------+
|Month_Name|Total_Sales|
+----------+-----------+
|   January|  9744767.0|
|  February|  8263853.0|
|     March|  7694984.0|
|     April|  9691420.0|
|       May| 1.074172E7|
|      June|  9803147.0|
|      July|1.2550419E7|
|    August|1.2293226E7|
| September|1.0405584E7|
|   October|  8538758.0|
|  November|  9023440.0|
|  December|1.1415332E7|
+----------+-----------+



## Total sales by state

In [None]:
q3 = spark.sql("""
    SELECT
        State,
        ROUND(SUM(Total_Sales), 2) AS Total_Sales
    FROM sales
    GROUP BY State
    ORDER BY Total_Sales DESC
""")

print("Total sales per state: \n \n")
q3.show()

Total sales per state: 
 

+--------------+-----------+
|         State|Total_Sales|
+--------------+-----------+
|      New York|  8670464.0|
|    California|  8580508.0|
|       Florida|  7820589.0|
|         Texas|  6612371.0|
|South Carolina|  3593112.0|
|     Louisiana|  3377031.0|
|    Washington|  3222093.0|
|      Virginia|  3074415.0|
|        Oregon|  3047049.0|
|        Nevada|  2981134.0|
|North Carolina|  2936581.0|
|    New Mexico|  2824641.0|
|         Idaho|  2742753.0|
|        Hawaii|  2734457.0|
|       Georgia|  2708591.0|
|      Colorado|  2569036.0|
|     Tennessee|  2567190.0|
|       Alabama|  2513424.0|
| New Hampshire|  2339267.0|
|      Michigan|  2287283.0|
+--------------+-----------+
only showing top 20 rows



## Total sales by sales method

In [None]:
q4 = spark.sql("""
    SELECT
        `Sales Method`,
        ROUND(SUM(Total_Sales)/1000000, 2) AS Total_Sales_in_Millions
    FROM sales
    GROUP BY `Sales Method`
    ORDER BY Total_Sales_in_Millions DESC
""")


print("Total sales per sales method: \n \n")
q4.show()

Total sales per sales method: 
 

+------------+-----------------------+
|Sales Method|Total_Sales_in_Millions|
+------------+-----------------------+
|      Online|                  44.97|
|      Outlet|                  39.54|
|    In-store|                  35.66|
+------------+-----------------------+



## Total sales by region

In [None]:
q5 = spark.sql("""
    SELECT
        Region,
        ROUND(SUM(Total_Sales)/1000000, 2) AS Total_Sales_in_millions
    FROM sales
    GROUP BY Region
    ORDER BY Total_Sales_in_millions DESC
""")

print("Total sales per region: \n \n")
q5.show()

Total sales per region: 
 

+---------+-----------------------+
|   Region|Total_Sales_in_millions|
+---------+-----------------------+
|     West|                  36.44|
|Northeast|                  25.08|
|Southeast|                  21.37|
|    South|                   20.6|
|  Midwest|                  16.67|
+---------+-----------------------+



## Total sales by product

In [None]:
q6 = spark.sql("""
    SELECT
        Product,
        ROUND(SUM(Total_Sales)/1000000, 2) AS Total_Sales_in_millions
    FROM sales
    GROUP BY Product
    ORDER BY Total_Sales_in_millions DESC
""")

print("Total sales per Product: \n \n")
q6.show()

Total sales per Product: 
 

+--------------------+-----------------------+
|             Product|Total_Sales_in_millions|
+--------------------+-----------------------+
|Men's Street Foot...|                  27.68|
|     Women's Apparel|                  23.87|
|Men's Athletic Fo...|                  20.58|
|Women's Street Fo...|                   17.2|
|       Men's Apparel|                  16.52|
|Women's Athletic ...|                  14.32|
+--------------------+-----------------------+

