In [1]:
!pip install -q pyspark

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, to_timestamp, lower, lit

In [5]:
spark = SparkSession.builder.appName('Customer360Project').getOrCreate()


In [6]:
customer_fp = "Customer360Insights.csv"
meesho_fp = "kurtiData.csv"

customer360_df = spark.read.csv(customer_fp, header=True, inferSchema=True)
meesho_df = spark.read.csv(meesho_fp, header=True, inferSchema=True)


In [17]:
# --- Clean Customer360 ---
customer360_df_cleaned = customer360_df \
    .filter((col('CustomerID').isNotNull()) & (col('CustomerID') != '') & (col('SessionStart').isNotNull())) \
    .dropDuplicates() \
    .withColumn(
        'event_type',
        when(col('OrderConfirmation').isNotNull(), 'purchase')
        .when(col('CartAdditionTime').isNotNull(), 'add_to_cart')
        .when(col('SessionStart').isNotNull(), 'session_start')
        .otherwise('other')
    ) \
    .withColumn('event_type', lower(col('event_type'))) \
    .withColumn('user_id', col('CustomerID').cast('string')) \
    .withColumn('event_timestamp', to_timestamp(col('SessionStart'))) \
    .withColumnRenamed('Product', 'product_id') \
    .withColumn('price', col('Price').cast('double')) \
    .withColumn('source', lit('customer360'))

# --- Clean Meesho ---
# Assume all interactions as "interaction" if you have no event_type, or derive as needed
meesho_df_cleaned = meesho_df \
    .filter((col('user_id').isNotNull()) & (col('user_id') != '') & (col('timestamp').isNotNull())) \
    .dropDuplicates() \
    .withColumn('event_type', lit('interaction')) \
    .withColumn('event_type', lower(col('event_type'))) \
    .withColumn('user_id', col('user_id').cast('string')) \
    .withColumn('event_timestamp', to_timestamp(col('timestamp'))) \
    .withColumn('price', col('price').cast('double')) \
    .withColumn('source', lit('meesho'))

In [18]:
# Define your unified schema columns
common_columns = ['user_id', 'event_type', 'event_timestamp', 'product_id', 'price', 'source']

# Helper function to add missing columns as nulls if needed
def align_cols(df, cols):
    for colname in cols:
        if colname not in df.columns:
            df = df.withColumn(colname, lit(None))
    return df

customer360_aligned = align_cols(customer360_df_cleaned, common_columns).select(*common_columns)
meesho_aligned = align_cols(meesho_df_cleaned, common_columns).select(*common_columns)

In [20]:
# Union rows from both datasets, preserving all events & sources
blended_events_df = customer360_aligned.unionByName(meesho_aligned)

blended_events_df.groupBy("source").count().show()
blended_events_df.filter(blended_events_df.source == "customer360").show(5, truncate=False)
blended_events_df.filter(blended_events_df.source == "meesho").show(5, truncate=False)



+------+------+
|source| count|
+------+------+
|meesho|425998|
+------+------+

+-------+----------+---------------+----------+-----+------+
|user_id|event_type|event_timestamp|product_id|price|source|
+-------+----------+---------------+----------+-----+------+
+-------+----------+---------------+----------+-----+------+

+--------------------------------+-----------+-------------------+----------+-----+------+
|user_id                         |event_type |event_timestamp    |product_id|price|source|
+--------------------------------+-----------+-------------------+----------+-----+------+
|8c153b42a53c4c4a9ac69f058cc057c4|interaction|2028-10-06 18:20:48|1xiamj    |NULL |meesho|
|99026c6d9ed44b5c8cfa2250dcd78fbb|interaction|2002-10-11 11:18:56|1xiamj    |NULL |meesho|
|c204760c97304f6fb019260734971125|interaction|2018-11-12 04:13:30|1xiamj    |NULL |meesho|
|7bf28679b4c447ce868a63d67de20332|interaction|2020-05-23 04:55:36|1xiamj    |NULL |meesho|
|903c6789de714d7eb654e93e83d260f7|int

In [21]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# 1. Define a window by user, sorted by timestamp
window = Window.partitionBy('user_id').orderBy('event_timestamp')

# 2. Calculate time diff from previous event for each user
blended_events_df = blended_events_df.withColumn(
    'prev_event_time', F.lag('event_timestamp').over(window)
)

# 3. Define if there is a session break (gap > 30 min = 1800 seconds)
blended_events_df = blended_events_df.withColumn(
    'session_break',
    (F.unix_timestamp('event_timestamp') - F.unix_timestamp('prev_event_time') > 1800).cast('int')
)

# 4. Fill nulls in session_break (first row will be null)
blended_events_df = blended_events_df.fillna({'session_break': 0})

# 5. Generate session_id by cumulatively summing session_break per user
blended_events_df = blended_events_df.withColumn(
    'session_id', F.sum('session_break').over(window)
)


In [22]:
# Define scores for event types
score_expr = F.expr("""
    CASE
        WHEN event_type = 'purchase' THEN 5
        WHEN event_type = 'add_to_cart' THEN 3
        WHEN event_type = 'interaction' THEN 1
        WHEN event_type = 'session_start' THEN 1
        ELSE 0
    END
""")

# Assign score for each row/event
blended_events_df = blended_events_df.withColumn('engagement_score', score_expr)

# To get total engagement per user:
user_engagement = blended_events_df.groupBy('user_id').agg(
    F.sum('engagement_score').alias('total_engagement_score'),
    F.count('*').alias('total_events')
)
user_engagement.show(10)


+--------------------+----------------------+------------+
|             user_id|total_engagement_score|total_events|
+--------------------+----------------------+------------+
|073fb454dfad4df9a...|                    15|          15|
|2f08b15fdbff4c509...|                    13|          13|
|e84bc6e7c23e40929...|                    13|          13|
|31566d13883241eea...|                    13|          13|
|c175da6894564b4da...|                    12|          12|
|0dfae0d064384a108...|                    15|          15|
|54d507de281349c5a...|                    13|          13|
|b586a6d6759541558...|                    19|          19|
|ade219f55aa3400f9...|                    13|          13|
|412c58dfea0c43afb...|                    16|          16|
+--------------------+----------------------+------------+
only showing top 10 rows



In [23]:
from pyspark.sql.functions import col, min as min_, trunc, month, year, countDistinct

# 1. Find each user's cohort month (month of first event)
user_cohort = blended_events_df.groupBy('user_id').agg(
    trunc(min_('event_timestamp'), 'MM').alias('cohort_month')
)

# 2. Join back to main event log to assign each event its user's cohort
events_with_cohort = blended_events_df.join(user_cohort, on='user_id', how='left')

# 3. For each row, extract the event month
events_with_cohort = events_with_cohort.withColumn(
    'event_month', trunc(col('event_timestamp'), 'MM')
)

# 4. Group by cohort and event month, count unique users retained
retention_table = events_with_cohort.groupBy('cohort_month', 'event_month').agg(
    countDistinct('user_id').alias('retained_users')
).orderBy('cohort_month', 'event_month')

retention_table.show(15, truncate=False)


+------------+-----------+--------------+
|cohort_month|event_month|retained_users|
+------------+-----------+--------------+
|2001-09-01  |2001-09-01 |792           |
|2001-09-01  |2001-10-01 |34            |
|2001-09-01  |2001-11-01 |26            |
|2001-09-01  |2001-12-01 |26            |
|2001-09-01  |2002-01-01 |22            |
|2001-09-01  |2002-02-01 |32            |
|2001-09-01  |2002-03-01 |28            |
|2001-09-01  |2002-04-01 |28            |
|2001-09-01  |2002-05-01 |30            |
|2001-09-01  |2002-06-01 |26            |
|2001-09-01  |2002-07-01 |33            |
|2001-09-01  |2002-08-01 |29            |
|2001-09-01  |2002-09-01 |28            |
|2001-09-01  |2002-10-01 |28            |
|2001-09-01  |2002-11-01 |25            |
+------------+-----------+--------------+
only showing top 15 rows



In [24]:
blended_events_df.toPandas().to_csv('/content/blended_events.csv', index=False)
retention_table.toPandas().to_csv('/content/retention_table.csv', index=False)


In [25]:
from google.colab import files
files.download('/content/blended_events.csv')
files.download('/content/retention_table.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>