### Recommendation Engine Implementation

In [None]:
import os
import time

# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.types import StringType, ArrayType
from pyspark.mllib.recommendation import ALS
from pyspark.sql.functions import *

# data science imports
import math
import numpy as np
import pandas as pd

# visualization imports
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
TOP_K = 10

# Select MovieLens data size: 100k, 1m, 10m, or 20m
PRODUCT_DATA_SIZE = '10k'
from pyspark.sql.types import StructType

In [None]:
data = df
from pyspark.sql.types import StructField
from pyspark.sql.types import IntegerType,FloatType,LongType

In [None]:
schema = StructType(
    (
        StructField("userId", IntegerType()),
        StructField("product_id", IntegerType()),
        StructField("Rating", FloatType()),
        StructField("Timestamp", LongType()),
    )
)

data = df
data

In [None]:
from pyspark.sql.functions import split
import pyspark.sql.functions as f

train, test = split(data, ratio=0.75, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

In [None]:
schema = StructType(
    (
        StructField("user_Id", IntegerType()),
        StructField("product_id", IntegerType()),
        StructField("Rating", FloatType()),
        StructField("Timestamp", LongType()),
    )
)

data = df.load_spark_df(spark, size=PRODUCT_DATA_SIZE, schema=schema)
data.show()

In [None]:
from timeit import Timer
with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))

In [None]:
with Timer() as test_time:

    # Get the cross join of all user-item pairs and score them.
    users = train.select('user_Id').distinct()
    items = train.select('product_id').distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred['user_Id'] == train['user_Id']) & (dfs_pred['product_id'] == train['product_id']),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train["train.Rating"].isNull()) \
        .select('pred.' + 'user_Id', 'pred.' + 'product_id', 'pred.' + "prediction")

    # In Spark, transformations are lazy evaluation
    # Use an action to force execute and measure the test time 
    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))

In [None]:
top_all.show()

In [None]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user="user_Id", col_item="product_id", 
                                    col_rating="Rating", col_prediction="prediction", 
                                    relevancy_method="top_k")

In [None]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

In [None]:
prediction = model.transform(test)
prediction.cache().show()

In [None]:
def get_productId(df_product, fav_product_list):
    """
    return all movieId(s) of user's favorite movies
    
    Parameters
    ----------
    df_movies: spark Dataframe, movies data
    
    fav_movie_list: list, user's list of favorite movies
    
    Return
    ------
    movieId_list: list of movieId(s)
    """
    productId_list = []
    for product in fav_product_list:
        productIds = df_products \
            .filter(products.title.like('%{}%'.format(movie))) \
            .select('productId') \
            .rdd \
            .map(lambda r: r[0]) \
            .collect()
        productId_list.extend(productId)
    return list(set(productId_list))


def add_new_user_to_data(train_data, productId_list, spark_context):
 
    # get new user id
    new_id = train_data.map(lambda r: r[0]).max() + 1
    # get max rating
    max_rating = train_data.map(lambda r: r[2]).max()
    # create new user rdd
    user_rows = [(new_id, movieId, max_rating) for productId in producId_list]
    new_rdd = spark_context.parallelize(user_rows)
    # return new train data
    return train_data.union(new_rdd)


def get_inference_data(train_data, df_product, productId_list):
   
    # get new user id
    new_id = train_data.map(lambda r: r[0]).max() + 1
    # return inference rdd
    return df_movies.rdd \
        .map(lambda r: r[0]) \
        .distinct() \
        .filter(lambda x: x not in movieId_list) \
        .map(lambda x: (new_id, x))


def make_recommendation(best_model_params, ratings_data, df_product, 
                        fav_product_list, n_recommendations, spark_context):
    """
    return top n movie recommendation based on user's input list of favorite movies


    Parameters
    ----------
    best_model_params: dict, {'iterations': iter, 'rank': rank, 'lambda_': reg}

    ratings_data: spark RDD, ratings data

    df_movies: spark Dataframe, movies data

    fav_movie_list: list, user's list of favorite movies

    n_recommendations: int, top n recommendations

    spark_context: Spark Context object

    Return
    ------
    list of top n movie recommendations
    """
    # modify train data by adding new user's rows
    productId_list = get_productId(df_product, fav_product_list)
    train_data = add_new_user_to_data(ratings_data, product_list, spark_context)
    
    # train best ALS
    model = ALS.train(
        ratings=train_data,
        iterations=best_model_params.get('iterations', None),
        rank=best_model_params.get('rank', None),
        lambda_=best_model_params.get('lambda_', None),
        seed=99)
    
    # get inference rdd
    inference_rdd = get_inference_data(ratings_data, df_product, productId_list)
    
    # inference
    predictions = model.predictAll(inference_rdd).map(lambda r: (r[1], r[2]))
    
    # get top n movieId
    topn_rows = predictions.sortBy(lambda r: r[1], ascending=False).take(n_recommendations)
    topn_ids = [r[0] for r in topn_rows]
    
    # return movie titles
    return df_prodcut.filter(products.productId.isin(topn_ids)) \
                    .select('title') \
                    .rdd \
                    .map(lambda r: r[0]) \
                    .collect()