## Imports

In [16]:
import os
import time
import load_recommender_data

from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.types import StringType, ArrayType
from pyspark.mllib.recommendation import ALS

import math
import numpy as np
import pandas as pd

import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline

## Load data

In [17]:
path_in_str = 'user_data1.0.json'
df_merge, anime_df, user_df = load_recommender_data.load_recommender_data(path_in_str)

## Data preprocessing

In [18]:
anime_df = anime_df[['anime_id', 'title', 'genres_name']]
anime_df = anime_df.rename({'genres_name': 'genres'}, axis=1)

In [19]:
anime_df = anime_df.astype({'anime_id': int, 'title': str, 'genres': str})

In [20]:
df_ratings = df_merge[['user_id', 'anime_id', 'rating_y']].copy()
df_ratings = df_ratings.rename({'rating_y': 'rating'}, axis=1)
df_ratings = df_ratings.astype({'anime_id': int, 'user_id': int, 'rating': float})
df_ratings.reset_index(inplace=True)
df_ratings['rating'] = ((df_ratings['rating'] / 10.0)*4.0) + 1
df_ratings = df_ratings[['user_id', 'anime_id', 'rating']]

In [21]:
df_ratings.to_csv('ratings.csv', index=False)
anime_df.to_csv('animes.csv', index=False)

## Setup Spark

In [22]:
spark = SparkSession \
    .builder \
    .appName("anime recommendation") \
    .config("spark.driver.maxResultSize", "96g") \
    .config("spark.driver.memory", "96g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.master", "local[12]") \
    .getOrCreate()
sc = spark.sparkContext

In [23]:
animes = spark.read.load('animes.csv', format='csv', header=True, inferSchema=True)

In [24]:
anime_rating = sc.textFile('ratings.csv')
header = anime_rating.take(1)[0]
rating_data = anime_rating \
    .filter(lambda line: line!=header) \
    .map(lambda line: line.split(",")) \
    .map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))) \
    .cache()
rating_data.take(3)

[(1, 1, 1.0), (2, 1, 3.8), (3, 1, 5.0)]

## Train ALS

In [25]:
train, validation, test = rating_data.randomSplit([6, 2, 2], seed=99)
train.cache()
validation.cache()
test.cache()

PythonRDD[546] at RDD at PythonRDD.scala:53

In [26]:
def train_ALS(train_data, validation_data, num_iters, reg_param, ranks):
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in reg_param:
            model = ALS.train(
                ratings=train_data,    # (userID, productID, rating) tuple
                iterations=num_iters,
                rank=rank,
                lambda_=reg,           # regularization param
                seed=99)
            valid_data = validation_data.map(lambda p: (p[0], p[1]))
            predictions = model.predictAll(valid_data).map(lambda r: ((r[0], r[1]), r[2]))
            ratesAndPreds = validation_data.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
            MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
            error = math.sqrt(MSE)
            print('{} latent factors and regularization = {}: validation RMSE is {}'.format(rank, reg, error))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
                best_model = model
    print('\nThe best model has {} latent factors and regularization = {}'.format(best_rank, best_regularization))
    return best_model

In [27]:
num_iterations = 10
# ranks = [8, 10, 12, 14, 16, 18, 20]
ranks = [8]
# reg_params = [0.001, 0.01, 0.05, 0.1, 0.2]
reg_params = [0.05]
start_time = time.time()
final_model = train_ALS(train, validation, num_iterations, reg_params, ranks)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))



8 latent factors and regularization = 0.05: validation RMSE is 1.583840405551869

The best model has 8 latent factors and regularization = 0.05
Total Runtime: 10.77 seconds




In [28]:
test_data = test.map(lambda p: (p[0], p[1]))
predictions = final_model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = test.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
error = math.sqrt(MSE)
print('The out-of-sample RMSE of rating predictions is', round(error, 4))



The out-of-sample RMSE of rating predictions is 1.5753




## Recommender helper functions

In [29]:
def get_anime_id(df_animes, fav_anime_list):
    anime_id_list = []
    for anime in fav_anime_list:
        anime_ids = df_animes \
            .filter(animes.title.like('%{}%'.format(anime))) \
            .select('anime_id') \
            .rdd \
            .map(lambda r: r[0]) \
            .collect()
        anime_id_list.extend(anime_ids)
    return list(set(anime_id_list))


def add_new_user_to_data(train_data, anime_id_list, spark_context):
    new_id = train_data.map(lambda r: r[0]).max() + 1
    max_rating = train_data.map(lambda r: r[2]).max()
    user_rows = [(new_id, anime_id, max_rating) for anime_id in anime_id_list]
    new_rdd = spark_context.parallelize(user_rows)
    return train_data.union(new_rdd)


def get_inference_data(train_data, df_animes, anime_id_list):
    new_id = train_data.map(lambda r: r[0]).max() + 1
    return df_animes.rdd \
        .map(lambda r: r[0]) \
        .distinct() \
        .filter(lambda x: x not in anime_id_list) \
        .map(lambda x: (new_id, x))


def make_recommendation(best_model_params, ratings_data, df_animes, 
                        fav_anime_list, n_recommendations, spark_context):

    anime_id_list = get_anime_id(df_animes, fav_anime_list)
    train_data = add_new_user_to_data(ratings_data, anime_id_list, spark_context)
    
    model = ALS.train(
        ratings=train_data,
        iterations=best_model_params.get('iterations', None),
        rank=best_model_params.get('rank', None),
        lambda_=best_model_params.get('lambda_', None),
        seed=99)
    
    inference_rdd = get_inference_data(ratings_data, df_animes, anime_id_list)
    
    predictions = model.predictAll(inference_rdd).map(lambda r: (r[1], r[2]))
    
    topn_rows = predictions.sortBy(lambda r: r[1], ascending=False).take(n_recommendations)
    topn_ids = [r[0] for r in topn_rows]
    
    return df_animes.filter(animes.anime_id.isin(topn_ids)) \
                    .select('title') \
                    .rdd \
                    .map(lambda r: r[0]) \
                    .collect()

## Make recommendations
Enter your anime name in my_anime_list, and the recommender will generate recommendations

In [30]:
my_anime_list = ['Cowboy Bebop']

recommends = make_recommendation(
    best_model_params={'iterations': 10, 'rank': 20, 'lambda_': 0.05}, 
    ratings_data=rating_data, 
    df_animes=animes, 
    fav_anime_list=my_anime_list, 
    n_recommendations=10, 
    spark_context=sc)

print('Recommendations for {}:'.format(my_anime_list[0]))
for i, title in enumerate(recommends):
    print('{0}: {1}'.format(i+1, title))



Recommendations for Cowboy Bebop:
1: Fullmetal Alchemist: Brotherhood
2: Hunter x Hunter (2011)
3: Shingeki no Kyojin Season 3
4: Shokugeki no Souma: San no Sara
5: "Violet Evergarden: Kitto ""Ai"" wo Shiru Hi ga Kuru no Darou"
6: Seishun Buta Yarou wa Yumemiru Shoujo no Yume wo Minai
7: Death Note
8: Kimi no Na wa.
9: Saiki Kusuo no Ψ-nan 2
10: Shingeki no Kyojin Season 3 Part 2
