## 0. Notes:

In these notes I am going to explain the thought process behind the following code and the different part that have been performed.

1. Loading the data and the libraries, in this section we combine the two tables together, which are 'web navigation' and 'sku details'
2. Data Preprocessing, in this section I start figuring out how can we create a score/rate that best describes the user preference regarding a given product.
    - The simple way is to just use the product view as the score, where the more the user views a given product, the more he prefers it or interested in it. However, this simplification doesn't really reflect the user's preference and would'nt result a good recommendation system. So few key points have been tackled in this section, where I first broke down the product id, SKU, into parts and I have only used the first and third part of it, which are model and material. The model id has replaced the product id, then I tried checking the material part as well. However, from the looks of it usually one model has one material, so it didn't make sense to include the material in the score/rate equation.
    - The second variable I was interested in observing was the cart_adds, as it has much more importance than the product view, since if the user adds the product to cart then it's most likely he is interested in such product/type.
    - So now we have three variables: product view, cart adds, color, the other challenge is figuring out how can we combine them together in order to get a meaningful score that describes the user preference. This was done by introducing weights into the equation, where each variable has its own weight.
  For starters the product weight is simply one, then we have cart adds, the question that is asked here on average, how many views does a user make to add a product to cart. Then because there's some large outliers, I have decided to take the median as it would be a better representation to the user's behavior generally. The weight of the final variable is a bit complicated. So the idea here is to see on average how many times does a user view a different color when browsing products, and it's around 1.92.
    - The color score was calculated by giving importance to a given color that appears more times for product x than another color. For example, if product x has 2 colors (black and white) and mostly the black color was viewed more by users, then black has a higher score than white.
    - Another factor that was included in the equation is time decay factor, where we give more importance to a product that has been viewed several times in on day by a given user than a product that has been viewed several times but across several days that are far apart by the same user. The time decay factor was calculated by followaing the log approach, in order to give importance to reecency of the view date. For instance if user x has viewed product y today and yesterday the value of the decay factor won't be the same as when user x viewed product y today and 3 months ago.
    - The score equation is as follows: ( 1 * product views + 2 * cart adds + 1.92 * color ) * time decay factor.
3. Transforming the mcvisid and model id into numerical values by implementing stringindexer in order to pass them when creating ALS models.
4. Splitting the data, this process was done by deciding to take the recent interaction made by the user and store it in the test set and the rest in the train set. However, if a user only made one or a few number of interactions it doesn't make sense to take the last interaction, therefore I have checked what would be a reasonable threshold that I can implement in order to get a decent test set size. Taking the users who have made 4 or more interactions as my 'concentrated datapoints' where I take the last interaction they made and store it in the test set. This results in around a 100k records in the test set
5. Next part is tricky and can distort the model if not implemented correctly. Normalization, this phase has to be implemented after the train/test spllit in order to avoid data leakage, by this I mean passing information from the train to the test set. This is unlike the stringindexer part as this process is required to be done before the splitting in order not to distort the dataset.
6. Model fitting, in this stage we try different hyperparameters such as ranks, maxIter and regParams, finding the best model will show us what is the right combination of hyperparameters that resulted in a high accuracy. In total we're trying out 12 models
7. Model evaluation, in this stage we start evaluation each model on the test set. This is done by first generating recommendations for each model, then evaluating these recommendations using the original value in the test set. The evaluation metric that has been used is precision@k, where it shows us on average how many of the models recommeded are related to the user's preference
8. Using the best model we try to do some numeric/categorical visualizations, where I am interested in seeing if the last product the user has viewed gets products recommended to it. (REPARTO)
9. Few discussion points that can implemented later on. Cross validation can implemented in order to get higher accuracy, also few tweaks in the weights might help get better scores. I can also try to see if I can introduce another variable into the score/rating equation, maybe the date variable can be broken down and used if the dataset contains all of the months in the year. This can introduce the seasonality concept, where certain products get viewed during a specific period of the year. Another point that can be done is trying more than 12 models, by increasing the combination of hyperparameters. Due to the cluset being terminanted because of being idle for a given period, this was hard to implement unfortunately. Also changing the threshold of the train/test split might lead to a better accuracy score, however this is highly unlikely as the dataset is already very large and 100k records for a test set is very good actually.
10. This project can still have some work done on it, but it's a starting point. Item-based recommendation system can also be implemented, for instance, cosine similarity can be implemented in order to measure the similarity between products. Clustering techniques can be a possibility as well to be implemented after calculating the similarity, to check the type of products that are grouped together based on their similarity scores.

## 1. Load Data & Libraries

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StringIndexerModel
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import split
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import stddev, mean, col
from pyspark.sql.functions import max
from pyspark.sql.functions import min
from pyspark.mllib.recommendation import Rating
from pyspark.ml.evaluation import RankingEvaluator
from pyspark.sql.functions import desc, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType
from pyspark.sql.functions import exp, sum, avg, when, from_unixtime, unix_timestamp, datediff, log1p, abs, udf, countDistinct,  count, explode, collect_list, array, struct, expr
from pyspark.ml.recommendation import ALSModel
import os
from pyspark.sql import functions as F
import pandas as pd

In [None]:
# set base path
BASE_PATH_RECOM = '/FileStore/tables/1_recommendation/'
# ls the files in the base path
dbutils.fs.ls(BASE_PATH_RECOM)

# load web navigation
df_web = (
  spark.read.format('csv')
  .option('header', 'true')
  .option('sep', ',')
  .load(BASE_PATH_RECOM + 'web_navigations.csv')
)

# load web navigation
df_sku = (
  spark.read.format('csv')
  .option('header', 'true')
  .option('sep', ',')
  .load(BASE_PATH_RECOM + 'sku_details.csv')
)

df = (
    df_web
    .where("ADOBE_SKU is not NULL")
    .join(df_sku, on = 'ADOBE_SKU', how = 'left')   
)

## 2. Data Preprocessing

In [None]:
def Product_Key_Split (df):
    split_col=split(df['ADOBE_SKU'], '_')
    df = df.withColumn('Model', split_col.getItem(0)).withColumn('Material', split_col.getItem(1)).withColumn('Color', split_col.getItem(2))
    print("Product_Key_Split Completed")
    return df

In [None]:
def weights_calculation(df):
    # grouping by user and product and calculate the total number of views and purchases
    grouped_cart_df = (
        df
        .groupBy('mcvisid', 'Model')
        .agg(
            sum('Product_Views_custom').alias('total_views'),
            sum('Cart_Adds').alias('total_cart_adds')
        )
        .filter(col('total_cart_adds') >= 1)
    )
    
    # calculating the average number of views per purchase for each user
    result_cart_df = (
        grouped_cart_df
        .groupBy('mcvisid')
        .agg(avg(col('total_views') / col('total_cart_adds')).alias('average_views_per_cart_adds'))
        .filter(col('average_views_per_cart_adds').isNotNull())
    )
    
    df_filtered = df.filter(sorted_df['Product_Views_custom'] == 1)
    # Calculating the distinct colors viewed per user
    distinct_colors_per_user = df_filtered.groupBy('mcvisid').agg(countDistinct('Color').alias('distinct_colors'))

    # Calculating the average distinct colors viewed per user
    average_distinct_views = distinct_colors_per_user.select(avg('distinct_colors').alias('average_distinct_views'))
    
    print("weights_calculation Completed")
    return result_cart_df, average_distinct_views

In [None]:
def color_score_calculation (view_name, df):
    df_color_agg = spark.sql('''
        SELECT Model, Color, (Total_Views + Total_Cart_Adds) as Color_Score
        FROM 
         (
              SELECT Model, Color, SUM(Product_Views_custom) as Total_Views, SUM(2*Cart_Adds) as Total_Cart_Adds 
              FROM GeneralTable 
              GROUP BY Model, Color
         )

    ''')
    
    df_total_interactions = spark.sql('''
        SELECT Model, (Total_Views + Total_Cart_Adds) as Score
        FROM 
         (
              SELECT Model, SUM(Product_Views_custom) as Total_Views, SUM(2*Cart_Adds) as Total_Cart_Adds 
              FROM GeneralTable 
              GROUP BY Model
         )

    ''')
    
    # Calculating the score for each combination of product ID, color, and material
    df_partial_scores = df_color_agg.join(df_total_interactions, "Model").withColumn(
        "Final_Color_Score", col("Color_Score") / col("Score")
    )
    
    df = df.join(df_partial_scores.select("Model", "Final_Color_Score"), on = "Model")
    df = df.select("mcvisid","date", "Model", "Color", "REPARTO","Product_Views_Custom", "Cart_Adds","Final_Color_Score")
    
    print("color_score_calculation Completed")
    return df

In [None]:
def max_date(df):
    # Calculating the maximum date for each user-product combination
    max_date_df = df.groupBy('mcvisid', 'Model').agg({'date': 'max'})\
                   .withColumnRenamed('max(date)', 'last_view_date')

    # Joining the max_date_df with the original df
    df_with_max_date = df.join(max_date_df, ['mcvisid', 'Model'])
    
    print("max_date Completed")
    return df_with_max_date

In [None]:
def Score_calculation(df, CA_W, Color_W):
    half_life_days = 7
    # Calculating the decay factor based on the last view date
    decay_factor = (
        when(datediff(col('date'), col('last_view_date')) == 0, 1)
        .otherwise(1 / log1p(1 + (abs(datediff(col('date'), col('last_view_date'))) / half_life_days)))
    )
    
    df_with_score = df.withColumn("Total_score", (col('Product_Views_custom') + CA_W * col('Cart_Adds') + Color_W * col("Final_Color_Score")) * decay_factor)
    score_df = df_with_score.groupBy('mcvisid', 'Model', 'date').agg(sum('Total_score').alias('Score'))
    
    lower_quartile = score_df.approxQuantile("Score", [0.25], 0.01)[0]
    upper_quartile = score_df.approxQuantile("Score", [0.75], 0.01)[0]
    iqr = upper_quartile - lower_quartile

    # Calculating lower and upper bounds
    lower_bound = lower_quartile - 1.5 * iqr
    upper_bound = upper_quartile + 1.5 * iqr
    print("Lower bound:", lower_bound)
    print("Upper bound:", upper_bound)
    
    # Seting scores below lower bound to lower bound, and scores above upper bound to upper bound
    score_adj_df = score_df.withColumn('Score', when(col('Score') < lower_bound, lower_bound).otherwise(col('Score')))
    score_adj_df = score_df.withColumn('Score', when(col('Score') > upper_bound, upper_bound).otherwise(col('Score')))
    
    print("Score_calculation Completed")
    return score_adj_df

In [None]:
df = Product_Key_Split(df)
sorted_df = df.orderBy(['mcvisid', 'date'])
views_per_cart_df, avg_distinct_color_views_score = weights_calculation(sorted_df)
value_list = avg_distinct_color_views_score.collect()
# Accessing the value from the list
avg_dist_color_views = value_list[0][0]
print(avg_dist_color_views)
display(views_per_cart_df)

In [None]:
display(sorted_df)

In [None]:
Cart_Adds_Weight = 2
Color_Weight = 1.92
sorted_df.createOrReplaceTempView('GeneralTable')
sorted_df = color_score_calculation('GeneralTable', sorted_df)
max_date_df = max_date(sorted_df)
adj_df =  Score_calculation(max_date_df, Cart_Adds_Weight, Color_Weight)
display(adj_df)

## 3. String To Index (Products & Visitors)

In [None]:
def StringToIndex(df, val, val_date):
    indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in df.columns if column not in {val, val_date}]
    pipeline = Pipeline(stages=indexers)
    pipeline_model = pipeline.fit(df)
    df_indexed = pipeline_model.transform(df)
    string_indexer_models = [stage for stage in pipeline_model.stages if isinstance(stage, StringIndexerModel)]
    print("StringToIndex completed")
    return df_indexed, string_indexer_models

In [None]:
df_idx, string_idx_models = StringToIndex(adj_df, 'Score', 'date')

StringToIndex completed


## 4. Splitting The Data Into Train & Test Sets

In [None]:
def interactions_per_user(df, val):
    # grouping by user_id and count number of interactions
    user_counts = df.groupBy('mcvisid').agg(count('*').alias('interactions'))
    # filtering out users with less than 5 interactions
    user_counts = user_counts.filter(col('interactions') >= val)
    
    record_count = user_counts.count()
    print("Total record count:", record_count)
    return user_counts

In [None]:
def split_data(df, val):
    # joining back to original data to get latest interaction for each user
    latest_interactions = df.join(val, on='mcvisid') \
                            .orderBy(['mcvisid', 'date'], ascending=[True, False]) \
                            .withColumn('rank', row_number().over(Window.partitionBy('mcvisid').orderBy(desc('date')))) \
                            .filter(col('rank') == 1) \
                            .select('mcvisid', 'Model', 'mcvisid_index', 'Model_index','Score', 'date')

    # spliting data into train and test sets
    train = df.join(latest_interactions, on=['mcvisid', 'Model', 'mcvisid_index', 'Model_index','Score', 'date'], how='left_anti')
    test = latest_interactions.select('mcvisid', 'Model', 'mcvisid_index', 'Model_index', 'Score', 'date')
    
    print("split_data completed")
    return train, test

In [None]:
def set_size(df, df_train, df_test):
    df_size1 = df.count()
    print("Size of the entire dataset:", df_size1)
    df_size2 = df_train.count()
    print("Size of the train set:", df_size2)
    df_size3 = df_test.count()
    print("Size of the test set:", df_size3)

In [None]:
nb_of_users = interactions_per_user(df_idx, 4)
train_df, test_df = split_data(df_idx, nb_of_users)
set_size(df_idx, train_df, test_df)

Total record count: 109364
split_data completed


## 5. Normalizing The Scores

In [None]:
def normalize_score(df):
    min_score = df.agg({"score": "min"}).collect()[0][0]
    max_score = df.agg({"score": "max"}).collect()[0][0]

    # Defining the UDF for normalization
    normalize_udf = udf(lambda x: (x - min_score) / (max_score - min_score))
    score_normalized_df = df.withColumn("Score_Normalized", normalize_udf(col("Score")))
    
    print("normalize_score completed")
    return score_normalized_df

In [None]:
train_norm_df = normalize_score(train_df)
test_norm_df = normalize_score(test_df)

normalize_score completed
normalize_score completed


In [None]:
train_norm_df = train_norm_df.withColumn("Score_Normalized", train_norm_df["Score_Normalized"].cast(FloatType()))
test_norm_df = test_norm_df.withColumn("Score_Normalized", test_norm_df["Score_Normalized"].cast(FloatType()))

In [None]:
train_norm_df.write.parquet(BASE_PATH_RECOM + "/train_norm_dfII.parquet")

In [None]:
test_norm_df.write.parquet(BASE_PATH_RECOM + "/test_norm_df.parquet")

## 6. ALS Model Fitting

In [None]:
models_folder = 'models_Trialiii_ALS'
model_path = f"{BASE_PATH_RECOM}/{models_folder}/"

In [None]:
train_norm_df = spark.read.parquet("/FileStore/tables/1_recommendation/train_norm_dfII.parquet/")

In [None]:
def als_models(ranks, iterations, regularization, u_col, i_col, r_col):
    als_list = []
    idx = 0
    for k in ranks:
        for itr in iterations:
            for reg_p in regularization:
                    model_name = f"model_{idx}"
                    idx = idx + 1
                    als_model = (ALS(maxIter=itr,regParam = reg_p, rank = k, implicitPrefs = True, userCol= u_col, 
                                     itemCol= i_col, ratingCol = r_col, coldStartStrategy="drop",nonnegative=True))
                    als_list.append((model_name, als_model))    
    print("list_of_models_complete")
    return als_list

In [None]:
def model_fit(df_train, models):
    for name, model in models:
        fitted = model.fit(df_train)
        fitted.save(f"{model_path}/{name}")
        print(name, " complete")
    print("model_fit complete")

In [None]:
ranks = [3, 5, 10]
maxIters = [10, 20]
regParams = [0.05, 0.1]
train_norm_df.cache()
models = als_models(ranks, maxIters, regParams, "mcvisid_index", "Model_index", "Score_Normalized")
model_fit(train_norm_df, models)

## 7. ALS Model Evaluation

In [None]:
test_norm_df = spark.read.parquet("/FileStore/tables/1_recommendation/test_norm_df.parquet/")

In [None]:
def load_model(path, val):
    loaded_models = []
    for i in range(val):
        model_path_with_number = f"{path}model_{i}"
        loaded_models.append(ALSModel.load(model_path_with_number))
    print("load_model complete")
    return loaded_models

In [None]:
models_path = '/FileStore/tables/1_recommendation/models_Trialiii_ALS/'
models = load_model(models_path, 12) # 12 is the number of models created

load_model complete


In [None]:
def generate_recommendation(model, test_df, val):
    k = val
    top_k_recommendations = model.recommendForUserSubset(test_df, k)
    # Sorting the recommendations in descending order of similarity
    sorted_recommendations = top_k_recommendations.withColumn("sorted_recommendations", F.expr("sort_array(recommendations, false)"))
    print("generate_recommendation complete")
    return sorted_recommendations

In [None]:
def calculate_precision_at_k(test_df, recommendations, k):
    ground_truth = test_df.select('mcvisid_index', 'Model_index')
    # Joining recommendations and ground truth on user_id
    joined_df = recommendations.join(ground_truth, 'mcvisid_index')
    # Calculating precision@k
    precision_at_k = joined_df.withColumn(
        'intersection',
        expr(f'array_intersect(sorted_recommendations.Model_index, array({",".join(str(i) for i in range(k))}))')
    ).withColumn(
        'precision_at_k',
        expr(f'size(intersection) / {k}')
    ).agg({'precision_at_k': 'mean'}).first()[0]
    print("calculate_precision_at_k complete")
    return precision_at_k

In [None]:
k = 10
i = 0
for model in models:
    recommendations_df = generate_recommendation(model, test_norm_df, k)
    p_at_k = calculate_precision_at_k(test_norm_df, recommendations_df, k)
    print(f"model_{i}")
    print(f"Precision@{k}: {p_at_k:.4f}")
    i = i+1

generate_recommendation complete
calculate_precision_at_k complete
model_0
Precision@10: 0.5883
generate_recommendation complete
calculate_precision_at_k complete
model_1
Precision@10: 0.6080
generate_recommendation complete
calculate_precision_at_k complete
model_2
Precision@10: 0.4838
generate_recommendation complete
calculate_precision_at_k complete
model_3
Precision@10: 0.5215
generate_recommendation complete
calculate_precision_at_k complete
model_4
Precision@10: 0.4688
generate_recommendation complete
calculate_precision_at_k complete
model_5
Precision@10: 0.5134
generate_recommendation complete
calculate_precision_at_k complete
model_6
Precision@10: 0.3607
generate_recommendation complete
calculate_precision_at_k complete
model_7
Precision@10: 0.4176
generate_recommendation complete
calculate_precision_at_k complete
model_8
Precision@10: 0.2560
generate_recommendation complete
calculate_precision_at_k complete
model_9
Precision@10: 0.2507
generate_recommendation complete
calcula

## 8. Vieweing Some Recommendations

In [None]:
def recommendation_evaluation(u_rec_list, test_df, string_indexer_models):
        rec_alias = u_rec_list.alias("rec")
        test_alias = test_df.alias("test")
        recommendations_with_mcvisid = rec_alias.join(test_alias, col("rec.mcvisid_index") == col("test.mcvisid_index"), "left")
        # Selecting relevant columns and renaming them
        recommendations_with_mcvisid = recommendations_with_mcvisid.select(col("test.mcvisid").alias("mcvisid"), col("rec.sorted_recommendations"))

        # Retrieving values from the recommendations column
        rows = recommendations_with_mcvisid.collect()
        x = 0
        print("mcvisid |", "| recommendations")
        for row in rows:
            mcvisid = row["mcvisid"]
            recommendations = row["sorted_recommendations"]
            original_values = [string_idx_models[1].labels[int(rec[0])] for rec in recommendations]
            print(mcvisid, original_values)
            x = x+1
            if (x>10):
                break
        print("---------------------------------------------------------------------")

In [None]:
Final_recommendations_df = generate_recommendation(models[1], test_norm_df, k)
recommendation_evaluation(Final_recommendations_df, test_norm_df, string_idx_models)

In [None]:
specific_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

filtered_df = test_norm_df.filter(test_norm_df['mcvisid'] == specific_id)

filtered_df.show()