# ALS

In [1]:
# set the environment path to find Recommenders
import sys
import pandas as pd
from matplotlib import pyplot as plt
import numpy as np
import seaborn as sns
import sys
import pandas as pd

from functools import wraps
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.ml.tuning import CrossValidator
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import FloatType, IntegerType, LongType, StringType

from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.evaluation.spark_evaluation import SparkRankingEvaluation, SparkRatingEvaluation, SparkDiversityEvaluation
from recommenders.tuning.parameter_sweep import generate_param_grid
from recommenders.datasets.spark_splitters import spark_random_split

from pyspark.sql.window import Window

from recommenders.utils.timer import Timer

print("System version: {}".format(sys.version))
print("Pandas version: {}".format(pd.__version__))
print("PySpark version: {}".format(pyspark.__version__))

System version: 3.6.13 | packaged by conda-forge | (default, Sep 23 2021, 07:55:15) 
[GCC Clang 11.1.0]
Pandas version: 1.1.5
PySpark version: 2.4.8


In [2]:
#utils

# results table
cols = ["Data", "Algo", "K", "Train time (s)","Predicting time (s)", "RMSE", "MAE", "R2", "Explained Variance", "Recommending time (s)", "MAP", "nDCG@k", "Precision@k", "Recall@k","Diversity","Novelty","Distributional coverage","Catalog coverage"]
df_results = pd.DataFrame(columns=cols)

def generate_summary(data, algo, k, train_time, rating_time, rating_metrics, ranking_time, ranking_metrics, diversity_metrics):
    summary = {"Data": data, "Algo": algo, "K": k, "Train time (s)": train_time, "Predicting time (s)": rating_time, "Recommending time (s)":ranking_time}
    if rating_metrics is None:
        rating_metrics = {
            "RMSE": np.nan,
            "MAE": np.nan,
            "R2": np.nan,
            "Explained Variance": np.nan,
        }
    if ranking_metrics is None:
        ranking_metrics = {
            "MAP": np.nan,
            "nDCG@k": np.nan,
            "Precision@k": np.nan,
            "Recall@k": np.nan,
        }
    if diversity_metrics is None:
        diversity_metrics = {
        "Diversity": np.nan,
        "Novelty": np.nan,
        "Distributional coverage": np.nan,
        "Catalog coverage": np.nan,
    }
    summary.update(diversity_metrics)
    summary.update(rating_metrics)
    summary.update(ranking_metrics)

    return summary


def convert_timestamp(datetime):
    date_string = str(datetime)
    date = datetime.datetime.strptime(date_string, "%m/%d/%Y")
    timestamp = datetime.datetime.timestamp(date)
    return(timestamp)

def preprocess_data(df):
    # Convert the float precision to 32-bit in order to reduce memory consumption 
    df.loc[:, header["col_rating"]] = df[header["col_rating"]].astype(np.float32)
    # Convert to timestamp
    df[header["col_timestamp"]] = df[header["col_timestamp"]].apply(lambda x: int(pd.to_datetime(pd.Timestamp(x), unit='s').strftime('%s')))
    # map col_user to int 
    df[header["col_user"]] = df[header["col_user"]].map(map_customer)
    # map col_item to int
    df.loc[:, header["col_item"]] = df[header["col_item"]].astype(int)
    # select only relevant cols
    df = df[[header["col_user"],header["col_item"],header["col_rating"],header["col_timestamp"]]]
    return df 

def timing(f):
    @wraps(f)
    def wrap(*args, **kw):
        ts = time()
        result = f(*args, **kw)
        te = time()
        arg = args[0] if len(args)>=1 else "" 
        print('func:%r  took: %2.4f sec' % \
          (f.__name__, te-ts))
        return result
    return wrap


# 0. Config params

In [3]:
# table results 
algo = "als"
ranking_metrics = None
rating_metrics = None
diversity_metrics = None
train_time = np.nan
rating_time = np.nan
ranking_time = np.nan

# column name 
header = {
    "col_user": "customer_id",
    "col_item": "variant_id",
    "col_rating": "quantity",
    "col_timestamp": "order_date",
    "col_prediction": "prediction",
}

# top k
TOP_K = 10

################ TO MODIFY ################

# date size with 3 choices : "100k","1M" and "all"
data_size = "1M"
# load splitted data 
load_splitted_data = True 

################ TO MODIFY ################

# 1. Data

# 1.1 Load data 

In [4]:
########### TO MODIFY ###########
def load_data(data_size):
    path = ""
    if data_size=="100k":
        path = '../../data/transaction_100k_df.pkl'
    elif data_size=="1M":
        path = '../../data/transaction_1M_df.pkl'
    elif data_size=="all":
        path = '../../data/transaction_all_df.pkl'
    
    if path != "":
        return pd.read_pickle(path)
    else :
        print("Please choose between 100k, 1M and all")
########### TO MODIFY ###########

In [5]:
# 2 ways to load the data
if not load_splitted_data : 
    # data not splitted 
    data = load_data(data_size)
else :
    # or  use stored splitted data to make it faster
    train = pd.read_pickle(f"../../data/train_{data_size}_df.pkl")
    test = pd.read_pickle(f"../../data/test_{data_size}_df.pkl")
    train.shape[0], test.shape[0]

## 1.2 Split the data ( skip if load_splitted_data )

In [6]:
# chrono split but it is really slow ( +1h to split 8M data ) 
if not load_splitted_data :
    train, test = python_chrono_split(data,
                                      ratio=0.75,
                                      col_user=header["col_user"],
                                      col_item=header["col_item"],
                                      col_timestamp = header["col_timestamp"]
                                     )
    train.to_pickle(f"../../data/train_{data_size}_df.pkl")
    test.to_pickle(f"../../data/test_{data_size}_df.pkl")
    train.shape[0], test.shape[0]

In [7]:
print("""
Train:
Total Ratings: {train_total}
Unique Users: {train_users}
Unique Items: {train_items}

Test:
Total Ratings: {test_total}
Unique Users: {test_users}
Unique Items: {test_items}
""".format(
    train_total=len(train),
    train_users=len(train[header["col_user"]].unique()),
    train_items=len(train[header["col_item"]].unique()),
    test_total=len(test),
    test_users=len(test[header["col_user"]].unique()),
    test_items=len(test[header["col_item"]].unique()),
))


Train:
Total Ratings: 749444
Unique Users: 23079
Unique Items: 6733

Test:
Total Ratings: 250532
Unique Users: 23079
Unique Items: 5820



## 1.3 Process data

In [8]:
i=0
map_customer = {}
for customer in train[header["col_user"]]:
    if customer not in map_customer:
        map_customer[customer]=i
        i+=1
train = preprocess_data(train)
test = preprocess_data(test)

## 1.4 Transform into spark data

In [9]:
spark = start_or_get_spark("ALS Deep Dive", memory="16g")

mySchema = StructType(
    (
        StructField(header["col_user"], IntegerType()),
        StructField(header["col_item"], IntegerType()),
        StructField(header["col_rating"], FloatType()),
        StructField(header["col_timestamp"], LongType()),
    )
)

train_spark = spark.createDataFrame(train,schema=mySchema)
test_spark = spark.createDataFrame(test,schema=mySchema)

# 2. Model

## 2.1 Define model

In [85]:
als = ALS(
    maxIter=20, 
    rank=15,
    regParam=0.05, 
    userCol=header["col_user"], 
    itemCol=header["col_item"], 
    ratingCol=header["col_rating"], 
    coldStartStrategy="drop",
    implicitPrefs=True,
    nonnegative=False,
)

## 2.2 Train model

In [None]:
with Timer() as train_time:
    model = als.fit(train_spark)

## 2.3 Predict rating

In [88]:
with Timer() as raiting_time:
    predictions_spark = model.transform(test_spark).drop(header["col_rating"])

## 2.4  recommend k items

In [101]:
# Get the cross join of all user-item pairs and score them.
with Timer() as ranking_time:
    users = train_spark.select(header["col_user"]).distinct()
    items = train_spark.select(header["col_item"]).distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train_spark.alias("train"),
        (dfs_pred[header["col_user"]] == train_spark[header["col_user"]]) & (dfs_pred[header["col_item"]] == train_spark[header["col_item"]]),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train["train." + header["col_rating"]].isNull()) \
        .select('pred.' + header["col_user"], 'pred.' + header["col_item"], 'pred.' + header["col_prediction"])

    window = Window.partitionBy(header["col_user"]).orderBy(F.col(header["col_prediction"]).desc())    
    top_k = top_all.select("*", F.row_number().over(window).alias("rank")).filter(F.col("rank") <= TOP_K).drop("rank")

# 3. Evaluate

## 3.1 Raiting metrics

In [89]:
evaluations = SparkRatingEvaluation(
    test_spark, 
    predictions_spark,
    col_user=header["col_user"],
    col_item=header["col_item"],
    col_rating=header["col_rating"],
    col_prediction=header["col_prediction"]
)

eval_rmse = evaluations.rmse()
eval_mae = evaluations.mae()
eval_rsquared = evaluations.rsquared()
eval_exp_var = evaluations.exp_var()

rating_metrics = {
    "RMSE": eval_rmse,
    "MAE": eval_mae,
    "R2": eval_rsquared,
    "Explained Variance": eval_exp_var,
}

print("RMSE:\t\t%f" % eval_rmse,
      "MAE:\t\t%f" % eval_mae,
      "rsquared:\t%f" % eval_rsquared,
      "exp var:\t%f" % eval_exp_var, sep='\n')

RMSE:		6.092747
MAE:		2.048586
rsquared:	-0.114596
exp var:	0.011413


## 3.2 Ranking metrics 

In [97]:
evaluations = SparkRankingEvaluation(
    test_spark, 
    top_k,
    col_user=header["col_user"],
    col_item=header["col_item"],
    col_rating=header["col_rating"],
    col_prediction=header["col_prediction"],
    k=TOP_K
)

eval_map = evaluations.map_at_k()
eval_ndcg = evaluations.ndcg_at_k()
eval_precision = evaluations.precision_at_k()
eval_recall = evaluations.recall_at_k()

ranking_metrics = {
    "MAP": eval_map,
    "nDCG@k": eval_ndcg,
    "Precision@k": eval_precision,
    "Recall@k": eval_recall,
}

print(f"Model:",
      f"Top K:\t\t {TOP_K}",
      f"MAP:\t\t {eval_map:f}",
      f"NDCG:\t\t {eval_ndcg:f}",
      f"Precision@K:\t {eval_precision:f}",
      f"Recall@K:\t {eval_recall:f}",sep='\n')

Model:
Top K:		 10
MAP:		 0.010692
NDCG:		 0.084935
Precision@K:	 0.075278
Recall@K:	 0.021415


## 3.2 Diversity metrics

In [102]:
evaluations = SparkDiversityEvaluation(
    train_spark, 
    top_k,
    col_user=header["col_user"],
    col_item=header["col_item"]
)

eval_diversity = evaluations.diversity()
eval_novelty = evaluations.novelty()
eval_distributional_coverage = evaluations.distributional_coverage()
eval_catalog_coverage = evaluations.catalog_coverage()

diversity_metrics = {
    "Diversity": eval_diversity,
    "Novelty": eval_novelty,
    "Distributional coverage": eval_distributional_coverage,
    "Catalog coverage": eval_catalog_coverage,
}
        
print(f"Model:",
      f"Diversity :\t {eval_diversity}",
      f"Novelty:\t {eval_novelty:f}",
      f"Distributional Coverage:\t {eval_distributional_coverage:f}", 
      f"Catalog Coverage:\t {eval_catalog_coverage:f}", sep='\n')

Model:
Diversity :	 0.7790931834618018
Novelty:	 9.659503
Distributional Coverage:	 7.509037
Catalog Coverage:	 0.073672


# 4 Summary

In [103]:
summary = generate_summary(data_size,
                           algo,
                           TOP_K,
                           train_time, 
                           rating_time,
                           rating_metrics,
                           ranking_time,
                           ranking_metrics,
                           diversity_metrics)
df_results.loc[df_results.shape[0] + 1] = summary
df_results

Unnamed: 0,Data,Algo,K,Train time (s),Predicting time (s),RMSE,MAE,R2,Explained Variance,Recommending time (s),MAP,nDCG@k,Precision@k,Recall@k,Diversity,Novelty,Distributional coverage,Catalog coverage
1,100k,t_svd,10,,,6.092747,2.048586,-0.114596,0.011413,,0.010692,0.084935,0.075278,0.021415,0.779093,9.659503,7.509037,0.073672


# 5 Search for best params

In [13]:
# search for best param
def search_param(alpha):
    param_dict = {
        "rank": [5, 10, 20, 30,50,100],
        "regParam": [0.001,0.01,0.1],
    }
    param_grid = generate_param_grid(param_dict)
    score = []
    
    users = train_spark.select(header["col_user"]).distinct()
    items = train_spark.select(header["col_item"]).distinct()
    user_item = users.crossJoin(items)

    for g in tqdm(param_grid):
        als = ALS(        
            userCol=header["col_user"], 
            itemCol=header["col_item"], 
            ratingCol=header["col_rating"], 
            coldStartStrategy="drop",
            implicitPrefs=True,
            nonnegative=False,
            alpha=alpha,
            maxIter=40, 
            **g
        )

        model = als.fit(train_spark)
        dfs_pred = model.transform(user_item)

        # Remove seen items.
        dfs_pred_exclude_train = dfs_pred.alias("pred").join(
            train_spark.alias("train"),
            (dfs_pred[header["col_user"]] == train_spark[header["col_user"]]) & (dfs_pred[header["col_item"]] == train_spark[header["col_item"]]),
            how='outer'
        )

        top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train["train." + header["col_rating"]].isNull()) \
            .select('pred.' + header["col_user"], 'pred.' + header["col_item"], 'pred.' + header["col_prediction"])

        window = Window.partitionBy(header["col_user"]).orderBy(F.col(header["col_prediction"]).desc())    
        top_k = top_all.select("*", F.row_number().over(window).alias("rank")).filter(F.col("rank") <= TOP_K).drop("rank")

        evaluations = SparkRankingEvaluation(
            test_spark, 
            top_k,
            col_user=header["col_user"],
            col_item=header["col_item"],
            col_rating=header["col_rating"],
            col_prediction=header["col_prediction"],
            k=TOP_K
        )

        score.append(evaluations.precision_at_k())
    return score


def get_score_plot(score):
    param_dict = {
        "rank": [5, 10, 15, 20, 25, 30],
        "regParam": [0.0005,0.001,0.005,0.01,0.05,0.1,0.5,1.0],
    }
    score = [float('%.4f' % x) for x in score]
    score_array = np.reshape(score, (len(param_dict["rank"]), len(param_dict["regParam"]))) 
    score_df = pd.DataFrame(data=score_array, index=pd.Index(param_dict["rank"], name="rank"), 
                           columns=pd.Index(param_dict["regParam"], name="reg. parameter"))
    fig, ax = plt.subplots()
    sns.heatmap(score_df, cbar=False, annot=True, fmt=".4g")

In [14]:
score = search_param(1)

  0%|          | 0/18 [00:00<?, ?it/s]

----------------------------------------ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/anaconda3/envs/rec/lib/python3.6/site-packages/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/anaconda3/envs/rec/lib/python3.6/site-packages/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/opt/anaconda3/envs/rec/lib/python3.6/site-packages/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:61782)
Traceback (most recent call last):
  File "/opt/anaconda3/envs

Py4JError: An error occurred while calling o89.fit

In [None]:
get_score_plot(score)