**Using Google collab to solve question 4**

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.6.tgz
!tar xf spark-2.4.5-bin-hadoop2.6.tgz
!pip install -q findspark

**Set environment variables:**

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.6"

**Creating spark session**

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

**Import Spark Libraries**

In [None]:
from pyspark.sql.types import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

**Loading spark Context and set config parameters:**

In [None]:
sc = spark.sparkContext
spark.conf.set("spark.executor.memory", '8g')
spark.conf.set('spark.executor.cores', '3')
spark.conf.set('spark.cores.max', '3')
spark.conf.set("spark.driver.memory",'8g')

**Mount drive**

In [79]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**Reading ratings.csv**

In [None]:
import pandas as pd 
data=pd.read_csv('/content/drive/My Drive/ratings.csv')

In [81]:
data.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,296,5.0,1147880044
1,1,306,3.5,1147868817
2,1,307,5.0,1147868828
3,1,665,5.0,1147878820
4,1,899,3.5,1147868510


**Creating RDD**

In [82]:
data_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/content/drive/My Drive/ratings.csv")
data_df.withColumn('userId', data_df.userId.cast(IntegerType()))
data_df.withColumn('movieId', data_df.movieId.cast(IntegerType()))
data_df.withColumn('rating', data_df.rating.cast(FloatType()))
data_df.withColumn('timestamp', data_df.timestamp.cast(IntegerType()))

DataFrame[userId: int, movieId: int, rating: double, timestamp: int]

**Splitting training and test data in 9:1 ratio**

In [None]:
(training, test) = data_df.randomSplit([0.9, 0.1])

**Build the recommendation model using ALS on the training data**

In [None]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

**Fitting training data**

In [None]:
model = als.fit(training)

**Evaluate the model by computing the RMSE on the test data**

In [86]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.8091085123503021


**Generate top 10 movie recommendations for each user**

In [93]:
userRecs = model.recommendForAllUsers(10)
userRecs.head()

Row(userId=148, recommendations=[Row(movieId=134037, rating=10.424752235412598), Row(movieId=178393, rating=9.718330383300781), Row(movieId=151410, rating=9.486729621887207), Row(movieId=153014, rating=9.359319686889648), Row(movieId=180851, rating=9.269782066345215), Row(movieId=205741, rating=9.269782066345215), Row(movieId=175173, rating=9.269782066345215), Row(movieId=148741, rating=9.269782066345215), Row(movieId=201280, rating=9.269782066345215), Row(movieId=174771, rating=9.269782066345215)])

**Generate top 10 user recommendations for each movie**

In [None]:
movieRecs = model.recommendForAllItems(10)
movieRecs.head()

**Generate top 10 movie recommendations for a specified set of users**

In [None]:
users = data_df.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)

**Generate top 10 user recommendations for a specified set of movies**

In [None]:
movies = data_df.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)