In [None]:
# The main purpose of this .ipynb file is to train an ALS model to return the top X products
# for each user in our population of interest. A pickle file containing all user_id & product_id
# will be returned.

In [1]:
import numpy as np
import pandas as pd
import sys
import subprocess
import random
import time
import gc
import pickle

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.ml.recommendation import ALS
from pyspark.mllib.recommendation import Rating
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from collections import defaultdict
from pyspark.ml.feature import *
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql import SQLContext
from pyspark import SparkContext


#Create PySpark SparkSession
SparkContext.setSystemProperty('spark.executor.memory', '64g')
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .config("spark.driver.memory", "64g")\
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled","true")
sc = SparkContext.getOrCreate() 
sqlContext = SQLContext(sc)
sc.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/10 03:43:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Saving and Loading 0-based User Index

In [None]:
# # convert user_id from a 1-based index to a 0-based index. dictionary is saved
# # for future use.

# orders = pd.read_csv("orders.csv")
# all_users = orders.user_id.unique()
# all_users.sort()
# user_study = np.random.choice(all_users, len(all_users)//2, replace=False)
# user_study.sort()
# print (len(user_study))
# dict_user_study = dict(zip(user_study, range(len(user_study))))

# file_name = "user_study_dict_100k.pkl"
# open_file = open(file_name, "wb")
# pickle.dump(dict_user_study, open_file)
# open_file.close()

In [2]:
#load in 0-based user index map

# file_name = 'user_study_dict_ALLUSERS.pkl'

# open_file = open(file_name, "rb")
# user_study_dict = pickle.load(open_file)
# open_file.close()
# study_indices = list(user_study_dict.keys())

# file_name = 'user_study_dict_40k.pkl'

# open_file = open(file_name, "rb")
# user_study_dict = pickle.load(open_file)
# open_file.close()
# study_indices = list(user_study_dict.keys())

# file_name = 'user_study_dict_100k.pkl'

# open_file = open(file_name, "rb")
# user_study_dict = pickle.load(open_file)
# open_file.close()
# study_indices = list(user_study_dict.keys())

file_name = 'user_study_dict_20k.pkl'

open_file = open(file_name, "rb")
user_study_dict = pickle.load(open_file)
open_file.close()
study_indices = list(user_study_dict.keys())

In [4]:
# create a pyspark dataframe to limit the sample of users we are considering for reccommendations

study_indices = [str(x) for x in list(user_study_dict.keys())]
R = Row('user_id')
study_indices_df = spark.createDataFrame([R(x) for x in study_indices])

#### Loading Historical Data

In [5]:
# loading in all order history from user population

# df_spark_orders_full = sqlContext.read.csv("orders.csv", header=True).withColumn("order_number", 
#                                                             col("order_number").cast("int"))

df_spark_previous = sqlContext.read.csv("order_products__prior.csv", header=True)
df_spark_train = sqlContext.read.csv("order_products__train.csv", header=True)
df_spark_orders = sqlContext.read.csv("orders.csv", header=True).select('order_id', 'user_id')


df_spark_previous_joined = df_spark_previous.join(df_spark_orders, 'order_id', 'left')
df_spark_previous_joined = df_spark_previous_joined.join(study_indices_df, how='inner', on='user_id')

df_spark_train_joined = df_spark_train.join(df_spark_orders, 'order_id', 'left')
df_spark_train_joined = df_spark_train_joined.join(study_indices_df, how='inner', on='user_id')

print ('done reading data')

done reading data


In [6]:
# Grouping all product purchases by user_id. These dataframes will train and test 
# the ALS model.

df_train_reorder_count = df_spark_train_joined.groupBy("user_id", "product_id").agg(F.count('reordered').alias('reorder_count'))
df_previous_reorder_count = df_spark_previous_joined.groupBy("user_id", "product_id").agg(F.count('reordered').alias('reorder_count'))

In [7]:
# Creating a StringIndexer for user_id and product_id. Converting all data to use new indexes. handleInvalid=keep
# StringIndexer encodes a string column of labels to a column of label indices. If the input column is numeric, 
# we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by 
# label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType.

# .setHandleInvalid("keep") option adds new indexes when it sees new labels.

indexer_user = StringIndexer(inputCol="user_id", outputCol="user_index", handleInvalid='keep').fit(df_previous_reorder_count)
indexer_product = StringIndexer(inputCol="product_id", outputCol="product_index", handleInvalid='keep').fit(df_previous_reorder_count)

df_previous_reorder_count_indexed = indexer_product.transform(indexer_user.transform(df_previous_reorder_count))
df_train_reorder_count_indexed = indexer_product.transform(indexer_user.transform(df_train_reorder_count))

# creating a map of the user_id and what they actually purchased in their last order
true_product_list_train = df_train_reorder_count_indexed.select("user_index", "product_index").groupby("user_index").agg(F.expr('collect_list(product_index) AS true_priority'))

# saving user_index for train, validation sets for later use
train_users = df_train_reorder_count_indexed.select("user_index").distinct()
prior_users = df_previous_reorder_count_indexed.select("user_index").distinct()

# saving product_index for later use
all_products = df_previous_reorder_count_indexed.select("product_index").distinct()

                                                                                

In [8]:
# saving index map to convert StringIndexer index back to original index later. Creating 3 dataframes to store the maps for
# train users, prior users and all products

user_labels = indexer_user.labels
product_labels = indexer_product.labels

user_id_to_label = IndexToString(inputCol='user_index', outputCol='user_id', labels=user_labels)
product_id_to_label = IndexToString(inputCol='product_index', outputCol='product_id', labels=product_labels)

# train users

train_users_with_labels = user_id_to_label.transform(train_users)
df_train_users = train_users_with_labels.toPandas()

# prior users

prior_users_with_labels = user_id_to_label.transform(prior_users)
df_prior_users = prior_users_with_labels.toPandas()

# all products

products_with_labels = product_id_to_label.transform(all_products)
df_products_with_labels = products_with_labels.toPandas()

                                                                                

#### Run if Training and Model Save is Needed

In [9]:
rank = 250
alpha = 5.0
regParam = 0.5

maxIter = 2
numUserBlocks = 10
numItemBlocks = 10
implicitPrefs = True
nonnegative = False
userCol = "user_index"
itemCol = "product_index"
ratingCol = "reorder_count"

als = ALS(maxIter=maxIter,numUserBlocks=numUserBlocks, numItemBlocks=numItemBlocks,userCol=userCol, itemCol=itemCol, ratingCol=ratingCol, 
implicitPrefs=implicitPrefs, rank=rank, alpha=alpha, regParam=regParam)
model = als.fit(df_previous_reorder_count_indexed)

model.write().overwrite().save("ALS_model_rank250_20k.model") #10k sample

                                                                                

#### Load Model "ALS_model_rank250_count_comparison.model"

In [None]:
# model = ALSModel.load("ALS_model_rank350_allusers.model")

#### Extracting User and Item Latent Factors

In [None]:
# df_itemFactors = (model.itemFactors).toPandas()
# print (df_itemFactors.iloc[0]['features'].shape)
# print (df_itemFactors.shape)

# df_userFactors = model.userFactors.toPandas()
# print (df_userFactors.iloc[0]['features'].shape)
# print (df_userFactors.shape)

#### Top 15 Recommendations per User (Validation Set)

In [10]:
recommendations_train = model.recommendForUserSubset(train_users, 15)
predicted_priority_train = recommendations_train.select('user_index', 'recommendations.product_index')
predicted_priority_train = predicted_priority_train.join(true_product_list_train, ['user_index'], "left")
truth_and_prediction_train = predicted_priority_train.rdd.map(lambda row: (row[1], row[2]))
rank_metrics_train = RankingMetrics(truth_and_prediction_train)

print ('Train (MAP 15): ', rank_metrics_train.meanAveragePrecisionAt(15))
# print ('Train (MAP 100): ', rank_metrics_train.meanAveragePrecisionAt(100))

print ('Train (ndcg 15): ', rank_metrics_train.ndcgAt(15))
# print ('Train (ndcg 100): ', rank_metrics_train.ndcgAt(100))

                                                                                

Train (MAP 15):  0.17986596815863914


[Stage 277:>                                                        (0 + 1) / 1]

Train (ndcg 15):  0.3045305556835871


                                                                                

#### Top (50, 100, 150) per User for all Users

In [11]:
recommendations_train_cg = model.recommendForUserSubset(prior_users, 100)
predicted_priority_train_cg = recommendations_train_cg.select('user_index', 'recommendations.product_index')

predicted_priority_train_cg = predicted_priority_train_cg.join(true_product_list_train, ['user_index'], "left")
truth_and_prediction_train_cg = predicted_priority_train_cg.rdd.map(lambda row: (row[1], row[2]))
rank_metrics_train_cg = RankingMetrics(truth_and_prediction_train_cg)

#### Bringing back Original Index for Each User

predicted_priority_train_cg = user_id_to_label.transform(predicted_priority_train_cg)

df_predicted_priority_train_cg = predicted_priority_train_cg.toPandas()

                                                                                

In [None]:
# df_predicted_priority_train_cg.head()

In [None]:
# df_predicted_priority_train_cg_exploded = df_predicted_priority_train_cg.explode('product_index')

In [None]:
# df_predicted_priority_train_cg_exploded[df_predicted_priority_train_cg_exploded.user_index == 0]

#### Exploding Top X Predictions into Row-Level Data

#### Bringing back Original Index for Each Product

In [12]:
# expanding out the predictions, seen in the 'product_index' column, to become row-wise
df_predicted_priority_train_cg_exploded = df_predicted_priority_train_cg.explode('product_index')

print (df_predicted_priority_train_cg_exploded.shape, df_predicted_priority_train_cg.shape)

# df_predicted_priority_train_cg_exploded.head()

df_products_with_labels.product_index = df_products_with_labels.product_index.astype(int)

# df_products_with_labels[df_products_with_labels.product_index == 8596]

df_predicted_priority_train_cg_exploded = pd.merge(df_predicted_priority_train_cg_exploded, df_products_with_labels, how='left', on='product_index')

(2062000, 4) (20620, 4)


In [13]:
cg_exploded = df_predicted_priority_train_cg_exploded[["user_id", "product_id"]].copy()
cg_exploded.user_id = cg_exploded.user_id.astype(np.int64) 
cg_exploded.product_id = cg_exploded.product_id.astype(np.int64) 

In [14]:
# converting user_id and product_id back to 0-based index
cg_exploded.user_id = cg_exploded.user_id.map(user_study_dict)
cg_exploded.product_id = cg_exploded.product_id - 1

# format of this data is unique combinations of user_id, product_id
cg_exploded.to_pickle('cg_exploded_rank250_20k_100.pkl')

### Tuning

In [None]:
# need to time out rank 40 and rank 250 for all users

possible_ranks = [40, 250]

for rank_var in possible_ranks:
    rank = rank_var
    alpha = 5.0
    regParam = 0.5
    
    maxIter = 2
    numUserBlocks = 10
    numItemBlocks = 10
    implicitPrefs = True
    nonnegative = False
    userCol = "user_index"
    itemCol = "product_index"
    ratingCol = "reorder_count"
    tic1 = time.perf_counter()
    als = ALS(maxIter=maxIter,numUserBlocks=numUserBlocks, numItemBlocks=numItemBlocks,userCol=userCol, itemCol=itemCol, ratingCol=ratingCol, 
    implicitPrefs=implicitPrefs, rank=rank, alpha=alpha, regParam=regParam)
    model = als.fit(df_previous_reorder_count_indexed)
    toc1 = time.perf_counter()
    tic2 = time.perf_counter()
    recommendations_train = model.recommendForUserSubset(train_users, 15)
    toc2 = time.perf_counter()
    
    predicted_priority_train = recommendations_train.select('user_index', 'recommendations.product_index')
    predicted_priority_train = predicted_priority_train.join(true_product_list_train, ['user_index'], "left")
    truth_and_prediction_train = predicted_priority_train.rdd.map(lambda row: (row[1], row[2]))
    rank_metrics_train = RankingMetrics(truth_and_prediction_train)

    print ('Train (MAP 15): ', rank_metrics_train.meanAveragePrecisionAt(15))
    # print ('Train (MAP 100): ', rank_metrics_train.meanAveragePrecisionAt(100))

    print ('Train (ndcg 15): ', rank_metrics_train.ndcgAt(15))
    # print ('Train (ndcg 100): ', rank_metrics_train.ndcgAt(100))
    
    model.write().overwrite().save("ALS_model_rank" + str(rank_var) + "_allusers.model") #10k sample
    file1 = open("ALS_Rank_TimeTest.txt","a")
    L = ["rank: " + str(rank_var) + "\n",
        "Train Val MAP 15: " + str(rank_metrics_train.meanAveragePrecisionAt(15)) + "\n",
        "Train Val NDCG 15: " + str(rank_metrics_train.ndcgAt(15)) + "\n",
        f"Model trained in {toc1 - tic1:0.4f} seconds", 
         f"Predictions made in {toc2 - tic2:0.4f}", "\n \n"]
    file1.writelines(L)
    file1.close() #to change file access modes
#     model = None
#     del model
#     gc.collect()