<a href="https://colab.research.google.com/github/JCherryA050/phase_4_project/blob/main/tech_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd

In [2]:
# Run for Google Colab environment
!pip install pyspark
!apt install openjdk-8-jdk-headless -qq
!pip install mlflow

openjdk-8-jdk-headless is already the newest version (8u292-b10-0ubuntu1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 39 not upgraded.


In [3]:
import pyspark
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import feature
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
# import org.apache.spark.sql.functions.col
# import org.apache.spark.sql.types.IntegerType
# import pyspark.sql.functions.col
from pyspark.sql.types import IntegerType

In [4]:
spark = SparkSession\
        .builder\
        .appName('anime_rec').config('spark.driver.host', 'localhost')\
        .getOrCreate()

In [5]:
rec_data = spark.read.csv('animelist.csv', header='true')

In [6]:

rec_data = rec_data.withColumn('rating', rec_data['rating'].cast(IntegerType()))
rec_data = rec_data.withColumn('user_id', rec_data['user_id'].cast(IntegerType()))
rec_data = rec_data.withColumn('anime_id', rec_data['anime_id'].cast(IntegerType()))

In [7]:
rec_data.dtypes

[('user_id', 'int'),
 ('anime_id', 'int'),
 ('rating', 'int'),
 ('watching_status', 'string'),
 ('watched_episodes', 'string')]

In [8]:
rec_data = rec_data.drop('watching_status')
rec_data = rec_data.drop('watched_episodes')

In [9]:
rec_data

DataFrame[user_id: int, anime_id: int, rating: int]

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.recommendation import ALS

(training, test) = rec_data.randomSplit([0.8, 0.2], seed=1)

als = ALS(userCol='user_id', itemCol='anime_id', ratingCol='rating', coldStartStrategy ='drop', nonnegative= True)


In [11]:
params = ParamGridBuilder()\
  .addGrid(als.regParam, [0.2, 0.3])\
  .addGrid(als.maxIter, [30, 50])\
  .addGrid(als.rank, [30, 40]).build() # Ran earlier and found 10 to be best

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

tvs = TrainValidationSplit(estimator=als,
                    estimatorParamMaps=params,
                    evaluator=evaluator)

In [12]:
model = tvs.fit(training)

best_model = model.bestModel

Py4JJavaError: ignored

In [None]:
# We see the best model has a rank of 10, so we will use that in our future models with this dataset
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

print('RMSE = ' + str(rmse))
print('---Best Model---')
print(' Rank:', best_model.rank) 
print(' MaxIter:', best_model._java_obj.parent().getMaxIter())
print(' RegParam:', best_model._java_obj.parent().getRegParam())

In [None]:
anime_titles = spark.read.csv('anime.csv', header=True)
anime_titles.head(5)

In [None]:
def name_retriever(anime_id, dataframe=anime_titles):
    name = anime_titles.where(anime_titles.MAL_ID == anime_id).take(1)[0]['English name']
    if name == 'Unknown':
      name = anime_titles.where(anime_titles.MAL_ID == anime_id).take(1)[0]['Name']
    return name

In [None]:
print(name_retriever(1, anime_titles))

In [None]:
users = rec_data.select(als.getUserCol()).distinct().limit(1)
userSubsetRecs = best_model.recommendForUserSubset(users, 10)
recs = userSubsetRecs.take(1)

In [None]:
# use indexing to obtain the movie id of top predicted rated item
first_recommendation = recs[0]['recommendations'][0][0]

# use the name retriever function to get the values
name_retriever(first_recommendation,anime_titles)

In [None]:
recommendations = best_model.recommendForAllUsers(5)
recommendations.where(recommendations.user_id == 3).collect()

In [None]:
def new_user_recs(user_id, new_ratings, rating_df, anime_title_df, num_recs):
    # turn the new_recommendations list into a spark DataFrame
    new_user_ratings = spark.createDataFrame(new_ratings, rating_df.columns)
    
    # combine the new ratings df with the rating_df
    anime_ratings_combined = rating_df.union(new_user_ratings)
    
    # create an ALS model and fit it
    als = ALS(maxIter=30, rank=30, regParam=0.2, userCol='user_id', itemCol='anime_id', ratingCol='rating',
              coldStartStrategy='drop')
    model = als.fit(anime_ratings_combined)
    
    # make recommendations for all users using the recommendForAllUsers method
    recommendations = model.recommendForAllUsers(num_recs)
    
    # get recommendations specifically for the new user that has been added to the DataFrame
    recs_for_user = recommendations.where(recommendations.user_id == user_id).take(1)

    for ranking, (anime_id, rating) in enumerate(recs_for_user[0]['recommendations']):
      anime_string = name_retriever(anime_id, anime_title_df)
      print('Recommendation {}: {}  | predicted score: {}'.format(ranking+1, anime_string, rating))

In [None]:
user_id = 1000000
user_ratings_1 = [(user_id,1,7),
                  (user_id,2,7),
                  (user_id,30,10),
                  (user_id,32937,10),
                  (user_id,8625,5),
                  (user_id,203,10)]
new_user_recs(user_id,
             new_ratings=user_ratings_1,
             rating_df=rec_data,
             anime_title_df=anime_titles,
             num_recs = 10)