<a href="https://colab.research.google.com/github/aryalkoshish/big_data/blob/main/AnimeRecommendationPyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark





In [None]:
# importing the required pyspark library
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, sum
# sparkml libraries
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [None]:
# create spark session
spark = SparkSession.builder.appName('Anime_Recommender').getOrCreate()


In [None]:
# we can create our schema if we  need a specific data type enforcement to our field during loading our dataset.
schema = StructType([\
    StructField("userid", IntegerType(), True),\
    StructField("username", StringType(), True),\
    StructField("movieid", IntegerType(), True),\
    StructField("rating", IntegerType(), True)
    ])

In [None]:
# from local
input_path = 'users-score-2023.csv'
# from gcp bucket
input_path = 'gs://dataproc-staging-us-central1-441320292389-riaohl70/pyspark_retailstore_analysis/data/users-score-2023.csv'

In [None]:
#CSV file can be downloaded from the link mentioned above.
data = spark.read.csv(input_path,
                      inferSchema=True,header=True) # , schema=schema #if schema needed

data.show(5)

+-------+--------+--------+--------------------+------+
|user_id|Username|anime_id|         Anime Title|rating|
+-------+--------+--------+--------------------+------+
|      1|   Xinil|      21|           One Piece|     9|
|      1|   Xinil|      48|         .hack//Sign|     7|
|      1|   Xinil|     320|              A Kite|     5|
|      1|   Xinil|      49|    Aa! Megami-sama!|     8|
|      1|   Xinil|     304|Aa! Megami-sama! ...|     8|
+-------+--------+--------+--------------------+------+
only showing top 5 rows



In [None]:
#checking the number of rows in our dataset which is about 22 million records.
data.count()


22322428

In [None]:
# checking the data types for each columns
data.dtypes

[('user_id', 'int'),
 ('Username', 'string'),
 ('anime_id', 'int'),
 ('Anime Title', 'string'),
 ('rating', 'string')]

In [None]:
# change the rating column type from string to integer
data = data.withColumn("rating",data['rating'].cast('integer'))
# confirming the changes
data.dtypes

[('user_id', 'int'),
 ('Username', 'string'),
 ('anime_id', 'int'),
 ('Anime Title', 'string'),
 ('rating', 'int')]

In [None]:
# get all the null values. we use col to get the prefered column from dataframe using pyspark.sql.function and
# sum the null values received from isNull fro that specific column through list of data columns.
null_values = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])


In [None]:
# checking the null values, which are in columns anime_id, anime_title and rating being the highest about 2866 null items.
null_values.show()

+-------+--------+--------+-----------+------+
|user_id|Username|anime_id|Anime Title|rating|
+-------+--------+--------+-----------+------+
|      0|       0|       1|          1|  2866|
+-------+--------+--------+-----------+------+



In [None]:
# since we have 259 null values in ratings. We fill them with 0 for now because when fitting the data
# into the model it can not handle null values
data = data.fillna(0)

In [None]:
# rechecking null values
null_counts = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])
#its okay we have 1 null values in anime title column because we will not be uing anime title feature to build our model
null_counts.show()

+-------+--------+--------+-----------+------+
|user_id|Username|anime_id|Anime Title|rating|
+-------+--------+--------+-----------+------+
|      0|       0|       0|          1|     0|
+-------+--------+--------+-----------+------+



In [None]:
# statistical summary of our data
data.describe().show()


+-------+-----------------+----------+-----------------+------------------+------------------+
|summary|          user_id|  Username|         anime_id|       Anime Title|            rating|
+-------+-----------------+----------+-----------------+------------------+------------------+
|  count|         22322428|  22322428|         22322428|          22322427|          22322428|
|   mean|370442.4563417564|       NaN|9530.012213053167|18677.971851610804|7.6184140452821705|
| stddev|295264.9040922144|       NaN|11993.32492562087|109357.40231008055|1.6625702600454617|
|    min|                1|   -------|                0|      !NVADE SHOW!|                 0|
|    max|          1141579|zzzyeknom0|            56085|                 ◯|                10|
+-------+-----------------+----------+-----------------+------------------+------------------+



In [None]:

# Splitting our data into train and test randomly where
# 80% are stored in train_data for training purpose and 20% in test_data for testing.
train_data, test_data = data.randomSplit([0.8, 0.2])

In [None]:
# ALS (Alternating Least Squares) is a matrix factorization algorithm commonly used for collaborative filtering in recommendation systems.
# using ALS form pyspark.ml
# we can later optimize this parameter using hyper parameter optimization
als = ALS(maxIter=10,
          regParam=0.01,
          userCol="user_id",
          itemCol="anime_id",
          ratingCol="rating")

# our model is ready, now lets fit it to our train data
model = als.fit(train_data)

In [None]:
# using test_data we evaluate our predictions made from our model
predictions = model.transform(test_data)

# checking the prediction from our model
predictions.show()

+-------+--------+--------+--------------------+------+----------+
|user_id|Username|anime_id|         Anime Title|rating|prediction|
+-------+--------+--------+--------------------+------+----------+
|      1|   Xinil|     193|            Maburaho|     7| 6.1814594|
|      1|   Xinil|     210|             Ranma ½|     7| 7.5945907|
|      1|   Xinil|     192|Love Hina Haru Sp...|     7| 6.8603435|
|      1|   Xinil|      22| Tennis no Ouji-sama|     7|  7.145653|
|      1|   Xinil|     122|Full Moon wo Saga...|     9| 7.6245384|
|      1|   Xinil|     157|Mahou Sensei Negima!|     6|  6.445051|
|      1|   Xinil|     190|     Love Hina Again|     8|  6.962368|
|      1|   Xinil|      43|    Koukaku Kidoutai|     8|  8.101777|
|      1|   Xinil|     165|           RahXephon|     8| 7.6812234|
|      1|   Xinil|     127|        Gate Keepers|     5|  6.716782|
|      1|   Xinil|      17|Hungry Heart: Wil...|     7| 7.1544356|
|      1|   Xinil|     194|        Macross Zero|     8|  7.734

In [None]:
# dropping na values from predictions
predictions = predictions.na.drop()


In [None]:
# Now lets calculate RMSE using regressionEvaluator from pyspark.ml.evaluation
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse_value = evaluator.evaluate(predictions)

# a lower RMSE value indicates better predictive accuracy,
# as it means that the predicted values are closer to the actual values.
# the closer to 0 means no error from the model
print("Root Mean Sqaure Error \n" + str(rmse_value))


Root Mean Sqaure Error 
1.2562565775325762


In [None]:
# Checking for a specific user with id 500 and retrieving two columns
user = test_data.filter(test_data['user_id']==5615).select(['anime_id','user_id'])

user.show()

+--------+-------+
|anime_id|user_id|
+--------+-------+
|      16|   5615|
|      19|   5615|
|      43|   5615|
|      47|   5615|
|     177|   5615|
|     225|   5615|
|     232|   5615|
|     317|   5615|
|     543|   5615|
|     572|   5615|
|     813|   5615|
|     861|   5615|
|    1735|   5615|
|    2001|   5615|
|    2236|   5615|
|    2476|   5615|
|    2904|   5615|
|    3091|   5615|
|    9253|   5615|
|    9756|   5615|
+--------+-------+
only showing top 20 rows



In [None]:

# lets use  the model and to evauluate and train with the user id 5615
recommended_anime = model.transform(user)

# finally show the predection in descending order as highest score anime to be recommended.
recommended_anime.orderBy('prediction',ascending=False).show()

+--------+-------+----------+
|anime_id|user_id|prediction|
+--------+-------+----------+
|      43|   5615|  9.105129|
|    2236|   5615|  8.892427|
|    9253|   5615|  8.773878|
|    3091|   5615|  8.688642|
|      47|   5615|  8.681513|
|    2904|   5615|  8.665394|
|      19|   5615|  8.630951|
|     572|   5615|  8.500926|
|    2001|   5615|  8.487847|
|     861|   5615|  8.335785|
|      16|   5615|  8.305287|
|   11061|   5615|  8.263053|
|    9756|   5615|  8.239414|
|     317|   5615|  8.006407|
|   12431|   5615|  7.889063|
|     813|   5615| 7.8830805|
|     543|   5615| 7.8369055|
|   22297|   5615| 7.7805305|
|   23283|   5615|   7.71803|
|     232|   5615|  7.366153|
+--------+-------+----------+
only showing top 20 rows



In [None]:
# end the spark session
spark.stop()
