<a href="https://colab.research.google.com/github/amiller411/uniwork/blob/big_data_cw3/b000633497_andrew_miller.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Initial Setup**

1. First, you will setup your CoLab environment. Run the cell below.

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u292-b10-0ubuntu1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 37 not upgraded.


Now we authenticate a Google Drive client to download the file we will be processing in our Spark job.

**Make sure to follow the interactive instructions.**

In [2]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

Download both anime.csv and rating.csv, and store it in your google drive. It is advisable to create a separate project folder, where you can store this dataset and also your code.

The script will give you the id of the two files in your drive.

In [3]:
file_list = drive.ListFile({'q': "'1Oi8cMnAfJVZH9-FyXGxwOrGGCIkkB7uy' in parents"}).GetList()
for f in file_list:
  print('title: %s, id: %s' % (f['title'], f['id']))

title: rating.csv, id: 1f76dQZxRB1fNaReBv_DnUDVkIXNm7mw9
title: anime.csv, id: 1TppJoj4QVJlc_HML20xmH847Brrw0Zfc


If you executed the cells below, you should be able to see the dataset we will need for this Colab under the "Files" tab on the left panel.

In [4]:
# Change the id, if it differs from the one below.
id='1TppJoj4QVJlc_HML20xmH847Brrw0Zfc'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('anime.csv')

id='1f76dQZxRB1fNaReBv_DnUDVkIXNm7mw9'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('rating.csv')

Here is a list of packages that might be useful to you. 

**Student Activity: Add the packages you need to carry out your analysis here** 

In [5]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

# Student Activity: Add your packages here.
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as F
import numpy as np
from pyspark.sql.types import FloatType
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# create function to calculate mean of an array (each row)
array_mean = udf(lambda x: float(np.mean(x)), FloatType())


**This step initializes the Spark context.**

In [6]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

You can easily check the current version and get the link of the web interface. In the Spark UI, you can monitor the progress of your job and debug the performance bottlenecks (if your Colab is running with a local runtime).

In [7]:
spark

## **From this point onwards, you are supposed to do the coding yourself. Follow the steps as mentioned below in its appropriate place.**

**1. Student Activity: Read the datasets here. You must write the script for the first question and explore both the files here.**

Q1. Identify and describe the number of columns in the two dataset files.

In [7]:
df_anime = spark.read.csv("anime.csv", header=True, inferSchema = True)
df_rat = spark.read.csv("rating.csv", header=True, inferSchema = True)

In [9]:
# examine schema - column names and data types
df_anime.printSchema()
df_rat.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- members: integer (nullable = true)

root
 |-- user_id: integer (nullable = true)
 |-- anime_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



The Anime dataset contains 7 columns (anime_id, name, genre, type, episodes, rating and members) with anime_id and members being the only integer types, rating the only double type and the remaining types all strings.

**2. Student Activity: Preprocess the datasets here. You must write the script for the second question here. Make sure to check if the script is running is correctly or not**

Q2. Merge/Join/Combine the two datasets and identify the key common column that would you have performed? 

Key column = rating, full join

In [11]:
df_anime.show()

+--------+--------------------+--------------------+-----+--------+------+-------+
|anime_id|                name|               genre| type|episodes|rating|members|
+--------+--------------------+--------------------+-----+--------+------+-------+
|   32281|      Kimi no Na wa.|Drama, Romance, S...|Movie|       1|  9.37| 200630|
|    5114|Fullmetal Alchemi...|Action, Adventure...|   TV|      64|  9.26| 793665|
|   28977|            Gintama°|Action, Comedy, H...|   TV|      51|  9.25| 114262|
|    9253|         Steins;Gate|    Sci-Fi, Thriller|   TV|      24|  9.17| 673572|
|    9969|       Gintama&#039;|Action, Comedy, H...|   TV|      51|  9.16| 151266|
|   32935|Haikyuu!!: Karasu...|Comedy, Drama, Sc...|   TV|      10|  9.15|  93351|
|   11061|Hunter x Hunter (...|Action, Adventure...|   TV|     148|  9.13| 425855|
|     820|Ginga Eiyuu Densetsu|Drama, Military, ...|  OVA|     110|  9.11|  80679|
|   15335|Gintama Movie: Ka...|Action, Comedy, H...|Movie|       1|   9.1|  72534|
|   

In [37]:
# generate some descriptive stats on ad_anime
df_anime.describe().show()

+-------+------------------+-----------------+------+-----+------------------+-----------------+-----------------+
|summary|          anime_id|             name| genre| type|          episodes|           rating|          members|
+-------+------------------+-----------------+------+-----+------------------+-----------------+-----------------+
|  count|             12294|            12294| 12232|12269|             12294|            12064|            12294|
|   mean|14058.221652838783|         166778.5|  null| null|12.382549774134182|6.473901690981445|18071.33886448674|
| stddev|11455.294700988177|330891.6746051493|  null| null| 46.86535196440979|1.026746306898068|54820.67692490701|
|    min|                 1|    &quot;0&quot;|Action|Movie|                 1|             1.67|                5|
|    max|             34527|                ◯|  Yaoi|   TV|           Unknown|             10.0|          1013917|
+-------+------------------+-----------------+------+-----+------------------+--

In [36]:
# repeat for ratings csv
df_rat.describe().show()

+-------+------------------+-----------------+------------------+
|summary|           user_id|         anime_id|            rating|
+-------+------------------+-----------------+------------------+
|  count|           7813737|          7813737|           7813737|
|   mean|36727.956744640884|8909.072104295294| 6.144029546937656|
| stddev|20997.946118973723| 8883.94963588107|3.7278004201098067|
|    min|                 1|                1|                -1|
|    max|             73516|            34519|                10|
+-------+------------------+-----------------+------------------+



In [14]:
# check for nan ratings
df_rat.select([count(when(isnan('rating'),True))]).show()

+--------------------------------------------+
|count(CASE WHEN isnan(rating) THEN true END)|
+--------------------------------------------+
|                                           0|
+--------------------------------------------+



In [10]:
# check for nan ratings
df_anime.select([count(when(isnan('rating'),True))]).show()

+--------------------------------------------+
|count(CASE WHEN isnan(rating) THEN true END)|
+--------------------------------------------+
|                                           0|
+--------------------------------------------+



In [None]:
# count number of rows prrior to dropping
df_rat.count()

In [8]:
# drop -1 ratings, observed from descriptive stats
df_rating_dropped = df_rat.filter(df_rat.rating!='-1')

In [12]:
# confirm rows were dropped
df_rating_dropped.count()

6337241

In [9]:
# change column names to avoid ambigious naming errors post merging
df_rating_dropped = df_rating_dropped.selectExpr("user_id as user_id","anime_id as anime_id_rating", "rating as rating_rating")


In [10]:
# full outer join on anime_id with anime_ids, ensures no data is lost
outer_join_df = df_anime.join(df_rating_dropped, df_anime.anime_id == df_rating_dropped.anime_id_rating, "fullouter")
#filter out null values (2)
outer_join_df_filtered = outer_join_df.filter(outer_join_df.anime_id. isNotNull())
# cast types of two ratings for consistency
outer_join_df = outer_join_df_filtered.withColumn("rating_rating", outer_join_df_filtered.rating_rating.cast('double'))

In [14]:
outer_join_df.show()

+--------+------------+--------------------+----+--------+------+-------+-------+---------------+-------------+
|anime_id|        name|               genre|type|episodes|rating|members|user_id|anime_id_rating|rating_rating|
+--------+------------+--------------------+----+--------+------+-------+-------+---------------+-------------+
|       1|Cowboy Bebop|Action, Adventure...|  TV|      26|  8.82| 486824|     19|              1|           10|
|       1|Cowboy Bebop|Action, Adventure...|  TV|      26|  8.82| 486824|     21|              1|            9|
|       1|Cowboy Bebop|Action, Adventure...|  TV|      26|  8.82| 486824|     23|              1|            9|
|       1|Cowboy Bebop|Action, Adventure...|  TV|      26|  8.82| 486824|     32|              1|           10|
|       1|Cowboy Bebop|Action, Adventure...|  TV|      26|  8.82| 486824|     34|              1|            7|
|       1|Cowboy Bebop|Action, Adventure...|  TV|      26|  8.82| 486824|     43|              1|       

**3. Student Activity: Now do some exploratory analysis. You must write the script for the third and fourth question here. Make sure to check if the script is running is correctly or not**

Q3. Find the top 10 anime based on user rating. Use tabular/graphical presentation to provide evidence of your analysis.

Q4. Find the top 10 genre based on user rating. Use tabular/graphical presentation to provide evidence of your analysis.

I will display the top 10 anime with only using ratings from the ratings.csv and then only using ratings from anime.csv

In [18]:
outer_join_df.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- members: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- anime_id_rating: integer (nullable = true)
 |-- rating_rating: double (nullable = true)



now to find top 10 anime using anime csv ratings only

In [23]:
# create a new df consisting of anime_ids and a corresponding array containing all the ratings for that anime_id
grouped_ratings = outer_join_df_filtered.groupBy('anime_id').agg({'rating': 'collect_list'}).toDF('anime_id', 'rating')

In [24]:
# add new column to the new df of the average of the aggregated ratings using the udf created above
grouped_ratings_anime_rating = grouped_ratings.withColumn("avg_rating", array_mean("rating"))

In [25]:
# create a new column 'size' indicaitng the size of the rating array and therefor number of ratings, filter out animes with less than 20 ratings
grouped_ratings_anime_rating = grouped_ratings_anime_rating.withColumn("size", F.size(F.col('rating')))
grouped_ratings_anime_rating_filtered = grouped_ratings_anime_rating.filter(F.col("size") >= 20)

Table below shows the top 10 anime's based on ratings found in the user ratings.csv for animes with more than 20 reviews.

In [28]:
# show top 10 highest average ratings in descending order
grouped_ratings_anime_rating_filtered.sort(col("rating").desc()).show(n=10)

+--------+--------------------+----------+-----+
|anime_id|              rating|avg_rating| size|
+--------+--------------------+----------+-----+
|   32281|[9.37, 9.37, 9.37...|      9.37| 1961|
|    5114|[9.26, 9.26, 9.26...|      9.26|21494|
|   28977|[9.25, 9.25, 9.25...|      9.25| 1188|
|    9253|[9.17, 9.17, 9.17...|      9.17|17151|
|    9969|[9.16, 9.16, 9.16...|      9.16| 3115|
|   32935|[9.15, 9.15, 9.15...|      9.15| 1038|
|   11061|[9.13, 9.13, 9.13...|      9.13| 7477|
|   15417|[9.11, 9.11, 9.11...|      9.11| 2126|
|     820|[9.11, 9.11, 9.11...|      9.11|  803|
|   15335|[9.1, 9.1, 9.1, 9...|       9.1| 2147|
+--------+--------------------+----------+-----+
only showing top 10 rows



finding top 10 anime using ratings csv only

In [19]:
# repeat steps below for rating.csv df containing user ratings
grouped_ratings_ratings = outer_join_df_filtered.groupBy('anime_id').agg({'rating_rating': 'collect_list'}).toDF('anime_id', 'rating_rating')
grouped_ratings_ratings = grouped_ratings_ratings.withColumn("avg_rating", array_mean("rating_rating"))
grouped_ratings_ratings = grouped_ratings_ratings.withColumn("size", F.size(F.col('rating_rating')))
grouped_ratings_ratings_filtered = grouped_ratings_ratings.filter(F.col("size") >= 20)

Table below shows the top 10 anime's based on ratings found in the user ratings .csv for animes with more than 20 reviews.

In [39]:
grouped_ratings_ratings_filtered.sort(col("avg_rating").desc()).show(n=10)

+--------+--------------------+-----+----------+
|anime_id|       rating_rating| size|avg_rating|
+--------+--------------------+-----+----------+
|   28977|[10, 10, 7, 10, 8...| 1188|  9.449495|
|   32281|[9, 5, 8, 10, 10,...| 1961|  9.426313|
|     820|[10, 10, 10, 6, 9...|  803|  9.389789|
|    5114|[10, 10, 9, 10, 1...|21494| 9.3227415|
|    9969|[6, 9, 7, 8, 10, ...| 3115| 9.2725525|
|    9253|[9, 7, 10, 9, 10,...|17151|  9.261326|
|     918|[9, 7, 9, 10, 10,...| 4264|  9.236398|
|   11061|[9, 9, 8, 10, 9, ...| 7477|  9.234586|
|   15417|[6, 8, 10, 8, 10,...| 2126|  9.202258|
|   15335|[6, 10, 10, 9, 8,...| 2147|   9.19143|
+--------+--------------------+-----+----------+
only showing top 10 rows



now to work out most popular genre

In [30]:
outer_join_df_filtered.show(n=5)

+--------+--------------------+--------------------+----+--------+------+-------+-------+---------------+-------------+
|anime_id|                name|               genre|type|episodes|rating|members|user_id|anime_id_rating|rating_rating|
+--------+--------------------+--------------------+----+--------+------+-------+-------+---------------+-------------+
|    5114|Fullmetal Alchemi...|Action, Adventure...|  TV|      64|  9.26| 793665|      3|           5114|           10|
|    5114|Fullmetal Alchemi...|Action, Adventure...|  TV|      64|  9.26| 793665|     10|           5114|           10|
|    5114|Fullmetal Alchemi...|Action, Adventure...|  TV|      64|  9.26| 793665|     11|           5114|            8|
|    5114|Fullmetal Alchemi...|Action, Adventure...|  TV|      64|  9.26| 793665|     12|           5114|            9|
|    5114|Fullmetal Alchemi...|Action, Adventure...|  TV|      64|  9.26| 793665|     17|           5114|           10|
+--------+--------------------+---------

In [11]:
# using user ratings (rating_rating) create a dataframe with an aggregated collection of ratings for each genre
genre_df = outer_join_df_filtered.groupBy('genre').agg({'rating_rating': 'collect_list'}).toDF('genre', 'rating_rating')

In [12]:
# show current state of genre_df info
genre_df.show()

+--------------------+--------------------+
|               genre|       rating_rating|
+--------------------+--------------------+
|                null|[6, 5, 7, 6, 6, 4...|
|   Action, Adventure|[7, 5, 5, 5, 6, 4...|
|Action, Adventure...|[9, 8, 9, 5, 7, 6...|
|Action, Adventure...|[5, 6, 7, 6, 7, 3...|
|Action, Adventure...|[9, 6, 7, 8, 7, 1...|
|Action, Adventure...|[1, 4, 8, 8, 9, 1...|
|Action, Adventure...|                  []|
|Action, Adventure...|[8, 8, 8, 6, 5, 8...|
|Action, Adventure...|[5, 7, 7, 7, 7, 8...|
|Action, Adventure...|[7, 7, 4, 6, 8, 8...|
|Action, Adventure...|[8, 7, 3, 7, 8, 7...|
|Action, Adventure...|[8, 8, 9, 8, 7, 1...|
|Action, Adventure...|[9, 7, 6, 8, 10, ...|
|Action, Adventure...|                  []|
|Action, Adventure...|[6, 7, 9, 8, 9, 8...|
|Action, Adventure...|[9, 9, 7, 7, 7, 7...|
|Action, Adventure...|[7, 6, 8, 8, 8, 9...|
|Action, Adventure...|[4, 9, 9, 5, 7, 7...|
|Action, Adventure...|[5, 9, 6, 5, 5, 8...|
|Action, Adventure...|[8, 6, 5, 

In [13]:
# explode dataframe so all ratings are applied to each genre for rows with multiple genres
df_genre_expl = genre_df.withColumn("genre", explode(split("genre", "[,]")))
df_genre_expl.fillna("")
df_genre_expl.show()

+----------+--------------------+
|     genre|       rating_rating|
+----------+--------------------+
|    Action|[7, 5, 5, 5, 6, 4...|
| Adventure|[7, 5, 5, 5, 6, 4...|
|    Action|[9, 8, 9, 5, 7, 6...|
| Adventure|[9, 8, 9, 5, 7, 6...|
|      Cars|[9, 8, 9, 5, 7, 6...|
|    Sci-Fi|[9, 8, 9, 5, 7, 6...|
|    Action|[5, 6, 7, 6, 7, 3...|
| Adventure|[5, 6, 7, 6, 7, 3...|
|    Comedy|[5, 6, 7, 6, 7, 3...|
|    Action|[9, 6, 7, 8, 7, 1...|
| Adventure|[9, 6, 7, 8, 7, 1...|
|    Comedy|[9, 6, 7, 8, 7, 1...|
|    Demons|[9, 6, 7, 8, 7, 1...|
|     Drama|[9, 6, 7, 8, 7, 1...|
|     Ecchi|[9, 6, 7, 8, 7, 1...|
|    Horror|[9, 6, 7, 8, 7, 1...|
|   Mystery|[9, 6, 7, 8, 7, 1...|
|   Romance|[9, 6, 7, 8, 7, 1...|
|    Sci-Fi|[9, 6, 7, 8, 7, 1...|
|    Action|[1, 4, 8, 8, 9, 1...|
+----------+--------------------+
only showing top 20 rows



In [15]:
# collect the ratings again for each genre, so now each genre contains all ratings assigned to it either in isolation or as part of a multi-genre anime
genres_all_ratings = df_genre_expl.groupBy('genre').agg({'rating_rating': 'collect_list'}).toDF('genre', 'rating_rating')
genres_all_ratings.show()

+--------------+--------------------+
|         genre|       rating_rating|
+--------------+--------------------+
|        Seinen|[[6], [6, 8, 7, 5...|
|        Sports|[[5, 9, 6, 5, 5, ...|
|         Harem|[[5, 7, 7, 9, 7, ...|
|    Shounen Ai|[[5, 8, 7, 6, 6, ...|
|         Ecchi|[[9, 6, 7, 8, 7, ...|
|      Military|[[7, 7, 4, 6, 8, ...|
|      Dementia|[[3, 1, 2, 6, 1, ...|
|       Romance|[[6, 9, 3, 3, 3, ...|
|    Historical|[[6, 9, 10, 8, 7,...|
|         Magic|[[6, 7, 8, 3, 8, ...|
|      Thriller|[[7, 7, 8, 6, 8, ...|
| Psychological|[[8], [4, 8, 8, 6...|
|         Josei|[[7, 8, 9, 9, 10,...|
|       Romance|[[9, 6, 7, 8, 7, ...|
|        Demons|[[9, 6, 7, 8, 7, ...|
|          Yuri|[[4, 3], [10, 6, ...|
|     Adventure|[[9, 7, 8, 4, 4, ...|
|   Super Power|[[5, 6, 5, 5, 3, ...|
|  Martial Arts|[[8, 6, 5, 6, 8, ...|
|        Sports|[[5, 6, 8, 4, 6, ...|
+--------------+--------------------+
only showing top 20 rows



In [16]:
# flatten the nested array so average can be calculated
df_genre_flat = genres_all_ratings.select(genres_all_ratings.genre,flatten(genres_all_ratings.rating_rating))
df_genre_flat.show()

+--------------+----------------------+
|         genre|flatten(rating_rating)|
+--------------+----------------------+
|        Seinen|  [6, 6, 8, 7, 5, 6...|
|        Sports|  [5, 9, 6, 5, 5, 8...|
|         Harem|  [5, 7, 7, 9, 7, 7...|
|    Shounen Ai|  [5, 8, 7, 6, 6, 6...|
|         Ecchi|  [9, 6, 7, 8, 7, 1...|
|      Military|  [7, 7, 4, 6, 8, 8...|
|      Dementia|  [3, 1, 2, 6, 1, 4...|
|       Romance|  [6, 9, 3, 3, 3, 6...|
|    Historical|  [6, 9, 10, 8, 7, ...|
|         Magic|  [6, 7, 8, 3, 8, 6...|
|      Thriller|  [7, 7, 8, 6, 8, 5...|
| Psychological|  [8, 4, 8, 8, 6, 5...|
|         Josei|  [7, 8, 9, 9, 10, ...|
|       Romance|  [9, 6, 7, 8, 7, 1...|
|        Demons|  [9, 6, 7, 8, 7, 1...|
|          Yuri|  [4, 3, 10, 6, 6, ...|
|     Adventure|  [9, 7, 8, 4, 4, 7...|
|   Super Power|  [5, 6, 5, 5, 3, 2...|
|  Martial Arts|  [8, 6, 5, 6, 8, 8...|
|        Sports|  [5, 6, 8, 4, 6, 5...|
+--------------+----------------------+
only showing top 20 rows



In [17]:
# calculate the average and show the top 10 genres by average rating, applying the udf created earlier
genres_avg = df_genre_flat.withColumn("avg_rating", array_mean("flatten(rating_rating)"))
genres_avg_filtered = genres_avg.filter(genres_avg.avg_rating.isNotNull())
genres_avg_filtered.sort(col("avg_rating").desc()).show(n=11)

+--------------+----------------------+----------+
|         genre|flatten(rating_rating)|avg_rating|
+--------------+----------------------+----------+
|         Space|                    []|       NaN|
|         Josei|  [7, 8, 9, 9, 10, ...|  8.574034|
|        Sci-Fi|  [7, 7, 6, 8, 3, 5...|  8.502633|
|      Thriller|  [7, 8, 10, 9, 5, ...|  8.381566|
|       Mystery|  [7, 8, 7, 10, 9, ...|  8.355527|
| Psychological|  [4, 7, 2, 5, 7, 1...|  8.327118|
|        Police|  [7, 6, 6, 7, 6, 8...| 8.1723175|
|      Military|  [7, 7, 4, 6, 8, 8...|  8.107572|
| Psychological|  [8, 4, 8, 8, 6, 5...|  8.104771|
|       Samurai|  [7, 6, 7, 9, 9, 8...|   8.09543|
|    Historical|  [6, 9, 10, 8, 7, ...|  8.080772|
+--------------+----------------------+----------+
only showing top 11 rows



**4. Student Activity: Design the recommendation system. Remember to split the dataset into training and testing to validate your recommendation model. This section would help you in answering question 5**

Q5. Design a collaborative filter-based recommendation system. 

In [11]:
# 60:20 training:test split, df_rat_dropped has -1's removed
seed = 100
(df_train_60, df_test_20) = df_rating_dropped.randomSplit([0.6,0.8],seed)

# Let's cache for performance
train_df = df_train_60.cache()
test_df = df_test_20.cache()

train_df.show(3)
test_df.show(3)

+-------+---------------+-------------+
|user_id|anime_id_rating|rating_rating|
+-------+---------------+-------------+
|      1|          11617|           10|
|      2|          11771|           10|
|      3|             20|            8|
+-------+---------------+-------------+
only showing top 3 rows

+-------+---------------+-------------+
|user_id|anime_id_rating|rating_rating|
+-------+---------------+-------------+
|      1|           8074|           10|
|      1|          11757|           10|
|      1|          15451|           10|
+-------+---------------+-------------+
only showing top 3 rows



In [12]:
# Alternating Least Squares

# initialize ALS learner 
als = ALS(maxIter=5, seed=seed, regParam=0.1, userCol="user_id", itemCol="anime_id_rating", ratingCol="rating_rating", coldStartStrategy="drop")

model = als.fit(train_df)
predictions = model.transform(test_df)

# Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(metricName="rmse", predictionCol="prediction", labelCol="rating_rating")

rmse = reg_eval.evaluate(predictions)
print("RMSE = " + str(rmse))

RMSE = 1.2451447506786457


**Student Activity: Analyse the output of the test dataset here.**

Q6. Give example of best three anime recommendations for minimum of 10 users.

In [13]:
# top 10 movie recommendations for 3 users
users = df_rating_dropped.select(als.getUserCol()).distinct().limit(10)
userSubsetRecs = model.recommendForUserSubset(users, 3)
userSubsetRecs.show(truncate=False)



+-------+-----------------------------------------------------------+
|user_id|recommendations                                            |
+-------+-----------------------------------------------------------+
|1580   |[{9077, 13.942918}, {32400, 11.879543}, {8353, 11.774237}] |
|471    |[{9077, 10.239969}, {8353, 9.373256}, {6733, 9.369171}]    |
|1591   |[{2651, 10.178323}, {31686, 10.013484}, {23639, 9.906234}] |
|1342   |[{3676, 11.724735}, {9558, 11.564286}, {9979, 11.564121}]  |
|463    |[{32400, 11.398967}, {8353, 11.234302}, {31299, 11.035712}]|
|833    |[{8353, 11.288126}, {32400, 11.075176}, {31299, 10.917287}]|
|496    |[{32400, 11.08476}, {8353, 11.030807}, {31299, 10.561193}] |
|148    |[{32400, 10.473956}, {8353, 9.625719}, {31299, 9.509622}]  |
|1088   |[{16474, 12.54911}, {7264, 12.269246}, {4821, 11.90329}]   |
|1238   |[{8353, 11.367343}, {32400, 10.702857}, {31299, 10.557213}]|
+-------+-----------------------------------------------------------+



In [None]:
spark.stop()