# Segmentation and Sales Analysis

In [8]:
# Set JAVA_HOME environment variable before importing PySpark
import os
os.environ["JAVA_HOME"] = '/usr/lib/jvm/java-17-openjdk-amd64'
print(f"JAVA_HOME: {os.environ.get('JAVA_HOME')}")

from ucimlrepo import fetch_ucirepo
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


JAVA_HOME: /usr/lib/jvm/java-17-openjdk-amd64


In [26]:
online_retail = fetch_ucirepo(id=352)
online_retail_full = online_retail.data.original
spark = SparkSession.builder.appName("ecommerce").getOrCreate()
print(spark.sparkContext._gateway.jvm.System.getProperty("java.version"))


17.0.17


In [None]:
online_retail_full.to_csv("online_retail.csv", index=False)
online_retail_spark = spark.read.csv("online_retail.csv", header=True, inferSchema=True)
online_retail_spark = spark.createDataFrame(online_retail.data.original)
online_retail_spark.show(5)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
only showing top 5 rows


25/12/23 14:20:09 WARN TaskSetManager: Stage 70 contains a task of very large size (1013 KiB). The maximum recommended task size is 1000 KiB.
Traceback (most recent call last):
  File "/home/archawin/segment-sales/venv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 233, in manager
    code = worker(sock, authenticated)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/archawin/segment-sales/venv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 87, in worker
    outfile.flush()
BrokenPipeError: [Errno 32] Broken pipe


**Metadata and null checks**

In spark:

In [55]:
print(online_retail_spark.count())

print(online_retail_spark.printSchema())
print(online_retail_spark.rdd.getNumPartitions())
online_retail_spark.filter(F.col("CustomerID").isNull()).count()


25/12/23 14:59:09 WARN TaskSetManager: Stage 109 contains a task of very large size (1013 KiB). The maximum recommended task size is 1000 KiB.


541909
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)

None
32


25/12/23 14:59:09 WARN TaskSetManager: Stage 112 contains a task of very large size (1013 KiB). The maximum recommended task size is 1000 KiB.


0

In SQL

In [36]:
online_retail_spark.createOrReplaceTempView("retail")
spark.sql("SELECT COUNT (*) FROM retail WHERE CustomerID IS NULL").show()

25/12/23 14:22:59 WARN TaskSetManager: Stage 91 contains a task of very large size (1013 KiB). The maximum recommended task size is 1000 KiB.


+--------+
|count(1)|
+--------+
|       0|
+--------+



## Let us explore some basic information about the data

In [50]:
# Top countries by revenue
df_with_revenue = online_retail_spark.withColumn("Revenue", F.col("Quantity") * F.col("UnitPrice"))
top_countries = df_with_revenue.groupBy("Country") \
    .agg(F.sum("Revenue").alias("TotalRevenue")) \
    .orderBy(F.desc("TotalRevenue")) \
    .limit(5)

top_countries.show()
# Try with SQL, should match
print("SQL Output")
spark.sql("SELECT Country, SUM(Quantity * UnitPrice) as TotalRevenue FROM retail GROUP BY Country ORDER BY TotalRevenue DESC LIMIT 5").show()


25/12/23 14:51:39 WARN TaskSetManager: Stage 103 contains a task of very large size (1013 KiB). The maximum recommended task size is 1000 KiB.


+--------------+------------------+
|       Country|      TotalRevenue|
+--------------+------------------+
|United Kingdom| 8187806.364000115|
|   Netherlands|         284661.54|
|          EIRE|263276.81999999995|
|       Germany|221698.20999999993|
|        France|197403.90000000002|
+--------------+------------------+

SQL Output


25/12/23 14:51:40 WARN TaskSetManager: Stage 106 contains a task of very large size (1013 KiB). The maximum recommended task size is 1000 KiB.


+--------------+------------------+
|       Country|      TotalRevenue|
+--------------+------------------+
|United Kingdom| 8187806.364000115|
|   Netherlands|         284661.54|
|          EIRE|263276.81999999995|
|       Germany|221698.20999999993|
|        France|197403.90000000002|
+--------------+------------------+



In [60]:
# Top best-selling products by total quantity sold
top_products = online_retail_spark.groupBy("Description") \
    .agg(F.sum("Quantity").alias("TotalQuantity")).orderBy(F.desc("TotalQuantity")) \
    .limit(10)

top_products.show()

print("SQL Output")
spark.sql("SELECT Description, sum(Quantity) as TotalQuantity FROM retail GROUP BY Description ORDER BY TotalQuantity DESC LIMIT 10").show()

25/12/23 15:02:00 WARN TaskSetManager: Stage 124 contains a task of very large size (1013 KiB). The maximum recommended task size is 1000 KiB.
25/12/23 15:02:01 WARN TaskSetManager: Stage 127 contains a task of very large size (1013 KiB). The maximum recommended task size is 1000 KiB.


+--------------------+-------------+
|         Description|TotalQuantity|
+--------------------+-------------+
|WORLD WAR 2 GLIDE...|        53847|
|JUMBO BAG RED RET...|        47363|
|ASSORTED COLOUR B...|        36381|
|      POPCORN HOLDER|        36334|
|PACK OF 72 RETROS...|        36039|
|WHITE HANGING HEA...|        35317|
|  RABBIT NIGHT LIGHT|        30680|
|MINI PAINT SET VI...|        26437|
|PACK OF 12 LONDON...|        26315|
|PACK OF 60 PINK P...|        24753|
+--------------------+-------------+

SQL Output




+--------------------+-------------+
|         Description|TotalQuantity|
+--------------------+-------------+
|WORLD WAR 2 GLIDE...|        53847|
|JUMBO BAG RED RET...|        47363|
|ASSORTED COLOUR B...|        36381|
|      POPCORN HOLDER|        36334|
|PACK OF 72 RETROS...|        36039|
|WHITE HANGING HEA...|        35317|
|  RABBIT NIGHT LIGHT|        30680|
|MINI PAINT SET VI...|        26437|
|PACK OF 12 LONDON...|        26315|
|PACK OF 60 PINK P...|        24753|
+--------------------+-------------+



                                                                                

In [None]:
# Compare the execution plans. They should be the same as Spark is API abstraction of SQL
df_plan = top_products.explain()
print("\n---\n")
sql_plan = spark.sql("SELECT Description, sum(Quantity) as TotalQuantity FROM retail GROUP BY Description ORDER BY TotalQuantity DESC LIMIT 10").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=10, orderBy=[TotalQuantity#758L DESC NULLS LAST], output=[Description#436,TotalQuantity#758L])
   +- HashAggregate(keys=[Description#436], functions=[sum(Quantity#437L)])
      +- Exchange hashpartitioning(Description#436, 200), ENSURE_REQUIREMENTS, [plan_id=1771]
         +- HashAggregate(keys=[Description#436], functions=[partial_sum(Quantity#437L)])
            +- Project [Description#436, Quantity#437L]
               +- Scan ExistingRDD[InvoiceNo#434,StockCode#435,Description#436,Quantity#437L,InvoiceDate#438,UnitPrice#439,CustomerID#440,Country#441]



---

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=10, orderBy=[TotalQuantity#790L DESC NULLS LAST], output=[Description#436,TotalQuantity#790L])
   +- HashAggregate(keys=[Description#436], functions=[sum(Quantity#437L)])
      +- Exchange hashpartitioning(Description#436, 200), ENSURE_REQUIREMENTS, [plan_

In [86]:
filtered_df = online_retail_spark.filter((online_retail_spark['Quantity'] > 0) & (online_retail_spark['UnitPrice'] > 0))
print(filtered_df.count())


# SQL
spark.sql("SELECT * FROM retail WHERE (Quantity > 0 AND UnitPrice > 0)").count()

25/12/23 16:52:43 WARN TaskSetManager: Stage 176 contains a task of very large size (1013 KiB). The maximum recommended task size is 1000 KiB.


530104


25/12/23 16:52:43 WARN TaskSetManager: Stage 179 contains a task of very large size (1013 KiB). The maximum recommended task size is 1000 KiB.


530104

In [115]:
cleaned_df = filtered_df.withColumn('InvoiceDate', F.to_timestamp(F.col('InvoiceDate'), "M/d/yyyy H:mm"))
cleaned_df

cleaned_df.createOrReplaceTempView("retail_clean")


#SQL Query to check
spark.sql("SELECT InvoiceDate, hour(InvoiceDate) as Hour FROM retail_clean LIMIT 10").show()


+-------------------+----+
|        InvoiceDate|Hour|
+-------------------+----+
|2010-12-01 08:26:00|   8|
|2010-12-01 08:26:00|   8|
|2010-12-01 08:26:00|   8|
|2010-12-01 08:26:00|   8|
|2010-12-01 08:26:00|   8|
|2010-12-01 08:26:00|   8|
|2010-12-01 08:26:00|   8|
|2010-12-01 08:28:00|   8|
|2010-12-01 08:28:00|   8|
|2010-12-01 08:34:00|   8|
+-------------------+----+



25/12/23 17:12:33 WARN TaskSetManager: Stage 210 contains a task of very large size (1013 KiB). The maximum recommended task size is 1000 KiB.
Traceback (most recent call last):
  File "/home/archawin/segment-sales/venv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 233, in manager
    code = worker(sock, authenticated)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/archawin/segment-sales/venv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 87, in worker
    outfile.flush()
BrokenPipeError: [Errno 32] Broken pipe
