<a href="https://colab.research.google.com/github/BhanuSaketh/PySpark/blob/main/Insights_from_UCI_Online_Retail_Dataset_with_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

 Load the Dataset


In [2]:
from pyspark.sql import SparkSession

In [5]:
spark=SparkSession.builder.appName("spark").getOrCreate()

In [61]:
df=spark.read.csv('online_retail.csv',header=True,inferSchema=True)
df.show(6)

+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00| 6.95|    13085.0|United Kingdom|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|    22041|"RECORD FRAME 7""...|      48|2009-12-01 07:45:00|  2.1|    13085.0|United Kingdom|
| 489434|    21232|STRAWBERRY CERAMI...|      24|2009-12-01 07:45:00| 1.25|    13085.0|United Kingdom|
| 489434|    22064|PINK DOUGHNUT TRI...|      24|2009-12-01 07:45:00| 1.65|    13085.0|United Kingdom|
+-------+---------+--------------------+--------+-------------------+----

Schema

In [12]:
df.printSchema()

root
 |-- Invoice: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- Price: double (nullable = true)
 |-- Customer ID: double (nullable = true)
 |-- Country: string (nullable = true)



**Data Cleaning**

In [62]:
from pyspark.sql.functions import col
df.filter(col("Customer ID").isNull()).count()

69868

In [65]:
df=df.na.drop(how='any')

In [66]:
df.filter(col("Customer ID").isNull()).count()

0

In [33]:
df = df.filter((col("Quantity") > 0) & (col("Price") > 0))

**Basic Descriptive Analysis**

In [35]:
print("Total Transactions:", df.select("Invoice").distinct().count())
print("Unique Customers:", df.select("Customer ID").distinct().count())

Total Transactions: 12303
Unique Customers: 3349


In [37]:
df.describe(["Quantity", "Price"]).show()

+-------+------------------+------------------+
|summary|          Quantity|             Price|
+-------+------------------+------------------+
|  count|            253459|            253459|
|   mean|14.396651923979816|3.3658279287785207|
| stddev|110.08397027703005| 36.60756906006961|
|    min|                 1|             0.001|
|    max|             19152|           10953.5|
+-------+------------------+------------------+



In [38]:
df.groupBy("Country").count().orderBy(col("count").desc()).show()

+--------------------+------+
|             Country| count|
+--------------------+------+
|      United Kingdom|230846|
|                EIRE|  5285|
|             Germany|  5029|
|              France|  3216|
|         Netherlands|  1795|
|               Spain|   778|
|         Switzerland|   675|
|             Belgium|   666|
|              Sweden|   589|
|            Portugal|   496|
|              Cyprus|   488|
|     Channel Islands|   433|
|               Italy|   306|
|             Austria|   301|
|             Denmark|   277|
|           Australia|   277|
|              Greece|   276|
|United Arab Emirates|   252|
|             Finland|   214|
|                 USA|   182|
+--------------------+------+
only showing top 20 rows



**Revenue Analysis**

In [40]:
df = df.withColumn("TotalRevenue", col("Quantity") * col("Price"))
df.select("Invoice", "TotalRevenue").show(5)

+-------+------------------+
|Invoice|      TotalRevenue|
+-------+------------------+
| 489434|              83.4|
| 489434|              81.0|
| 489434|              81.0|
| 489434|100.80000000000001|
| 489434|              30.0|
+-------+------------------+
only showing top 5 rows



In [41]:
df.agg({"TotalRevenue": "sum"}).show()

+-----------------+
|sum(TotalRevenue)|
+-----------------+
|5614640.971000284|
+-----------------+



In [42]:
df.groupBy("Country").sum("TotalRevenue").orderBy(col("sum(TotalRevenue)").desc()).show(10)

+--------------+------------------+
|       Country| sum(TotalRevenue)|
+--------------+------------------+
|United Kingdom| 4693642.889999666|
|          EIRE| 245800.4400000002|
|   Netherlands|181971.57999999978|
|       Germany| 129334.9810000001|
|        France| 96474.66000000018|
|       Denmark|43391.540000000015|
|        Sweden|41173.270000000004|
|         Spain| 23602.76000000001|
|   Switzerland| 20426.21999999999|
|       Belgium|16405.369999999995|
+--------------+------------------+
only showing top 10 rows



In [44]:
df.groupBy("Customer ID").sum("TotalRevenue").orderBy(col("sum(TotalRevenue)").desc()).show(10)

+-----------+------------------+
|Customer ID| sum(TotalRevenue)|
+-----------+------------------+
|    18102.0| 269041.6699999999|
|    14646.0|167028.64999999982|
|    14156.0|154877.14999999973|
|    13694.0|114527.41000000006|
|    14911.0| 84594.36999999997|
|    17511.0| 61806.31999999996|
|    15061.0| 52028.14999999998|
|    16754.0|50926.810000000005|
|    15311.0| 44291.61000000005|
|    17949.0| 43253.76000000001|
+-----------+------------------+
only showing top 10 rows



**Customer Segmentation**

In [46]:
df.groupBy("Customer ID").agg({"TotalRevenue": "avg"}).orderBy(col("avg(TotalRevenue)").desc()).show(10)

+-----------+------------------+
|Customer ID| avg(TotalRevenue)|
+-----------+------------------+
|    12918.0|           10953.5|
|    15760.0|           6958.17|
|    12737.0|           1855.25|
|    17940.0|       1411.209375|
|    14028.0|1155.1666666666667|
|    14255.0|           1000.63|
|    16532.0| 861.5999999999999|
|    17876.0|            809.51|
|    16308.0|             800.0|
|    17310.0|             765.0|
+-----------+------------------+
only showing top 10 rows



**Product Analysis**

In [47]:
df.groupBy("Description").sum("Quantity").orderBy(col("sum(Quantity)").desc()).show(10)

+--------------------+-------------+
|         Description|sum(Quantity)|
+--------------------+-------------+
|PACK OF 72 RETRO ...|        41230|
|WHITE HANGING HEA...|        41052|
|WORLD WAR 2 GLIDE...|        33756|
| BROCADE RING PURSE |        28524|
|60 TEATIME FAIRY ...|        27372|
|BLACK AND WHITE P...|        25685|
|ASSORTED COLOUR B...|        24653|
|PACK OF 60 PINK P...|        22262|
|COLOUR GLASS T-LI...|        22012|
|PACK OF 12 RED SP...|        21749|
+--------------------+-------------+
only showing top 10 rows



In [48]:
df.groupBy("Description").sum("TotalRevenue").orderBy(col("sum(TotalRevenue)").desc()).show(10)

+--------------------+------------------+
|         Description| sum(TotalRevenue)|
+--------------------+------------------+
|WHITE HANGING HEA...|109116.57999999942|
|REGENCY CAKESTAND...| 75588.44999999997|
|              Manual| 65992.77999999997|
|ASSORTED COLOUR B...| 39351.89000000018|
|             POSTAGE|          28785.18|
|EDWARDIAN PARASOL...|27499.400000000038|
|JUMBO BAG STRAWBERRY|          26548.35|
|VINTAGE UNION JAC...|           26060.5|
|SET/5 RED SPOTTY ...|25528.800000000054|
|       PARTY BUNTING|24622.149999999994|
+--------------------+------------------+
only showing top 10 rows



**Time-Based Analysis**

In [49]:
from pyspark.sql.functions import month

df.groupBy(month("InvoiceDate").alias("Month")).sum("TotalRevenue").orderBy(col("Month")).show()

+-----+------------------+
|Month| sum(TotalRevenue)|
+-----+------------------+
|    1| 557319.0620000134|
|    2|506371.06600001536|
|    3| 699608.9909999889|
|    4| 594609.1919999976|
|    5| 599985.7900000144|
|    6| 639066.5800000058|
|    7| 591636.7400000024|
|    8| 604242.6499999989|
|    9|135146.73999999883|
|   12| 686654.1599999949|
+-----+------------------+



In [52]:
df.printSchema()

root
 |-- Invoice: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- Price: double (nullable = true)
 |-- Customer ID: double (nullable = true)
 |-- Country: string (nullable = true)
 |-- TotalRevenue: double (nullable = true)



**RFM (Recency, Frequency, Monetary) Analysis**

In [60]:
from pyspark.sql.functions import datediff, current_date, max, countDistinct,sum  # Import countDistinct

# Recency: Days since last purchase
recency_df = df.groupBy("Customer ID").agg(datediff(current_date(), max(col("InvoiceDate"))).alias("Recency"))

# Frequency: Total number of invoices per customer
frequency_df = df.groupBy("Customer ID").agg(countDistinct("Invoice").alias("Frequency"))

# Monetary: Total spend per customer
monetary_df = df.groupBy("Customer ID").agg(sum(col("TotalRevenue").cast("double")).alias("Monetary"))

# Combine all RFM features
rfm_df = recency_df.join(frequency_df, "Customer ID").join(monetary_df, "Customer ID")
rfm_df.show(10)

+-----------+-------+---------+------------------+
|Customer ID|Recency|Frequency|          Monetary|
+-----------+-------+---------+------------------+
|    16596.0|   5484|        2|208.82999999999998|
|    17884.0|   5272|       12|1705.2800000000004|
|    14285.0|   5308|        4|1374.4099999999999|
|    16822.0|   5481|        1|            181.39|
|    17072.0|   5420|        1|            282.05|
|    12671.0|   5401|        1| 2622.481000000001|
|    15893.0|   5276|        1|305.28000000000003|
|    12737.0|   5293|        2|            3710.5|
|    14452.0|   5316|        1|127.30000000000003|
|    17032.0|   5342|        8|3004.5999999999995|
+-----------+-------+---------+------------------+
only showing top 10 rows

