In [1]:
# Import python libraries
import re
import time
import importlib

import numpy as np
import pandas as pd
import dask.dataframe as dd

from dask import delayed
from scipy.sparse import csr_matrix
from collections import defaultdict
from matplotlib import pyplot as plt
from typing import Any, Dict, List, Set, Tuple
from scipy.spatial.distance import pdist, squareform
from sklearn.metrics.pairwise import pairwise_distances


# Import project modules
import kdd_processor
importlib.reload(kdd_processor)
from kdd_processor import handle_data


# Ignore warnings
import warnings
warnings.filterwarnings('ignore')

In [2]:
# Housekeeping ==> THIS GENERALLY STAYS THE SAME
SEED, NUM_RECOMMENDATIONS = 183, 10
task, task1_locales = 'task1', ['UK', 'DE', 'JP']
NUM_CORES, NUM_THREADS = 8, 16
data_path, output_path = '../../data/', '../../outputs/'
train_path, test_path = data_path + 'train/', data_path + 'test/'
output_file = output_path + task + '_predictions.parquet'
project_files = ['products_train.csv', 'sessions_train.csv', f'sessions_test_{task}.csv']
prod_dtypes = {
    'id': 'object',
    'locale': 'object',
    'title': 'object',
    'price': 'float64',
    'brand': 'object',
    'color': 'object',
    'size': 'object',
    'model': 'object',
    'material': 'object',
    'author': 'object',
    'desc': 'object'
}
sess_dtypes = {
    'session_id': 'int32'
}

In [3]:
# PARTITION SWITCH ==> CHANGE AS NEEDED
# Specify partitions, ids and sampling fractions here
num_partitions = 5000              # num. of subsets: None will automatically discard partition_ids below
partition_ids = {                   # partition id(s): 'all', or any one of {integer = n, range = range(0, 6), list = [5, 19]} 
    'products_train': [0],
    'sessions_train': [0],
    'sessions_test': 'all'
}
fraction = 1                       # fraction of each partition to sample from, 1 will not sample


# LOAD DATA HERE. This generally stays the same
products_train, sessions_train, sessions_test = handle_data(
    project_files,
    [train_path, train_path, test_path],
    task,
    task1_locales,
    num_partitions,
    partition_ids,
    fraction,
    SEED,
    prod_dtypes,
    sess_dtypes
)

Processed products file found at: ../../data/train/. Loading it now...
Processed sessions file found at: ../../data/train/. Loading it now...
Processed sessions test file found at: ../../data/test/. Loading it now...


### Dask
We will be using Dask, as it is a great tool for parallel computing.

Below are some examples. CAUTION: Dask dframes take time to compute(), uncomment as required

Resources: https://www.youtube.com/@Dask-dev/videos

In [4]:
# # # CAUTION: Dask dframes take time to compute, uncomment as required
# products_train.head(2)

In [5]:
# sessions_train.head(2)

In [6]:
# sessions_test.head(2)

In [7]:
# products_train.compute().info()

In [8]:
# sessions_train.compute().info()

In [9]:
# sessions_test.compute().info()

In [10]:
# products_train.compute().shape

In [11]:
# sessions_train.compute().shape

In [12]:
# sessions_test.compute().shape

Retrieving details for a sample product

In [13]:
# # Details for a product
# view_product = 'B06XKPB3GT'
# products_train[products_train['id'] == view_product].compute()

#### Identify product involvements
We will be identifying products that are in both products_train and sessions_train. This will be USEFUL while getting recommendations later on.

In [14]:
# Function to analyze dataframes
def analyze_dataframes(
    products_train: dd.DataFrame, 
    sessions_train: dd.DataFrame
) -> Tuple[pd.Series, pd.Series, Dict[str, int], Dict[str, List[int]]]:
    
    # 1. Identify unique product ids in products_train
    prod_in_pt = products_train['id'].unique().compute()

    # 2. Identify unique product ids in sessions_train['prev_items']
    sessions_train = sessions_train.assign(prev_items=sessions_train['prev_items'].str.split(','))
    prod_in_st = sessions_train['prev_items'].explode().unique().compute()

    # 3. Identify rows where each unique id in prod_in_pt can be found in products_train
    products_train = products_train.reset_index().rename(columns={'index': 'row'})
    prod_in_pt_rows = products_train[products_train['id'].isin(prod_in_pt)][['row', 'id']].compute().set_index('id')['row'].to_dict()

    # 4. Identify rows where each unique id in prod_in_st occurs in sessions_train['prev_items'] and count occurrences
    exploded_sessions_train = sessions_train.explode('prev_items').reset_index().rename(columns={'index': 'row'})
    unique_pairs = exploded_sessions_train[['row', 'prev_items']].drop_duplicates()
    prod_in_st_occs = unique_pairs[unique_pairs['prev_items'].isin(prod_in_st)].groupby('prev_items')['row'].apply(list, meta=('row', 'f8')).compute().to_dict()
    
    return prod_in_pt, prod_in_st, prod_in_pt_rows, prod_in_st_occs

# Function to view common products
def view_common_products(
    products_train: dd.DataFrame,
    sessions_train: dd.DataFrame,
    prod_pt: pd.Series,
    prod_st: pd.Series,
    prod_pt_rows: Dict[str, int],
    prod_st_occs: Dict[str, List[int]]
) -> pd.DataFrame:

    # Calculate common product ids
    common_ids = set(prod_pt) & set(prod_st)

    # Function to calculate partition number
    def get_partition(row_index, partition_size):
        return row_index // partition_size

    app_rows = []

    # Calculate partition sizes
    partition_size_products = len(products_train) // len(partition_ids['products_train'])
    partition_size_sessions = len(sessions_train) // len(partition_ids['sessions_train'])

    for common_id in common_ids:
        count_in_sessions = len(prod_st_occs[common_id])
        row_in_products = prod_pt_rows[common_id] + 1
        app_data = prod_st_occs[common_id]
        partition_numbers = [get_partition(x, partition_size_sessions) for x in app_data]

        app_rows.append([common_id, None, row_in_products, ",".join(map(str, partition_numbers)), ",".join(map(str, [x + 1 for x in app_data])), count_in_sessions])

    # Create a DataFrame to store the results
    app_df = pd.DataFrame(app_rows, columns=['id', 'part_prod', 'row_prod', 'part_sess', 'row_sess', 'count_sess'])
    app_df['part_prod'] = app_df['row_prod'].apply(lambda x: get_partition(x, partition_size_products))
    app_df = app_df[['id', 'part_prod', 'row_prod', 'part_sess', 'row_sess', 'count_sess']]
    app_df = app_df.sort_values(by=['row_prod']).reset_index(drop=True)
    
    return app_df

# Start timer
start_time = time.time()

# Analyze dataframes
prod_pt, prod_st, prod_pt_rows, prod_st_occs = analyze_dataframes(products_train, sessions_train)

# Intermediate timer
inter_time = time.time()
print(f'Extracting product occurrences took: {inter_time - start_time} seconds')

# View common products
common_products = view_common_products(products_train, sessions_train, prod_pt, prod_st, prod_pt_rows, prod_st_occs)

# End timer
end_time = time.time()
print(f'Retrieving common products took {end_time - inter_time} seconds')
print(f'{len(common_products)} common products in products_train and sessions_train, for partitions: {partition_ids["products_train"]}, {partition_ids["sessions_train"]}')

common_products

Extracting product occurrences took: 6.8472983837127686 seconds
Retrieving common products took 2.519829511642456 seconds
1 common products in products_train and sessions_train, for partitions: [0], [0]


Unnamed: 0,id,part_prod,row_prod,part_sess,row_sess,count_sess
0,B06XKPB3GT,0,13,0,209,1


In [15]:
common_products[common_products['count_sess'] > 1]

Unnamed: 0,id,part_prod,row_prod,part_sess,row_sess,count_sess


## User-based Collaborative Filtering
User/session-based collaborative filtering is a recommendation technique that uses the similarities between items to recommend similar products to users. For our case, item-based collaborative filtering can be used to recommend products based on their similarity in terms of features like product title, description, brand, or price. The intuition is that if a user engaged with a particular item, they are more likely to engage with similar items.

<img src='../../img/cf.png' width=600>

In [16]:
# Model 1
def SessionBasedCFModel1(products_train, sessions_train, hyperparams, top_n):
    def create_interaction_matrix(sessions_train):
        interactions = sessions_train.compute().groupby('session_id')['prev_items'].apply(lambda x: ','.join(x)).reset_index()
        interaction_matrix = interactions['prev_items'].str.get_dummies(sep=',')
        interaction_matrix.index = interactions['session_id']
        return interaction_matrix

    def calculate_similarity(interaction_matrix, metric):
        similarity_matrix = 1 - squareform(pdist(interaction_matrix, metric))
        similarity_df = pd.DataFrame(similarity_matrix, index=interaction_matrix.index, columns=interaction_matrix.index)
        return similarity_df

    def get_top_k_similar_sessions(similarity_df, k):
        top_k_similar_sessions = similarity_df.apply(lambda x: x.nlargest(k + 1).iloc[1:], axis=1)
        return top_k_similar_sessions

    def get_recommendations(top_k_similar_sessions, interaction_matrix):
        epsilon = 1e-9  # Small positive constant to avoid 0 values in similar_sessions
        session_similarity_dict = defaultdict(dict)
        for session_id, similar_sessions in top_k_similar_sessions.iterrows():
            similar_sessions = similar_sessions + epsilon  # Add epsilon to similar_sessions
            rec_items = interaction_matrix.loc[similar_sessions.index].apply(lambda x: np.dot(x, similar_sessions), axis=0)
            sorted_recommendations = rec_items.sort_values(ascending=False).head(top_n + 1).index.tolist()
            session_similarity_dict[session_id] = {item: rec_items[item] for item in sorted_recommendations if interaction_matrix.at[session_id, item] == 0}
        return session_similarity_dict

    interaction_matrix = create_interaction_matrix(sessions_train)
    epsilon = 1e-9  # Small positive constant to avoid 0 values in interaction_matrix
    interaction_matrix = interaction_matrix + epsilon  # Add epsilon to interaction_matrix
    # Filter out sessions with no interactions
    interaction_matrix = interaction_matrix.loc[(interaction_matrix != 0).any(axis=1)]
    similarity_df = calculate_similarity(interaction_matrix, hyperparams['similarity_metric'])
    top_k_similar_sessions = get_top_k_similar_sessions(similarity_df, hyperparams['top_k'])
    session_similarity_dict = get_recommendations(top_k_similar_sessions, interaction_matrix)

    return session_similarity_dict, interaction_matrix


# Function to view recommendations
def view_recs1(session_similarity_dict, interaction_matrix, prod_to_rec, n):
    # Check if the product exists in the interaction_matrix
    if prod_to_rec not in interaction_matrix.columns:
        print(f'{prod_to_rec} not found in the training data')
        return

    # Find sessions that interacted with the given product
    related_sessions = interaction_matrix[interaction_matrix[prod_to_rec] == 1].index

    # If no session interacted with the given product
    if len(related_sessions) == 0:
        print(f'No sessions found with {prod_to_rec} in the training data')
        return

    # Aggregate recommendations from the related sessions
    rec_dict = defaultdict(float)
    for session_id in related_sessions:
        recommendations = session_similarity_dict[session_id]
        for item, score in recommendations.items():
            rec_dict[item] += score

    # Get the top n recommendations
    top_n_recommendations = sorted(rec_dict.items(), key=lambda x: x[1], reverse=True)[:n]

    # Create a DataFrame to display the results
    df_recommendations = pd.DataFrame(top_n_recommendations, columns=["related_products", "score"])

    return df_recommendations

In [17]:
# Model 2
def create_interaction_matrix(sessions_train):
    interactions = sessions_train.compute().groupby('session_id')['prev_items'].apply(lambda x: ','.join(x)).reset_index()
    interaction_matrix = interactions['prev_items'].str.get_dummies(sep=',')
    interaction_matrix.index = interactions['session_id']
    return csr_matrix(interaction_matrix)

def calculate_similarity(interaction_matrix, metric):
    similarity_matrix = 1 - pairwise_distances(interaction_matrix, metric=metric, n_jobs=-1)
    return pd.DataFrame(similarity_matrix, index=interaction_matrix.index, columns=interaction_matrix.index)

def get_top_k_similar_sessions(similarity_df, k):
    top_k_indices = np.argpartition(similarity_df.values, -k-1, axis=1)[:, -k-1:-1]
    top_k_similar_sessions = pd.DataFrame(top_k_indices, index=similarity_df.index, columns=[f"Top{k+1}" for k in range(k)])
    return top_k_similar_sessions

def get_recommendations(top_k_similar_sessions, interaction_matrix, top_n):
    session_similarity_dict = defaultdict(dict)
    for session_id, similar_sessions in top_k_similar_sessions.iterrows():
        rec_items = interaction_matrix[similar_sessions.values].dot(interaction_matrix[session_id].T).toarray().sum(axis=0)
        top_item_indices = np.argpartition(rec_items, -top_n-1)[-top_n-1:-1]
        session_similarity_dict[session_id] = {item: rec_items[item] for item in top_item_indices if interaction_matrix[session_id, item] == 0}
    return session_similarity_dict

def SessionBasedCFModel2(products_train, sessions_train, hyperparams, top_n):
    interaction_matrix = create_interaction_matrix(sessions_train)
    similarity_df = calculate_similarity(interaction_matrix, hyperparams['similarity_metric'])
    top_k_similar_sessions = get_top_k_similar_sessions(similarity_df, hyperparams['top_k'])
    session_similarity_dict = get_recommendations(top_k_similar_sessions, interaction_matrix, top_n)
    return session_similarity_dict, interaction_matrix


# Function to view recommendations
def view_recs2(session_similarity_dict, interaction_matrix, prod_to_rec, n):
    related_sessions = interaction_matrix[:, interaction_matrix.columns.get_loc(prod_to_rec)].nonzero()[0]

    if len(related_sessions) == 0:
        print(f'{prod_to_rec} not found in the training data')
        return

    rec_dict = defaultdict(float)
    for session_id in related_sessions:
        recommendations = session_similarity_dict[session_id]
        for item, score in recommendations.items():
            rec_dict[item] += score

    top_n_recommendations = sorted(rec_dict.items(), key=lambda x: x[1], reverse=True)[:n]
    df_recommendations = pd.DataFrame(top_n_recommendations, columns=["related_products", "score"])
    return df_recommendations

In [18]:
# Hyperparams for SessionBasedCFModel
sbcf_hyperparams = {
    'similarity_metric': 'cosine',  # or 'jaccard', 'correlation', etc.
    'top_k': 5
}


# Specify product to get recommendations for
prod_to_rec = 'B06XKPB3GT'


# Train and predict
sbcf_similarity, sbcf_interaction_matrix = SessionBasedCFModel1(products_train, sessions_train, sbcf_hyperparams, NUM_RECOMMENDATIONS)


# View recommendations
sbcf_recos_df = view_recs1(sbcf_similarity, sbcf_interaction_matrix, prod_to_rec, NUM_RECOMMENDATIONS)
print(f'Recommendations for {prod_to_rec}:')
sbcf_recos_df

No sessions found with B06XKPB3GT in the training data
Recommendations for B06XKPB3GT:
