#Imports

In [0]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.mllib.evaluation import RankingMetrics
from datetime import datetime, timedelta

# Configs

In [0]:
start_datetime = datetime.today() - timedelta(days=datetime.today().weekday() + 7)
end_datetime = start_datetime + timedelta(days=6)
start_date = start_datetime.strftime('%Y-%m-%d')
end_date = end_datetime.strftime('%Y-%m-%d')
k = 20

realestate = "main"
print(start_date, end_date)

2025-02-10 2025-02-16


#Functions

In [0]:
@F.udf(T.ArrayType(T.IntegerType()))
def sorter_fn(l):
    res = sorted(l, key=lambda x: x["catalog_position"])
    return [item['catalog_id'] for item in res]

#Raw Data

In [0]:
views_anonymous = spark.sql(f""" 
                            select concat(year, '-', month, '-',day) as date, cast(user_id as string), cast(catalog_id as int), catalog_position , session_id 
                            from silver.mixpanel_android__anonymous_catalog_views_report 
                            where concat(year, '-', month, '-',day) >= "{start_date}" and 
                            concat(year, '-', month, '-',day) <= "{end_date}" 
                            and origin = '{realestate}' 
                            and user_id is not null 
                            and catalog_id is not null 
                            and session_id is not null
                        """)

views_signed_up = spark.sql(
                        f"""
                            select concat(year, '-', month, '-',day) as date, cast(user_id as string), cast(catalog_id as int), catalog_position , session_id
                            from silver.mixpanel_android__catalog_views_report_final
                            where concat(year, '-', month, '-',day) >= "{start_date}" and 
                            concat(year, '-', month, '-',day) <= "{end_date}" and
                            origin = '{realestate}' and 
                            user_id is not null and catalog_id is not null and session_id is not null
                        """
                        )

views_raw_data = views_signed_up.unionByName(views_anonymous).cache()

preds = views_raw_data.filter(f"catalog_position <= {k}") \
            .groupBy('session_id', 'user_id') \
            .agg(F.collect_set(F.struct('catalog_id', 'catalog_position')).alias('preds')) \
            .withColumn('preds', sorter_fn('preds'))

views_cdf_data = views_raw_data.groupBy('date', 'user_id', 'catalog_id') \
                            .agg(F.count('*').alias('views')).cache()

In [0]:
clicks_anonymous = spark.sql(f"""
        select
         concat(year, '-', month, '-',day) as date,
          session_id,
          cast(user_id as string),
          catalog_id,
          count(*) as clicks
        from
          silver.mixpanel_android__anonymous_catalog_opened
        where
          concat(year, '-', month, '-',day) >= "{start_date}" and 
          concat(year, '-', month, '-',day) <= "{end_date}"
          and origin = '{realestate}'
          and user_id is not NULL
          and catalog_id is not NULL
          and session_id is not null
          group by 1, 2, 3, 4
    """)

clicks_signep_up = spark.sql(f"""
        select
         concat(year, '-', month, '-',day) as date,
          session_id,
          cast(user_id as string),
          catalog_id,
          count(*) as clicks
        from
          silver.mixpanel_android__catalog_opened_main
        where
          concat(year, '-', month, '-',day) >= "{start_date}" and 
          concat(year, '-', month, '-',day) <= "{end_date}"
          and origin = '{realestate}'
          and user_id is not NULL
          and catalog_id is not NULL
          and session_id is not null
          group by 1, 2, 3, 4
    """)

clicks_raw_data = clicks_signep_up.unionByName(clicks_anonymous).cache()
  
labels = clicks_raw_data.groupBy('session_id', 'user_id') \
            .agg(F.collect_set('catalog_id').alias('labels'))

clicks_cdf_data = clicks_raw_data.groupBy('date', 'user_id', 'catalog_id') \
                            .agg(F.sum('clicks').alias('clicks')).cache()

In [0]:
ranking_metrics_df = labels.join(preds,on=['session_id', 'user_id'],how="inner").cache()

In [0]:
scroll_depth_raw = spark.sql(
f"""
    select 
    concat(year, '-', month, '-',day) as date,
    cast(user_id as string),
    max(catalog_position) as scroll_depth
    from gold.unique_signed_up_catalog_views_deduped
    where concat(year, '-', month, '-',day) >= '{start_date}' and 
    concat(year, '-', month, '-',day) <= '{end_date}'
    and origin = '{realestate}'
    group by 1,2
""").cache() 

In [0]:
catalog_price = spark.sql(f"""
                            select dt as date, catalog_id, price
                            from gold.pricing_ranking_attributes 
                            where dt >= '{start_date}' and dt <= '{end_date}'
                    """)

In [0]:
od_data = spark.sql(
f"""
    select cast(user_id as string), count(distinct omb.order_id) as od
    from platinum.order_master_bi omb 
    where test=0 and verified=1 and omb.order_status in (4,6) and omb.order_date < '{start_date}'
    group by user_id 
"""
).cache()

In [0]:
app_open_anonymous = spark.sql(f"""
                        select distinct cast(distinct_id as string) as user_id , concat(year,'-',month,'-',day) as date
                        from silver.mixpanel_android__anonymous_app_open_pre_signup
                        where concat(year, '-', month, '-',day) >= '{start_date}' and 
                        concat(year, '-', month, '-',day) <= '{end_date}'
                        and distinct_id is not null
                    """)

app_open_signed_up = spark.sql(f"""
                        select distinct cast(user_id as string) , concat(year,'-',month,'-',day) as date
                        from silver.mixpanel_android__app_open 
                        where concat(year, '-', month, '-',day) >= '{start_date}' and 
                        concat(year, '-', month, '-',day) <= '{end_date}'
                        and user_id is not null
                    """)

app_open_raw_data = app_open_anonymous.unionByName(app_open_signed_up).cache()

# 22OD+ (Feed Only)

In [0]:
od_22_users = od_data.filter('od >= 22').select('user_id').cache()

## 80 VC Clicks (Clicks CDF)![](![path](path))

In [0]:
# Perform inner join on clicks_data and od_data on user_id
clicks_data = clicks_cdf_data.join(od_22_users, on="user_id", how="inner")

# Group by user_id and take sum of clicks
clicks_data = clicks_data.groupBy("date", "catalog_id").agg(F.sum("clicks").alias("clicks"))

w = Window.partitionBy("date").orderBy(F.col("clicks").desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)

clicks_data = clicks_data.withColumn("cf_sum", F.sum("clicks").over(w))

total_clicks = clicks_data.groupBy('date') \
                        .agg(F.sum("clicks").alias('total_clicks'))

clicks_data = clicks_data.join(total_clicks, on="date", how="inner") \
                    .withColumn("cf_perc", F.col("cf_sum")/F.col("total_clicks"))

clicks_cdf = clicks_data.filter("cf_perc <= 0.8") \
                .groupBy("date") \
                .agg(F.count('catalog_id').alias('clicks_cdf'))

clicks_cdf_avg = clicks_cdf.agg(F.avg('clicks_cdf').alias('avg_clicks_cdf')).collect()[0][0]

print("clicks_cdf_avg:", clicks_cdf_avg)

clicks_cdf_avg: 39500.42857142857


## 80 VC Views (Views CDF)


In [0]:
views_data = views_cdf_data.join(od_22_users, on="user_id", how="inner")

# Group by user_id and take sum of views
views_data = views_data.groupBy("date", "catalog_id").agg(F.sum("views").alias("views"))

w = Window.partitionBy("date").orderBy(F.col("views").desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)

views_data = views_data.withColumn("cf_sum", F.sum("views").over(w))

total_views = views_data.groupBy('date') \
                        .agg(F.sum("views").alias('total_views'))

views_data = views_data.join(total_views, on="date", how="inner") \
                    .withColumn("cf_perc", F.col("cf_sum")/F.col("total_views"))

views_cdf = views_data.filter("cf_perc <= 0.8") \
                .groupBy("date") \
                .agg(F.count('catalog_id').alias('views_cdf'))

views_cdf_avg = views_cdf.agg(F.avg('views_cdf').alias('avg_views_cdf')).collect()[0][0]

print("views_cdf_avg:", views_cdf_avg)

views_cdf_avg: 42717.57142857143


## Scroll Depth


In [0]:
scroll_depth = scroll_depth_raw.join(od_22_users, on="user_id", how="inner").drop("od")
scroll_depth = scroll_depth.select(F.avg('scroll_depth').alias('avg_scroll_depth'))
sd = scroll_depth.select("avg_scroll_depth").collect()[0][0]
print("scroll_depth:", sd)

scroll_depth: 81.32749970830149


## % of Users with atleast 1 FY Clicks


In [0]:
clicks_data = clicks_cdf_data.join(od_22_users, on="user_id", how="inner")
app_open_data = app_open_raw_data.join(od_22_users, on="user_id", how="inner")

num_users_with_fy_clicks = clicks_data.groupBy("date").agg(F.countDistinct("user_id").alias("num_users_with_fy_clicks"))

dau = app_open_data.groupBy("date").agg(F.countDistinct("user_id").alias("dau"))

perct_users_with_fy_clicks = num_users_with_fy_clicks.join(dau, ['date'], 'inner') \
                                               .withColumn("perct_users_with_fy_clicks", F.col("num_users_with_fy_clicks")/F.col("dau"))

# perct_users_with_fy_clicks.display()

perct_users_with_fy_clicks = perct_users_with_fy_clicks.agg(F.avg("perct_users_with_fy_clicks").alias("perct_users_with_fy_clicks")).collect()[0][0]

print("perct_users_with_fy_clicks:" ,perct_users_with_fy_clicks)

## Ranking Metrics


In [0]:
ans = ranking_metrics_df.join(od_22_users, 'user_id') \
        .select("preds", "labels") \
        .rdd

metrics = RankingMetrics(ans)

recall = metrics.recallAt(20)
print("recall:", recall)

ndcg = metrics.ndcgAt(20)
print("ndcg:", ndcg)



recall: 0.4205636996184287
ndcg: 0.20336934520315592


In [0]:
od_22_users.unpersist()

DataFrame[user_id: bigint]

In [0]:
metrics1 = dict()
metrics1["clicks_cdf"] = clicks_cdf_avg
metrics1["views_cdf"] = views_cdf_avg
metrics1["avg_scroll_depth"] = sd
metrics1["perct_users_with_fy_clicks"] = perct_users_with_fy_clicks
metrics1[f"ndcg@k"] = ndcg
metrics1[f"recall@k"] = recall
# metrics1["k"] = k

metrics1 = {k: str(v) for k, v in metrics1.items()}

# COMMAND ----------

table_schema = StructType([
    StructField("real_estate", StringType(), False),
    StructField("metrics_dict", MapType(StringType(), StringType()), False),
    StructField("start_date", StringType(), False),
    StructField("end_date", StringType(), False),
    StructField("start_datetime", TimestampType(), False),
    StructField("end_datetime", TimestampType(), False),
    ])
df1 = spark.createDataFrame([['FY 22OD+', metrics1, start_date, end_date, start_datetime, end_datetime]], table_schema)

df1.display()

# df.write.mode("append").saveAsTable("ds_silver_exp.prod_kaizen_re_metrics_full")

real_estate,metrics_dict,start_date,end_date,start_datetime,end_datetime
FY 22OD+,"Map(ndcg@k -> 0.20336934520315592, clicks_cdf -> 39500.42857142857, num_users_with_fy_clicks -> 2491794.714285714, recall@k -> 0.4205636996184287, avg_scroll_depth -> 81.32749970830149, views_cdf -> 42717.57142857143)",2025-02-03,2025-02-09,2025-02-03T17:35:54.835102+05:30,2025-02-09T17:35:54.835102+05:30


# < 22OD (FY+Homepage Only)


In [0]:
od_lt_22_users = od_data.filter('od < 22').select('user_id').cache()

## 80 VC Clicks (Clicks CDF)


In [0]:
# Perform inner join on clicks_data and od_data on user_id
clicks_data = clicks_cdf_data.join(od_lt_22_users, on="user_id", how="inner")

# Group by user_id and take sum of clicks
clicks_data = clicks_data.groupBy("date", "catalog_id").agg(F.sum("clicks").alias("clicks"))

w = Window.partitionBy("date").orderBy(F.col("clicks").desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)

clicks_data = clicks_data.withColumn("cf_sum", F.sum("clicks").over(w))

total_clicks = clicks_data.groupBy('date') \
                        .agg(F.sum("clicks").alias('total_clicks'))

clicks_data = clicks_data.join(total_clicks, on="date", how="inner") \
                    .withColumn("cf_perc", F.col("cf_sum")/F.col("total_clicks"))

clicks_cdf = clicks_data.filter("cf_perc <= 0.8") \
                .groupBy("date") \
                .agg(F.count('catalog_id').alias('clicks_cdf'))

clicks_cdf_avg = clicks_cdf.agg(F.avg('clicks_cdf').alias('avg_clicks_cdf')).collect()[0][0]

print("clicks_cdf_avg:", clicks_cdf_avg)

clicks_cdf_avg: 42933.28571428572


## 80 VC Views (Views CDF)


In [0]:
views_data = views_cdf_data.join(od_lt_22_users, on="user_id", how="inner")

# Group by user_id and take sum of views
views_data = views_data.groupBy("date", "catalog_id").agg(F.sum("views").alias("views"))

w = Window.partitionBy("date").orderBy(F.col("views").desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)

views_data = views_data.withColumn("cf_sum", F.sum("views").over(w))

total_views = views_data.groupBy('date') \
                        .agg(F.sum("views").alias('total_views'))

views_data = views_data.join(total_views, on="date", how="inner") \
                    .withColumn("cf_perc", F.col("cf_sum")/F.col("total_views"))

views_cdf = views_data.filter("cf_perc <= 0.8") \
                .groupBy("date") \
                .agg(F.count('catalog_id').alias('views_cdf'))

views_cdf_avg = views_cdf.agg(F.avg('views_cdf').alias('avg_views_cdf')).collect()[0][0]

print("views_cdf_avg:", views_cdf_avg)

views_cdf_avg: 45920.857142857145


## Scroll Depth

In [0]:
scroll_depth = scroll_depth_raw.join(od_lt_22_users, on="user_id", how="inner").drop("od")
scroll_depth = scroll_depth.select(F.avg('scroll_depth').alias('avg_scroll_depth'))
sd = scroll_depth.select("avg_scroll_depth").collect()[0][0]
print("scroll_depth:", sd)

scroll_depth: 82.71419130081183


## % of Users with atleast 1 FY Clicks
## 

In [0]:
clicks_data = clicks_cdf_data.join(od_lt_22_users, on="user_id", how="inner")
app_open_data = app_open_raw_data.join(od_lt_22_users, on="user_id", how="inner")

num_users_with_fy_clicks = clicks_data.groupBy("date").agg(F.countDistinct("user_id").alias("num_users_with_fy_clicks"))

dau = app_open_data.groupBy("date").agg(F.countDistinct("user_id").alias("dau"))

perct_users_with_fy_clicks = num_users_with_fy_clicks.join(dau, ['date'], 'inner') \
                                               .withColumn("perct_users_with_fy_clicks", F.col("num_users_with_fy_clicks")/F.col("dau"))

# perct_users_with_fy_clicks.display()

perct_users_with_fy_clicks = perct_users_with_fy_clicks.agg(F.avg("perct_users_with_fy_clicks").alias("perct_users_with_fy_clicks")).collect()[0][0]

print("perct_users_with_fy_clicks:" ,perct_users_with_fy_clicks)

## Clicks nDCG @ 20

In [0]:
ans = ranking_metrics_df.join(od_lt_22_users, 'user_id') \
        .select("preds", "labels") \
        .rdd
metrics = RankingMetrics(ans)

recall = metrics.recallAt(20)
print("recall:", recall)

ndcg = metrics.ndcgAt(20)
print("ndcg:", ndcg)



recall: 0.3912459735184172
ndcg: 0.17926070590589915


In [0]:
od_lt_22_users.unpersist()

DataFrame[user_id: bigint]

In [0]:
metrics2 = dict()
metrics2["clicks_cdf"] = clicks_cdf_avg
metrics2["views_cdf"] = views_cdf_avg
metrics2["avg_scroll_depth"] = sd
metrics2["perct_users_with_fy_clicks"] = perct_users_with_fy_clicks
metrics2[f"ndcg@k"] = ndcg
metrics2[f"recall@k"] = recall
# metrics2["k"] = k

metrics2 = {k: str(v) for k, v in metrics2.items()}

# COMMAND ----------

table_schema = StructType([
    StructField("real_estate", StringType(), False),
    StructField("metrics_dict", MapType(StringType(), StringType()), False),
    StructField("start_date", StringType(), False),
    StructField("end_date", StringType(), False),
    StructField("start_datetime", TimestampType(), False),
    StructField("end_datetime", TimestampType(), False),
    ])
df2 = spark.createDataFrame([['FY < 22OD', metrics2, start_date, end_date, start_datetime, end_datetime]], table_schema)

df2.display()

# df.write.mode("append").saveAsTable("ds_silver_exp.prod_kaizen_re_metrics_full")

real_estate,metrics_dict,start_date,end_date,start_datetime,end_datetime
FY < 22OD,"Map(ndcg@k -> 0.17926070590589915, clicks_cdf -> 42933.28571428572, num_users_with_fy_clicks -> 3646201.285714286, recall@k -> 0.3912459735184172, avg_scroll_depth -> 82.71419130081183, views_cdf -> 45920.857142857145)",2025-02-03,2025-02-09,2025-02-03T17:35:54.835102+05:30,2025-02-09T17:35:54.835102+05:30


# 0-1 FY Feed

## Click nDCG @ K

In [0]:
ans = ranking_metrics_df.join(od_data, 'user_id', 'leftanti') \
        .select("preds", "labels") \
        .rdd
metrics = RankingMetrics(ans)

recall = metrics.recallAt(20)
print("recall:", recall)

ndcg = metrics.ndcgAt(20)
print("ndcg:", ndcg)



recall: 0.406491456731617
ndcg: 0.19067172610388716


## scroll depth

In [0]:
scroll_depth = scroll_depth_raw.join(od_data, on="user_id", how="leftanti").drop("od")
scroll_depth = scroll_depth.select(F.avg('scroll_depth').alias('avg_scroll_depth'))
sd = scroll_depth.select("avg_scroll_depth").collect()[0][0]
print("scroll_depth:", sd)

scroll_depth: 79.51170923536905


## View weighted price

In [0]:
vwp = views_cdf_data.join(od_data, on="user_id", how="leftanti") \
                            .join(catalog_price, ['date', 'catalog_id'], 'inner') \
                            .withColumn('vwp', F.expr('price*views')) \
                            .groupBy('date') \
                            .agg((F.sum('vwp')/F.sum('views')).alias('vwp')) \
                            .select(F.avg('vwp')).collect()[0][0]

print('vwp:', vwp)

## 80 VC Clicks (Clicks CDF)

In [0]:
# Perform inner join on clicks_data and od_data on user_id
clicks_data = clicks_cdf_data.join(od_data, on="user_id", how="leftanti")

# Group by user_id and take sum of clicks
clicks_data = clicks_data.groupBy("date", "catalog_id").agg(F.sum("clicks").alias("clicks"))

w = Window.partitionBy("date").orderBy(F.col("clicks").desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)

clicks_data = clicks_data.withColumn("cf_sum", F.sum("clicks").over(w))

total_clicks = clicks_data.groupBy('date') \
                        .agg(F.sum("clicks").alias('total_clicks'))

clicks_data = clicks_data.join(total_clicks, on="date", how="inner") \
                    .withColumn("cf_perc", F.col("cf_sum")/F.col("total_clicks"))

clicks_cdf = clicks_data.filter("cf_perc <= 0.8") \
                .groupBy("date") \
                .agg(F.count('catalog_id').alias('clicks_cdf'))

clicks_cdf_avg = clicks_cdf.agg(F.avg('clicks_cdf').alias('avg_clicks_cdf')).collect()[0][0]

print("clicks_cdf_avg:", clicks_cdf_avg)

clicks_cdf_avg: 29611.0


## 80 VC Views (Views CDF)


In [0]:
views_data = views_cdf_data.join(od_data, on="user_id", how="leftanti")

# Group by user_id and take sum of views
views_data = views_data.groupBy("date", "catalog_id").agg(F.sum("views").alias("views"))

w = Window.partitionBy("date").orderBy(F.col("views").desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)

views_data = views_data.withColumn("cf_sum", F.sum("views").over(w))

total_views = views_data.groupBy('date') \
                        .agg(F.sum("views").alias('total_views'))

views_data = views_data.join(total_views, on="date", how="inner") \
                    .withColumn("cf_perc", F.col("cf_sum")/F.col("total_views"))

views_cdf = views_data.filter("cf_perc <= 0.8") \
                .groupBy("date") \
                .agg(F.count('catalog_id').alias('views_cdf'))

views_cdf_avg = views_cdf.agg(F.avg('views_cdf').alias('avg_views_cdf')).collect()[0][0]

print("views_cdf_avg:", views_cdf_avg)

views_cdf_avg: 27707.0


## % of Users with atleast 1 FY Clicks


In [0]:
clicks_data = clicks_cdf_data.join(od_data, on="user_id", how="leftanti")
app_open_data = app_open_raw_data.join(od_data, on="user_id", how="leftanti")

num_users_with_fy_clicks = clicks_data.groupBy("date").agg(F.countDistinct("user_id").alias("num_users_with_fy_clicks"))

dau = app_open_data.groupBy("date").agg(F.countDistinct("user_id").alias("dau"))

perct_users_with_fy_clicks = num_users_with_fy_clicks.join(dau, ['date'], 'inner') \
                                               .withColumn("perct_users_with_fy_clicks", F.col("num_users_with_fy_clicks")/F.col("dau"))

# perct_users_with_fy_clicks.display()

perct_users_with_fy_clicks = perct_users_with_fy_clicks.agg(F.avg("perct_users_with_fy_clicks").alias("perct_users_with_fy_clicks")).collect()[0][0]

print("perct_users_with_fy_clicks:" ,perct_users_with_fy_clicks)

In [0]:
metrics3 = dict()
metrics3["clicks_cdf"] = clicks_cdf_avg
metrics3["views_cdf"] = views_cdf_avg
metrics3[f"ndcg@k"] = ndcg
metrics3[f"recall@k"] = recall
metrics3["vwp"] = vwp
metrics3["perct_users_with_fy_clicks"] = perct_users_with_fy_clicks
metrics3 = {k: str(v) for k, v in metrics3.items()}

# COMMAND ----------

table_schema = StructType([
    StructField("real_estate", StringType(), False),
    StructField("user_cohort", StringType(), False),
    StructField("metrics_dict", MapType(StringType(), StringType()), False),
    StructField("start_date", StringType(), False),
    StructField("end_date", StringType(), False),
    StructField("start_datetime", TimestampType(), False),
    StructField("end_datetime", TimestampType(), False),
    ])
df3 = spark.createDataFrame([['main', "0-1 FY Feed", metrics3, start_date, end_date, start_datetime, end_datetime]], table_schema)

df3.display()

# df.write.mode("append").saveAsTable("ds_silver_exp.prod_kaizen_re_metrics_full")

real_estate,user_cohort,metrics_dict,start_date,end_date,start_datetime,end_datetime
main,0-1 FY Feed,"Map(clicks_cdf -> 29611.0, recall@k -> 0.406491456731617, ndcg@k -> 0.19067172610388716, views_cdf -> 27707.0)",2025-02-10,2025-02-16,2025-02-10T12:00:45.703077+05:30,2025-02-16T12:00:45.703077+05:30
