## Environment Setup

In [1]:
!pip install pyspark py4j
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=a945660f31bf13feb15a3d6cfeb1a9ec7b053943b2c1a9e5d04afe5f39c5db20
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


## Lodaing The Required Libraries

In [104]:
import numpy as np
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StringIndexerModel
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from scipy.sparse import coo_matrix
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col
from pyspark.sql.functions import count
from pyspark.sql import Window
from pyspark.sql.functions import col, desc, row_number, udf
from pyspark.sql.types import FloatType
from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

## Loading The Dataset

In [4]:
filename = 'overall_df.csv'
overall_df = pd.read_csv(filename)
overall_df = overall_df.drop(columns=['Population', 'Post_Title', 'Upvotes'])
overall_df.rename(columns={'PageRank_Within_Community': 'PR_Community'}, inplace=True)
overall_df.head()

Unnamed: 0,Subreddit,Date,User_ID,Community_Label,PageRank,PR_Community,Category
0,AdviceAnimals,8/31/2022,Lost-My-Mind-,0,0.000407,0.001599,Humor
1,AdviceAnimals,8/31/2022,SgtDoughnut,0,0.000137,0.000528,Humor
2,AdviceAnimals,8/31/2022,JasonDJ,0,8.2e-05,0.000312,Humor
3,AdviceAnimals,8/31/2022,tacknosaddle,0,0.000199,0.000771,Humor
4,AdviceAnimals,8/31/2022,jezra,0,8.1e-05,0.000309,Humor


## Creating List Of Community Dataframes

- The aim of this section is to create a list of dataframes, where each dataframe represent a specific community. The reason behind doing this is: The collaborative filtering process will be focused on each community seperately.

In [5]:
def create_community_dataframes(df):
  community_dataframes = []

  # Grouping the nodes_df by the 'Community_Library' column
  grouped = df.groupby('Community_Label')

  # Iterating through each community and create separate DataFrames
  for community_id, community_nodes_df in grouped:
      # Removing the 'Community_Library' column from the DataFrame, as it's no longer needed
      community_nodes_df = community_nodes_df.drop(columns=['Community_Label'])

      # Appending the DataFrame to the list
      community_dataframes.append(community_nodes_df)

  return community_dataframes

In [6]:
community_dfs = create_community_dataframes(overall_df)
for df in community_dfs:
    df['Date'] = pd.to_datetime(df['Date'])
# Printing the first few nodes of a selected community DataFrame
print("\nCommunity X DataFrame:")
community_dfs[9].head(5)


Community X DataFrame:


Unnamed: 0,Subreddit,Date,User_ID,PageRank,PR_Community,Category
25616,AnimalsBeingDerps,2022-09-02,Sniflix,0.00018,0.002661,Animals and Pets
25618,AnimalsBeingDerps,2022-07-09,Sniflix,0.00018,0.002661,Animals and Pets
25634,AnimalsBeingDerps,2023-03-26,Sniflix,0.00018,0.002661,Animals and Pets
25700,AnimalsBeingDerps,2023-03-20,Sniflix,0.00018,0.002661,Animals and Pets
25941,AnimalsBeingDerps,2022-08-08,Sniflix,0.00018,0.002661,Animals and Pets


In [7]:
record_counts = [(i, len(df)) for i, df in enumerate(community_dfs)]
# Sort the list of tuples in descending order based on record counts
sorted_record_counts = sorted(record_counts, key=lambda x: x[1], reverse=True)
# Extract the top 3 DataFrames
top_3_dataframes = [community_dfs[i] for i, _ in sorted_record_counts[:5]]
# Print the number of records in each of the top 3 DataFrames
for i, count in sorted_record_counts[:3]:
    print(f"DataFrame {i}: {count} records")

DataFrame 0: 194345 records
DataFrame 4: 69365 records
DataFrame 15: 48567 records


In [8]:
communities_to_keep = [0, 4, 15]
community_dfs = [community_dfs[i] for i in communities_to_keep]
community_dfs[1].head(5)

Unnamed: 0,Subreddit,Date,User_ID,PageRank,PR_Community,Category
38,AdviceAnimals,2022-08-25,Technologhee,0.000191,0.001274,Humor
98,AdviceAnimals,2023-02-19,Technologhee,0.000191,0.001274,Humor
127,AdviceAnimals,2022-09-05,Technologhee,0.000191,0.001274,Humor
136,AdviceAnimals,2022-11-02,Technologhee,0.000191,0.001274,Humor
140,AdviceAnimals,2023-06-06,Technologhee,0.000191,0.001274,Humor


## Data Preprocessing

- The aim of this section I start figuring out how can I create a score/rate that best describes the user preference regarding a given subreddit.
- I first compute the weights of the following subreddit and category. Each one describes the following: On average how many categories/subreddits does a user view.
- I then compute the partial scores, which are engagement score for subreddit and for category. These partial scores describe or give more importance to the subreddits/categories that have been interacted with more by the user. For example if the user have performed an interaction in a subreddit X four times and in subreddit Y 2 times, then I give more importance to subreddit X than subreddit Y.  
- Finally, I compute the decay factor, where I give more importance to a subreddit that has been interacted with several times in a day by a given user than a product that has been viewed several times but across several days that are far apart by the same user. The time decay factor was calculated by followaing the log approach, in order to give importance to reecency of the view date. For instance if user x has viewed product y today and yesterday the value of the decay factor won't be the same as when user x viewed product y today and 3 months ago.

In [9]:
def calculate_subreddit_weight(df):
    # Grouping by 'User_ID' and count distinct 'Subreddit'
    grouped = df.groupby('User_ID')['Subreddit'].nunique()
    average_count = grouped.mean()
    return average_count

def calculate_category_weight(df):
    # Grouping by 'User_ID' and count distinct 'Category'
    grouped = df.groupby('User_ID')['Category'].nunique()
    average_count = grouped.mean()
    return average_count

In [10]:
def calculate_interaction_ratio_subreddit(df):
    subreddit_interactions = df.groupby(['User_ID', 'Subreddit']).size().reset_index(name='interactions')
    total_interactions = df.groupby('User_ID').size().reset_index(name='total_interactions')

    df = df.merge(subreddit_interactions, on=['User_ID', 'Subreddit'], how='left')
    df = df.merge(total_interactions, on='User_ID', how='left')

    df['Engagement_Score_Subreddit'] = df['interactions'] / df['total_interactions']
    df = df.drop(['interactions', 'total_interactions'], axis=1)
    return df

def calculate_interaction_ratio_category(df):
    category_interactions = df.groupby(['User_ID', 'Category']).size().reset_index(name='interactions')
    total_interactions = df.groupby('User_ID').size().reset_index(name='total_interactions')

    df = df.merge(category_interactions, on=['User_ID', 'Category'], how='left')
    df = df.merge(total_interactions, on='User_ID', how='left')

    df['Engagement_Score_Category'] = df['interactions'] / df['total_interactions']
    df = df.drop(['interactions', 'total_interactions'], axis=1)
    return df

In [11]:
def max_date(df):
    # Calculating the maximum date for each user-product combination
    max_date_df = df.groupby(['User_ID', 'Subreddit'])['Date'].max().reset_index()
    max_date_df.rename(columns={'Date': 'Last_Interaction_Date'}, inplace=True)

    # Joining the max_date_df with the original df
    df_with_max_date = df.merge(max_date_df, on=['User_ID', 'Subreddit'])
    return df_with_max_date

def calculate_decay_factor(row):
    half_life_days = 7
    days_diff = abs((pd.to_datetime(row['Date']) - pd.to_datetime(row['Last_Interaction_Date'])).days)
    if days_diff == 0:
        return 1
    return 1 / np.log1p(1 + (days_diff / half_life_days))

In [12]:
subreddit_weight = []
for df in community_dfs:
    subreddit_weight.append(calculate_subreddit_weight(df))

category_weight = []
for df in community_dfs:
    category_weight.append(calculate_category_weight(df))

In [13]:
temp_dfs_1 = []
for df in community_dfs:
    temp_dfs_1.append(calculate_interaction_ratio_subreddit(df))

temp_dfs_2 = []
for df in temp_dfs_1:
    temp_dfs_2.append(calculate_interaction_ratio_category(df))

In [14]:
max_date_dfs = []
for df in temp_dfs_2:
    max_date_dfs.append(max_date(df))
for df in max_date_dfs:
    df['Decay_Factor'] = df.apply(calculate_decay_factor, axis=1)
overall_dfs = max_date_dfs.copy()
overall_dfs[1].head()

Unnamed: 0,Subreddit,Date,User_ID,PageRank,PR_Community,Category,Engagement_Score_Subreddit,Engagement_Score_Category,Last_Interaction_Date,Decay_Factor
0,AdviceAnimals,2022-08-25,Technologhee,0.000191,0.001274,Humor,0.5625,0.5625,2023-07-01,0.260768
1,AdviceAnimals,2023-02-19,Technologhee,0.000191,0.001274,Humor,0.5625,0.5625,2023-07-01,0.329197
2,AdviceAnimals,2022-09-05,Technologhee,0.000191,0.001274,Humor,0.5625,0.5625,2023-07-01,0.263138
3,AdviceAnimals,2022-11-02,Technologhee,0.000191,0.001274,Humor,0.5625,0.5625,2023-07-01,0.278137
4,AdviceAnimals,2023-06-06,Technologhee,0.000191,0.001274,Humor,0.5625,0.5625,2023-07-01,0.58219


## Score Calculation

- In this section I compute the implicit score by firstly computing the following equation: (subreddit weight * engagement score subreddit + category weight * engagement score category)*decay factor. The result of this equation is then used to compute the actual implicit score by integrating the pagerank score. Alpha is a parameter that controls the balance between the Total_Score and the PageRank scores. Higher alpha gives more weight to pagerank scores, making the recommendations more influenced by user importance in the community. I then compute the lower and upper bounds based on the interquartile range (IQR) and capping the Implicit_Score values that fall outside these bounds. This helps in mitigating the impact of outliers on your recommendations.

In [15]:
def calculate_implicit_score(df, subreddit_weight, category_weight, alpha):
    df['Total_Score'] = (subreddit_weight * df['Engagement_Score_Subreddit'] + category_weight * df['Engagement_Score_Category'])* df['Decay_Factor']
    df['Implicit_Score'] = (1 - alpha) * df['Total_Score'] + alpha * df['PR_Community']
    lower_quartile = df['Implicit_Score'].quantile(0.25)
    upper_quartile = df['Implicit_Score'].quantile(0.75)
    iqr = upper_quartile - lower_quartile

    # Calculating lower and upper bounds
    lower_bound = lower_quartile - 1.5 * iqr
    upper_bound = upper_quartile + 1.5 * iqr

    # Setting scores below lower bound to lower bound, and scores above upper bound to upper bound
    df['Implicit_Score'] = df['Implicit_Score'].apply(lambda x: lower_bound if x < lower_bound else x)
    df['Implicit_Score'] = df['Implicit_Score'].apply(lambda x: upper_bound if x > upper_bound else x)
    final_df = df.copy()
    return final_df

In [16]:
alpha = 0.7
main_dfs = []
for community_idx in range(len(overall_dfs)):
    community_df = overall_dfs[community_idx]
    community_subreddit_weight = subreddit_weight[community_idx]
    community_category_weight = category_weight[community_idx]
    implicit_score_df = calculate_implicit_score(community_df, community_subreddit_weight, community_category_weight, alpha)
    main_dfs.append(implicit_score_df)
main_dfs[1].head()

Unnamed: 0,Subreddit,Date,User_ID,PageRank,PR_Community,Category,Engagement_Score_Subreddit,Engagement_Score_Category,Last_Interaction_Date,Decay_Factor,Total_Score,Implicit_Score
0,AdviceAnimals,2022-08-25,Technologhee,0.000191,0.001274,Humor,0.5625,0.5625,2023-07-01,0.260768,0.312415,0.094616
1,AdviceAnimals,2023-02-19,Technologhee,0.000191,0.001274,Humor,0.5625,0.5625,2023-07-01,0.329197,0.394398,0.119211
2,AdviceAnimals,2022-09-05,Technologhee,0.000191,0.001274,Humor,0.5625,0.5625,2023-07-01,0.263138,0.315255,0.095468
3,AdviceAnimals,2022-11-02,Technologhee,0.000191,0.001274,Humor,0.5625,0.5625,2023-07-01,0.278137,0.333225,0.100859
4,AdviceAnimals,2023-06-06,Technologhee,0.000191,0.001274,Humor,0.5625,0.5625,2023-07-01,0.58219,0.6975,0.210142


## String To Index (User & Subreddit)

In [17]:
columns_to_select = ['Subreddit', 'Date', 'User_ID', 'Implicit_Score']
spark_dfs = []
for main_df in main_dfs:
    # Converting each Pandas DataFrame to a PySpark DataFrame
    spark_df = spark.createDataFrame(main_df[columns_to_select])
    spark_dfs.append(spark_df)
spark_dfs[1].show(5)

+-------------+-------------------+------------+-------------------+
|    Subreddit|               Date|     User_ID|     Implicit_Score|
+-------------+-------------------+------------+-------------------+
|AdviceAnimals|2022-08-25 00:00:00|Technologhee|0.09461639541283831|
|AdviceAnimals|2023-02-19 00:00:00|Technologhee|0.11921113788728874|
|AdviceAnimals|2022-09-05 00:00:00|Technologhee|0.09546824532435702|
|AdviceAnimals|2022-11-02 00:00:00|Technologhee|0.10085922324338634|
|AdviceAnimals|2023-06-06 00:00:00|Technologhee|0.21014163032513997|
+-------------+-------------------+------------+-------------------+
only showing top 5 rows



In [18]:
def string_to_index(dfs, val, val_date):
    dfs_indexed = []
    string_indexer_models_list = []
    for df in dfs:
      indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in df.columns if column not in {val, val_date}]
      pipeline = Pipeline(stages=indexers)
      pipeline_model = pipeline.fit(df)
      df_indexed = pipeline_model.transform(df)
      string_indexer_models = [stage for stage in pipeline_model.stages if isinstance(stage, StringIndexerModel)]
      dfs_indexed.append(df_indexed)
      string_indexer_models_list.append(string_indexer_models)
    print("StringToIndex completed")
    return dfs_indexed, string_indexer_models_list

In [19]:
dfs_idx = []
string_idx_models_list = []
dfs_idx, string_idx_models_list = string_to_index(spark_dfs, 'Date', 'Implicit_Score')
dfs_idx[1].show(5)

StringToIndex completed
+-------------+-------------------+------------+-------------------+---------------+-------------+
|    Subreddit|               Date|     User_ID|     Implicit_Score|Subreddit_index|User_ID_index|
+-------------+-------------------+------------+-------------------+---------------+-------------+
|AdviceAnimals|2022-08-25 00:00:00|Technologhee|0.09461639541283831|           13.0|        129.0|
|AdviceAnimals|2023-02-19 00:00:00|Technologhee|0.11921113788728874|           13.0|        129.0|
|AdviceAnimals|2022-09-05 00:00:00|Technologhee|0.09546824532435702|           13.0|        129.0|
|AdviceAnimals|2022-11-02 00:00:00|Technologhee|0.10085922324338634|           13.0|        129.0|
|AdviceAnimals|2023-06-06 00:00:00|Technologhee|0.21014163032513997|           13.0|        129.0|
+-------------+-------------------+------------+-------------------+---------------+-------------+
only showing top 5 rows



## Splitting The Data Into Train & Test Sets

In [81]:
def interactions_per_user(dfs, val):
    user_counts_list = []
    i = 0
    for df in dfs:
      # grouping by user_id and count number of interactions
      user_counts = df.groupBy('User_ID').agg(count('*').alias('interactions'))
      # filtering out users with less than 50 interactions
      user_counts = user_counts.filter(col('interactions') >= val)
      user_counts_list.append(user_counts)
      record_count = user_counts.count()
      print(f"Total record count {i}:", record_count)
      i= i+1
    return user_counts_list

In [88]:
def split_data(dfs, val_list):
    train_list = []
    test_list = []
    for df, val in zip(dfs, val_list):
        # Joining back to the original data to get recent interactions for each user
        recent_interactions = df.join(val, on='User_ID') \
                                .orderBy(['User_ID', 'Date'], ascending=[True, False]) \
                                .withColumn('rank', row_number().over(Window.partitionBy('User_ID').orderBy(desc('Date')))) \
                                .filter(col('rank') == 1) \
                                .select('User_ID', 'Subreddit', 'User_ID_index', 'Subreddit_index', 'Implicit_Score', 'Date')

        # Splitting data into train and test sets
        train = df.join(recent_interactions, on=['User_ID', 'Subreddit', 'User_ID_index', 'Subreddit_index', 'Implicit_Score', 'Date'], how='left_anti')
        test = recent_interactions.select('User_ID', 'Subreddit', 'User_ID_index', 'Subreddit_index', 'Implicit_Score', 'Date')

        train_list.append(train)
        test_list.append(test)

    print("split_data completed")
    return train_list, test_list

In [83]:
def set_size(df_list, df_train_list, df_test_list):
    for i, df in enumerate(df_list):
        df_size1 = df.count()
        print(f"Size of the entire dataset {i + 1}:", df_size1)

    for i, df_train in enumerate(df_train_list):
        df_size2 = df_train.count()
        print(f"Size of the train set {i + 1}:", df_size2)

    for i, df_test in enumerate(df_test_list):
        df_size3 = df_test.count()
        print(f"Size of the test set {i + 1}:", df_size3)

In [89]:
nb_of_users = []
train_dfs = []
test_dfs = []
nb_of_users = interactions_per_user(dfs_idx, 50)
train_dfs, test_dfs = split_data(dfs_idx, nb_of_users)
set_size(dfs_idx, train_dfs, test_dfs)

Total record count 0: 1352
Total record count 1: 577
Total record count 2: 456
split_data completed
Size of the entire dataset 1: 194345
Size of the entire dataset 2: 69365
Size of the entire dataset 3: 48567
Size of the train set 1: 192850
Size of the train set 2: 68708
Size of the train set 3: 48041
Size of the test set 1: 1352
Size of the test set 2: 577
Size of the test set 3: 456


## Normalizing The Scores

In [101]:
def normalize_score(dfs):
    score_normalized_dfs = []
    for df in dfs:
      min_score = df.agg({"Implicit_Score": "min"}).collect()[0][0]
      max_score = df.agg({"Implicit_Score": "max"}).collect()[0][0]

      # Defining the UDF for normalization
      normalize_udf = udf(lambda x: (x - min_score) / (max_score - min_score))
      score_normalized_df = df.withColumn("Score_Normalized", normalize_udf(col("Implicit_Score")))
      score_normalized_dfs.append(score_normalized_df)
    print("normalize_score completed")
    return score_normalized_dfs

In [102]:
train_norm_dfs = normalize_score(train_dfs)
test_norm_dfs = normalize_score(test_dfs)

normalize_score completed
normalize_score completed


In [105]:
for i in range(len(train_norm_dfs)):
    train_norm_dfs[i] = train_norm_dfs[i].withColumn("Score_Normalized", train_norm_dfs[i]["Score_Normalized"].cast(FloatType()))

for i in range(len(test_norm_dfs)):
    test_norm_dfs[i] = test_norm_dfs[i].withColumn("Score_Normalized", test_norm_dfs[i]["Score_Normalized"].cast(FloatType()))

## ALS Model Fitting

In [124]:
model_path = "/content/models_ALS"

In [107]:
def als_models(ranks, iterations, regularization, u_col, i_col, r_col):
    als_list = []
    idx = 0
    for k in ranks:
        for itr in iterations:
            for reg_p in regularization:
                    model_name = f"model_{idx}"
                    idx = idx + 1
                    als_model = (ALS(maxIter=itr,regParam = reg_p, rank = k, implicitPrefs = True, userCol= u_col,
                                     itemCol= i_col, ratingCol = r_col, coldStartStrategy="drop",nonnegative=True))
                    als_list.append((model_name, als_model))
    print("list_of_models_complete")
    return als_list

In [120]:
def model_fit(df_train, models, idx):
    for name, model in models:
        fitted = model.fit(df_train)
        fitted.save(f"{model_path}/train_df_{idx}/{name}")
        print(name, " complete")
    print("model_fit complete")

In [122]:
ranks = [3, 5, 10]
maxIters = [10, 20]
regParams = [0.05, 0.1]
models_list = []
for train_norm_df in train_norm_dfs:
  models = als_models(ranks, maxIters, regParams, "User_ID_index", "Subreddit_index", "Score_Normalized")
  models_list.append(models)

list_of_models_complete
list_of_models_complete
list_of_models_complete


In [125]:
idx = 0
for train_norm_df, models in zip(train_norm_dfs, models_list):
  model_fit(train_norm_df, models, idx)
  idx = idx + 1

model_0  complete
model_1  complete
model_2  complete
model_3  complete
model_4  complete
model_5  complete
model_6  complete
model_7  complete
model_8  complete
model_9  complete
model_10  complete
model_11  complete
model_fit complete
model_0  complete
model_1  complete
model_2  complete
model_3  complete
model_4  complete
model_5  complete
model_6  complete
model_7  complete
model_8  complete
model_9  complete
model_10  complete
model_11  complete
model_fit complete
model_0  complete
model_1  complete
model_2  complete
model_3  complete
model_4  complete
model_5  complete
model_6  complete
model_7  complete
model_8  complete
model_9  complete
model_10  complete
model_11  complete
model_fit complete
