**Initial Setup**

1. First, you will setup your CoLab environment. Run the cell below.

In [5]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 25 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=a53682ab22334f8661f0268e78ec64a360ac134edb60ca7596c868af3ae2e423
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-m

Now we authenticate a Google Drive client to download the file we will be processing in our Spark job.

**Make sure to follow the interactive instructions.**

In [1]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

Download both anime.csv and rating.csv, and store it in your google drive. It is advisable to create a separate project folder, where you can store this dataset and also your code.

The script will give you the id of the two files in your drive.

In [2]:
file_list = drive.ListFile({'q': "'1Oi8cMnAfJVZH9-FyXGxwOrGGCIkkB7uy' in parents"}).GetList()
for f in file_list:
  print('title: %s, id: %s' % (f['title'], f['id']))

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 33, in <module>
    from oauth2client.contrib.locked_file import LockedFile
ModuleNotFoundError: No module named 'oauth2client.contrib.locked_file'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 37, in <module>
    from oauth2client.locked_file import LockedFile
ModuleNotFoundError: No module named 'oauth2client.locked_file'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/googleapiclient/discovery_cache/__init__.py", line 44, in autodetect
    from . import file_cache
  File "/usr/local/lib/python3.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 41, in <module>
    "file_cach

title: rating.csv, id: 1f76dQZxRB1fNaReBv_DnUDVkIXNm7mw9
title: anime.csv, id: 1TppJoj4QVJlc_HML20xmH847Brrw0Zfc


If you executed the cells below, you should be able to see the dataset we will need for this Colab under the "Files" tab on the left panel.

In [3]:
# Change the id, if it differs from the one below.
id='1TppJoj4QVJlc_HML20xmH847Brrw0Zfc'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('anime.csv')

id='1f76dQZxRB1fNaReBv_DnUDVkIXNm7mw9'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('rating.csv')

Here is a list of packages that might be useful to you. 

**Student Activity: Add the packages you need to carry out your analysis here** 

In [6]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

# Student Activity: Add your packages here.
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pyspark
from pyspark.sql.types import IntegerType
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import IndexToString
from pyspark.sql.functions import array, col, lit, struct
from pyspark.sql.functions import avg

**This step initializes the Spark context.**

In [7]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

You can easily check the current version and get the link of the web interface. In the Spark UI, you can monitor the progress of your job and debug the performance bottlenecks (if your Colab is running with a local runtime).

In [None]:
spark

## **From this point onwards, you are supposed to do the coding yourself. Follow the steps as mentioned below in its appropriate place.**

**1. Student Activity: Read the datasets here. You must write the script for the first question and explore both the files here.**

Q1. Identify and describe the number of columns in the two dataset files.

In [8]:
anime = spark.read.csv('/content/anime.csv',header=True)
rating = spark.read.csv('/content/rating.csv',header=True)

In [9]:
# print the columns in the anime dataset
print('There are %s columns in the anime dataset'% len(anime.columns))
anime.columns

There are 7 columns in the anime dataset


['anime_id', 'name', 'genre', 'type', 'episodes', 'rating', 'members']

In [10]:
# print the columns in the rating dataset
print('There are %s columns in the anime dataset'% len(rating.columns))
rating.columns

There are 3 columns in the anime dataset


['user_id', 'anime_id', 'rating']

In [11]:
# To avoid ambiguity, the rating colum from anime and rating data can be renamed
anime = anime.selectExpr('anime_id as anime_id', 'name as name', 'genre as genre', 'type as type',' episodes as episodes',\
                         'rating as anime_rating','members as members')
rating = rating.selectExpr('user_id as user_id', 'anime_id as anime_id' ,'rating as user_rating')


**2. Student Activity: Preprocess the datasets here. You must write the script for the second question here. Make sure to check if the script is running is correctly or not**

Q2. Merge/Join/Combine the two datasets and identify the key common column that would you have performed? 

In [12]:
# perfrom an inner join on the datasets
data = anime.join(rating, on='anime_id', how='inner')
data.show()

+--------+--------------------+--------------------+-----+--------+------------+-------+-------+-----------+
|anime_id|                name|               genre| type|episodes|anime_rating|members|user_id|user_rating|
+--------+--------------------+--------------------+-----+--------+------------+-------+-------+-----------+
|      20|              Naruto|Action, Comedy, M...|   TV|     220|        7.81| 683297|      1|         -1|
|      24|       School Rumble|Comedy, Romance, ...|   TV|      26|        8.06| 178553|      1|         -1|
|      79|            Shuffle!|Comedy, Drama, Ec...|   TV|      24|        7.31| 158772|      1|         -1|
|     226|          Elfen Lied|Action, Drama, Ho...|   TV|      13|        7.85| 623511|      1|         -1|
|     241|Girls Bravo: Firs...|Comedy, Ecchi, Fa...|   TV|      11|        6.69|  84395|      1|         -1|
|     355|   Shakugan no Shana|Action, Drama, Fa...|   TV|      24|        7.74| 297058|      1|         -1|
|     356|     Fate

**3. Student Activity: Now do some exploratory analysis. You must write the script for the third and fourth question here. Make sure to check if the script is running is correctly or not**

Q3. Find the top 10 anime based on rating. Use tabular/graphical presentation to provide evidence of your analysis.

Q4. Find the top 10 anime with the most episodes. Use tabular/graphical presentation to provide evidence of your analysis.

In [14]:
# top 10 anime based on rating
data_sort_rating=data.withColumn('user_rating_',data['user_rating'].cast('int')).groupBy('name').avg('user_rating_')
data_sort_rating = data_sort_rating.sort(data_sort_rating['avg(user_rating_)'].desc()).show(10,truncate=200)


+----------------------------------------------------------------+-----------------+
|                                                            name|avg(user_rating_)|
+----------------------------------------------------------------+-----------------+
|                               Warui no wo Taose!! Salaryman Man|             10.0|
|                                                      Shiroi Zou|             10.0|
|                                              Choegang Top Plate|             10.0|
|                                      STAR BEAT!: Hoshi no Kodou|             10.0|
|                                                      Shiranpuri|              9.0|
|Yakushiji Ryouko no Kaiki Jikenbo: Hamachou, Voice &amp; Fiction|              9.0|
|                                               Tang Lang Bu Chan|              9.0|
|                                                      Doukyuusei|              9.0|
|                                                   Steins;Gate 0

In [15]:
 # top 10 anime with the most episodes
data_sort_episodes=data.withColumn('episodes_',data['episodes'].cast('int')).groupBy('name').max('episodes_')
data_sort_episodes = data_sort_episodes.sort(data_sort_episodes['max(episodes_)'].desc()).show(10,truncate=200)

+-------------------------------------+--------------+
|                                 name|max(episodes_)|
+-------------------------------------+--------------+
|                           Oyako Club|          1818|
|                      Doraemon (1979)|          1787|
|               Kirin Monoshiri Yakata|          1565|
|   Manga Nippon Mukashibanashi (1976)|          1471|
|                     Hoka Hoka Kazoku|          1428|
|Monoshiri Daigaku: Ashita no Calendar|          1274|
|                Sekai Monoshiri Ryoko|          1006|
|                       Kotowaza House|           773|
|       Shima Shima Tora no Shimajirou|           726|
|                    Ninja Hattori-kun|           694|
+-------------------------------------+--------------+
only showing top 10 rows



In [16]:
# top 10 genre based on user rating
data_sort_rating_genre=data.withColumn('user_rating_',data['user_rating'].cast('int')).groupBy('genre').avg('user_rating_')
data_sort_rating_genre = data_sort_rating_genre.sort(data_sort_rating_genre['avg(user_rating_)'].desc()).show(10,truncate=200)


+---------------------------------------------------------------+------------------+
|                                                          genre| avg(user_rating_)|
+---------------------------------------------------------------+------------------+
|                                       Action, Historical, Kids|              10.0|
|    Action, Adventure, Drama, Fantasy, Magic, Military, Shounen| 8.028933018637584|
|                            Action, Comedy, Historical, Samurai|               8.0|
|           Drama, Fantasy, Romance, Slice of Life, Supernatural| 7.835275008401479|
|Drama, Horror, Mystery, Police, Psychological, Seinen, Thriller| 7.809098824553765|
|            Action, Drama, Mecha, Military, Sci-Fi, Super Power|  7.76594340400957|
|                         Drama, Music, Romance, School, Shounen| 7.740262489415749|
|                            Drama, Historical, Seinen, Thriller| 7.713305001634521|
|                                               Sci-Fi, Thriller|

**4. Student Activity: Design the recommendation system. Remember to split the dataset into training and testing to validate your recommendation model. This section would help you in answering question 5**

Q5. Design a collaborative filter-based recommendation system. 

In [17]:
data2 = data.selectExpr('user_id','name','user_rating')
data2 = data2.withColumn('user_rating',data['user_rating'].cast('int'))
data2 = data2.withColumn('user_id',data['user_id'].cast('int'))
data2 = data2.groupBy('name','user_id').avg('user_rating')
indexer = StringIndexer(inputCol="name", outputCol="names") 
indexer=indexer.fit(data2) 
data2 = indexer.transform(data2)


In [18]:

# dataset split into training and testing set
(training, test) = data2.randomSplit([0.8, 0.2])
# training the model
als = ALS(maxIter=5, implicitPrefs=True,userCol="user_id", itemCol="names", ratingCol='avg(user_rating)',coldStartStrategy="drop")
model = als.fit(training)

In [19]:
# predict using the testing datatset
predictions = model.transform(test)
predictions.show()

+--------------------+-------+----------------+------+-----------+
|                name|user_id|avg(user_rating)| names| prediction|
+--------------------+-------+----------------+------+-----------+
|       &quot;0&quot;|  24548|            -1.0|6591.0|        0.0|
|       &quot;0&quot;|  34232|            -1.0|6591.0|0.011608885|
|       &quot;0&quot;|  35199|             5.0|6591.0| 0.02506883|
|&quot;Bungaku Sho...|   1313|             8.0|1995.0| 0.10274658|
|&quot;Bungaku Sho...|   1866|             8.0|1995.0|0.072162464|
|&quot;Bungaku Sho...|   5392|             8.0|1995.0|  0.3995214|
|&quot;Bungaku Sho...|   5504|             7.0|1995.0| 0.57809544|
|&quot;Bungaku Sho...|   6814|             5.0|1995.0| 0.51406324|
|&quot;Bungaku Sho...|   7305|             7.0|1995.0|   0.224262|
|&quot;Bungaku Sho...|   7340|             6.0|1995.0| 0.42350265|
|&quot;Bungaku Sho...|   7486|             9.0|1995.0| 0.20218632|
|&quot;Bungaku Sho...|   8087|             5.0|1995.0|  0.5044

**Student Activity: Analyse the output of the test dataset here.**

Q6. Give example of best three anime recommendations for minimum of 10 users.

In [20]:
name_labels = indexer.labels

id_name={x:y for x,y in enumerate(list(name_labels))}


In [21]:
def recommendedArtists(userId,limit=3):
 test =  model.recommendForAllUsers(limit).filter(col('user_id')==userId).select("recommendations").collect()
 topanime = []
 for item in test[0][0]:
  topanime.append(id_name[item.names])
 return topanime


In [22]:
x=0
while x<10:
  id_to_predict =  np.random.randint(1000)
  recommended = recommendedArtists(id_to_predict)
  print ('*' *100)
  print ("These are the recommended  anime for user %s" % id_to_predict)
  print (recommended)
  print ('*' *100)
  print ('')
  x=x+1



****************************************************************************************************
These are the recommended  anime for user 364
['Fullmetal Alchemist: Brotherhood', 'Death Note', 'Sword Art Online']
****************************************************************************************************

****************************************************************************************************
These are the recommended  anime for user 648
['Shingeki no Kyojin', 'Death Note', 'Sword Art Online']
****************************************************************************************************

****************************************************************************************************
These are the recommended  anime for user 215
['Death Note', 'Shingeki no Kyojin', 'Sword Art Online']
****************************************************************************************************

********************************************************************