<a href="https://colab.research.google.com/github/RajuKGosala-45/E-Commerce-dataset-PySpark-Practices/blob/main/E_Commerce_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark Install in Google Collab

In [None]:
!apt-get update -qq
!apt-get install -y openjdk-17-jdk-headless -qq

!pip install pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["PATH"] += ":/usr/lib/jvm/java-17-openjdk-amd64/bin"



W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
(Reading database ... 121713 files and directories currently installed.)
Preparing to unpack .../openjdk-17-jdk-headless_17.0.17+10-1~22.04_amd64.deb ...
Unpacking openjdk-17-jdk-headless:amd64 (17.0.17+10-1~22.04) over (17.0.16+8~us1-0ubuntu1~22.04.1) ...
Preparing to unpack .../openjdk-17-jre-headless_17.0.17+10-1~22.04_amd64.deb ...
Unpacking openjdk-17-jre-headless:amd64 (17.0.17+10-1~22.04) over (17.0.16+8~us1-0ubuntu1~22.04.1) ...
Setting up openjdk-17-jre-headless:amd64 (17.0.17+10-1~22.04) ...
Installing new version of config file /etc/java-17-openjdk/security/default.policy ...
Installing new version of config file /etc/java-17-openjdk/security/java.security ...
Setting up openjdk-17-jdk-headless:amd64 (17.0.17+10-1~22.04) ...


# Day 51 - PySpark Data Profiling and Loading

### Set up SparkSession and Load Data

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("E-Commerce Bussiness").getOrCreate()

Files ={
    "events":"/content/events.csv",
    "order_items":"/content/order_items.csv",
    "orders":"/content/orders.csv",
    "products":"/content/products.csv",
    "reviews":"/content/reviews.csv",
    "users":"/content/users.csv"
}

df= {name: spark.read.csv(path, header=True, inferSchema=True) for name, path in Files.items()}

### Print Schema and Sample

In [2]:
for name, df in df.items():
  print(f"\n Dataset: {name.upper()}")
  df.printSchema()
  df.show(5)


 Dataset: EVENTS
root
 |-- event_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_timestamp: timestamp (nullable = true)

+---------+-------+----------+----------+--------------------+
| event_id|user_id|product_id|event_type|     event_timestamp|
+---------+-------+----------+----------+--------------------+
|E00000001|U009798|   P001393|      cart|2025-07-08 14:28:...|
|E00000002|U005881|   P000669|      view|2025-10-19 23:00:...|
|E00000003|U006348|   P001404|      view|2025-05-09 07:02:...|
|E00000004|U002664|   P000400|      cart|2025-07-19 22:47:...|
|E00000005|U005776|   P000392|      view|2024-10-24 10:20:...|
+---------+-------+----------+----------+--------------------+
only showing top 5 rows


 Dataset: ORDER_ITEMS
root
 |-- order_item_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- user_id: st

### Missing Values Summary

In [None]:

df = {name: spark.read.csv(path, header=True, inferSchema=True) for name, path in Files.items()}

for name, current_df in df.items():
  print(f"\n Missing Values Summary For: {name.upper()}")

  null_expressions = []
  for field in current_df.schema:
    column_name = field.name

    # Start with the isNull() check for all column types
    condition = col(column_name).isNull()

    # Add check for empty strings if the column is of StringType
    if isinstance(field.dataType, StringType):
      condition = condition | (col(column_name) == "")

    # Add check for NaN if the column is of NumericType
    # NumericType includes DoubleType, FloatType, IntegerType, etc.
    if isinstance(field.dataType, NumericType):
      condition = condition | isnan(col(column_name))

    null_expressions.append(
        (count(when(condition, col(column_name))) / current_df.count() * 100).alias(column_name)
    )

  null_df = current_df.select(null_expressions)
  null_df.show()


 Missing Values Summary For: EVENTS
+--------+-------+----------+----------+---------------+
|event_id|user_id|product_id|event_type|event_timestamp|
+--------+-------+----------+----------+---------------+
|     0.0|    0.0|       0.0|       0.0|            0.0|
+--------+-------+----------+----------+---------------+


 Missing Values Summary For: ORDER_ITEMS
+-------------+--------+----------+-------+--------+----------+----------+
|order_item_id|order_id|product_id|user_id|quantity|item_price|item_total|
+-------------+--------+----------+-------+--------+----------+----------+
|          0.0|     0.0|       0.0|    0.0|     0.0|       0.0|       0.0|
+-------------+--------+----------+-------+--------+----------+----------+


 Missing Values Summary For: ORDERS
+--------+-------+----------+------------+------------+
|order_id|user_id|order_date|order_status|total_amount|
+--------+-------+----------+------------+------------+
|     0.0|    0.0|       0.0|         0.0|         0.0

### Duplicate check Using primary Keys

In [None]:
Primary_keys ={
    "users":"user_id",
    "orders":"order_id",
    "order_items":"order_item_id",
    "products":"product_id",
    "reviews":"review_id",
    "events":"event_id"
}

for name, df in df.items():
  key = Primary_keys.get(name)
  if key:
    dup_count = df.groupBy(key).count().filter("count > 1").count()
    print(f"Duplicate {key} in {name}: {dup_count}")

Duplicate event_id in events: 0
Duplicate order_item_id in order_items: 0
Duplicate order_id in orders: 0
Duplicate product_id in products: 0
Duplicate review_id in reviews: 0
Duplicate user_id in users: 0


### Statistical summary Stats for Numerical Columns


In [None]:
for name, df in df.items():
  print(f"\n Summary Stats for: {name.upper()}")
  numeric_cols = [c for c, t in df.dtypes if t in ("int","bigint","double","float")]
  if numeric_cols:
    df.select(numeric_cols).describe().show()
print("\n Day 51 Completed - Data Profilling Done ")


 Summary Stats for: EVENTS

 Summary Stats for: ORDER_ITEMS
+-------+------------------+------------------+-----------------+
|summary|          quantity|        item_price|       item_total|
+-------+------------------+------------------+-----------------+
|  count|             43525|             43525|            43525|
|   mean|1.3973118897185526|196.63697622056128|273.8350132108008|
| stddev|0.6607172757815206|  297.259223858562|471.5851039154428|
|    min|                 1|              1.11|             1.11|
|    max|                 3|           2338.13|          7014.39|
+-------+------------------+------------------+-----------------+


 Summary Stats for: ORDERS
+-------+-----------------+
|summary|     total_amount|
+-------+-----------------+
|  count|            20000|
|   mean|595.9334475000019|
| stddev|776.0633359009934|
|    min|             1.11|
|    max|          7950.74|
+-------+-----------------+


 Summary Stats for: PRODUCTS
+-------+------------------+-----

# Day - 52. Customer Segmentation with RFM Model Using PySpark

### 1.Setup SparkSession and Load Data

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName("E-Commerce Bussiness").getOrCreate()
events_df = spark.read.csv("/content/events.csv", header=True, inferSchema=True)
order_items_df= spark.read.csv("/content/order_items.csv", header=True,inferSchema=True)
orders_df = spark.read.csv("/content/orders.csv", header=True, inferSchema=True)
products_df = spark.read.csv("/content/products.csv", header=True, inferSchema=True)
reviews_df = spark.read.csv("/content/reviews.csv", header=True, inferSchema=True)
users_df = spark.read.csv("/content/users.csv", header=True, inferSchema=True)

# Read Orders Data table...........................................................

orders_df.show(truncate=False)


+---------+-------+--------------------------+------------+------------+
|order_id |user_id|order_date                |order_status|total_amount|
+---------+-------+--------------------------+------------+------------+
|O00000001|U009310|2025-09-09 14:52:37.292731|processing  |689.66      |
|O00000002|U003247|2025-04-15 01:18:27.193404|completed   |1666.85     |
|O00000003|U007252|2025-04-27 15:37:48.008624|processing  |665.06      |
|O00000004|U008986|2025-10-04 20:35:22.204857|cancelled   |689.5       |
|O00000005|U008537|2024-11-13 08:15:18.498252|cancelled   |860.5       |
|O00000006|U003449|2024-10-13 09:02:26.848944|completed   |2258.34     |
|O00000007|U006985|2024-05-01 04:44:56.418188|returned    |769.71      |
|O00000008|U007661|2024-03-03 23:33:22.844913|completed   |15.21       |
|O00000009|U000610|2025-10-29 07:00:11.203638|cancelled   |229.02      |
|O00000010|U003175|2024-08-13 07:04:24.298799|returned    |107.63      |
|O00000011|U004470|2025-08-14 09:59:55.223975|shipp

### 2.Convert Date Column

In [10]:
orders_df = orders_df.withColumn("order_date", col("order_date").cast("date"))
orders_df.show(truncate=False)

+---------+-------+----------+------------+------------+
|order_id |user_id|order_date|order_status|total_amount|
+---------+-------+----------+------------+------------+
|O00000001|U009310|2025-09-09|processing  |689.66      |
|O00000002|U003247|2025-04-15|completed   |1666.85     |
|O00000003|U007252|2025-04-27|processing  |665.06      |
|O00000004|U008986|2025-10-04|cancelled   |689.5       |
|O00000005|U008537|2024-11-13|cancelled   |860.5       |
|O00000006|U003449|2024-10-13|completed   |2258.34     |
|O00000007|U006985|2024-05-01|returned    |769.71      |
|O00000008|U007661|2024-03-03|completed   |15.21       |
|O00000009|U000610|2025-10-29|cancelled   |229.02      |
|O00000010|U003175|2024-08-13|returned    |107.63      |
|O00000011|U004470|2025-08-14|shipped     |1016.61     |
|O00000012|U009480|2025-08-27|cancelled   |600.23      |
|O00000013|U003631|2025-01-03|processing  |15.87       |
|O00000014|U009493|2025-06-15|shipped     |520.96      |
|O00000015|U003336|2024-07-16|r

### 3.Select Latest Refference Date (Business Standard)

In [14]:
max_order_date = orders_df.agg(
    max("order_date")
).collect()[0][0]

print(max_order_date)

2025-11-14


### 4.Joins Orders & Order_Items to compute total spent per order

In [25]:
orders_total_spent_df= order_items_df.groupBy("order_id").agg(
    sum(col("item_price")*col("quantity")).alias("total_spent")
)
orders_total_spent_df.show()

+---------+------------------+
| order_id|       total_spent|
+---------+------------------+
|O00000313|            313.65|
|O00000363|             39.34|
|O00001031|            543.25|
|O00001112|            159.88|
|O00001185|            438.46|
|O00001428|           2079.99|
|O00001430|            383.75|
|O00001436| 90.44000000000001|
|O00002065|            270.36|
|O00002631|            117.48|
|O00002701|            1314.8|
|O00002727|             95.91|
|O00003025|             63.48|
|O00003294|              63.4|
|O00003498|             99.56|
|O00004218|242.73000000000002|
|O00004394|            108.39|
|O00004407| 660.7099999999999|
|O00004642|             150.4|
|O00004787|             128.9|
+---------+------------------+
only showing top 20 rows



### 5. Join back orders with users

In [35]:
Customer_orders = users_df.join(orders_df, on="user_id", how="inner")\
                         .join(orders_total_spent_df, on="order_id", how="inner")
Customer_orders.show()

+---------+-------+-----------------+--------------------+------+--------------------+-----------+----------+------------+------------+------------------+
| order_id|user_id|             name|               email|gender|                city|signup_date|order_date|order_status|total_amount|       total_spent|
+---------+-------+-----------------+--------------------+------+--------------------+-----------+----------+------------+------------+------------------+
|O00000001|U009310| Christine Snyder|  john14@example.com|Female|           Jonestown| 2024-02-22|2025-09-09|  processing|      689.66| 689.6600000000001|
|O00000002|U003247|    Ryan Valencia|stevensonanthony@...|  Male|          South Mary| 2024-03-23|2025-04-15|   completed|     1666.85|           1666.85|
|O00000003|U007252|        Tammy Fox|roberthenderson@e...| Other|           Wilsonton| 2024-06-27|2025-04-27|  processing|      665.06| 665.0600000000001|
|O00000004|U008986|      Kurt Howard| nancy40@example.org|Female|     

 ### 6. RFM Calculations (Recency, Frequency, Monetary)

In [41]:
rfm_df = Customer_orders.groupBy("user_id")\
  .agg(
      max("order_date").alias("Last_Purchase_date"),
      count("order_id").alias("Frequency"),
      sum("total_spent").alias("Monetary")
  )\
  .withColumn("Recency", datediff(lit(max_order_date), col("Last_Purchase_date")))\
  .select("user_id", "Recency", "Frequency", "Monetary")

rfm_df.show()

+-------+-------+---------+------------------+
|user_id|Recency|Frequency|          Monetary|
+-------+-------+---------+------------------+
|U004486|    178|        4|           2272.84|
|U008621|      8|        2|            261.86|
|U007574|    181|        2|            384.17|
|U004751|     28|        8|           1757.97|
|U008751|    119|        3|            3011.2|
|U009989|    244|        4|2829.5899999999997|
|U008646|    454|        4|3014.5300000000007|
|U005460|    266|        2|372.44000000000005|
|U000332|    137|        3|            3225.4|
|U003768|     88|        3|           2204.21|
|U009781|     40|        5|2347.7599999999998|
|U002403|    342|        1|             21.38|
|U001211|    325|        3|392.53999999999996|
|U007541|     34|        3|           2386.54|
|U005423|    363|        4|           2245.54|
|U007683|    181|        3|             152.0|
|U008063|     41|        3|           4439.82|
|U007925|     50|        1|            293.07|
|U008341|    

### 7.Replace null monetary values with 0

In [38]:
rfm_df = rfm_df.withColumn("monetary", col("monetary").cast(DoubleType()))
rfm_df.show()

+-------+-------+---------+------------------+
|user_id|Recency|Frequency|          monetary|
+-------+-------+---------+------------------+
|U004486|    178|        4|           2272.84|
|U008621|      8|        2|            261.86|
|U007574|    181|        2|            384.17|
|U004751|     28|        8|           1757.97|
|U008751|    119|        3|            3011.2|
|U009989|    244|        4|2829.5899999999997|
|U008646|    454|        4|3014.5300000000007|
|U005460|    266|        2|372.44000000000005|
|U000332|    137|        3|            3225.4|
|U003768|     88|        3|           2204.21|
|U009781|     40|        5|2347.7599999999998|
|U002403|    342|        1|             21.38|
|U001211|    325|        3|392.53999999999996|
|U007541|     34|        3|           2386.54|
|U005423|    363|        4|           2245.54|
|U007683|    181|        3|             152.0|
|U008063|     41|        3|           4439.82|
|U007925|     50|        1|            293.07|
|U008341|    

### 8. Score Generation

In [39]:
rfm_scored = rfm_df.withColumn(
    "R_score", when(col("recency") <= 30, 5)
                .when(col("recency") <= 60, 4)
                .when(col("recency") <= 120, 3)
                .when(col("recency") <= 180, 2)
                .otherwise(1)
).withColumn(
    "F_score", when(col("frequency") >= 10, 5)
                .when(col("frequency") >= 6, 4)
                .when(col("frequency") >= 3, 3)
                .when(col("frequency") >= 2, 2)
                .otherwise(1)
).withColumn(
    "M_score", when(col("monetary") >= 1000, 5)
                .when(col("monetary") >= 500, 4)
                .when(col("monetary") >= 200, 3)
                .when(col("monetary") >= 50, 2)
                .otherwise(1)
)

# Final RFM SCORE
final_rfm = rfm_scored.withColumn("RFM_score",
                                  col("R_score") + col("F_score") + col("M_score"))

final_rfm.show(20, truncate=False)

+-------+-------+---------+------------------+-------+-------+-------+---------+
|user_id|Recency|Frequency|monetary          |R_score|F_score|M_score|RFM_score|
+-------+-------+---------+------------------+-------+-------+-------+---------+
|U004486|178    |4        |2272.84           |2      |3      |5      |10       |
|U008621|8      |2        |261.86            |5      |2      |3      |10       |
|U007574|181    |2        |384.17            |1      |2      |3      |6        |
|U004751|28     |8        |1757.97           |5      |4      |5      |14       |
|U008751|119    |3        |3011.2            |3      |3      |5      |11       |
|U009989|244    |4        |2829.5899999999997|1      |3      |5      |9        |
|U008646|454    |4        |3014.5300000000007|1      |3      |5      |9        |
|U005460|266    |2        |372.44000000000005|1      |2      |3      |6        |
|U000332|137    |3        |3225.4            |2      |3      |5      |10       |
|U003768|88     |3        |2

### 9. Assign Customer Segments

In [40]:
segment_df = final_rfm.withColumn(
    "segment",
    when(col("RFM_score") >= 12, " Loyal / VIP")
    .when(col("RFM_score") >= 9, " High Value")
    .when(col("RFM_score") >= 6, " Regular")
    .when(col("RFM_score") >= 4, " At Risk")
    .otherwise(" Lost / Inactive")
)

segment_df.show(20, False)

+-------+-------+---------+------------------+-------+-------+-------+---------+----------------+
|user_id|Recency|Frequency|monetary          |R_score|F_score|M_score|RFM_score|segment         |
+-------+-------+---------+------------------+-------+-------+-------+---------+----------------+
|U004486|178    |4        |2272.84           |2      |3      |5      |10       | High Value     |
|U008621|8      |2        |261.86            |5      |2      |3      |10       | High Value     |
|U007574|181    |2        |384.17            |1      |2      |3      |6        | Regular        |
|U004751|28     |8        |1757.97           |5      |4      |5      |14       | Loyal / VIP    |
|U008751|119    |3        |3011.2            |3      |3      |5      |11       | High Value     |
|U009989|244    |4        |2829.5899999999997|1      |3      |5      |9        | High Value     |
|U008646|454    |4        |3014.5300000000007|1      |3      |5      |9        | High Value     |
|U005460|266    |2  