In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
 # Azure storage access info
blob_account_name = "bgupb202402juanbarriento"
blob_container_name = "marketplace"
blob_sas_token = 'sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupiytfx&se=2024-12-01T01:56:16Z&st=2024-10-16T17:56:16Z&spr=https&sig=cM7vK4vvrL9YH31ZYI%2BX%2BmssMUTdRieP%2BMGcVNPoagA%3D'

# Allow SPARK to read from Blob remotely
wasbs_path = f'wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/'
spark.conf.set( f'fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net' , blob_sas_token)
print('Remote blob path: ' + wasbs_path)

Remote blob path: wasbs://marketplace@bgupb202402juanbarriento.blob.core.windows.net/


In [0]:
# Load silver table
silver_df = spark.read.format("delta").load(f"{wasbs_path}silver")

# Transformations

Bases in the month to month sales, classify the clients by growing.
1. if the grow is >= 10% set the target to 1
1. otherwise set the target to 0

In [0]:
%sql
SELECT DISTINCT event_type FROM jdbr_mkp.mkp_silver

event_type
purchase
view
cart


In [0]:
purchase_df = silver_df.filter(col('event_type') == 'purchase') \
    .groupBy('user_id', 'year','month').sum('price') \
    .withColumnRenamed('sum(price)', 'purchase_amount') \
    .orderBy('user_id', 'year', 'month')

In [0]:
display(purchase_df)

In [0]:
window_specs = Window.partitionBy('user_id').orderBy('user_id', 'year', 'month')

In [0]:
purchase_lag_df = purchase_df.withColumn('purchase_lag', lag(purchase_df.purchase_amount).over(window_specs).cast('double'))

In [0]:
display(purchase_lag_df)

In [0]:
grow_delta_df = purchase_lag_df.withColumn('delta_revenue', (purchase_lag_df.purchase_amount - purchase_lag_df.purchase_lag).cast('double'))

In [0]:
grow_percent_df = grow_delta_df.withColumn('percent_delta_revenue', (grow_delta_df['delta_revenue'] / grow_delta_df['purchase_lag']).cast('double'))

In [0]:
display(grow_percent_df)

In [0]:
grow_target_df = grow_percent_df.withColumn('target', when(grow_percent_df['percent_delta_revenue'] >= 0.1, 1).otherwise(0)).filter(col('target').isNotNull())

In [0]:
display(grow_target_df)

In [0]:
grow_target_df = grow_target_df.drop('purchase_amount', 'delta_revenue', 'percent_delta_revenue', 'purchase_lag') 

In [0]:
grow_target_df.write.format("delta").mode("overwrite").save(f"{wasbs_path}features/grow_target")

# Aggregations

## Number of sessions by user, year, month

In [0]:
sessions_per_user_year_month_df = silver_df.groupBy("user_id", "year", "month").agg(countDistinct("user_session").alias('session_per_user_year_month')).orderBy("user_id","year", "month")

In [0]:
display(sessions_per_user_year_month_df.head(20))

user_id,year,month,session_per_user_year_month
100037567,2019,12,10
100037567,2020,1,75
100140882,2019,12,12
100140882,2020,1,32
100140882,2020,2,29
100140882,2020,3,13
100140882,2020,4,4
100594172,2019,12,1
100705743,2019,12,2
100705743,2020,1,1


In [0]:
%sql
SELECT user_id, year, month, COUNT(DISTINCT user_session) as session_per_user_year_month
FROM jdbr_mkp.mkp_silver
GROUP BY user_id, year, month
ORDER BY user_id, year, month

In [0]:
df = spark.sql("""
SELECT user_id, year, month, COUNT(DISTINCT user_session) as session_per_user_year_month
FROM jdbr_mkp.mkp_silver
GROUP BY user_id, year, month
ORDER BY user_id, year, month""")

## user session delta time

In [0]:
session_duration_df = silver_df.groupBy("user_id", "year", "month").agg(min("event_time_timestamp"),max("event_time_timestamp"))

In [0]:
display(session_duration_df.withColumn("session_duration", col("max(event_time_timestamp)") - col("min(event_time_timestamp)")))

In [0]:
session_duration_df = session_duration_df.withColumn("session_duration", (col("max(event_time_timestamp)") - col("min(event_time_timestamp)")).cast('long')).drop("max(event_time_timestamp)", "min(event_time_timestamp)")

In [0]:
display(session_duration_df)

In [0]:
session_duration_df.write.format("delta").mode("overwrite").save(f"{wasbs_path}features/session_duration")

In [0]:
sessions_per_user_year_month_df.write.format("delta").mode("overwrite").save(f"{wasbs_path}features/sessions_per_user_year_month")

## Convertion rate
purchases / user visits per month


In [0]:
convertion_rate_per_user_year_month_df = silver_df.groupBy('user_id', 'year', 'month')\
    .agg(
        count(when(col('event_type') == 'view', True)).alias('num_views'),
        count(when(col('event_type') == 'purchase', True)).alias('num_purchase'),
    )\
    .fillna({'num_views':0, 'num_purchase':0})\
    .withColumn('convertion_rate', (col('num_purchase')/col('num_views')))\
    .drop('num_purchase','num_views')\
    .orderBy('user_id','year','month')

In [0]:
display(convertion_rate_per_user_year_month_df.head(10))

user_id,year,month,convertion_rate
100037567,2019,12,0.0
100037567,2020,1,0.0
100140882,2019,12,0.0
100140882,2020,1,0.0
100140882,2020,2,0.0
100140882,2020,3,0.0
100140882,2020,4,0.0
100594172,2019,12,0.0
100705743,2019,12,0.0
100705743,2020,1,0.0


## Average Purchase Order Value
Average value of purchases by user, year, and month

In [0]:
avg_purchase_per_user_year_month_df = silver_df.filter(col('event_type') == 'purchase')\
    .groupBy('user_id', 'year', 'month')\
    .agg(mean(col('price')).alias('mean_price'))\
    .orderBy('user_id','year','month')

In [0]:
display(avg_purchase_per_user_year_month_df.head(10))

user_id,year,month,mean_price
101875240,2020,1,184.52
107620212,2020,1,244.28
128968633,2019,12,119.59666666666668
136662675,2019,12,102.65
145611266,2019,12,39.9
145611266,2020,4,41.66
177507559,2019,12,154.16
184265397,2020,2,90.07
192078182,2020,1,308.86
192078182,2020,3,110.72


## Cart Abandonment Rate

In [0]:
abandon_rate_per_session_df= silver_df.filter((col('event_type') == 'purchase') | (col('event_type') == 'cart'))\
    .groupBy('user_id', 'year', 'month', 'user_session', 'product_id')\
    .pivot('event_type')\
    .agg(count('product_id'))\
    .fillna({'cart':0, 'purchase':0})\
    .withColumn('cart_abandone_rate', ((col('cart')-col('purchase')))/col('cart'))

In [0]:
display(abandon_rate_per_session_df.head(20))

user_id,year,month,user_session,product_id,cart,purchase,cart_abandone_rate
587850980,2019,12,c7fa246d-c438-4965-8ab7-3f0004f9104b,1005112,1,1,0.0
519275530,2019,12,7e620fda-357c-4139-82ca-22104b1793c9,1005169,1,1,0.0
539302311,2019,12,23d07d81-12db-449b-8b87-4c820b8c5d3f,1005115,2,1,0.5
571712863,2019,12,9ae23bbf-b1a8-4a88-ae35-3005c2858aac,20000683,1,1,0.0
515535741,2019,11,60d69a60-0cf7-4bdb-b6f7-bf056da450a1,1005133,1,0,1.0
554080544,2019,11,7f088988-ec78-4f60-8653-9c9c1b984571,1005017,1,1,0.0
572131463,2019,11,4b2e145b-2540-4b6e-b84b-28c54f1bfb70,1004833,1,1,0.0
560971004,2019,11,df9dc246-362d-48fe-9f16-0ad05da5701a,1004358,1,0,1.0
513538467,2020,4,64897071-e4b0-4285-af2a-1ff76b8989ac,1004886,4,1,0.75
553709996,2019,10,a8b5ee88-bae6-bd1c-1a46-2aa533bc5909,1004794,2,0,1.0


In [0]:
avg_cart_abandone_rate_df = abandon_rate_per_session_df.groupBy('user_id', 'year', 'month')\
    .agg(mean(col('cart_abandone_rate')).alias('avg_cart_abandone_rate'))\
    .orderBy('user_id','year','month')

In [0]:
convertion_rate_per_user_year_month_df.write.format("delta").mode("overwrite").save(f"{wasbs_path}features/convertion_rate_per_user_year_month")

In [0]:
avg_purchase_per_user_year_month_df.write.format("delta").mode("overwrite").save(f"{wasbs_path}features/avg_purchase_per_user_year_month")

In [0]:
avg_cart_abandone_rate_df.write.format("delta").mode("overwrite").save(f"{wasbs_path}features/avg_cart_abandone_rate")

## Features Join

In [0]:
#Load al the featues and target data
grow_target_df = spark.read.format("delta").load(f"{wasbs_path}features/grow_target")
sessions_per_user_year_month_df = spark.read.format("delta").load(f"{wasbs_path}features/sessions_per_user_year_month")
session_duration_df = spark.read.format("delta").load(f"{wasbs_path}features/session_duration")
avg_cart_abandone_rate_df = spark.read.format("delta").load(f"{wasbs_path}features/avg_cart_abandone_rate")
avg_purchase_per_user_year_month_df = spark.read.format("delta").load(f"{wasbs_path}features/avg_purchase_per_user_year_month")
convertion_rate_per_user_year_month_df = spark.read.format("delta").load(f"{wasbs_path}features/convertion_rate_per_user_year_month")

#The student must to complete loading the rest of dataframes

### Joining data frames

In [0]:
def join_dfs(list_df):
    joined_df = grow_target_df
    for l in list_df:
        joined_df = joined_df.join(l, ['user_id','year','month'], how='left')
    return joined_df

In [0]:
# Add all the features dataframes to the list, here are only 2 dataframes
features_df = join_dfs([sessions_per_user_year_month_df, session_duration_df, avg_cart_abandone_rate_df, avg_purchase_per_user_year_month_df, convertion_rate_per_user_year_month_df])
display(features_df)

In [0]:
features_df.write.format("delta").mode("overwrite").save(f"{wasbs_path}features/features_df")

#### Features Dataset

The feature dataset, which will be used in Machine Learning, consists of the following columns:

| Column Name                   | Data Type  | Description |
|-------------------------------|------------|-------------|
| **year**                       | integer    | The year of the data. |
| **month**                      | integer    | The month of the data. |
| **target**                     | integer    | The target variable. Based on month-to-month sales, clients are classified by growth. If the growth is >= 10%, the value is set to 1; otherwise, it is set to 0. |
| **session_per_user_year_month** | long       | The total number of sessions per user within the given year and month. |
| **session_duration**           | long       | The total duration (in seconds) of all sessions within the given year and month. |
| **avg_cart_abandone_rate**  | double     | The average cart abandonment rate for users within the given time period. |
| **mean_price**                 | double     | The average price of purchases within the given year and month. |
| **conversion_rate**            | double     | The conversion rate, representing the percentage of purchases per user visit per month. |
