## Data transformation and Feature Engineering

In [1]:
import os
os.chdir('C:/Users/giddy/Documents/RECOMMENDATION_SYSTEM')
import findspark
findspark.init()
from pathlib import Path
from dataclasses import dataclass
from src.constants import *
from glob import glob
from src.logger import logging


from pyspark import StorageLevel
from pyspark.sql import SparkSession,Window
from pyspark.sql.types import StructField,StructType,StringType,IntegerType,FloatType,TimestampType
from pyspark.ml.feature import FeatureHasher, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql import types
from pyspark.sql import functions as F
from pyspark import storagelevel
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import math
from src.utils.commons import spark_session,read_yaml
from src.cloud_storage.S3_object_store import S3Client

In [2]:
@dataclass
class DataTransformationConfig:
    source_datapath : Path
    source_parquetpath :Path
    schema : dict



class ConfigurationManager:
    def __init__(self,config=CONFIG_FILE_PATH,
                 params=PARAMS_FILE_PATH,
                 schema=SCHEMA_FILE_PATH):
        
        self.config = read_yaml(str(config))
        self.params = read_yaml(str(params))
        self.schema  = read_yaml(str(schema))

    def get_data_transformation_config(self) -> DataTransformationConfig:

        config = self.config.DATA_TRANSFORMATION
        schema = self.schema['schema']

        config_object = DataTransformationConfig(
            source_datapath=config.source_datapath,
            source_parquetpath = config.source_parquetpath,
            schema=self.schema
        )

        return config_object
    

class DataTransformation:

    def __init__(self,config = DataTransformationConfig):

        self.config = config
        self.spark = spark_session()

    def read_schema(self):

        fields = []

        logging.info('Reading schema from schema.yaml')
        logging.info(f'schema {self.config.schema}')

        for  field_dict in self.config.schema['schema']['fields']:
            field_type = getattr(types, field_dict['type'])()
            fields.append(StructField(field_dict['name'], field_type))

        return StructType(fields)
    
    def load_data(self):
        try:
            parquet_path = self.config.source_parquetpath  # Path for Parquet files
            
            # Check if the Parquet file exists
            if os.path.exists(parquet_path):
                logging.info(f'Parquet file exists, loading data from {parquet_path}')
                return parquet_path  # Return the path to the existing Parquet file
            else:
                # Load data from CSV if Parquet file is not available
                logging.info(f'Loading data from {self.config.source_datapath}')
                datapath = glob(self.config.source_datapath)  # List of CSV files
                self.schema = self.read_schema()  # Read or define the schema
                

                df = None
                for path in datapath:
                    logging.info(f'data path :{path}')
                    # Load each CSV file and combine them into a DataFrame
                    temp_df = self.spark.read.csv(path, schema=self.schema, header=True)
                    df = temp_df if df is None else df.union(temp_df)
                
                    logging.info(f'number of data point {df.count()}')

                # Order by event_time if needed
                df = df.orderBy('event_time')

                # Save the processed DataFrame to Parquet for future use
                logging.info(f'Saving processed data as Parquet at {parquet_path}')
                df.write.parquet(parquet_path,mode='overwrite')

                # Return the path to the newly saved Parquet file
                return parquet_path

        except Exception as e:
            logging.error(f'Could not load data: {e}')
            return None

    def feature_engineering(self):

        try: 

            logging.info('Starting feature engineering')

            data_path = self.load_data()
            data = self.spark.read.parquet(data_path) # read data

            data = data.sample(fraction=0.001,seed=42)

            # handling the missing values
            data = data.drop('category_code')
            data = data.fillna({'brand':'UNKNOWN'})

            # Add the 'is_brand_missing' column to indicate if the brand is missing
            data = data.withColumn('is_brand_missing', F.when(F.col('brand') == 'UNKNOWN',F.lit(1)).otherwise(F.lit(0)))
            data = data.dropna(subset=['user_session'])

            # dropduplicates 
            data = data.dropDuplicates()

            data = data.withColumn('price',F.abs(data['price']))

            # purchases made in the last 2 days
            logging.info('creating features : two_day_purchase')
            train_purchase = data.filter(data['event_type'] == 'purchase')
            train_purchase_per_2day = train_purchase.withColumn('two_day_purchase', 
                                                                        F.from_unixtime(
                                                                            F.floor(F.unix_timestamp(F.col('event_time')) / (2 * 86400))* (2 * 86400) 
                                                                            )
                                                                        )
            train_purchase_per_2day = train_purchase_per_2day.groupBy('user_id','two_day_purchase').agg(F.count('*').alias('Num_purchase_per_2day'))
            train_purchase_per_2day = train_purchase_per_2day.withColumnRenamed("user_id", "train_user_id") # rename 'user_id to prevent ambitiuity
            # Rename user_id in train_purchase_per_2day to avoid duplication in final result
            train_purchase_per_2day = train_purchase_per_2day.withColumnRenamed("user_id", "train_user_id")

            data = data.join(
                train_purchase_per_2day,
                (data['user_id'] == train_purchase_per_2day['train_user_id']) & 
                (F.floor(F.unix_timestamp(data['event_time']) / (2*86400)) ==  F.floor(F.unix_timestamp(train_purchase_per_2day['two_day_purchase']) / (2 * 86400))),
                how='left'
            ).drop('two_day_purchase','train_user_id').na.fill({'Num_purchase_per_2day' : 0 })

            logging.info('combined feature(two_day_purchase) to main dataframe')


            # average purchase price 
            logging.info('creating feature : Avg_purchase_price')
            avg_purchase_price = train_purchase.groupBy('user_id').agg(F.format_number(F.mean('price'),2).cast(FloatType()).alias('Avg_purchase_price'))
            avg_purchase_price = avg_purchase_price.withColumnRenamed("user_id", "train_user_id")
            data = data.join(
                avg_purchase_price,
                data['user_id'] == avg_purchase_price['train_user_id'],
                how='left'
            ).drop('train_user_id').na.fill({'Avg_purchase_price' : 0})
            logging.info('combined feature(Avg__purchase_price) to main dataframe')


            # normalized price
            logging.info('creating feature : normalised_product_price')
            window_cat = Window.partitionBy('category_id')
            data = data.withColumn('avg_cat_price',F.avg('price').over(window_cat))
            data = data.withColumn('normalised_price',F.col('price')/F.col('avg_cat_price')).drop('avg_cat_price')



            # most purchase category of user 
            # groupby user id and category and create a window specification to get the top three brands
            logging.info('creating most purchase category')
            category_count = train_purchase.groupBy('user_id','category_id').agg(F.count('category_id').alias('category_count'))
            window_spec = Window.partitionBy('User_id').orderBy(F.desc('category_count'))
            ranked_categorys = category_count.withColumn('rank' ,F.row_number().over(window_spec))
            top_categorys = ranked_categorys.filter(F.col("rank") <= 3)

            top_categorys = top_categorys.groupBy("user_id").pivot("rank", [1, 2, 3]).agg(F.first("category_id"))
            # rename the columns
            top_brands = top_categorys.select(
                F.col("user_id"),
                F.col("1").alias("1_most_purchased_category"),
                F.col("2").alias("2_most_purchased_category"),
                F.col("3").alias("3_most_purchased_category")
            )
            # Get the most purchased brand for each user
            most_purchased_category = category_count.withColumn('rank', F.row_number().over(Window.partitionBy('user_id').orderBy(F.desc('category_count'))))
            most_purchased_category = most_purchased_category.filter(F.col('rank') == 1).select('user_id', 'category_id')

            # Join the top brands with the most purchased brand
            top_category_with_most_purchased = top_brands.join(most_purchased_category, on='user_id', how='left')
            top_category_with_most_purchased = top_category_with_most_purchased.withColumnsRenamed({'user_id':'train_user_id','category_id':'preferred_category'})
            # Fill missing values with the user's most purchased brand
            # Replace missing values with the preferred brand
            top_brands_with_most_purchased = top_category_with_most_purchased.withColumn(
                '1_most_purchased_category',
                F.when(F.col('1_most_purchased_category').isNull(), F.col('preferred_category')).otherwise(F.col('1_most_purchased_category'))
            ).withColumn(
                '2_most_purchased_category',
                F.when(F.col('2_most_purchased_category').isNull(), F.col('preferred_category')).otherwise(F.col('2_most_purchased_category'))
            ).withColumn(
                '3_most_purchased_category',
                F.when(F.col('3_most_purchased_category').isNull(), F.col('preferred_category')).otherwise(F.col('3_most_purchased_category'))
            )

            data = data.join(
                top_brands_with_most_purchased,
                data['user_id'] == top_brands_with_most_purchased['train_user_id'],
                how = 'left'
            ).drop('train_user_id','preferred_category').na.fill({'1_most_purchased_category':'No purchase',
                                                            '2_most_purchased_category':'No purchase',
                                                            '3_most_purchased_category' : 'No purchase'})
            
            logging.info('combined feature(most _purchase_category) to main dataframe')


            # duration of users session
            logging.info('creating feature : user_session_duration')
            session_duration = data.groupBy('user_id','user_session').agg(
                F.min(F.col('event_time')).alias('session_start'),
                F.max(F.col('event_time')).alias('session_end')
            ).withColumn('session_duration', F.unix_timestamp('session_end') - F.unix_timestamp('session_start'))
            # combine 
            data = data.join(
                session_duration,
                on = ['user_id','user_session'],
                how='left'

            ).drop('session_start','session_end')
            logging.info('combined feature(user_session_duration) to main dataframe')

            # avearge session price for view,cart and purchase
            logging.info('creating feature : session_avg_price')
            session_avg_price = data.groupBy('user_id','user_session').agg(F.round(F.mean('price'),2).alias('Avg_session_prices'))
            data = data.join(
                session_avg_price,
                on = ['user_id','user_session'],
                how='left'
            )

            # total number of product viewed during a session
            logging.info('creating feature : num_session_views')
            session_view = data.filter(F.col('event_type') == 'view')
            num_session_view = session_view.groupBy('user_id','user_session').agg(F.count('event_type').alias('num_session_views'))
            # combine dataframe
            data = data.join(
                num_session_view,
                on = ['user_id','user_session'],
                how='left'
            ).na.fill({'num_session_views' : 0})

            # number of product added to cart in a sesiion
            logging.info('creating feature : num_session_cart')
            session_cart = data.filter(F.col('event_type') == 'cart')
            num_session_cart = session_cart.groupBy('user_id','user_session').agg(F.count('event_type').alias('num_session_cart'))
            # combine dataframe
            data = data.join(
                num_session_cart,
                on = ['user_id','user_session'],
                how='left'
            ).na.fill({'num_session_cart' : 0})

            # number of produt removed from cart
            logging.info('creating feature : num_session_frm_cart')
            session_frm_cart = data.filter(F.col('event_type') == 'remove_from_cart')
            num_session_frm_cart = session_frm_cart.groupBy('user_id','user_session').agg(F.count('event_type').alias('num_session_frm_cart'))
            # combine dataframe
            data= data.join(
                num_session_frm_cart,
                on = ['user_id','user_session'],
                how='left'
            ).na.fill({'num_session_frm_cart' : 0})

            # session purchase
            logging.info('creating feature : num_session_purchase')
            session_purchase = data.filter(F.col('event_type') == 'purchase')
            num_session_purchase = session_purchase.groupBy('user_id', 'user_session').agg(F.count('event_type').alias('num_session_purchase'))
            data = data.join(
                num_session_purchase,
                on=['user_id', 'user_session'],
                how='left'
            ).na.fill({'num_session_purchase':0})

            # cart-to-purchase ration
            logging.info('creating feature : session_cart_to_purchase_ratio')
            data = data.withColumn('session_cart_to_purchase_ratio', F.when(F.col('num_session_purchase') == 0 , 0).otherwise(
                F.col('num_session_cart') / F.col('num_session_purchase')
            ))
            data = data.withColumn('session_cart_to_purchase_ratio', F.round(F.col('session_cart_to_purchase_ratio'), 2))

            # Calculate Cart Abandonment Rate and round to two decimal points
            logging.info('creating feature : session_cart_abondonment_rate')
            data = data.withColumn('session_cart_abandonment_rate', 
                                    F.when(F.col('num_session_cart') == 0, 0)  # Handle case where no products were added to the cart
                                    .otherwise(1 - (F.col('num_session_purchase') / F.col('num_session_cart'))))

            data = data.withColumn('session_cart_abandonment_rate', F.round(F.col('session_cart_abandonment_rate'), 2))

            # total product purchased
            logging.info('creating feature : total_num_product_purchased')
            product_purchase = train_purchase.groupBy('product_id').agg(
                F.count('product_id').alias('total_num_product_purchased')
            )
            data = data.join(
                product_purchase,
                on=['product_id'],
                how='left'
            ).na.fill({'total_num_product_purchased':0})

            # total number of product views
            logging.info('creating feature : total_product_views')
            train_views = data.filter(F.col('event_type') == 'view')
            # groupby by product
            product_views = train_views.groupBy('product_id').agg(F.count('product_id').alias('total_product_views'))
            data = data.join(
                product_views,
                on = ['product_id'],
                how='left'
            ).na.fill({'total_product_views':0})

            #popularity percentile
            # Compute max view_count per category
            logging.info('createing feature : popularity percentile')
            data = data.withColumn(
                "max_view_category",
                F.max("total_product_views").over(window_cat)
            )
            # Compute popularity percentile
            data = data.withColumn(
                "popularity_percentile",
                F.when(F.col("max_view_category") > 0, F.col("total_product_views") / F.col("max_view_category")).otherwise(0)
            ).drop("max_view_category")


            # total product add to cart
            logging.info('creating feature : total_product_cart')
            train_cart = data.filter(F.col('event_type') == 'cart')
            product_cart = train_cart.groupBy('product_id').agg(F.count('product_id').alias('total_product_cart'))

            data = data.join(
                product_cart,
                on=['product_id'],
                how='left'
            ).na.fill({'total_product_cart':0})

            # product cart  purchase conversion rate 
            logging.info('creating feature :  product cart_purchase_conversion rate')
            data = data.withColumn('product_cart_to_purchase_conversion_ratio', F.when(F.col('total_num_product_purchased') == 0 , 0).otherwise(
                F.col('total_product_cart') / F.col('total_num_product_purchased')
            ))

            data = data.withColumn('product_cart_to_purchase_conversion_ratio', F.round(F.col('product_cart_to_purchase_conversion_ratio'), 2))

            #user_product_view_count
            logging.info('creating feature : user_product_view')
            user_product_view = train_views.groupBy('user_id','product_id').agg(F.count('event_type').alias('user_product_view'))
            data = data.join(
                user_product_view,
                on=['user_id','product_id'],
                how='left'
            ).na.fill({'user_product_view':0})

            #user_product_add to cart _count
            logging.info('creating feature : user_product_cart')
            user_product_cart = train_cart.groupBy('user_id','product_id').agg(F.count('event_type').alias('user_product_cart'))

            data = data.join(
                user_product_cart,
                on=['user_id','product_id'],
                how='left'
            ).na.fill({'user_product_cart':0})


            #user_product_add to cart _count
            logging.info('creating feature : user_product_purchase')
            user_product_purchase = train_purchase.groupBy('user_id','product_id').agg(F.count('event_type').alias('user_product_purchase'))
            data = data.join(
                user_product_purchase,
                on=['user_id','product_id'],
                how='left'
            ).na.fill({'user_product_purchase':0})

            # user product remove from cart
            logging.info('creating feature : user_product_frm_cart')
            data_frm_cart = data.filter(F.col('event_type') == 'remove_from_cart')
            user_product_frm_cart =  data_frm_cart.groupBy('user_id','product_id').agg(F.count('event_type').alias('user_product_frm_cart'))
            data = data.join(
                user_product_frm_cart,
                on = ['user_id','product_id'],
                how='left'
            ).na.fill({'user_product_frm_cart' : 0})


            logging.info('creating feature: interaction_weight')

            data = data.withColumn('interaction_weight' , F.when(
                    F.col('event_type') == 'view',0.3).when(
                        F.col('event_type') == 'cart' ,0.7
                    ).when(
                        F.col('event_type') == 'purchase',2
                    ).when(
                        F.col('event_type') == 'remove_from_cart',-0.3
                    )
                      ).drop('event_type')

            # interaction score 
            logging.info('creating feature : interaction score')
            data = data.withColumn('interaction_score' ,
                                    F.col('user_product_view') * F.col('interaction_weight') + 
                                   F.col('user_product_cart')*F.col('interaction_weight') + 
                                   F.col('user_product_purchase') * F.col('interaction_weight') + 
                                   F.col('user_product_frm_cart') * F.col('interaction_weight'))

            # Temporalfeatures
            logging.info('creating feature : Temporal features')
            data = data.withColumn('day',F.day(F.col('event_time')))
            data = data.withColumn('week',F.dayofweek(F.col('event_time')))
            data = data.withColumn('hour',F.hour(F.col('event_time')))
            data  = data.withColumn('minutes',F.minute(F.col('event_time')))
            data = data.withColumn('seconds',F.second(F.col('event_time')))
            # add cyclic time features 
            data = data.withColumn(
                'hour_sin' , F.sin(2 * math.pi * F.col('hour') / 24)
            ).withColumn('hour_cos',F.cos(2 * math.pi * F.col('hour') / 24))

            data = data.withColumn(
                'minutes_sin' , F.sin(2 * math.pi * F.col('minutes') / 24)
            ).withColumn('minutes_cos',F.cos(2 * math.pi * F.col('minutes') / 24))

            data = data.withColumn(
                'second_sin',F.sin(2 * math.pi * F.col('seconds') / 24)
            ).withColumn('second_cos',F.cos(2 * math.pi * F.col('seconds') / 24))

            data = data.orderBy('event_time').drop('hour','minutes','seconds','event_time')
            #data.show()
            

            return data

        except Exception as e :
            logging.error(f'could not complete feature engineering because : {e}')

    def feature_transformation(self):

            try:
                logging.info('Performing feature transformation')

                data = self.feature_engineering()

               

                logging.info('creating hashed features')
                # Define the features to be hashed
                features_to_hash = ['user_id', 'product_id', 'category_id', 'brand', 'user_session','1_most_purchased_category','2_most_purchased_category',
                                        '3_most_purchased_category']

                # Create featurehasher transformers
                hashingTFs = [FeatureHasher(inputCols=[col], outputCol=f"{col}_hash", numFeatures=8000) for col in features_to_hash] 

                # Assemble all hashed features into a single vector
                logging.info('Assembling features into one vector space')
                assembler = VectorAssembler(inputCols=[f"{col}_hash" for col in features_to_hash], outputCol="Assembled_hashed_features")

                #from pyspark.ml import Pipeline
                pipeline = Pipeline(stages= hashingTFs)

                # Fit and transform the data
                logging.info('Performing feature transformation')
                data = pipeline.fit(data).transform(data)
                data.write.parquet('Artifacts/feature_engineered_data.parquet')
                
                # load to s3 bucket
                logging.info('loading into s3 bucket')
                s3 = S3Client()
                s3.create_bucket('product-recommendation-store')
                s3.upload_file(filename='Artifacts/feature_engineered_data.parquet',bucket='product-recommendation-store',object_name='feature_engineered_data.parquet')
                
            except Exception as e:
                logging.error(f'error occured : {e}')


        

        



        

    
    


In [6]:

manager = ConfigurationManager()
config = manager.get_data_transformation_config()
data_trans = DataTransformation(config)
ft = data_trans.feature_transformation()

[2025-01-04 16:16:40,438 ] 39 root - INFO - Yaml file:  config\config.yaml loaded suscessfully
[2025-01-04 16:16:40,449 ] 39 root - INFO - Yaml file:  params.yaml loaded suscessfully
[2025-01-04 16:16:40,466 ] 39 root - INFO - Yaml file:  schema.yaml loaded suscessfully
[2025-01-04 16:16:40,474 ] 205 root - INFO - Creating spark session


In [6]:
#  creat spark session

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Recommendation System") \
    .config("spark.driver.memory", "6g") \
    .getOrCreate()


In [4]:
schema = StructType([
    StructField('event_time',TimestampType(),False),
    StructField('event_type',StringType(),True),
    StructField('product_id',StringType(),True),
    StructField('category_id',StringType(),True),
    StructField('category_code',StringType(),True),
    StructField('brand',StringType(),True),
    StructField('price',FloatType(),True),
    StructField('user_id',StringType(),True),
    StructField('user_session',StringType(),True)]
)

# loading
oct_data = spark.read.csv('Artifacts/Raw_ingested_data/2019-Oct.csv',header=True,schema=schema)
nov_data = spark.read.csv('Artifacts/Raw_ingested_data/2019-Nov.csv',header=True,schema=schema)
dec_data = spark.read.csv('Artifacts/Raw_ingested_data/2019-Dec.csv',header=True,schema=schema)
jan_data = spark.read.csv('Artifacts/Raw_ingested_data/2020-Jan.csv',header=True,schema=schema)
feb_data = spark.read.csv('Artifacts/Raw_ingested_data/2020-feb.csv',header=True,schema=schema)




# split into training , validation and test data
training_data = oct_data.union(nov_data).union(dec_data).union(jan_data).union(feb_data)


# Step 1: Compute total interactions for each user
user_total_interactions = training_data.groupBy("user_id").agg(F.count("*").alias("total_interactions"))

# Step 2: Join the total interactions back to the original DataFrame
training_data = training_data.join(user_total_interactions, on="user_id")


user_window = Window.partitionBy("user_id").orderBy("event_time")
df_with_rank = training_data.withColumn("interaction_rank", F.row_number().over(user_window))


train_cutoff = 0.7  # 70% for training


df_with_rank = df_with_rank.withColumn(
    "is_train",
    F.when(
        (df_with_rank["interaction_rank"] <= (df_with_rank["total_interactions"] * train_cutoff)) | 
        (df_with_rank["total_interactions"] <= 2),  # Include customers with 1 interaction
        True
    ).otherwise(False)
)


train_df = df_with_rank.filter("is_train = true")
test_df = df_with_rank.filter("is_train = false")



new_training_data = train_df.select('*')
new_val_data = test_df.select('*')

#new_training_data = new_training_data.sample(fraction=0.01,seed=42)

# handling the missing values
new_training_data = new_training_data.drop('category_code')
new_training_data = new_training_data.fillna({'brand':'UNKNOWN'})

# Add the 'is_brand_missing' column to indicate if the brand is missing
new_training_data = new_training_data.withColumn('is_brand_missing', F.when(F.col('brand') == 'UNKNOWN',F.lit(1)).otherwise(F.lit(0)))
new_training_data = new_training_data.dropna(subset=['user_session'])

# dropduplicates 
new_training_data = new_training_data.dropDuplicates()

new_training_data = new_training_data.withColumn('price',F.abs(new_training_data['price']))

# purchases made in the last 2 days

train_purchase = new_training_data.filter(new_training_data['event_type'] == 'purchase')

train_purchase_per_2day = train_purchase.withColumn('two_day_purchase', 
                                                             F.from_unixtime(
                                                                 F.floor(F.unix_timestamp(F.col('event_time')) / (2 * 86400))* (2 * 86400) 
                                                                 )
                                                             )

train_purchase_per_2day = train_purchase_per_2day.groupBy('user_id','two_day_purchase').agg(F.count('*').alias('Num_purchase_per_2day'))
train_purchase_per_2day = train_purchase_per_2day.withColumnRenamed("user_id", "train_user_id") # rename 'user_id to prevent ambitiuity
# Rename user_id in train_purchase_per_2day to avoid duplication in final result
train_purchase_per_2day = train_purchase_per_2day.withColumnRenamed("user_id", "train_user_id")

new_training_data = new_training_data.join(
    train_purchase_per_2day,
    (new_training_data['user_id'] == train_purchase_per_2day['train_user_id']) & 
    (F.floor(F.unix_timestamp(new_training_data['event_time']) / (2*86400)) ==  F.floor(F.unix_timestamp(train_purchase_per_2day['two_day_purchase']) / (2 * 86400))),
    how='left'
).drop('two_day_purchase','train_user_id').na.fill({'Num_purchase_per_2day' : 0 })


# average purchase price 

avg_purchase_price = train_purchase.groupBy('user_id').agg(F.format_number(F.mean('price'),2).cast(FloatType()).alias('Avg_purchase_price'))
avg_purchase_price = avg_purchase_price.withColumnRenamed("user_id", "train_user_id")

new_training_data = new_training_data.join(
    avg_purchase_price,
    new_training_data['user_id'] == avg_purchase_price['train_user_id'],
    how='left'
).drop('train_user_id').na.fill({'Avg_purchase_price' : 0})


# most purchase category of user 
# groupby user id and brand and create a window specification to get the top three brands
category_count = train_purchase.groupBy('user_id','category_id').agg(F.count('category_id').alias('category_count'))

window_spec = Window.partitionBy('User_id').orderBy(F.desc('category_count'))
ranked_categorys = category_count.withColumn('rank' ,F.row_number().over(window_spec))
top_categorys = ranked_categorys.filter(F.col("rank") <= 3)

top_categorys = top_categorys.groupBy("user_id").pivot("rank", [1, 2, 3]).agg(F.first("category_id"))

# rename the columns
top_brands = top_categorys.select(
    F.col("user_id"),
    F.col("1").alias("1_most_purchased_category"),
    F.col("2").alias("2_most_purchased_category"),
    F.col("3").alias("3_most_purchased_category")
)

# Get the most purchased brand for each user
most_purchased_category = category_count.withColumn('rank', F.row_number().over(Window.partitionBy('user_id').orderBy(F.desc('category_count'))))
most_purchased_category = most_purchased_category.filter(F.col('rank') == 1).select('user_id', 'category_id')

# Join the top brands with the most purchased brand
top_category_with_most_purchased = top_brands.join(most_purchased_category, on='user_id', how='left')
top_category_with_most_purchased = top_category_with_most_purchased.withColumnsRenamed({'user_id':'train_user_id','category_id':'preferred_category'})
# Fill missing values with the user's most purchased brand
# Replace missing values with the preferred brand
top_brands_with_most_purchased = top_category_with_most_purchased.withColumn(
    '1_most_purchased_category',
    F.when(F.col('1_most_purchased_category').isNull(), F.col('preferred_category')).otherwise(F.col('1_most_purchased_category'))
).withColumn(
    '2_most_purchased_category',
    F.when(F.col('2_most_purchased_category').isNull(), F.col('preferred_category')).otherwise(F.col('2_most_purchased_category'))
).withColumn(
    '3_most_purchased_category',
    F.when(F.col('3_most_purchased_category').isNull(), F.col('preferred_category')).otherwise(F.col('3_most_purchased_category'))
)

new_training_data = new_training_data.join(
    top_brands_with_most_purchased,
    new_training_data['user_id'] == top_brands_with_most_purchased['train_user_id'],
    how = 'left'
).drop('train_user_id','preferred_category').na.fill({'1_most_purchased_category':'No purchase',
                                                   '2_most_purchased_category':'No purchase',
                                                   '3_most_purchased_category' : 'No purchase'})


# duration of users session

session_duration = new_training_data.groupBy('user_id','user_session').agg(
    F.min(F.col('event_time')).alias('session_start'),
    F.max(F.col('event_time')).alias('session_end')
).withColumn('session_duration', F.unix_timestamp('session_end') - F.unix_timestamp('session_start'))
# combine 
new_training_data = new_training_data.join(
    session_duration,
    on = ['user_id','user_session'],
    how='left'

).drop('session_start','session_end')

# avearge session price for view,cart and purchase

session_avg_price = new_training_data.groupBy('user_id','user_session').agg(F.round(F.mean('price'),2).alias('Avg_session_prices'))
new_training_data = new_training_data.join(
    session_avg_price,
    on = ['user_id','user_session'],
    how='left'
)

# total number of product viewed during a session

session_view = new_training_data.filter(F.col('event_type') == 'view')
num_session_view = session_view.groupBy('user_id','user_session').agg(F.count('event_type').alias('num_session_views'))
# combine dataframe
new_training_data = new_training_data.join(
    num_session_view,
    on = ['user_id','user_session'],
    how='left'
).na.fill({'num_session_views' : 0})

# number of product added to cart in a sesiion
session_cart = new_training_data.filter(F.col('event_type') == 'cart')
num_session_cart = session_cart.groupBy('user_id','user_session').agg(F.count('event_type').alias('num_session_cart'))
# combine dataframe
new_training_data = new_training_data.join(
    num_session_cart,
    on = ['user_id','user_session'],
    how='left'
).na.fill({'num_session_cart' : 0})

# number of produt removed from cart
session_frm_cart = new_training_data.filter(F.col('event_type') == 'remove_from_cart')
num_session_frm_cart = session_frm_cart.groupBy('user_id','user_session').agg(F.count('event_type').alias('num_session_frm_cart'))
# combine dataframe

new_training_data= new_training_data.join(
    num_session_frm_cart,
    on = ['user_id','user_session'],
    how='left'
).na.fill({'num_session_frm_cart' : 0})

# session purchase
session_purchase = new_training_data.filter(F.col('event_type') == 'purchase')
num_session_purchase = session_purchase.groupBy('user_id', 'user_session').agg(F.count('event_type').alias('num_session_purchase'))
new_training_data = new_training_data.join(
    num_session_purchase,
    on=['user_id', 'user_session'],
    how='left'
).na.fill({'num_session_purchase':0})

# cart-to-purchase ration
new_training_data = new_training_data.withColumn('session_cart_to_purchase_ratio', F.when(F.col('num_session_purchase') == 0 , 0).otherwise(
    F.col('num_session_cart') / F.col('num_session_purchase')
))
new_training_data = new_training_data.withColumn('session_cart_to_purchase_ratio', F.round(F.col('session_cart_to_purchase_ratio'), 2))

# Calculate Cart Abandonment Rate and round to two decimal points
new_training_data = new_training_data.withColumn('session_cart_abandonment_rate', 
                           F.when(F.col('num_session_cart') == 0, 0)  # Handle case where no products were added to the cart
                           .otherwise(1 - (F.col('num_session_purchase') / F.col('num_session_cart'))))

new_training_data = new_training_data.withColumn('session_cart_abandonment_rate', F.round(F.col('session_cart_abandonment_rate'), 2))

# total product purchased
product_purchase = train_purchase.groupBy('product_id').agg(
    F.count('product_id').alias('total_num_product_purchased')
)
new_training_data = new_training_data.join(
    product_purchase,
    on=['product_id'],
    how='left'
).na.fill({'total_num_product_purchased':0})

# total number of product views
train_views = new_training_data.filter(F.col('event_type') == 'view')
# groupby by product
product_views = train_views.groupBy('product_id').agg(F.count('product_id').alias('total_product_views'))

new_training_data = new_training_data.join(
    product_views,
    on = ['product_id'],
    how='left'
).na.fill({'total_product_views':0})

# total product add to cart
train_cart = new_training_data.filter(F.col('event_type') == 'cart')
product_cart = train_cart.groupBy('product_id').agg(F.count('product_id').alias('total_product_cart'))

new_training_data = new_training_data.join(
    product_cart,
    on=['product_id'],
    how='left'
).na.fill({'total_product_cart':0})

# product cart  purchase conversion rate 
new_training_data = new_training_data.withColumn('product_cart_to_purchase_conversion_ratio', F.when(F.col('total_num_product_purchased') == 0 , 0).otherwise(
    F.col('total_product_cart') / F.col('total_num_product_purchased')
))

new_training_data = new_training_data.withColumn('product_cart_to_purchase_conversion_ratio', F.round(F.col('product_cart_to_purchase_conversion_ratio'), 2))

#user_product_view_count
user_product_view = train_views.groupBy('user_id','product_id').agg(F.count('event_type').alias('user_product_view'))
new_training_data = new_training_data.join(
    user_product_view,
    on=['user_id','product_id'],
    how='left'
).na.fill({'user_product_view':0})

#user_product_add to cart _count
user_product_cart = train_cart.groupBy('user_id','product_id').agg(F.count('event_type').alias('user_product_cart'))

new_training_data = new_training_data.join(
    user_product_cart,
    on=['user_id','product_id'],
    how='left'
).na.fill({'user_product_cart':0})


#user_product_add to cart _count
user_product_purchase = train_purchase.groupBy('user_id','product_id').agg(F.count('event_type').alias('user_product_purchase'))
new_training_data = new_training_data.join(
    user_product_purchase,
    on=['user_id','product_id'],
    how='left'
).na.fill({'user_product_purchase':0})

# Temporalfeatures
new_training_data = new_training_data.withColumn('day',F.day(F.col('event_time')))
new_training_data = new_training_data.withColumn('hour',F.hour(F.col('event_time')))
new_training_data  = new_training_data.withColumn('minutes',F.minute(F.col('event_time')))
new_training_data = new_training_data.withColumn('seconds',F.second(F.col('event_time')))

# add cyclic time features 
new_training_data = new_training_data.withColumn(
    'hour_sin' , F.sin(2 * math.pi * F.col('hour') / 24)
).withColumn('hour_cos',F.cos(2 * math.pi * F.col('hour') / 24))

new_training_data = new_training_data.withColumn(
    'minutes_sin' , F.sin(2 * math.pi * F.col('minutes') / 24)
).withColumn('minutes_cos',F.cos(2 * math.pi * F.col('minutes') / 24))

new_training_data = new_training_data.withColumn(
    'second_sin',F.sin(2 * math.pi * F.col('seconds') / 24)
).withColumn('second_cos',F.cos(2 * math.pi * F.col('seconds') / 24))

new_training_data = new_training_data.withColumn('week',F.dayofweek(F.col('event_time'))).drop('hour','minutes','seconds','event_time').persist(StorageLevel.DISK_ONLY)

new_training_data.show()










+---------+----------+--------------------+----------------+-------------------+--------+-----+------------------+----------------+--------+----------------+---------------------+------------------+-------------------------+-------------------------+-------------------------+----------------+------------------+-----------------+----------------+--------------------+--------------------+------------------------------+-----------------------------+---------------------------+-------------------+------------------+-----------------------------------------+-----------------+-----------------+---------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+
|  user_id|product_id|        user_session|      event_type|        category_id|   brand|price|total_interactions|interaction_rank|is_train|is_brand_missing|Num_purchase_per_2day|Avg_purchase_price|1_most_purchased_category|2_most_purchased_catego

In [None]:
from pyspark.ml.feature import FeatureHasher, StringIndexer, VectorAssembler
from pyspark.sql import functions as F



# Define the features to be hashed
features_to_hash = ['user_id', 'product_id', 'category_id', 'brand', 'user_session','1_most_purchased_category','2_most_purchased_category',
                    '3_most_purchased_category']

# Create StringIndexers for categorical features


# Create HashingTF transformers
hashingTFs = [FeatureHasher(inputCols=[col], outputCol=f"{col}_hash", numFeatures=8000) for col in features_to_hash] 

# Assemble all hashed features into a single vector
assembler = VectorAssembler(inputCols=[f"{col}_hash" for col in features_to_hash], outputCol="Assembled_hashed_features")



# Create a pipeline to chain the transformations
#from pyspark.ml import Pipeline
#pipeline = Pipeline(stages= [string_indexers + hashingTFs + assembler])

# Fit and transform the data
#hashed_df = pipeline.fit(new_training_data).transform(new_training_data)

# Select the original columns and the hashed features
#selected_cols = [c for c in new_training_data.columns] + ["hashed_features"]
#hashed_df = hashed_df.select(*selected_cols)

# Show the resulting DataFrame
#hashed_df.show()

In [10]:
indexed_data.show()

+---------+----------+--------------------+----------------+-------------------+--------+-----+------------------+----------------+--------+----------------+---------------------+------------------+-------------------------+-------------------------+-------------------------+----------------+------------------+-----------------+----------------+--------------------+--------------------+------------------------------+-----------------------------+---------------------------+-------------------+------------------+-----------------------------------------+-----------------+-----------------+---------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+-------------+
|  user_id|product_id|        user_session|      event_type|        category_id|   brand|price|total_interactions|interaction_rank|is_train|is_brand_missing|Num_purchase_per_2day|Avg_purchase_price|1_most_purchased_category|2_most_pu

In [22]:
# Run the second stage
hashed_data = hashingTFs[0].transform(indexed_data)

hashed_data.show()

+---------+----------+--------------------+----------------+-------------------+--------+-----+------------------+----------------+--------+----------------+---------------------+------------------+-------------------------+-------------------------+-------------------------+----------------+------------------+-----------------+----------------+--------------------+--------------------+------------------------------+-----------------------------+---------------------------+-------------------+------------------+-----------------------------------------+-----------------+-----------------+---------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+-------------+-------------------+
|  user_id|product_id|        user_session|      event_type|        category_id|   brand|price|total_interactions|interaction_rank|is_train|is_brand_missing|Num_purchase_per_2day|Avg_purchase_price|1_most_purchase

In [8]:
new_training_data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- user_session: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- brand: string (nullable = false)
 |-- price: float (nullable = true)
 |-- total_interactions: long (nullable = false)
 |-- interaction_rank: integer (nullable = false)
 |-- is_train: boolean (nullable = false)
 |-- is_brand_missing: integer (nullable = false)
 |-- Num_purchase_per_2day: long (nullable = false)
 |-- Avg_purchase_price: float (nullable = false)
 |-- 1_most_purchased_category: string (nullable = false)
 |-- 2_most_purchased_category: string (nullable = false)
 |-- 3_most_purchased_category: string (nullable = false)
 |-- session_duration: long (nullable = true)
 |-- Avg_session_prices: double (nullable = true)
 |-- num_session_views: long (nullable = false)
 |-- num_session_cart: long (nullable = false)
 |-- num_session_frm_cart: long (nullable = false)
 |

In [7]:
schema = StructType([
    StructField('event_time',TimestampType(),False),
    StructField('event_type',StringType(),True),
    StructField('product_id',StringType(),True),
    StructField('category_id',StringType(),True),
    StructField('category_code',StringType(),True),
    StructField('brand',StringType(),True),
    StructField('price',FloatType(),True),
    StructField('user_id',StringType(),True),
    StructField('user_session',StringType(),True)]
)

In [8]:
oct_data = spark.read.csv('Artifacts/Raw_ingested_data/2019-Oct.csv',header=True,schema=schema)
nov_data = spark.read.csv('Artifacts/Raw_ingested_data/2019-Nov.csv',header=True,schema=schema)
dec_data = spark.read.csv('Artifacts/Raw_ingested_data/2019-Dec.csv',header=True,schema=schema)
jan_data = spark.read.csv('Artifacts/Raw_ingested_data/2020-Jan.csv',header=True,schema=schema)
feb_data = spark.read.csv('Artifacts/Raw_ingested_data/2020-feb.csv',header=True,schema=schema)

In [9]:
oct_data.show()

+-------------------+----------------+----------+-------------------+-------------+--------+-----+---------+--------------------+
|         event_time|      event_type|product_id|        category_id|category_code|   brand|price|  user_id|        user_session|
+-------------------+----------------+----------+-------------------+-------------+--------+-----+---------+--------------------+
|2019-10-01 01:00:00|            cart|   5773203|1487580005134238553|         NULL|  runail| 2.62|463240011|26dd6e6e-4dac-477...|
|2019-10-01 01:00:03|            cart|   5773353|1487580005134238553|         NULL|  runail| 2.62|463240011|26dd6e6e-4dac-477...|
|2019-10-01 01:00:07|            cart|   5881589|2151191071051219817|         NULL|  lovely|13.48|429681830|49e8d843-adf3-428...|
|2019-10-01 01:00:07|            cart|   5723490|1487580005134238553|         NULL|  runail| 2.62|463240011|26dd6e6e-4dac-477...|
|2019-10-01 01:00:15|            cart|   5881449|1487580013522845895|         NULL|  lovel

In [33]:
event_type = {
                    'cart':0.3,
                    'purchase' : 5,
                    'remove_from_cart': -0.3,
                    'view' : 0.1
                }

event_type.values()

dict_values([0.3, 5, -0.3, 0.1])

In [7]:
datasets = [oct_data,nov_data,dec_data,jan_data,feb_data]

for data in datasets:
    data.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: float (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: float (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (

In [None]:
# split into training , validation and test data

training_data = oct_data.union(nov_data).union(dec_data).union(jan_data).union(feb_data)




the number of users in entire data : 1639358


In [None]:

from pyspark.sql.functions import row_number


from pyspark.sql.functions import col, count

# Step 1: Compute total interactions for each user
user_total_interactions = training_data.groupBy("user_id").agg(count("*").alias("total_interactions"))

# Step 2: Join the total interactions back to the original DataFrame
training_data = training_data.join(user_total_interactions, on="user_id")


user_window = Window.partitionBy("user_id").orderBy("event_time")
df_with_rank = training_data.withColumn("interaction_rank", row_number().over(user_window))


train_cutoff = 0.7  # 70% for training


df_with_rank = df_with_rank.withColumn(
    "is_train",
    F.when(
        (df_with_rank["interaction_rank"] <= (df_with_rank["total_interactions"] * train_cutoff)) | 
        (df_with_rank["total_interactions"] <= 2),  # Include customers with 1 interaction
        True
    ).otherwise(False)
)



train_df = df_with_rank.filter("is_train = true")
test_df = df_with_rank.filter("is_train = false")

user_total_interactions.show() , train_df.show()

+---------+------------------+
|  user_id|total_interactions|
+---------+------------------+
|550807616|                 3|
|555465781|                 3|
|555474518|                18|
|463798468|                 4|
|537764904|               257|
|456746149|                26|
|555493043|                 1|
|258322598|               136|
|555515249|                 3|
|434884379|                81|
|525207529|              1599|
|553765542|                27|
|555531068|                 1|
|366952657|                 5|
|553929818|                55|
|552691394|                 1|
|555533666|                 2|
|470021639|               546|
|259672011|                33|
|555543739|                 2|
+---------+------------------+
only showing top 20 rows

+---------+-------------------+----------------+----------+-------------------+-------------+----------+-----+--------------------+------------------+----------------+-----+
|  user_id|         event_time|      event_type|product_

(None, None)

In [12]:
test_df.filter('user_id == 101571486').count()


11

In [13]:
train_df.filter('user_id == 101571486').count()

23

In [7]:
val_user_id = test_df.select('user_id').distinct()
train_user_id = train_df.select('user_id').distinct()

val_not_in_train = train_user_id.intersect(val_user_id)
print(f"Number of users in training in validation: {val_not_in_train.count()}")

Number of users in training in validation: 869866


In [8]:
print(f'total number of users in training data {train_user_id.count()}')
print(f'total number of users in validation data {val_user_id.count()}')

total number of users in training data 1639358
total number of users in validation data 869866


In [1]:
# check to see if the user_id are both traingin,validation and test data

train_user_id = train_df.select('user_id').distinct()
validation_user_id = val_df.select('user_id').distinct()
test_user_id = test_df.select('user_id').distinct()


print(f'total number of users in training data {train_user_id.count()}')
print(f'total number of users in validation data {validation_user_id.count()}')
print(f'total number of users in test data {test_user_id.count()}')

NameError: name 'train_df' is not defined

In [22]:
# Users in Validation but not in Training:

val_not_in_train = validation_user_id.intersect(train_user_id)
print(f"Number of users in validation in training: {val_not_in_train.count()}")

Number of users in validation in training: 513786


In [11]:
val_not_in_test = test_user_id.intersect(validation_user_id)
print(f"Number of users in test in validation: {val_not_in_test.count()}")


Number of users in test not in validation: 327020


In [12]:
test_not_in_train = test_user_id.intersect(train_user_id)
print(f"Number of users in test in train: {test_not_in_train.count()}")

Number of users in test not in train: 331069


There is an issue of cold start in the data given that majority of the user in the validation and test data are not present in the training data

In [13]:
train_product_id = training_data.select('product_id').distinct()
val_product_id = validation_data.select('product_id').distinct()
test_product_id = testing_data.select('product_id').distinct()


print(f'total number of product in training data {train_product_id.count()}')
print(f'total number of product in validation data {val_product_id.count()}')
print(f'total number of product in test data {test_product_id.count()}')


total number of product in training data 49341
total number of product in validation data 45484
total number of product in test data 48579


In [14]:
val_not_in_train = val_product_id.subtract(train_product_id)
print(f"Number of product in validation not in training: {val_not_in_train.count()}")

Number of product in validation not in training: 2087


In [15]:
val_not_in_test = val_product_id.subtract(test_product_id)
print(f"Number of product in validation not in test: {val_not_in_test.count()}")

Number of product in validation not in test: 2345


In [16]:
test_not_in_train = test_product_id.subtract(train_product_id)
print(f"Number of product in test not in train: {test_not_in_train.count()}")

Number of product in test not in train: 4623


In [17]:
# check missing values 
train_missing_values = training_data.select([F.sum(F.col(c).isNull().cast('int')).alias(c) for c in training_data.columns])
val_missing_values = validation_data.select([F.sum(F.col(c).isNull().cast('int')).alias(c) for c in validation_data.columns])


train_missing_values.show() , val_missing_values.show()

+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|  brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|         0|         0|         0|          0|     12069716|5155579|    0|      0|        2229|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+

+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|  brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|         0|         0|         0|          0|      4190033|1775630|    0|      0|        1314|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+



(None, None)

In [23]:
# making a copy of the training , val dataframe and caching into memory before begining preprocessing

new_training_data = training_data.select('*')
new_val_data = validation_data.select('*')

# cache into memory

new_training_data = new_training_data.persist(StorageLevel.DISK_ONLY)
new_val_data = new_val_data



In [24]:
new_training_data.count()

1228741

####  handling the missing values

In [25]:
# handling the missing values

new_training_data = new_training_data.drop('category_code')
new_val_data = new_val_data.drop('category_code')


new_training_data = new_training_data.fillna({'brand':'UNKNOWN'})
new_val_data = new_val_data.fillna({'brand':'UNKNOWN'})

# Add the 'is_brand_missing' column to indicate if the brand is missing
new_training_data = new_training_data.withColumn('is_brand_missing', F.when(F.col('brand') == 'UNKNOWN',F.lit(1)).otherwise(F.lit(0)))

new_training_data = new_training_data.dropna(subset=['user_session'])
new_val_data = new_val_data.dropna(subset=['user_session'])

In [26]:
# check

train_missing_values = new_training_data.select([F.sum(F.col(c).isNull().cast('int')).alias(c) for c in new_training_data.columns])
val_missing_values = new_val_data.select([F.sum(F.col(c).isNull().cast('int')).alias(c) for c in new_val_data.columns])

train_missing_values.show() ,  val_missing_values.show()

+----------+----------+----------+-----------+-----+-----+-------+------------+----------------+
|event_time|event_type|product_id|category_id|brand|price|user_id|user_session|is_brand_missing|
+----------+----------+----------+-----------+-----+-----+-------+------------+----------------+
|         0|         0|         0|          0|    0|    0|      0|           0|               0|
+----------+----------+----------+-----------+-----+-----+-------+------------+----------------+

+----------+----------+----------+-----------+-----+-----+-------+------------+
|event_time|event_type|product_id|category_id|brand|price|user_id|user_session|
+----------+----------+----------+-----------+-----+-----+-------+------------+
|         0|         0|         0|          0|    0|    0|      0|           0|
+----------+----------+----------+-----------+-----+-----+-------+------------+



(None, None)

In [20]:
#### check for duplicates 

training_duplicates = new_training_data.count() - new_training_data.dropDuplicates().count()
val_duplicates = new_val_data.count() - new_val_data.dropDuplicates().count()


print(f'Number of duplicates rows in training data : {training_duplicates}')
print(f'Number of duplicates rows in val data : {val_duplicates}')

Number of duplicates rows in training data : 643522
Number of duplicates rows in val data : 224922


In [21]:
duplicates  =  new_training_data.groupBy(new_training_data.columns).count().filter("count > 1")
duplicates.show()

+-------------------+----------------+----------+-------------------+--------+-----+---------+--------------------+----------------+-----+
|         event_time|      event_type|product_id|        category_id|   brand|price|  user_id|        user_session|is_brand_missing|count|
+-------------------+----------------+----------+-------------------+--------+-----+---------+--------------------+----------------+-----+
|2019-10-01 01:18:44|remove_from_cart|   5857277|1487580005134238553|  runail| 2.62|446404021|abac4e23-d977-47d...|               0|    2|
|2019-10-01 02:16:05|remove_from_cart|   5861352|1487580009521479714| UNKNOWN| 3.65|550109931|1119c8b2-0b90-4fb...|               1|    2|
|2019-10-01 06:21:45|            cart|   5743974|1487580013053083824| italwax| 1.98|486637085|205bec28-f4a8-4c8...|               0|    3|
|2019-10-01 06:25:01|remove_from_cart|     36546|1487580007072007091|ingarden| 5.46|546705258|75d137b0-8ab4-50b...|               0|    2|
|2019-10-01 06:29:14|      

In [22]:
event_type_counts = duplicates.groupBy('event_type').count()

event_type_counts.show()

+----------------+------+
|      event_type| count|
+----------------+------+
|        purchase|   601|
|            view|   353|
|            cart| 54812|
|remove_from_cart|555921|
+----------------+------+



In [56]:
new_training_data = new_training_data.dropDuplicates()


In [57]:
new_val_data = new_val_data.dropDuplicates()

In [25]:
# check price column

price_data = new_training_data.filter(new_training_data['price'] < 0 )

price_data.show()
print(f' the number of rows with negative prices : {price_data.count()}')

+-------------------+----------+----------+-------------------+-------+------+---------+--------------------+----------------+
|         event_time|event_type|product_id|        category_id|  brand| price|  user_id|        user_session|is_brand_missing|
+-------------------+----------+----------+-------------------+-------+------+---------+--------------------+----------------+
|2019-10-02 09:30:03|  purchase|   5716855|1487580014042939619|UNKNOWN| -7.94|550375225|5ddec778-9464-451...|               1|
|2019-10-01 20:10:56|  purchase|   5716857|1487580014042939619|UNKNOWN|-23.81|552507528|dcdd60c6-1a70-442...|               1|
|2019-10-03 18:37:04|  purchase|   5716859|1487580014042939619|UNKNOWN|-47.62|555414763|479149eb-1807-417...|               1|
|2019-10-03 19:25:39|  purchase|   5670257|1487580014042939619|UNKNOWN|-15.87|556383221|4333d203-bc4d-4d0...|               1|
|2019-10-13 17:46:01|  purchase|   5716857|1487580014042939619|UNKNOWN|-23.81|559820267|f178c995-f004-404...|  

In [58]:
# convert negative prices to positive values

new_training_data = new_training_data.withColumn('price',F.abs(new_training_data['price']))
new_val_data = new_val_data.withColumn('price',F.abs(new_val_data['price']))

In [27]:
price_data = new_training_data.filter(new_training_data['price'] < 0 )

price_data.show()
print(f' the number of rows with negative prices : {price_data.count()}')

+----------+----------+----------+-----------+-----+-----+-------+------------+----------------+
|event_time|event_type|product_id|category_id|brand|price|user_id|user_session|is_brand_missing|
+----------+----------+----------+-----------+-----+-----+-------+------------+----------------+
+----------+----------+----------+-----------+-----+-----+-------+------------+----------------+

 the number of rows with negative prices : 0


In [59]:
new_training_data.limit(20).show()

+-------------------+----------------+----------+-------------------+-------+-----+---------+--------------------+----------------+
|         event_time|      event_type|product_id|        category_id|  brand|price|  user_id|        user_session|is_brand_missing|
+-------------------+----------------+----------+-------------------+-------+-----+---------+--------------------+----------------+
|2019-10-01 01:52:00|            view|   5664161|1487580009445982239|UNKNOWN| 5.16|553602927|4eecd233-e925-414...|               1|
|2019-10-01 01:57:40|            cart|   5841842|1487580007675986893|   milv| 1.59|532551564|d7296e7c-5d83-edc...|               0|
|2019-10-01 02:46:07|            view|   5849719|1783999063574708423|bluesky| 3.97|542286071|cc3a724d-7f50-409...|               0|
|2019-10-01 02:55:43|            cart|   5757469|1487580008246412266| matrix| 9.52|553103073|3072288c-6aeb-40e...|               0|
|2019-10-01 03:12:29|            view|   5692047|1487580007835370453|staleks

## create user features

In [29]:
# purchases made in the last 2 days

train_purchase = new_training_data.filter(new_training_data['event_type'] == 'purchase')

train_purchase_per_2day = train_purchase.withColumn('two_day_purchase', 
                                                             F.from_unixtime(
                                                                 F.floor(F.unix_timestamp(F.col('event_time')) / (2 * 86400))* (2 * 86400) 
                                                                 )
                                                             )

train_purchase_per_2day = train_purchase_per_2day.groupBy('user_id','two_day_purchase').agg(F.count('*').alias('Num_purchase_per_2day'))
train_purchase_per_2day = train_purchase_per_2day.withColumnRenamed("user_id", "train_user_id") # rename 'user_id to prevent ambitiuity

train_purchase_per_2day.show()

+-------------+-------------------+---------------------+
|train_user_id|   two_day_purchase|Num_purchase_per_2day|
+-------------+-------------------+---------------------+
|    231022172|2019-10-03 01:00:00|                    5|
|    544316901|2019-10-03 01:00:00|                   31|
|    518883976|2019-10-05 01:00:00|                   13|
|    530198286|2019-10-03 01:00:00|                   10|
|    450048082|2019-10-01 01:00:00|                    5|
|    240091903|2019-10-07 01:00:00|                   32|
|    313585738|2019-10-05 01:00:00|                   18|
|    342166464|2019-10-03 01:00:00|                   20|
|    276811622|2019-10-05 01:00:00|                   24|
|    546755599|2019-10-07 01:00:00|                   18|
|    556089514|2019-10-01 01:00:00|                    6|
|    556854936|2019-10-03 01:00:00|                   22|
|    557063130|2019-10-05 01:00:00|                    4|
|    533948073|2019-10-07 01:00:00|                    4|
|    370942347

In [30]:
# Rename user_id in train_purchase_per_2day to avoid duplication in final result
train_purchase_per_2day = train_purchase_per_2day.withColumnRenamed("user_id", "train_user_id")

new_training_data = new_training_data.join(
    train_purchase_per_2day,
    (new_training_data['user_id'] == train_purchase_per_2day['train_user_id']) & 
    (F.floor(F.unix_timestamp(new_training_data['event_time']) / (2*86400)) ==  F.floor(F.unix_timestamp(train_purchase_per_2day['two_day_purchase']) / (2 * 86400))),
    how='left'
).drop('two_day_purchase','train_user_id').na.fill({'Num_purchase_per_2day' : 0 })



In [31]:
# average purchase price 

avg_purchase_price = train_purchase.groupBy('user_id').agg(F.format_number(F.mean('price'),2).cast(FloatType()).alias('Avg_purchase_price'))
avg_purchase_price = avg_purchase_price.withColumnRenamed("user_id", "train_user_id")

avg_purchase_price.show()

new_training_data = new_training_data.join(
    avg_purchase_price,
    new_training_data['user_id'] == avg_purchase_price['train_user_id'],
    how='left'
).drop('train_user_id').na.fill({'Avg_purchase_price' : 0})



+-------------+------------------+
|train_user_id|Avg_purchase_price|
+-------------+------------------+
|    434738431|              2.75|
|    556907885|              4.02|
|    539446781|              2.75|
|    510281441|              4.61|
|    554557623|               1.8|
|    444737744|              2.59|
|    399573536|              6.63|
|    489117400|              4.74|
|    555790106|              3.78|
|    313585738|               3.9|
|    525207529|               3.8|
|    548479077|              4.16|
|    418130472|              2.44|
|    422640015|              4.57|
|    553967986|              3.22|
|    537764904|              5.17|
|    471200516|              2.06|
|    519092916|              5.36|
|    556680408|              2.76|
|    387203109|              1.74|
+-------------+------------------+
only showing top 20 rows



In [32]:
# most purchase category of user 

# groupby user id and brand and create a window specification to get the top three brands
brand_count = train_purchase.groupBy('user_id','brand').agg(F.count('brand').alias('brand_count'))

window_spec = Window.partitionBy('User_id').orderBy(F.desc('brand_count'))
ranked_brands = brand_count.withColumn('rank' ,F.row_number().over(window_spec))
top_brands = ranked_brands.filter(F.col("rank") <= 3)

top_brands = top_brands.groupBy("user_id").pivot("rank", [1, 2, 3]).agg(F.first("brand"))

# rename the columns
top_brands = top_brands.select(
    F.col("user_id"),
    F.col("1").alias("1_most_purchased_brand"),
    F.col("2").alias("2_most_purchased_brand"),
    F.col("3").alias("3_most_purchased_brand")
)

# Get the most purchased brand for each user
most_purchased_brand = brand_count.withColumn('rank', F.row_number().over(Window.partitionBy('user_id').orderBy(F.desc('brand_count'))))
most_purchased_brand = most_purchased_brand.filter(F.col('rank') == 1).select('user_id', 'brand')

# Join the top brands with the most purchased brand
top_brands_with_most_purchased = top_brands.join(most_purchased_brand, on='user_id', how='left')
top_brands_with_most_purchased = top_brands_with_most_purchased.withColumnsRenamed({'user_id':'train_user_id','brand':'preferred_brand'})
# Fill missing values with the user's most purchased brand
# Replace missing values with the preferred brand
top_brands_with_most_purchased = top_brands_with_most_purchased.withColumn(
    '1_most_purchased_brand',
    F.when(F.col('1_most_purchased_brand').isNull(), F.col('preferred_brand')).otherwise(F.col('1_most_purchased_brand'))
).withColumn(
    '2_most_purchased_brand',
    F.when(F.col('2_most_purchased_brand').isNull(), F.col('preferred_brand')).otherwise(F.col('2_most_purchased_brand'))
).withColumn(
    '3_most_purchased_brand',
    F.when(F.col('3_most_purchased_brand').isNull(), F.col('preferred_brand')).otherwise(F.col('3_most_purchased_brand'))
)



top_brands_with_most_purchased.show()

new_training_data = new_training_data.join(
    top_brands_with_most_purchased,
    new_training_data['user_id'] == top_brands_with_most_purchased['train_user_id'],
    how = 'left'
).drop('train_user_id','preferred_brand').na.fill({'1_most_purchased_brand':'UNKNOWN',
                                                   '2_most_purchased_brand':'UNKNOWN',
                                                   '3_most_purchased_brand' : 'UNKNOWN'})



+-------------+----------------------+----------------------+----------------------+---------------+
|train_user_id|1_most_purchased_brand|2_most_purchased_brand|3_most_purchased_brand|preferred_brand|
+-------------+----------------------+----------------------+----------------------+---------------+
|    100787781|               UNKNOWN|               markell|                 estel|        UNKNOWN|
|     10079204|                kaaral|                   cnd|                kaaral|         kaaral|
|    101025416|                 irisk|               UNKNOWN|                   cnd|          irisk|
|    103274988|             bpw.style|               UNKNOWN|             freedecor|      bpw.style|
|    103540490|               UNKNOWN|               UNKNOWN|               UNKNOWN|        UNKNOWN|
|    105075440|              skinlite|               UNKNOWN|              estelare|       skinlite|
|    107113173|              farmstay|              glysolid|              skinlite|       

### session based features 

In [33]:
# duration of users session

session_duration = new_training_data.groupBy('user_id','user_session').agg(
    F.min(F.col('event_time')).alias('session_start'),
    F.max(F.col('event_time')).alias('session_end')
).withColumn('session_duration', F.unix_timestamp('session_end') - F.unix_timestamp('session_start'))


session_duration.show()

# combine 

new_training_data = new_training_data.join(
    session_duration,
    on = ['user_id','user_session'],
    how='left'

).drop('session_start','session_end')

+---------+--------------------+-------------------+-------------------+----------------+
|  user_id|        user_session|      session_start|        session_end|session_duration|
+---------+--------------------+-------------------+-------------------+----------------+
|555451330|1da100d7-dd1d-4e5...|2019-10-01 01:36:57|2019-10-01 01:36:57|               0|
|458264120|bb72b357-2782-4d9...|2019-10-01 03:39:29|2019-10-01 03:39:29|               0|
|555482515|61fdb35c-409a-47e...|2019-10-01 05:23:00|2019-10-01 05:23:00|               0|
|324741429|9b57e28b-aaac-4b8...|2019-10-01 05:28:59|2019-10-01 05:28:59|               0|
|377119832|834f2167-3a25-4b0...|2019-10-01 05:42:11|2019-10-01 05:42:11|               0|
|444503648|527bb3d3-618a-400...|2019-10-01 05:45:16|2019-10-01 05:50:10|             294|
|380837147|a931d616-b6be-420...|2019-10-01 06:04:01|2019-10-01 06:13:13|             552|
|514703299|706b6a1e-7cd4-4fe...|2019-10-01 06:39:57|2019-10-01 06:39:57|               0|
|555497866

In [34]:
# avearge session price for view,cart and purchase

session_avg_price = new_training_data.groupBy('user_id','user_session').agg(F.round(F.mean('price'),2).alias('Avg_session_prices'))

session_avg_price.show()

new_training_data = new_training_data.join(
    session_avg_price,
    on = ['user_id','user_session'],
    how='left'
)

+---------+--------------------+------------------+
|  user_id|        user_session|Avg_session_prices|
+---------+--------------------+------------------+
|100787781|188a44b5-83f1-4f1...|               6.1|
|100787781|9a95e186-1c1a-4ea...|               4.2|
|100787781|ab88df75-5e32-419...|              5.07|
|100787781|a91ab4bf-6d76-46f...|              8.52|
|100787781|53029ceb-b63b-4a9...|             10.45|
|100787781|33e757b9-11c7-4e8...|             11.51|
|100787781|4eb304f5-b558-4a0...|              4.81|
|101025416|3506dd98-9567-48c...|             82.54|
|101025416|4c6468ec-42fe-489...|              5.43|
|101025416|bd5f70a4-fb1b-4ad...|             22.78|
|101025416|caba2c83-68dd-420...|              6.09|
|101025416|bc3b1686-872a-486...|              7.46|
|101025416|dd5a7463-a0be-4a2...|               5.6|
|101025416|c4acfcb3-79fd-4f4...|              7.68|
|103274988|09ac3d1b-aaf9-442...|              3.65|
|103274988|43063873-c7e8-4ac...|              6.52|
|103274988|9

In [35]:
# total number of product viewed during a session

session_view = new_training_data.filter(F.col('event_type') == 'view')
num_session_view = session_view.groupBy('user_id','user_session').agg(F.count('event_type').alias('num_session_views'))

num_session_view.show()

# combine dataframe

new_training_data = new_training_data.join(
    num_session_view,
    on = ['user_id','user_session'],
    how='left'
).na.fill({'num_session_views' : 0})




+---------+--------------------+-----------------+
|  user_id|        user_session|num_session_views|
+---------+--------------------+-----------------+
|100787781|188a44b5-83f1-4f1...|                4|
|100787781|9a95e186-1c1a-4ea...|                1|
|100787781|a91ab4bf-6d76-46f...|               10|
|100787781|ab88df75-5e32-419...|                6|
|100787781|33e757b9-11c7-4e8...|                4|
|100787781|53029ceb-b63b-4a9...|                3|
|100787781|4eb304f5-b558-4a0...|                3|
|101025416|bd5f70a4-fb1b-4ad...|                3|
|101025416|caba2c83-68dd-420...|               39|
|101025416|3506dd98-9567-48c...|                1|
|101025416|dd5a7463-a0be-4a2...|                4|
|101025416|bc3b1686-872a-486...|               21|
|101025416|4c6468ec-42fe-489...|                1|
|101025416|c4acfcb3-79fd-4f4...|                1|
|101245475|ed58d2ad-5e4d-49b...|                1|
|101245475|d771406b-a890-4ad...|                1|
|101245475|fe95eee1-4c1f-4a5...

In [36]:
# number of product added ti cart in a sesiion

session_cart = new_training_data.filter(F.col('event_type') == 'cart')
num_session_cart = session_cart.groupBy('user_id','user_session').agg(F.count('event_type').alias('num_session_cart'))

num_session_cart.show()

# combine dataframe

new_training_data = new_training_data.join(
    num_session_cart,
    on = ['user_id','user_session'],
    how='left'
).na.fill({'num_session_cart' : 0})



+---------+--------------------+----------------+
|  user_id|        user_session|num_session_cart|
+---------+--------------------+----------------+
|100787781|53029ceb-b63b-4a9...|               4|
|100787781|188a44b5-83f1-4f1...|              10|
|100787781|a91ab4bf-6d76-46f...|               8|
|100787781|ab88df75-5e32-419...|               4|
| 10079204|f873d16b-4efa-4e9...|               2|
|101025416|4c6468ec-42fe-489...|              11|
|101025416|caba2c83-68dd-420...|              12|
|101025416|bc3b1686-872a-486...|               4|
|102771868|e7c88b67-5db3-4ec...|               6|
|102771868|9f8bfb11-afbf-4a6...|               1|
|102771868|e3b8573f-c32b-44f...|               3|
|102771868|35c79850-8b0f-4d8...|               2|
|103156342|d82f0914-e57e-457...|               6|
|103274988|09ac3d1b-aaf9-442...|              13|
|103274988|43063873-c7e8-4ac...|               5|
|103540490|13c6b5da-c6c1-4b8...|               8|
|103540490|70337e79-b8ce-47f...|               8|


In [37]:
# number of produt removed from cart

session_frm_cart = new_training_data.filter(F.col('event_type') == 'remove_from_cart')
num_session_frm_cart = session_frm_cart.groupBy('user_id','user_session').agg(F.count('event_type').alias('num_session_frm_cart'))

num_session_frm_cart.show()

# combine dataframe

new_training_data= new_training_data.join(
    num_session_frm_cart,
    on = ['user_id','user_session'],
    how='left'
).na.fill({'num_session_frm_cart' : 0})




+---------+--------------------+--------------------+
|  user_id|        user_session|num_session_frm_cart|
+---------+--------------------+--------------------+
|100787781|188a44b5-83f1-4f1...|                   5|
|100787781|ab88df75-5e32-419...|                   7|
|100787781|a91ab4bf-6d76-46f...|                   4|
|101025416|caba2c83-68dd-420...|                  15|
|102771868|a1b846c0-bad5-47b...|                   3|
|102771868|cc1b259a-3b77-469...|                   5|
|102771868|e3b8573f-c32b-44f...|                   2|
|103156342|d82f0914-e57e-457...|                   1|
|103274988|43063873-c7e8-4ac...|                   1|
|103274988|09ac3d1b-aaf9-442...|                   2|
|103540490|f11f9da4-ac38-4d5...|                   2|
|103540490|70337e79-b8ce-47f...|                  17|
|105075440|6e6853cf-491b-412...|                   4|
|105075440|d336bb7a-fcb0-4f3...|                   4|
|105075440|d0381610-7532-4ea...|                   2|
|108242662|2eb8f823-8c17-42f

In [38]:
# session purchase


session_purchase = new_training_data.filter(F.col('event_type') == 'purchase')
num_session_purchase = session_purchase.groupBy('user_id', 'user_session').agg(F.count('event_type').alias('num_session_purchase'))

new_training_data = new_training_data.join(
    num_session_purchase,
    on=['user_id', 'user_session'],
    how='left'
).na.fill({'num_session_purchase':0})




In [39]:
# cart-to-purchase ration


new_training_data = new_training_data.withColumn('session_cart_to_purchase_ratio', F.when(F.col('num_session_purchase') == 0 , 0).otherwise(
    F.col('num_session_cart') / F.col('num_session_purchase')
))

new_training_data = new_training_data.withColumn('session_cart_to_purchase_ratio', F.round(F.col('session_cart_to_purchase_ratio'), 2))


In [40]:

# Calculate Cart Abandonment Rate and round to two decimal points
new_training_data = new_training_data.withColumn('session_cart_abandonment_rate', 
                           F.when(F.col('num_session_cart') == 0, 0)  # Handle case where no products were added to the cart
                           .otherwise(1 - (F.col('num_session_purchase') / F.col('num_session_cart'))))

new_training_data = new_training_data.withColumn('session_cart_abandonment_rate', F.round(F.col('session_cart_abandonment_rate'), 2)).persist(StorageLevel.DISK_ONLY)



In [41]:
new_training_data.limit(20).show()

+---------+--------------------+-------------------+----------------+----------+-------------------+----------+-----+----------------+---------------------+------------------+----------------------+----------------------+----------------------+----------------+------------------+-----------------+----------------+--------------------+--------------------+------------------------------+-----------------------------+
|  user_id|        user_session|         event_time|      event_type|product_id|        category_id|     brand|price|is_brand_missing|Num_purchase_per_2day|Avg_purchase_price|1_most_purchased_brand|2_most_purchased_brand|3_most_purchased_brand|session_duration|Avg_session_prices|num_session_views|num_session_cart|num_session_frm_cart|num_session_purchase|session_cart_to_purchase_ratio|session_cart_abandonment_rate|
+---------+--------------------+-------------------+----------------+----------+-------------------+----------+-----+----------------+---------------------+------

### Product features 

In [60]:
product_purchase = train_purchase.groupBy('product_id').agg(
    F.count('product_id').alias('total_num_product_purchased')
)


product_purchase.show()

new_training_data = new_training_data.join(
    product_purchase,
    on=['product_id'],
    how='left'
).na.fill({'total_num_product_purchased':0})



+----------+---------------------+
|product_id|num_product_purchased|
+----------+---------------------+
|   5833322|                  395|
|      5925|                   19|
|   5760785|                   79|
|   5809268|                   70|
|      6731|                  152|
|   5700501|                   81|
|   5724653|                   19|
|   5859440|                   59|
|   5677989|                   94|
|     15269|                    7|
|   5800056|                    6|
|   5697109|                   11|
|   5704999|                   63|
|   5883314|                  254|
|   5875362|                  166|
|   5699223|                   34|
|   5758067|                    6|
|   5783557|                   18|
|   5690569|                   13|
|   5727771|                    4|
+----------+---------------------+
only showing top 20 rows



In [61]:
new_training_data.show()

+----------+-------------------+----------------+-------------------+-------+-----+---------+--------------------+----------------+---------------------+
|product_id|         event_time|      event_type|        category_id|  brand|price|  user_id|        user_session|is_brand_missing|num_product_purchased|
+----------+-------------------+----------------+-------------------+-------+-----+---------+--------------------+----------------+---------------------+
|   5804167|2019-10-01 04:53:45|            cart|1487580005671109489| masura| 1.73|468353545|e85c79e0-3882-42d...|               0|                   30|
|   5610054|2019-10-01 05:46:31|            cart|1487580006551913373|   milv| 2.38|483832589|c294e056-2610-4fb...|               0|                   28|
|   5853713|2019-10-01 03:57:41|remove_from_cart|1487580009286598681| runail| 2.06|386491215|34179930-7376-48e...|               0|                  426|
|   5706784|2019-10-01 03:19:17|            cart|1487580005092295511| runail

In [63]:
null_count = new_training_data.filter(F.col('num_product_purchased').isNull()).count()
print(f"Number of nulls in 'num_product_purchased': {null_count}")


Number of nulls in 'num_product_purchased': 219788


In [64]:
# total number of product views

train_views = new_training_data.filter(F.col('event_type') == 'view')

# groupby by product
product_views = train_views.groupBy('product_id').agg(F.count('product_id').alias('total_product_views'))

product_views.show()

new_training_data = new_training_data.join(
    product_views,
    on = ['product_id'],
    how='left'
).na.fill({'total_product_views':0})



+----------+-------------------+
|product_id|total_product_views|
+----------+-------------------+
|   5886193|                 95|
|   5752486|                 41|
|   5760785|                374|
|   5618278|                154|
|   5667106|                 13|
|   5804345|                 33|
|   5877723|                 34|
|   5809268|                186|
|   5852507|                313|
|   5706454|                 55|
|   5712736|                526|
|   5813329|                 66|
|   5692484|                 29|
|   5699223|                117|
|   5884241|                 60|
|   5857316|                 46|
|   5835374|                 51|
|   5700214|                 44|
|   5837465|                309|
|   5869132|                554|
+----------+-------------------+
only showing top 20 rows



In [65]:
new_training_data.show()

+----------+-------------------+----------------+-------------------+-------+-----+---------+--------------------+----------------+---------------------+-------------------+
|product_id|         event_time|      event_type|        category_id|  brand|price|  user_id|        user_session|is_brand_missing|num_product_purchased|total_product_views|
+----------+-------------------+----------------+-------------------+-------+-----+---------+--------------------+----------------+---------------------+-------------------+
|   5804167|2019-10-01 04:53:45|            cart|1487580005671109489| masura| 1.73|468353545|e85c79e0-3882-42d...|               0|                   30|                107|
|   5610054|2019-10-01 05:46:31|            cart|1487580006551913373|   milv| 2.38|483832589|c294e056-2610-4fb...|               0|                   28|                184|
|   5853713|2019-10-01 03:57:41|remove_from_cart|1487580009286598681| runail| 2.06|386491215|34179930-7376-48e...|               0

In [66]:
# total product add to cart

train_cart = new_training_data.filter(F.col('event_type') == 'cart')

product_cart = train_cart.groupBy('product_id').agg(F.count('product_id').alias('total_product_cart'))

product_cart.show()

new_training_data = new_training_data.join(
    product_cart,
    on=['product_id'],
    how='left'
).na.fill({'total_product_cart':0})



+----------+------------------+
|product_id|total_product_cart|
+----------+------------------+
|   5773322|                42|
|   5563794|                54|
|   5848791|                 8|
|   5782947|                18|
|   5813912|                29|
|      6731|               500|
|   5875374|               181|
|   5854189|                16|
|   5819241|               210|
|   5784566|                46|
|   5794153|               260|
|   5875362|               657|
|   5833322|              1280|
|   5803877|                35|
|   5746402|               111|
|   5801756|                24|
|   5778722|                85|
|   5773340|               120|
|   5760785|               328|
|   5697109|                56|
+----------+------------------+
only showing top 20 rows



In [67]:
new_training_data.show()

+----------+-------------------+----------------+-------------------+-------+-----+---------+--------------------+----------------+---------------------+-------------------+------------------+
|product_id|         event_time|      event_type|        category_id|  brand|price|  user_id|        user_session|is_brand_missing|num_product_purchased|total_product_views|total_product_cart|
+----------+-------------------+----------------+-------------------+-------+-----+---------+--------------------+----------------+---------------------+-------------------+------------------+
|   5804167|2019-10-01 04:53:45|            cart|1487580005671109489| masura| 1.73|468353545|e85c79e0-3882-42d...|               0|                   30|                107|               144|
|   5610054|2019-10-01 05:46:31|            cart|1487580006551913373|   milv| 2.38|483832589|c294e056-2610-4fb...|               0|                   28|                184|               135|
|   5853713|2019-10-01 03:57:41|rem

In [70]:
# product cart  purchase conversion rate 


new_training_data = new_training_data.withColumn('product_cart_to_purchase_conversion_ratio', F.when(F.col('total_num_product_purchased') == 0 , 0).otherwise(
    F.col('total_product_cart') / F.col('total_num_product_purchased')
))

new_training_data = new_training_data.withColumn('product_cart_to_purchase_conversion_ratio', F.round(F.col('product_cart_to_purchase_conversion_ratio'), 2)).persist(StorageLevel.DISK_ONLY)

new_training_data.show()

+----------+-------------------+----------------+-------------------+-----------+-----+---------+--------------------+----------------+---------------------+-------------------+------------------+-----------------------------------------+
|product_id|         event_time|      event_type|        category_id|      brand|price|  user_id|        user_session|is_brand_missing|num_product_purchased|total_product_views|total_product_cart|product_cart_to_purchase_conversion_ratio|
+----------+-------------------+----------------+-------------------+-----------+-----+---------+--------------------+----------------+---------------------+-------------------+------------------+-----------------------------------------+
|   5886193|2019-10-01 08:33:52|            view|1487580013690618064|     consly| 5.87|541909826|01b9fb74-8c2b-4d2...|               0|                   10|                 95|                49|                                      4.9|
|   5833322|2019-10-05 07:19:55|        purc

### user - product interaction 

In [71]:
#user_product_view_count

user_product_view = train_views.groupBy('user_id','product_id').agg(F.count('event_type').alias('user_product_view'))

user_product_view.show()

new_training_data = new_training_data.join(
    user_product_view,
    on=['user_id','product_id'],
    how='left'
).na.fill({'user_product_view':0})



+---------+----------+-----------------+
|  user_id|product_id|user_product_view|
+---------+----------+-----------------+
|542357047|   5852517|                1|
|515443537|   5687741|                1|
|511173605|   5693593|                1|
|411113964|   5801325|                2|
|463842147|   5826406|                1|
|556310217|   5838846|                1|
|557324555|   5740986|                1|
|556551445|   5588152|                1|
|556350711|   5649236|                1|
|550883221|   5877454|                1|
|503093887|   5857069|                1|
|555256312|   5712790|                1|
|556164626|   5872904|                5|
|359124175|   5851621|                1|
|557711264|   5785424|                1|
|557698181|   5635103|                1|
|528635770|   5769914|                2|
|392937743|   5837620|                2|
|557520412|   5861557|                1|
|556127613|   5877454|                1|
+---------+----------+-----------------+
only showing top

In [72]:
#user_product_add to cart _count

user_product_cart = train_cart.groupBy('user_id','product_id').agg(F.count('event_type').alias('user_product_cart'))

user_product_cart.show()

new_training_data = new_training_data.join(
    user_product_cart,
    on=['user_id','product_id'],
    how='left'
).na.fill({'user_product_cart':0})


+---------+----------+-----------------+
|  user_id|product_id|user_product_cart|
+---------+----------+-----------------+
|510669794|      5953|                1|
|555813359|   5853604|                3|
|540249915|   5758061|                1|
|556043509|   5843998|                3|
|555873806|   5687470|                3|
|557585105|   5856550|                2|
|379259878|   5788435|                2|
|557489316|   5859475|                2|
|555778305|   5747359|                3|
|402334206|   5815083|                1|
|486328391|   5563754|                1|
|549172882|   5886746|                1|
|547733654|   5751383|                1|
|506513604|   5622677|                1|
|554797493|   5812117|                1|
|557326643|   5848635|                3|
|551676584|   5695489|                1|
|556122109|   5870668|                3|
|557227863|   5878510|                3|
|480151395|   5760785|                1|
+---------+----------+-----------------+
only showing top

In [73]:
#user_product_add to cart _count

user_product_purchase = train_purchase.groupBy('user_id','product_id').agg(F.count('event_type').alias('user_product_purchase'))

user_product_purchase.show()

new_training_data = new_training_data.join(
    user_product_purchase,
    on=['user_id','product_id'],
    how='left'
).na.fill({'user_product_purchase':0}).persist(StorageLevel.DISK_ONLY)

new_training_data.show()

+---------+----------+---------------------+
|  user_id|product_id|user_product_purchase|
+---------+----------+---------------------+
|477118331|   5683947|                    1|
|539457086|      4915|                    1|
|538498612|   5749149|                    1|
|555790106|   5797977|                    1|
|555996661|   5860437|                    1|
|474393926|   5700167|                    1|
|480463194|   5873653|                    1|
|555150103|   5760770|                    1|
|525401126|   5688125|                    1|
|535820863|   5873171|                    1|
|469091951|   5549838|                    1|
|509653293|   5866156|                    1|
|557495972|   5808546|                    1|
|553959742|   5856193|                    1|
|467567265|   5744098|                    1|
|545647953|   5818396|                    1|
|555738287|   5866174|                    1|
|519108765|   5877609|                    1|
|531299903|   5843567|                    1|
|489992190

In [None]:
# hour of the day

new_training_data = new_training_data.withColumn('hour',F.hour(F.col('event_time')))
new_training_data  = new_training_data.withColumn('minutes',F.minute(F.col('event_time')))
new_training_data = new_training_data.withColumn('seconds',F.second(F.col('event_time')))

# add cyclic time features 
new_training_data = new_training_data.withColumn(
    'hour_sin' , F.sin(2 * math.pi * F.col('hour') / 24)
).withColumn('hour_cos',F.cos(2 * math.pi * F.col('hour') / 24))

new_training_data = new_training_data.withColumn(
    'minutes_sin' , F.sin(2 * math.pi * F.col('minutes') / 24)
).withColumn('minutes_cos',F.cos(2 * math.pi * F.col('minutes') / 24))

new_training_data = new_training_data.withColumn(
    'second_sin',F.sin(2 * math.pi * F.col('seconds') / 24)
).withColumn('second_cos',F.cos(2 * math.pi * F.col('seconds') / 24))


new_training_data = new_training_data.withColumn('week',F.dayofweek(F.col('event_time')))
new_training_data.drop('hour','minutes','seconds')

new_training_data.show()

+---------+----------+-------------------+----------+-------------------+--------+-----+--------------------+----------------+---------------------+-------------------+------------------+-----------------------------------------+-----------------+-----------------+---------------------+----+-------+-------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+
|  user_id|product_id|         event_time|event_type|        category_id|   brand|price|        user_session|is_brand_missing|num_product_purchased|total_product_views|total_product_cart|product_cart_to_purchase_conversion_ratio|user_product_view|user_product_cart|user_product_purchase|hour|minutes|seconds|           hour_sin|            hour_cos|         minutes_sin|         minutes_cos|          second_sin|          second_cos|week|
+---------+----------+-------------------+----------+-------------------+--------+-----+--------------------+-------------

In [None]:
new_training_data.summary().show()