In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PysparkProjectSalesAnalysis").getOrCreate()
spark

In [2]:
df = spark.read.csv("./sales_data_sample.csv",header=True,inferSchema=True)
# print(df)
df.show()

+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+--------+----------+---------+---------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES|      ORDERDATE| STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|           PHONE|        ADDRESSLINE1|ADDRESSLINE2|         CITY|   STATE|POSTALCODE|  COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+--------+----------+---------+---------+---------------+----------------+--------+
|      10107|             30|     95.7|              2| 2871.0| 2/24/2003

In [3]:
col=df.columns
print(col)

['ORDERNUMBER', 'QUANTITYORDERED', 'PRICEEACH', 'ORDERLINENUMBER', 'SALES', 'ORDERDATE', 'STATUS', 'QTR_ID', 'MONTH_ID', 'YEAR_ID', 'PRODUCTLINE', 'MSRP', 'PRODUCTCODE', 'CUSTOMERNAME', 'PHONE', 'ADDRESSLINE1', 'ADDRESSLINE2', 'CITY', 'STATE', 'POSTALCODE', 'COUNTRY', 'TERRITORY', 'CONTACTLASTNAME', 'CONTACTFIRSTNAME', 'DEALSIZE']


In [4]:
# KPIs
# Sales Performance
# 1. Total Sales: Sum of all `SALES`.
# 2. Average Sales per Order: Average of `SALES` per order.
# 3. Sales by Product Line: Sum of `SALES` for each `PRODUCTLINE`.
# 4. Sales by Territory: Sum of `SALES` for each `TERRITORY`.
# 5. Monthly Sales: Sum of `SALES` grouped by `MONTH_ID` and `YEAR_ID`.

# Order Performance
# 6. Total Orders: Count of all unique `ORDERNUMBER`.
# 7. Average Order Value (AOV): Total `SALES` divided by the number of orders.
# 8. Order Quantity per Customer: Average of `QUANTITYORDERED` per customer.

# Customer Metrics
# 9. Top Customers by Sales: Customers with the highest total `SALES`.
# 10. Customer Order Frequency: Count of orders per customer.
# 11. Average Sales per Customer: Average of `SALES` per customer.

# Product Performance
# 12. Top-Selling Products: Products with the highest total `QUANTITYORDERED`.
# 13. Sales by Product: Total `SALES` for each `PRODUCTCODE`.

# Time-Based Metrics
# 14. Quarterly Sales: Sum of `SALES` grouped by `QTR_ID` and `YEAR_ID`.
# 15. Yearly Sales Growth: Percentage growth in `SALES` year over year.

# Operational Metrics
# 16. Order Status Distribution: Count of orders in each `STATUS` category.

In [5]:
#  Total Sales: Sum of all `SALES` group By country
from pyspark.sql.functions import sum as _sum

TotalSalesByCountry = df.groupBy("COUNTRY").agg(_sum("SALES").alias("Total Sales By Country"))
TotalSalesByCountry.show()

+-----------+----------------------+
|    COUNTRY|Total Sales By Country|
+-----------+----------------------+
|     Sweden|             210014.21|
|Philippines|              94015.73|
|  Singapore|    288488.41000000003|
|    Germany|    220472.08999999994|
|     France|    1110916.5199999993|
|    Belgium|             108412.62|
|    Finland|     329581.9100000001|
|      Italy|    374674.30999999976|
|     Norway|     307463.7000000001|
|      Spain|    1215686.9200000009|
|    Denmark|             245637.15|
|    Ireland|              57756.43|
|        USA|            3627982.83|
|         UK|     478880.4600000001|
|Switzerland|    117713.55999999998|
|     Canada|    224078.55999999994|
|      Japan|    188167.80999999997|
|  Australia|     630623.1000000001|
|    Austria|             202062.53|
+-----------+----------------------+



In [6]:
#Average Sales Group By Country

from pyspark.sql.functions import avg

TotalAvgViaCountry = df.groupBy("COUNTRY").agg(avg("SALES").alias("Total AVG"))
TotalAvgViaCountry.show()

+-----------+------------------+
|    COUNTRY|         Total AVG|
+-----------+------------------+
|     Sweden|3684.4598245614034|
|Philippines|3615.9896153846153|
|  Singapore| 3651.752025316456|
|    Germany| 3556.001451612902|
|     France|3537.9507006369404|
|    Belgium| 3285.230909090909|
|    Finland|3582.4120652173924|
|      Italy|3315.7018584070775|
|     Norway|3617.2200000000016|
|      Spain|3554.6401169590667|
|    Denmark| 3899.002380952381|
|    Ireland|       3609.776875|
|        USA|3613.5287151394423|
|         UK|3325.5587500000006|
|Switzerland| 3797.211612903225|
|     Canada|3201.1222857142848|
|      Japan|3618.6117307692302|
|  Australia| 3408.773513513514|
|    Austria| 3673.864181818182|
+-----------+------------------+



In [7]:
# Total Sales Product 
TotalSalesViaProduct = df.groupBy("PRODUCTLINE").sum("SALES")
TotalSalesViaProduct.show()

+----------------+------------------+
|     PRODUCTLINE|        sum(SALES)|
+----------------+------------------+
|     Motorcycles|1166388.3400000003|
|    Vintage Cars|1903150.8399999992|
|           Ships|         714437.13|
|Trucks and Buses|1127789.8399999996|
|    Classic Cars| 3919615.659999997|
|          Trains|226243.46999999997|
|          Planes| 975003.5700000001|
+----------------+------------------+



In [14]:
#Total Sales for each Territory

TotalSalesForeacgTerritory = df.groupBy("Territory").sum("SALES").alias("Sales Via Territory")
TotalSalesForeacgTerritory.show()


+---------+------------------+
|Territory|        sum(SALES)|
+---------+------------------+
|       NA| 3852061.390000001|
|     APAC| 746121.8300000002|
|    Japan| 455173.2200000002|
|     EMEA|4979272.4099999955|
+---------+------------------+



In [None]:
# Total Sales Via Month Sales

MonthlySales = df.groupBy("YEAR_ID","MONTH_ID").sum("SALES")
MonthlySales.show()

+-------+--------+------------------+
|YEAR_ID|MONTH_ID|        sum(SALES)|
+-------+--------+------------------+
|   2005|       5|457861.05999999965|
|   2004|       6|         286674.22|
|   2003|       2|140836.19000000003|
|   2004|       8| 461501.2700000001|
|   2003|      10|         568290.97|
|   2004|      10|         552924.25|
|   2003|       3|          174504.9|
|   2003|      12|261876.46000000005|
|   2003|       9|         263973.36|
|   2003|       1|          129753.6|
|   2004|       7| 327144.0899999998|
|   2004|       4|206148.12000000008|
|   2005|       1|         339543.42|
|   2003|      11|1029837.6600000001|
|   2005|       3| 374262.7599999999|
|   2004|       2| 311419.5299999999|
|   2003|       5|         192673.11|
|   2003|       7|187731.87999999998|
|   2004|       9|320750.91000000003|
|   2003|       4|201609.55000000002|
+-------+--------+------------------+
only showing top 20 rows



In [None]:
# Order Performance
# Total Orders: Count of all unique ORDERNUMBER.
# Average Order Value (AOV): Total SALES divided by the number of orders.
# Order Quantity per Customer: Average of QUANTITYORDERED per customer.

In [None]:
# Total Orders: Count of all unique ORDERNUMBER.
ORDERNUMBERCount = df.groupBy("ORDERNUMBER").count()
ORDERNUMBERCount.show()

+-----------+-----+
|ORDERNUMBER|count|
+-----------+-----+
|      10206|   11|
|      10362|    4|
|      10121|    5|
|      10230|    8|
|      10395|    4|
|      10416|   14|
|      10257|    5|
|      10264|    7|
|      10128|    4|
|      10183|   12|
|      10143|   16|
|      10366|    3|
|      10267|    6|
|      10293|    9|
|      10195|   10|
|      10303|    2|
|      10162|   10|
|      10168|   18|
|      10334|    6|
|      10371|   12|
+-----------+-----+
only showing top 20 rows



In [None]:
# Average Order Value (AOV): Total SALES divided by the number of orders.
from pyspark.sql.functions import count
AverageOrderValue = df.groupBy("ORDERNUMBER").agg(_sum("SALES")/count("ORDERNUMBER").alias("AverageValue"))
AverageOrderValue.show()

+-----------+-------------------------------------------------+
|ORDERNUMBER|(sum(SALES) / count(ORDERNUMBER) AS AverageValue)|
+-----------+-------------------------------------------------+
|      10206|                               3514.7463636363636|
|      10362|                                        3382.3925|
|      10121|                                         3794.392|
|      10230|                                       4658.31125|
|      10395|                                        5080.3825|
|      10416|                               2964.9957142857143|
|      10257|                                          3225.62|
|      10264|                                2792.621428571428|
|      10128|                                          4362.02|
|      10183|                               3338.4716666666664|
|      10143|                                      2858.649375|
|      10366|                                          5542.72|
|      10267|                           

In [28]:
# Order Quantity per Customer: Average of QUANTITYORDERED per customer.
OrderQualityPerCustomerAverage = df.groupby(["CUSTOMERNAME"]).agg(avg("QUANTITYORDERED").alias("Average Quantity Ordered")).show()
# df.select("QUANTITYORDERED").show()


+--------------------+------------------------+
|        CUSTOMERNAME|Average Quantity Ordered|
+--------------------+------------------------+
| Suominen Souveniers|       34.36666666666667|
|  Amica Models & Co.|       32.42307692307692|
|Collectables For ...|                  33.125|
|         CAF Imports|                    36.0|
|   giftsbymail.co.uk|       34.42307692307692|
|       Rovelli Gifts|                  34.375|
|     Lyon Souveniers|                    34.2|
|   La Rochelle Gifts|       34.56603773584906|
| L'ordine Souveniers|       32.82051282051282|
|Signal Collectibl...|      34.266666666666666|
|     Vitachrome Inc.|                   31.48|
|Volvo Model Repli...|       34.05263157894737|
|Daedalus Designs ...|                   34.95|
|Classic Legends Inc.|                    36.0|
|     Mini Wheels Co.|       32.95238095238095|
|  Signal Gift Stores|       32.03448275862069|
|La Corne D'abonda...|       36.34782608695652|
|Royal Canadian Co...|       33.57692307

In [None]:
# Customer Metrics
# Top Customers by Sales: Customers with the highest total SALES.
# Customer Order Frequency: Count of orders per customer.
# Average Sales per Customer: Average of SALES per customer.

In [35]:
# Top Customers by Sales: Customers with the highest total SALES

Total_Sales_by_Customer = df.groupby("CUSTOMERNAME").agg(avg("SALES").alias("Total Sales"))
total_customer_by_sales = Total_Sales_by_Customer.orderBy("Total Sales",ascending=False)
total_customer_by_sales.show()
# print(df.columns)

+--------------------+------------------+
|        CUSTOMERNAME|       Total Sales|
+--------------------+------------------+
|    Super Scale Inc.| 4674.827647058824|
|         Mini Caravy| 4233.604210526316|
|La Corne D'abonda...|  4226.24695652174|
|        Royale Belge|         4180.0125|
|  Muscle Machine Inc| 4119.519583333333|
|     Gift Depot Inc.|4075.7916000000005|
|UK Collectables, ...| 4069.250689655172|
|Danish Wholesale ...|4028.9333333333334|
|Dragon Souveniers...|  4023.01581395349|
|The Sharp Gifts W...| 4000.256749999999|
|Volvo Model Repli...|3987.0989473684212|
|Australian Gift N...|3964.6079999999993|
|Tekni Collectable...| 3963.247142857143|
|Diecast Classics ...|3939.9400000000005|
|Diecast Collectables|3936.6544444444444|
|     Lyon Souveniers|3928.5170000000007|
|Classic Legends Inc.|3889.7599999999998|
|Online Diecast Cr...|  3873.09705882353|
|Blauer See Auto, Co.|3871.4359090909084|
|Toms Spezialitten...|3857.9453846153847|
+--------------------+------------

In [42]:
# Customer Order Frequency: Count of orders per customer.
from pyspark.sql.functions import count
CountOrderPerCustomer = df.groupBy("CUSTOMERNAME").agg(count("ORDERNUMBER").alias("Order Count"))
CountOrderPerCustomer.show()
# print(df.columns)

+--------------------+-----------+
|        CUSTOMERNAME|Order Count|
+--------------------+-----------+
| Suominen Souveniers|         30|
|  Amica Models & Co.|         26|
|Collectables For ...|         24|
|         CAF Imports|         13|
|   giftsbymail.co.uk|         26|
|       Rovelli Gifts|         48|
|     Lyon Souveniers|         20|
|   La Rochelle Gifts|         53|
| L'ordine Souveniers|         39|
|Signal Collectibl...|         15|
|     Vitachrome Inc.|         25|
|Volvo Model Repli...|         19|
|Daedalus Designs ...|         20|
|Classic Legends Inc.|         20|
|     Mini Wheels Co.|         21|
|  Signal Gift Stores|         29|
|La Corne D'abonda...|         23|
|Royal Canadian Co...|         26|
|Online Diecast Cr...|         34|
|     Cruz & Sons Co.|         26|
+--------------------+-----------+
only showing top 20 rows



In [47]:
# Average Sales per Customer: Average of SALES per customer.
AvgSalesPerCustomer = df.groupBy("CUSTOMERNAME").agg(avg("SALES").alias("AvgSalesPerCustomer"))
AvgSalesPerCustomer.show()
# print(df.columns)

+--------------------+-------------------+
|        CUSTOMERNAME|AvgSalesPerCustomer|
+--------------------+-------------------+
| Suominen Souveniers|  3798.704999999999|
|  Amica Models & Co.| 3619.8946153846164|
|Collectables For ...|          3399.0825|
|         CAF Imports|  3818.619230769231|
|   giftsbymail.co.uk| 3009.2630769230764|
|       Rovelli Gifts|  2874.077500000001|
|     Lyon Souveniers| 3928.5170000000007|
|   La Rochelle Gifts| 3398.5830188679242|
| L'ordine Souveniers| 3656.4443589743596|
|Signal Collectibl...| 3347.9006666666673|
|     Vitachrome Inc.| 3521.6504000000004|
|Volvo Model Repli...| 3987.0989473684212|
|Daedalus Designs ...|          3452.6205|
|Classic Legends Inc.| 3889.7599999999998|
|     Mini Wheels Co.| 3546.4847619047614|
|  Signal Gift Stores|   2853.48551724138|
|La Corne D'abonda...|   4226.24695652174|
|Royal Canadian Co...| 2870.5711538461537|
|Online Diecast Cr...|   3873.09705882353|
|     Cruz & Sons Co.| 3615.9896153846153|
+----------

In [None]:
# Product Performance
# Top-Selling Products: Products with the highest total QUANTITYORDERED.
# Sales by Product: Total SALES for each PRODUCTCODE.

In [51]:
# Top-Selling Products: Products with the highest total QUANTITYORDERED.
# print(df.columns)

TotalSellingProducts = df.groupBy("PRODUCTCODE").agg(_sum("QUANTITYORDERED").alias("Total Selling Products"))
TotalSellingProducts.show()




+-----------+----------------------+
|PRODUCTCODE|Total Selling Products|
+-----------+----------------------+
|   S18_4600|                  1031|
|   S18_1749|                   802|
|   S12_3891|                   921|
|   S18_2248|                   743|
|  S700_1138|                   902|
|   S32_1268|                   873|
|   S12_1099|                   838|
|   S18_2795|                   789|
|   S24_1937|                   844|
|   S32_3522|                   957|
|   S18_1097|                   999|
|   S18_1662|                   940|
|   S12_1666|                   972|
|   S24_3969|                   745|
|   S24_1578|                   931|
|   S24_4048|                   844|
|   S18_3320|                   909|
|   S24_3816|                   870|
|   S18_3136|                   873|
|   S32_2509|                   955|
+-----------+----------------------+
only showing top 20 rows



In [52]:
# Sales by Product: Total  for each PRODUCTCODE.
TotalSalesForEachProductCode = df.groupBy("PRODUCTCODE").agg(_sum("SALES").alias("Total Sales for each product code"))
TotalSalesForEachProductCode.show()

+-----------+---------------------------------+
|PRODUCTCODE|Total Sales for each product code|
+-----------+---------------------------------+
|   S18_4600|               123723.07999999999|
|   S18_1749|                        127310.42|
|   S12_3891|                        145332.04|
|   S18_2248|               52161.649999999994|
|  S700_1138|                64362.16999999999|
|   S32_1268|                         90826.41|
|   S12_1099|                        137177.01|
|   S18_2795|                         125199.3|
|   S24_1937|                         47981.09|
|   S32_3522|                77278.62000000001|
|   S18_1097|                        106086.56|
|   S18_1662|                        139421.97|
|   S12_1666|                        136692.72|
|   S24_3969|                         33181.66|
|   S24_1578|                         99240.03|
|   S24_4048|                99789.06999999999|
|   S18_3320|                99535.45999999999|
|   S24_3816|                70170.62000

In [None]:
# Time-Based Metrics
# Quarterly Sales: Sum of SALES grouped by QTR_ID and YEAR_ID.
# Yearly Sales Growth: Percentage growth in SALES year over year.

In [58]:
# Quarterly Sales: Sum of SALES grouped by QTR_ID and YEAR_ID.
QuarterlySales = df.groupBy(["QTR_ID","YEAR_ID"]).agg(_sum("SALES").alias("Quaterly Sales"))
QuarterlySales = QuarterlySales.orderBy(["YEAR_ID","QTR_ID"])
QuarterlySales.show()

+------+-------+------------------+
|QTR_ID|YEAR_ID|    Quaterly Sales|
+------+-------+------------------+
|     1|   2003| 445094.6900000002|
|     2|   2003|         562365.22|
|     3|   2003| 649514.5399999999|
|     4|   2003|1860005.0899999987|
|     1|   2004| 833730.6800000005|
|     2|   2004| 766260.7299999996|
|     3|   2004|1109396.2700000005|
|     4|   2004|2014774.9199999995|
|     1|   2005|1071992.3600000003|
|     2|   2005| 719494.3500000001|
+------+-------+------------------+



In [64]:
# Yearly Sales Growth: Percentage growth in SALES year over year.

# Step 1: Calculate the total sales for each year
# yearly_sales = df.groupBy("YEAR_ID").agg(_sum("SALES").alias("Total Sales")).orderBy("YEAR_ID")

# # Step 2: Calculate the year-over-year growth percentage
# # Use the Window function to get the previous year's sales
# window_spec = Window.orderBy("YEAR_ID")
# yearly_sales_with_lag = yearly_sales.withColumn("Previous Year Sales", lag(col("Total Sales")).over(window_spec))

# # Calculate the percentage growth
# yearly_sales_growth = yearly_sales_with_lag.withColumn(
#     "Yearly Sales Growth (%)",
#     ((col("Total Sales") - col("Previous Year Sales")) / col("Previous Year Sales")) * 100
# )

# # Show the results
# yearly_sales_growth.show()
#!important

from pyspark.sql.window import Window
from pyspark.sql.functions import lag,col

yearly_sales = df.groupBy("YEAR_ID").agg(_sum("SALES").alias("Total Sales")).orderBy("YEAR_ID")

Window_Spec = Window.orderBy("YEAR_ID")
yearly_sales_with_lag = yearly_sales.withColumn("Previous Year Sales",lag(col("Total Sales")).over(Window_Spec))

yearly_sales_groth = yearly_sales_with_lag.withColumn(
    "Yearly Sales Groth (%)",
    ((col("Total Sales")-col("Previous Year Sales"))/col("Previous Year Sales"))*100
)

yearly_sales_groth.show()




+-------+-----------------+-------------------+----------------------+
|YEAR_ID|      Total Sales|Previous Year Sales|Yearly Sales Groth (%)|
+-------+-----------------+-------------------+----------------------+
|   2003|3516979.540000001|               NULL|                  NULL|
|   2004|4724162.599999997|  3516979.540000001|    34.324426578836324|
|   2005|       1791486.71|  4724162.599999997|    -62.07821657112308|
+-------+-----------------+-------------------+----------------------+



In [None]:
# Operational Metrics
# Order Status Distribution: Count of orders in each STATUS category.

In [76]:
# print(df.columns)
CountOrderInStatusCategory = df.groupBy("STATUS").agg(count("ORDERNUMBER").alias("Order Count"))
CountOrderPerCustomer.show()

+--------------------+-----------+
|        CUSTOMERNAME|Order Count|
+--------------------+-----------+
| Suominen Souveniers|         30|
|  Amica Models & Co.|         26|
|Collectables For ...|         24|
|         CAF Imports|         13|
|   giftsbymail.co.uk|         26|
|       Rovelli Gifts|         48|
|     Lyon Souveniers|         20|
|   La Rochelle Gifts|         53|
| L'ordine Souveniers|         39|
|Signal Collectibl...|         15|
|     Vitachrome Inc.|         25|
|Volvo Model Repli...|         19|
|Daedalus Designs ...|         20|
|Classic Legends Inc.|         20|
|     Mini Wheels Co.|         21|
|  Signal Gift Stores|         29|
|La Corne D'abonda...|         23|
|Royal Canadian Co...|         26|
|Online Diecast Cr...|         34|
|     Cruz & Sons Co.|         26|
+--------------------+-----------+
only showing top 20 rows

