<a href="https://colab.research.google.com/github/Bhaktispace/CustomerEngagementIntelligencePipeline/blob/main/CustomerEngagementIntelligencePipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark



# Step 1: Loading libraries

In [2]:
import os

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark import StorageLevel


spark = SparkSession.builder.appName('CustomerEngagementPipeline').getOrCreate()

# Step 2: Loading Datasets

In [4]:
if not os.path.exists('/content/drive'):
    print("CRITICAL: Drive is not mounted!")
else:
    # List the first level of your Drive
    print("Folders found in Drive:", os.listdir('/content/drive/MyDrive/Colab Notebooks/Datasets/CustomerEngagament'))

Folders found in Drive: ['orders.parquet', 'web_events.json', 'customers.parquet', 'customer_engagement.csv']


In [5]:
root_dir = '/content/drive/MyDrive/Colab Notebooks/Datasets/CustomerEngagament'

In [6]:
df_customers = spark.read.option("recursiveFileLookup", "true").parquet(f"{root_dir}/customers.parquet/")
df_orders = spark.read.option("recursiveFileLookup", "true").parquet(f"{root_dir}/orders.parquet/")
df_events = spark.read.option("recursiveFileLookup", "true").json(f"{root_dir}/web_events.json/")


# Step 3: Viewing the datasets

In [7]:
df_customers.show(5)

+-----------+--------------------+-----------+-------+
|customer_id|               email|signup_date|country|
+-----------+--------------------+-----------+-------+
|          1|customer_1@email.com| 2023-09-21|     US|
|          2|customer_2@email.com| 2023-06-13|     UK|
|          3|customer_3@email.com| 2023-08-25|     IN|
|          4|customer_4@email.com| 2023-05-14|     UK|
|          5|customer_5@email.com| 2023-05-03|     IN|
+-----------+--------------------+-----------+-------+
only showing top 5 rows


In [8]:
df_customers.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- email: string (nullable = true)
 |-- signup_date: string (nullable = true)
 |-- country: string (nullable = true)



In [9]:
df_orders.show(5)

+--------+-----------+----------+------------+
|order_id|customer_id|order_date|order_amount|
+--------+-----------+----------+------------+
|  101024|        179|2023-09-27|       469.0|
|  101025|        179|2023-01-15|      622.02|
|  101026|        179|2023-11-27|      425.46|
|  101027|        179|2024-04-22|      989.28|
|  101028|        179|2024-05-10|      837.43|
+--------+-----------+----------+------------+
only showing top 5 rows


In [10]:
df_orders.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_amount: double (nullable = true)



In [11]:
df_events.show(5)

+-----------+-------------------+-----------+
|customer_id|         event_time| event_type|
+-----------+-------------------+-----------+
|        264|2023-10-04 04:14:00|add_to_cart|
|        264|2024-03-02 15:00:00|  page_view|
|        264|2023-08-29 05:26:00|add_to_cart|
|        264|2024-07-11 02:44:00|add_to_cart|
|        264|2023-02-21 17:51:00|  page_view|
+-----------+-------------------+-----------+
only showing top 5 rows


In [12]:
df_events.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)



# Step 4: Basic Data quality Checks

In [13]:
assert df_customers.filter(F.col("customer_id").isNull()).count() == 0
assert df_orders.filter(F.col("order_id").isNull()).count() == 0
assert df_events.filter(F.col("event_time").isNull()).count() == 0

In [14]:
invalid_orders = df_orders.join(
    df_customers,
    "customer_id",
    "left_anti"
)

assert invalid_orders.count() == 0

In [15]:
df_customers.count() == df_customers.select("customer_id").distinct().count()

True

In [16]:
df_customers.filter(F.col("customer_id").isNull()).count() == 0

True

In [17]:
df_customers.count()

500

Need to check later the latest event date and customers who signed up after the last event date should be removed from the analysis

In [18]:
df_orders.filter(F.col("order_id").isNull()|
                 F.col("customer_id").isNull()|
                 F.col("order_date").isNull()
                 ).count()

0

In [19]:
df_orders.filter(F.col("order_amount") < 0).count()

0

In [20]:
df_orders.join(df_customers, "customer_id", "left_anti").count() == 0

True

There are no orphan orders

Need to check later orders placed after the last event date should be removed from the analysis

In [21]:
df_events.filter(
    F.col("customer_id").isNull() |
    F.col("event_time").isNull() |
    F.col("event_type").isNull()
).count() == 0

True

In [22]:
df_events.join(df_customers, "customer_id", "left_anti").count() == 0

True

All the events have matching customer

# Step 5: Data Preprocessing

In [23]:
df_customers = df_customers.withColumn("signup_date", F.col('signup_date').cast('date'))

In [24]:
df_customers.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- email: string (nullable = true)
 |-- signup_date: date (nullable = true)
 |-- country: string (nullable = true)



In [25]:
df_customers.show(5)

+-----------+--------------------+-----------+-------+
|customer_id|               email|signup_date|country|
+-----------+--------------------+-----------+-------+
|          1|customer_1@email.com| 2023-09-21|     US|
|          2|customer_2@email.com| 2023-06-13|     UK|
|          3|customer_3@email.com| 2023-08-25|     IN|
|          4|customer_4@email.com| 2023-05-14|     UK|
|          5|customer_5@email.com| 2023-05-03|     IN|
+-----------+--------------------+-----------+-------+
only showing top 5 rows


In [26]:
df_orders = df_orders.withColumn("order_date", F.col('order_date').cast('date'))

In [27]:
df_orders.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_date: date (nullable = true)
 |-- order_amount: double (nullable = true)



In [28]:
df_orders.show(5)

+--------+-----------+----------+------------+
|order_id|customer_id|order_date|order_amount|
+--------+-----------+----------+------------+
|  101024|        179|2023-09-27|       469.0|
|  101025|        179|2023-01-15|      622.02|
|  101026|        179|2023-11-27|      425.46|
|  101027|        179|2024-04-22|      989.28|
|  101028|        179|2024-05-10|      837.43|
+--------+-----------+----------+------------+
only showing top 5 rows


In [29]:
df_events = df_events.withColumn("event_time", F.col('event_time').cast('timestamp'))

In [30]:
df_events.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)



In [31]:
df_events.show(5)

+-----------+-------------------+-----------+
|customer_id|         event_time| event_type|
+-----------+-------------------+-----------+
|        264|2023-10-04 04:14:00|add_to_cart|
|        264|2024-03-02 15:00:00|  page_view|
|        264|2023-08-29 05:26:00|add_to_cart|
|        264|2024-07-11 02:44:00|add_to_cart|
|        264|2023-02-21 17:51:00|  page_view|
+-----------+-------------------+-----------+
only showing top 5 rows


# Step 6: Sessionization of Web Events

Creating sessions sessions for each customer the duration of a session is 30 mins

In [32]:
window_spec = Window.partitionBy("customer_id").orderBy("event_time")

In [33]:
events_session = df_events.withColumn("prev_time", F.lag("event_time").over(window_spec)
).withColumn("time_diff", F.unix_timestamp("event_time")-F.unix_timestamp("prev_time")
).withColumn("new_session", F.when(F.expr("time_diff > 1800 or prev_time is Null"),1
                                   ).otherwise(0)
)

In [34]:
events_session.show(5)

+-----------+-------------------+-----------+-------------------+---------+-----------+
|customer_id|         event_time| event_type|          prev_time|time_diff|new_session|
+-----------+-------------------+-----------+-------------------+---------+-----------+
|          1|2023-03-24 14:25:00|   checkout|               NULL|     NULL|          1|
|          1|2023-10-15 17:28:00|add_to_cart|2023-03-24 14:25:00| 17722980|          1|
|          1|2023-12-18 11:55:00|  page_view|2023-10-15 17:28:00|  5509620|          1|
|          1|2024-03-08 22:32:00|  page_view|2023-12-18 11:55:00|  7036620|          1|
|          1|2024-06-02 10:37:00|add_to_cart|2024-03-08 22:32:00|  7387500|          1|
+-----------+-------------------+-----------+-------------------+---------+-----------+
only showing top 5 rows


In [35]:
events_session = events_session.withColumn("session_id", F.sum("new_session").over(window_spec))

In [36]:
events_session.show()

+-----------+-------------------+-----------+-------------------+---------+-----------+----------+
|customer_id|         event_time| event_type|          prev_time|time_diff|new_session|session_id|
+-----------+-------------------+-----------+-------------------+---------+-----------+----------+
|          1|2023-03-24 14:25:00|   checkout|               NULL|     NULL|          1|         1|
|          1|2023-10-15 17:28:00|add_to_cart|2023-03-24 14:25:00| 17722980|          1|         2|
|          1|2023-12-18 11:55:00|  page_view|2023-10-15 17:28:00|  5509620|          1|         3|
|          1|2024-03-08 22:32:00|  page_view|2023-12-18 11:55:00|  7036620|          1|         4|
|          1|2024-06-02 10:37:00|add_to_cart|2024-03-08 22:32:00|  7387500|          1|         5|
|          1|2024-07-14 14:06:00|add_to_cart|2024-06-02 10:37:00|  3641340|          1|         6|
|          1|2024-07-18 18:58:00|  page_view|2024-07-14 14:06:00|   363120|          1|         7|
|         

In [37]:
as_of_date = events_session.select(
    F.max("event_time").alias("as_of_date")
).collect()[0]["as_of_date"]

In [38]:
print(as_of_date)

2024-12-31 23:40:00


In [39]:
events_session.groupBy("event_type").count().show()

+-----------+-----+
| event_type|count|
+-----------+-----+
|  page_view| 7969|
|add_to_cart| 8032|
|   checkout| 2722|
+-----------+-----+



The Page View Events should have the maximum number of events followed by Add to Cart and then Checkout. But here the Add To Cart Events is the highest followed by Page View and then Checkout. This indicates that a significant number of expected user interactions are missing from the data. This indicates that some form of clarification is required from the data engineering or product analytics team. However, for the purpose of this project, the analysis is performed using the data exactly as provided, with assumptions and limitations clearly documented.

In [40]:
win_count = Window.partitionBy("customer_id", "session_id")

events_session.withColumn("total_events_in_session", F.count("*").over(win_count)) \
              .filter(F.col("total_events_in_session") > 1) \
              .orderBy("customer_id", "event_time") \
              .show()

+-----------+-------------------+-----------+-------------------+---------+-----------+----------+-----------------------+
|customer_id|         event_time| event_type|          prev_time|time_diff|new_session|session_id|total_events_in_session|
+-----------+-------------------+-----------+-------------------+---------+-----------+----------+-----------------------+
|         25|2024-09-10 20:19:00|add_to_cart|2024-09-01 02:36:00|   841380|          1|        80|                      2|
|         25|2024-09-10 20:43:00|add_to_cart|2024-09-10 20:19:00|     1440|          0|        80|                      2|
|         47|2023-07-10 21:04:00|  page_view|2023-07-05 04:28:00|   491760|          1|        10|                      2|
|         47|2023-07-10 21:31:00|add_to_cart|2023-07-10 21:04:00|     1620|          0|        10|                      2|
|         50|2024-12-06 21:28:00|  page_view|2024-11-20 05:24:00|  1440240|          1|       105|                      2|
|         50|202

The dataset contains only three event types: page_view, add_to_cart, and checkout. Many sessions consist of a single event, and the maximum number of events observed within any session is two. This indicates that a significant number of expected user interactions are missing from the data.

This pattern suggests there may be underlying business or tracking logic that determines which events are captured and persisted, and it is something that would typically require clarification from the data engineering or product analytics team. However, for the purpose of this project, the analysis is performed using the data exactly as provided, with assumptions and limitations clearly documented.

In [41]:
df_sessions_30 = (
    events_session
    .filter(F.col("event_time") >= F.date_sub(F.lit(as_of_date), 30))
    .groupBy("customer_id")
    .agg(F.countDistinct("session_id").alias("sessions_last_30_days"))
)

In [42]:
df_sessions_30.show()

+-----------+---------------------+
|customer_id|sessions_last_30_days|
+-----------+---------------------+
|          2|                    3|
|          3|                    1|
|          4|                    2|
|          5|                    1|
|          8|                    1|
|          9|                    4|
|         10|                    5|
|         12|                    3|
|         14|                    1|
|         15|                    2|
|         16|                    2|
|         17|                    1|
|         18|                    1|
|         19|                    2|
|         21|                    1|
|         22|                    2|
|         23|                    1|
|         24|                    1|
|         25|                    8|
|         26|                    4|
+-----------+---------------------+
only showing top 20 rows


Sessionized web events are aggregated to compute the number of distinct sessions per customer in the last 30 days. This intermediate fact table captures recent behavioral engagement and is designed to be joined downstream with customer and order data to support customer-level engagement scoring.

# Step 7: Building Order Aggregate data frame

In [43]:
df_orders_agg = df_orders.groupBy("customer_id").agg(
    F.count("order_id").alias("total_orders"),
    F.round(F.sum("order_amount"),2).alias("total_revenue"),
    F.max("order_date").alias("last_order_date")
)

In [44]:
df_orders_agg.show()

+-----------+------------+-------------+---------------+
|customer_id|total_orders|total_revenue|last_order_date|
+-----------+------------+-------------+---------------+
|        191|           7|      1822.87|     2024-12-12|
|        418|           7|      1275.18|     2024-10-13|
|        222|           5|       1082.6|     2024-05-29|
|        270|           7|      1957.87|     2024-11-25|
|        293|           2|       725.96|     2023-12-10|
|        243|          15|      7809.11|     2024-08-12|
|        278|           7|      2792.65|     2024-09-29|
|        367|           2|       386.95|     2024-09-30|
|        442|          10|      5670.12|     2024-11-08|
|        296|           3|       861.68|     2024-10-08|
|        277|           2|        210.7|     2023-06-20|
|        287|          14|      8211.63|     2024-11-28|
|        348|           6|       2079.4|     2024-05-26|
|        415|          11|      6847.19|     2024-12-21|
|        299|           2|     

This aggregation transforms raw order transactions into customer-level features, enabling recency, frequency, and monetary analysis and serving as the foundation for engagement scoring and BI reporting.

# Step 8: Cleaning of Customers dataset

In [45]:
df_customers.filter(F.col("signup_date")> as_of_date).count()

0

There are no customers in the dataset after the maximum event date in the dataset. No cleaning of the customer data is required.

# Step 9: Cleaning of Orders dataset

In [46]:
df_orders.filter(F.col("order_date") > as_of_date).count()

0

There are no orders placed after the maximum event date. No cleaning of the orders dataset needed.

# Step 10: Joining all the data

In [47]:
customer_engagement = df_customers \
                      .join(df_orders_agg, "customer_id", "left") \
                      .join(df_sessions_30, "customer_id", "left")\
                      .fillna({
        "total_orders": 0,
        "total_revenue": 0.0,
        "sessions_last_30_days": 0
    })

In [48]:
customer_engagement.show()

+-----------+--------------------+-----------+-------+------------+-------------+---------------+---------------------+
|customer_id|               email|signup_date|country|total_orders|total_revenue|last_order_date|sessions_last_30_days|
+-----------+--------------------+-----------+-------+------------+-------------+---------------+---------------------+
|          1|customer_1@email.com| 2023-09-21|     US|           1|       236.23|     2023-03-24|                    0|
|          2|customer_2@email.com| 2023-06-13|     UK|          14|      9554.23|     2024-10-04|                    3|
|          3|customer_3@email.com| 2023-08-25|     IN|           2|       677.18|     2023-09-17|                    1|
|          4|customer_4@email.com| 2023-05-14|     UK|           4|      1342.01|     2024-06-05|                    2|
|          5|customer_5@email.com| 2023-05-03|     IN|           6|      1087.36|     2024-01-08|                    1|
|          6|customer_6@email.com| 2023-

This step consolidates customer, order, and session-level facts into a single customer-grain dataset, ensuring all customers—including inactive ones—are represented with consistent, analytics-ready features.

# Step 11: Engagement Score

In [49]:
customer_engagement = customer_engagement.withColumn('recency', F.datediff(F.lit(as_of_date), F.col('last_order_date')))
'''customer_engagement = customer_engagement.withColumn(
    'recency',
    F.when(F.col('last_order_date').isNull(),
           F.datediff(F.lit(as_of_date), F.col('signup_date')))
    .otherwise(F.datediff(F.lit(as_of_date), F.col('last_order_date')))
)'''

"customer_engagement = customer_engagement.withColumn(\n    'recency',\n    F.when(F.col('last_order_date').isNull(),\n           F.datediff(F.lit(as_of_date), F.col('signup_date')))\n    .otherwise(F.datediff(F.lit(as_of_date), F.col('last_order_date')))\n)"

In [50]:
customer_engagement.show()

+-----------+--------------------+-----------+-------+------------+-------------+---------------+---------------------+-------+
|customer_id|               email|signup_date|country|total_orders|total_revenue|last_order_date|sessions_last_30_days|recency|
+-----------+--------------------+-----------+-------+------------+-------------+---------------+---------------------+-------+
|          1|customer_1@email.com| 2023-09-21|     US|           1|       236.23|     2023-03-24|                    0|    648|
|          2|customer_2@email.com| 2023-06-13|     UK|          14|      9554.23|     2024-10-04|                    3|     88|
|          3|customer_3@email.com| 2023-08-25|     IN|           2|       677.18|     2023-09-17|                    1|    471|
|          4|customer_4@email.com| 2023-05-14|     UK|           4|      1342.01|     2024-06-05|                    2|    209|
|          5|customer_5@email.com| 2023-05-03|     IN|           6|      1087.36|     2024-01-08|       

Recency is calculated as the number of days since a customer's most recent purchase. This enables normalization and weighting within an engagament score, where more recent activity indicates higher customer engagement.

In [51]:
def min_max_norm(df, col_name):
  stats = df.agg(
      F.min(col_name).alias('min_val'),
      F.max(col_name).alias('max_val')
  ).collect()[0]

  min_val, max_val = stats['min_val'], stats['max_val']
  denom = max_val - min_val
  if denom == 0:
    return F.lit(0.0)
  return (F.col(col_name) - min_val) / denom

In [52]:
customer_engagement = customer_engagement.withColumn("recency_score", 1 - min_max_norm(customer_engagement, "recency"))

In [53]:
customer_engagement = customer_engagement.withColumn("order_score", min_max_norm(customer_engagement, "total_orders"))

In [54]:
customer_engagement = customer_engagement.withColumn("session_score", min_max_norm(customer_engagement, "sessions_last_30_days"))

In [55]:
customer_engagement.select(
    F.mean("total_revenue").alias("avg_revenue"),
    F.expr("percentile_approx(total_revenue, 0.5)").alias("median_revenue"),
    F.max("total_revenue").alias("max_revenue")
).show()

+-----------+--------------+-----------+
|avg_revenue|median_revenue|max_revenue|
+-----------+--------------+-----------+
|  2387.5044|       1141.04|   10841.98|
+-----------+--------------+-----------+



The data is skewed. There is a large gap between max_revenue and median which indicates a heavy tail.

In [56]:
customer_engagement.selectExpr(
    "percentile_approx(total_revenue, 0.5) as p50",
    "percentile_approx(total_revenue, 0.9) as p90",
    "percentile_approx(total_revenue, 0.99) as p99"
).show()

+-------+-------+--------+
|    p50|    p90|     p99|
+-------+-------+--------+
|1141.04|7673.01|10333.25|
+-------+-------+--------+



In [57]:
customer_engagement = customer_engagement.withColumn("log_revenue", F.log1p(F.col("total_revenue")))
customer_engagement = customer_engagement.withColumn("monetary_score", min_max_norm(customer_engagement, "log_revenue"))

In [58]:
customer_engagement = customer_engagement.withColumn("frequency_score", 0.6 * F.col("order_score") + 0.4 * F.col("session_score"))

In [59]:
customer_engagement = customer_engagement.withColumn('engagement_score',
                                                     F.round(0.4 * F.col("recency_score") + 0.3 * F.col("frequency_score") + 0.3 * F.col("monetary_score")
                                                         ,2
                                                     ))

In [60]:
customer_engagement = customer_engagement.withColumn(
    "engagement_segment",
    F.when(F.col("engagement_score") >= 0.7, "High")
    .when(F.col("engagement_score") >= 0.4, "Medium")
    .when(F.col("engagement_score") >= 0.2, "Low")
    .otherwise("At Risk")
)

In [61]:
customer_engagement = customer_engagement.withColumn(
    "engagement_status",
    F.when(
        F.col("last_order_date").isNull() &
        (F.col("sessions_last_30_days") == 0),
        "No Activity"
    )
    .when(
        F.col("last_order_date").isNull(),
        "Browsing Only"
    )
    .otherwise("Active")
)

In [62]:
customer_engagement = customer_engagement.withColumn(
    "engagement_score",
    F.when(
        F.col("engagement_score").isNull(),
        F.lit(0.0)
    ).otherwise(F.col("engagement_score"))
)

Null scores are handled through explicit customer status labeling to distinguish inactive and new customers from genuinely low-engagement users, ensuring analytical and business correctness.

In [63]:
customer_engagement.persist(StorageLevel.MEMORY_AND_DISK)
customer_engagement.count()

500

In [64]:
assert (
    customer_engagement.groupBy("customer_id")
    .count()
    .filter("count > 1")
    .count() == 0
)

In [65]:
# Added later
assert customer_engagement.filter(
    F.col("engagement_score").isNull()
).count() == 0, "Null engagement scores found"

# Verify score ranges
assert customer_engagement.filter(
    (F.col("engagement_score") < 0) | (F.col("engagement_score") > 1)
).count() == 0, "Engagement scores out of range"

In [66]:
customer_engagement.show()

+-----------+--------------------+-----------+-------+------------+-------------+---------------+---------------------+-------+-------------------+-------------------+-------------+------------------+------------------+-------------------+----------------+------------------+-----------------+
|customer_id|               email|signup_date|country|total_orders|total_revenue|last_order_date|sessions_last_30_days|recency|      recency_score|        order_score|session_score|       log_revenue|    monetary_score|    frequency_score|engagement_score|engagement_segment|engagement_status|
+-----------+--------------------+-----------+-------+------------+-------------+---------------+---------------------+-------+-------------------+-------------------+-------------+------------------+------------------+-------------------+----------------+------------------+-----------------+
|          1|customer_1@email.com| 2023-09-21|     US|           1|       236.23|     2023-03-24|                    0

In [67]:
high_risk_customers = customer_engagement.filter((
    F.col("engagement_score") < 0.3) & (F.col("engagement_status") == "Active")
)

In [68]:
high_risk_customers.show()

+-----------+--------------------+-----------+-------+------------+-------------+---------------+---------------------+-------+--------------------+-------------------+-------------+------------------+-------------------+-------------------+----------------+------------------+-----------------+
|customer_id|               email|signup_date|country|total_orders|total_revenue|last_order_date|sessions_last_30_days|recency|       recency_score|        order_score|session_score|       log_revenue|     monetary_score|    frequency_score|engagement_score|engagement_segment|engagement_status|
+-----------+--------------------+-----------+-------+------------+-------------+---------------+---------------------+-------+--------------------+-------------------+-------------+------------------+-------------------+-------------------+----------------+------------------+-----------------+
|          1|customer_1@email.com| 2023-09-21|     US|           1|       236.23|     2023-03-24|               

In [69]:
print(f"Customer count: {customer_engagement.count()}")
print(f"Avg engagement score: {customer_engagement.agg(F.avg('engagement_score')).first()[0]}")

Customer count: 500
Avg engagement score: 0.57936


In [70]:
#customer_engagement.write.mode('overwrite').option("header", "true").csv(f"{root_dir}/customer_engagement.csv")

In [71]:
#from google.colab import files
#files.download(f"{root_dir}/customer_engagement.csv")