Import Data

In [0]:
df = spark.read.csv("/FileStore/tables/Superstore.csv", header=True, inferSchema=True)
df.show(5)

+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|  Customer Name|  Segment|      Country|           City|     State|Postal Code|Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount|  Profit|
+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|     1|CA-2016-152156|2016-11-08|2016-11-11|  Second Class|   CG-12520|    Claire Gute| Consumer|United States|      Henderson|  Kentucky|      42420| South|FUR-BO-10001798|      Furniture|   Bookcases|Bush Somerset 

View Schemas and Columns

In [0]:
df.printSchema()

root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- Ship Date: date (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: double (nullable = true)



Check for Null Values

In [0]:
from pyspark.sql.functions import col

df.select([col(c).isNull().alias(c) for c in df.columns]).show(5)

+------+--------+----------+---------+---------+-----------+-------------+-------+-------+-----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|Row ID|Order ID|Order Date|Ship Date|Ship Mode|Customer ID|Customer Name|Segment|Country| City|State|Postal Code|Region|Product ID|Category|Sub-Category|Product Name|Sales|Quantity|Discount|Profit|
+------+--------+----------+---------+---------+-----------+-------------+-------+-------+-----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
| false|   false|     false|    false|    false|      false|        false|  false|  false|false|false|      false| false|     false|   false|       false|       false|false|   false|   false| false|
| false|   false|     false|    false|    false|      false|        false|  false|  false|false|false|      false| false|     false|   false|       false|       false|false|   false|   false| false|
| fal

Dropping Null Values

In [0]:
df_clean = df.dropna()

Sales Trend Over Time

In [0]:
from pyspark.sql.functions import to_date, month, year

# Convert order date to actual date
df_clean = df_clean.withColumn("Order_Date", to_date(col("Order Date"), "MM/dd/yyyy"))

# Group sales by month
monthly_sales = df_clean.groupBy(year("Order_Date").alias("Year"), month("Order_Date").alias("Month")).sum("Sales")
monthly_sales.orderBy("Year", "Month").show()

+----+-----+------------------+
|Year|Month|        sum(Sales)|
+----+-----+------------------+
|2014|    1|14161.348999999998|
|2014|    2|          4119.816|
|2014|    3| 55526.19900000002|
|2014|    4|28139.560999999994|
|2014|    5|         23634.667|
|2014|    6| 34508.99560000003|
|2014|    7| 33500.87299999999|
|2014|    8| 27603.51249999999|
|2014|    9| 81496.80679999998|
|2014|   10|31394.940999999988|
|2014|   11| 78297.24069999997|
|2014|   12| 69379.83649999999|
|2015|    1|18085.115599999994|
|2015|    2|11924.271999999999|
|2015|    3|38621.291999999994|
|2015|    4|32640.482499999987|
|2015|    5|29325.970500000003|
|2015|    6|24659.684000000005|
|2015|    7| 28524.52099999999|
|2015|    8|36380.928199999995|
+----+-----+------------------+
only showing top 20 rows



Top-Selling Products / Categories

In [0]:
df_clean.groupBy("Sub-Category").sum("Sales").orderBy("sum(Sales)", ascending=False).show()

+------------+------------------+
|Sub-Category|        sum(Sales)|
+------------+------------------+
|      Phones| 329753.0880000001|
|      Chairs|328449.10300000076|
|     Storage|216803.21200000012|
|      Tables| 206965.5320000001|
|     Binders|199905.71700000006|
|    Machines|189238.63099999996|
| Accessories| 167380.3180000001|
|     Copiers|149528.02999999994|
|   Bookcases|114879.99629999997|
|  Appliances|        107532.161|
| Furnishings| 82752.23000000004|
|       Paper| 75356.11799999999|
|    Supplies| 45952.47000000001|
|         Art|27118.791999999954|
|   Envelopes|15339.489999999993|
|      Labels|         12486.312|
|   Fasteners|3008.6559999999995|
+------------+------------------+



Profitability Analysis

In [0]:
df_clean = df_clean.withColumn("Profit", col("Profit").cast("double"))
df_clean.groupBy("Category").sum("Profit").orderBy("sum(Profit)", ascending=False).show()

+---------------+------------------+
|       Category|       sum(Profit)|
+---------------+------------------+
|     Technology|145388.29659999989|
|Office Supplies|120632.87839999991|
|      Furniture| 19686.42720000003|
+---------------+------------------+



Sales by Region

In [0]:
from pyspark.sql.functions import col

# Convert "Sales" to numeric (double) before grouping
df_clean = df_clean.withColumn("Sales", col("Sales").cast("double"))

# Now perform group by Region
df_clean.groupBy("Region").sum("Sales").show()

+-------+------------------+
| Region|        sum(Sales)|
+-------+------------------+
|  South|388983.58500000037|
|Central| 497800.8728000007|
|   East| 672194.0539999981|
|   West| 713471.3445000004|
+-------+------------------+



Average order value

In [0]:
from pyspark.sql.functions import avg

df_clean.groupBy("Customer ID").agg(
    avg("Sales").alias("Avg_Order_Value"),
    avg("Profit").alias("Avg_Profit")
).orderBy("Avg_Order_Value", ascending=False).show()

+-----------+------------------+-------------------+
|Customer ID|   Avg_Order_Value|         Avg_Profit|
+-----------+------------------+-------------------+
|   MW-18235|1751.2920000000001|            555.172|
|   TC-20980|1728.8952727272726|  747.0402166666668|
|   SM-20320|1669.5366666666666|-132.04928666666666|
|   GT-14635| 1558.535333333333| -684.7764833333334|
|   TA-21385|1459.5620000000001| 470.37882999999994|
|   HL-15040|1170.2998181818182| 511.12992727272734|
|   BS-11365| 1166.850333333333|  290.6738222222222|
|   CC-12370|1102.6429090909091| 197.91357272727274|
|   SH-20635|          1048.196|           130.3129|
|   IM-15055| 949.4533333333334| 241.75822499999998|
|   CM-12385|           895.402|          389.98904|
|   JR-15700|            863.88|            107.985|
|   TS-21370| 848.9907857142856| 157.92221333333333|
|   RB-19360| 839.8521666666667|  387.5608833333333|
|   BM-11140| 829.2785714285716|      -106.96450625|
|   KC-16540|          781.6524| 141.884380000

Customer Segementation

In [0]:
df_clean.groupBy("Customer ID").sum("Sales").orderBy("sum(Sales)", ascending=False).show(10)

+-----------+------------------+
|Customer ID|        sum(Sales)|
+-----------+------------------+
|   SM-20320|          25043.05|
|   TC-20980|19017.847999999998|
|   RB-19360|         15117.339|
|   TA-21385|          14595.62|
|   AB-10105|14355.610999999997|
|   SC-20095|14142.333999999999|
|   KL-16645|         14071.917|
|   HL-15040|12873.297999999999|
|   SE-20110|12209.438000000002|
|   CC-12370|         12129.072|
+-----------+------------------+
only showing top 10 rows



Repeat Customer behaviour

In [0]:
from pyspark.sql.functions import countDistinct

df_clean.groupBy("Customer ID").agg(
    countDistinct("Order ID").alias("Total_Orders")
).orderBy("Total_Orders", ascending=False).show()

+-----------+------------+
|Customer ID|Total_Orders|
+-----------+------------+
|   EP-13915|          17|
|   SH-19975|          13|
|   CK-12205|          13|
|   EA-14035|          13|
|   NS-18640|          13|
|   PG-18820|          13|
|   ZC-21910|          13|
|   JE-15745|          13|
|   PK-19075|          12|
|   KB-16585|          12|
|   CL-12565|          12|
|   SJ-20125|          12|
|   HG-14965|          12|
|   AH-10690|          12|
|   SM-20950|          12|
|   DK-12835|          12|
|   BP-11095|          12|
|   LC-16885|          12|
|   CC-12220|          12|
|   RB-19465|          12|
+-----------+------------+
only showing top 20 rows

