#Author: BENAZZOU Adnane
###ESI ICD - GDV

# Setting up PySpark

##Mounting Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


##Installing PySpark and jdk8

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:8 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:11 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [73.9 kB]
Hit:12 http://ppa.launchpad.net/cran/

In [None]:
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz


In [None]:
!rm -r "/content/spark-2.4.8-bin-hadoop2.7"

rm: cannot remove '/content/spark-2.4.8-bin-hadoop2.7': No such file or directory


In [None]:
!tar xf spark-2.4.8-bin-hadoop2.7.tgz

In [None]:
!pip install -q findspark
!ls "/content"

drive  sample_data  spark-2.4.8-bin-hadoop2.7  spark-2.4.8-bin-hadoop2.7.tgz


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"

##Initializing PySpark

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
sc = spark.sparkContext

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [None]:
!ls "/content/drive/MyDrive/ml-100k"

 allbut.pl  'u (1).user.gdoc'   u4.base   ua.test   u.info
 mku.sh      u2.base	        u4.test   ub.base   u.item
 README      u2.test	        u5.base   ub.test   u.occupation
 u1.base     u3.base	        u5.test   u.data    u.user
 u1.test     u3.test	        ua.base   u.genre   u.user.gdoc


# Loading Ratings Data

In [None]:
lines = sc.textFile("/content/drive/MyDrive/ml-100k/u.data")
parts = lines.map(lambda row: row.split("\t"))
print(parts.first())
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
print(ratings.head())

['196', '242', '3', '881250949']
Row(movieId=242, rating=3.0, timestamp=881250949, userId=196)


# Initial/Given ALS Model

##Training

In [None]:
(training, test) = ratings.randomSplit([0.8, 0.2])
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", rank = 51, coldStartStrategy="drop", nonnegative= True)
param_grid=ParamGridBuilder().addGrid(als.maxIter,[10, 20]).addGrid(als.regParam,[.1,.01, .001]).build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
tvs= TrainValidationSplit(estimator=als, estimatorParamMaps=param_grid,evaluator=evaluator)
print ("start training")
model = tvs.fit(training)

start training


KeyboardInterrupt: ignored

##Evaluating

In [None]:
best_model = model.bestModel
print("start testing")
predictions = best_model.transform(test)
print("start evaluation")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = ", str(rmse))
print("**bestModel**")
print(" rank:", best_model.rank)
print(" maxIter:", best_model._java_obj.parent().getMaxIter())
print(" regParam:", best_model._java_obj.parent().getRegParam())
predictions.sort("userId", "rating").show()

# Exploring Movies Datasets

In [None]:
movies_raw = sc.textFile("/content/drive/MyDrive/ml-100k/u.item")
movies = movies_raw.map(lambda row: row.split("|"))
print(movies.first())
moviesRDD = movies.map(lambda p: Row(movieId=int(p[0]), movieName=p[1], releaseDate = p[2], url = p[4]))
moviesDF = spark.createDataFrame(moviesRDD)
moviesDF.show()

['1', 'Toy Story (1995)', '01-Jan-1995', '', 'http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)', '0', '0', '0', '1', '1', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0']
+-------+--------------------+-----------+--------------------+
|movieId|           movieName|releaseDate|                 url|
+-------+--------------------+-----------+--------------------+
|      1|    Toy Story (1995)|01-Jan-1995|http://us.imdb.co...|
|      2|    GoldenEye (1995)|01-Jan-1995|http://us.imdb.co...|
|      3|   Four Rooms (1995)|01-Jan-1995|http://us.imdb.co...|
|      4|   Get Shorty (1995)|01-Jan-1995|http://us.imdb.co...|
|      5|      Copycat (1995)|01-Jan-1995|http://us.imdb.co...|
|      6|Shanghai Triad (Y...|01-Jan-1995|http://us.imdb.co...|
|      7|Twelve Monkeys (1...|01-Jan-1995|http://us.imdb.co...|
|      8|         Babe (1995)|01-Jan-1995|http://us.imdb.co...|
|      9|Dead Man Walking ...|01-Jan-1995|http://us.imdb.co...|
|     10|  Richard III (1995)|22-Ja

In [None]:
r = moviesRDD.map(lambda x: x[0]).count()
print(r)

1682


# Checking out joins between users and movies + occupations based approach

In [None]:
users_raw = sc.textFile("/content/drive/MyDrive/ml-100k/u.user")
users = users_raw.map(lambda row: row.split("|"))
print(users.first())
usersRDD = users.map(lambda p: Row(userId=int(p[0]),userAge=int(p[1]), userGender = p[2], userOccupation = p[3] ))
usersDF = spark.createDataFrame(usersRDD)
print(usersDF.head())

['1', '24', 'M', 'technician', '85711']
Row(userAge=24, userGender='M', userId=1, userOccupation='technician')


In [None]:
movies_rated = ratings.join(moviesDF, ['movieId'], how='inner')
user_movies_rated = movies_rated.join(usersDF, ['userId'], how='inner')
user_movies_rated.show()

+------+-------+------+---------+--------------------+-----------+--------------------+-------+----------+--------------+
|userId|movieId|rating|timestamp|           movieName|releaseDate|                 url|userAge|userGender|userOccupation|
+------+-------+------+---------+--------------------+-----------+--------------------+-------+----------+--------------+
|    26|   1010|   2.0|891377609|     Basquiat (1996)|16-Aug-1996|http://us.imdb.co...|     49|         M|      engineer|
|    26|    222|   3.0|891371369|Star Trek: First ...|22-Nov-1996|http://us.imdb.co...|     49|         M|      engineer|
|    26|    293|   3.0|891371369|Donnie Brasco (1997)|28-Feb-1997|http://us.imdb.co...|     49|         M|      engineer|
|    26|    926|   2.0|891385692|Down Periscope (1...|01-Mar-1996|http://us.imdb.co...|     49|         M|      engineer|
|    26|    831|   2.0|891379753|Escape from L.A. ...|09-Aug-1996|http://us.imdb.co...|     49|         M|      engineer|
|    26|    237|   3.0|8

##Occupations Based Approach

In [None]:
#future exploration assign mean of ratings for users with same occupation
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

array_mean = udf(lambda x: float(np.mean(x)), FloatType())#user defined function
df_group = user_movies_rated.groupBy('userOccupation', 'movieId').agg(array_mean(F.collect_list('rating')).alias('ratings'))
df_group.show()

+--------------+-------+---------+
|userOccupation|movieId|  ratings|
+--------------+-------+---------+
|      engineer|    257|3.8148148|
|     executive|    481|      4.5|
|     executive|   1011|      3.5|
|       student|     15|     3.85|
|       student|    560|      3.0|
|    programmer|    191|     3.92|
|        writer|     27|2.3333333|
| administrator|     56| 4.032258|
| administrator|    147|3.7333333|
|         other|    326|      3.6|
|       student|    969|3.9444444|
|      educator|     38|3.4285715|
|     executive|    876|1.6666666|
|        artist|     48|      4.0|
|        doctor|    292|      2.0|
|      educator|    520|     3.95|
|      engineer|     42|3.6923077|
| administrator|    520| 4.090909|
| administrator|    595|2.6666667|
|    healthcare|    257| 3.142857|
+--------------+-------+---------+
only showing top 20 rows



In [None]:
#assign mean values to some users whose job is one of the above for a movie that is not defined for them

#collected_users
existing_movie_ids = df_group.select("movieId").rdd.flatMap(lambda x: x).collect()
existing_user_occupations = df_group.select("userOccupation").rdd.flatMap(lambda x: x).collect()
existing_ratings = df_group.select("ratings").rdd.flatMap(lambda x: x).collect()

#create movie array for each user, iterate users that does not have a rating for a selected movie for a specific
#occupation and assign the appropriate value to that user

In [None]:
def fill_some_means(usersRDD, ratings, depth):#usersRDD and ratings DF
  usersPrep = spark.createDataFrame(usersRDD)
  #usersPrep.show()
  prepared_DF = usersPrep.join(ratings, ['userId'], how='inner')
  df1 = prepared_DF.groupBy('userId', 'userOccupation').agg(F.collect_list('movieId').alias('movieIds'), F.collect_list('rating').alias('ratings'))
  #df1.show()
  movies = df1.rdd.map(lambda x: x[2]).collect()
  ratings = df1.rdd.map(lambda x: x[3]).collect()
  users = df1.rdd.map(lambda x: x[0]).collect()
  occupations = df1.rdd.map(lambda x: x[1]).collect()
  user_counter = 0
  final_list = []
  for movie_list in movies:
    tmpVector = [0.0] * 1682
    for j in range(len(movie_list)):
      tmpVector[movie_list[j] - 1] = ratings[user_counter][j] #0 based but movie index 1 based
    final_list.append((users[user_counter], occupations[user_counter], tmpVector))
    user_counter += 1
  final_df =spark.createDataFrame(data = final_list, schema = ["userId","occupation", "movies_rating"])
  #final_df.show()
  final_rdd = final_df.rdd
  rows = final_rdd.map(lambda x: (x[0], x[1], np.array(x[2]))).collect()
  #print(rows[0])
  for i in range(len(rows)):
    if rows[i][1] in existing_user_occupations:
      #we have a mean for some movies
      #use some of these movies to replenish data
      indices = [k for k, x in enumerate(existing_user_occupations) if x == rows[i][1]]
      #####set max depth for each iteration to avoid overflow
      if(depth > len(indices)):
        depth = len(indices)
      #get indices for the movie with such occupation
      for j in range(depth):
        if rows[i][2][existing_movie_ids[indices[j]] - 1] == 0:#movies 0 based
          rows[i][2][existing_movie_ids[indices[j]] - 1] = existing_ratings[indices[j]]
          #if the movie with such occupation is null assign the mean to it
  arr = []
  for row in rows:
    for movie in range(len(row[2])):
      if (row[2][movie] != 0):#non null rating
        arr.append((row[0], movie + 1, float(row[2][movie])))#ignore occupation as we re done using it
  final_prediction_df = spark.createDataFrame(data = arr, schema = ["userId", "movieId", "rating"])
  return final_prediction_df



usersRDD2 = users.map(lambda p: Row(userId=int(p[0]), userOccupation = p[3]))
t = fill_some_means(usersRDD2, ratings, 500)#30%
# of them for each user
t.show()


+------+-------+------------------+
|userId|movieId|            rating|
+------+-------+------------------+
|    26|      1|               3.0|
|    26|      2| 4.166666507720947|
|    26|      3|3.3333332538604736|
|    26|      4|               3.0|
|    26|      7|               3.0|
|    26|      9|               4.0|
|    26|     12|               5.0|
|    26|     13|               3.0|
|    26|     14|               3.0|
|    26|     15|               4.0|
|    26|     16|             3.125|
|    26|     19|              3.75|
|    26|     21| 3.799999952316284|
|    26|     23|            4.1875|
|    26|     24|               3.0|
|    26|     25|               3.0|
|    26|     29|              2.25|
|    26|     30|               3.5|
|    26|     32|               5.0|
|    26|     42|               2.0|
+------+-------+------------------+
only showing top 20 rows



In [None]:
#we filled values according to occupation
print(t.count())
#now lets predict again
(training, test) = t.randomSplit([0.8, 0.2])
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", rank = 51, coldStartStrategy="drop", nonnegative= True)
param_grid=ParamGridBuilder().addGrid(als.maxIter,[10, 20]).addGrid(als.regParam,[.1,.01, .001]).build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
tvs= TrainValidationSplit(estimator=als, estimatorParamMaps=param_grid,evaluator=evaluator)
print ("start training")
model = tvs.fit(training)

336260
start training


In [None]:
best_model = model.bestModel
print("start testing")
predictions = best_model.transform(test)
print("start evaluation")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = ", str(rmse))
print("**bestModel**")
print(" rank:", best_model.rank)
print(" maxIter:", best_model._java_obj.parent().getMaxIter())
print(" regParam:", best_model._java_obj.parent().getRegParam())
predictions.sort("userId", "rating").show()

start testing
start evaluation
Root-mean-square error =  0.6280715945937412
**bestModel**
 rank: 51
 maxIter: 20
 regParam: 0.1
+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|     1|   1057|   1.0|    2.0165|
|     1|    302|   1.0| 3.0318727|
|     1|    243|   1.0| 2.4286635|
|     1|    862|   1.0| 2.2939937|
|     1|    112|   1.0| 2.4761884|
|     1|    103|   1.0| 2.7776766|
|     1|    231|   1.0| 2.8484313|
|     1|    104|   1.0| 1.0534246|
|     1|    252|   2.0| 3.0114105|
|     1|    145|   2.0| 2.6335301|
|     1|     11|   2.0|  3.670374|
|     1|    101|   2.0|  3.131624|
|     1|    264|   2.0| 2.9067798|
|     1|   1053|   2.0| 3.0110815|
|     1|   1040|   2.0|  2.453685|
|     1|    244|   2.0| 3.2660897|
|     1|     34|   2.0| 2.1266098|
|     1|    929|   2.0| 2.3323271|
|     1|    237|   2.0| 3.5575495|
|     1|    225|   2.0| 3.1660943|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
print(df_group.select("movieId").distinct().count())
print(movies_df.count())
#we filled up all the values by the mean so its overfit

1682
1682


# Exploring Cosin Similiarity based on movies genres

In [None]:
#movies.first()
vectors = movies.take(5)
vector_1 = vectors[0][5:]
vector_2 = vectors[3][5:]
vector1 = [int(x) for x in vector_1]
vector2 = [int(x) for x in vector_2]
print(vector1)
print(vector2)

[0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


In [None]:
import numpy as np
v1 = np.array(vector1)
v2 = np.array(vector2)
def cosin_sim(v1, v2):
  return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
print(cosin_sim(v1, v2))

vectorsRDD = movies.map(lambda row: row[5:])
vectorsList = vectorsRDD.collect()
genres_result = []
for genre in vectorsList:
  genres_result.append([int(x) for x in genre])
print(genres_result[0])


movies_id = movies.map(lambda p:int(p[0])).collect()
result = []
for i in range(len(movies_id)):
  result.append((movies_id[i], genres_result[i]))
movies_df =spark.createDataFrame(data = result, schema = ["movies_id", "genre"])
movies_df.show()
len(movies_id)#1682 * 1000 = 1682000 cosin similiarity can make sense for sparse vectors
#each user will have a list of ratings of 1682 element

0.33333333333333337
[0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
+---------+--------------------+
|movies_id|               genre|
+---------+--------------------+
|        1|[0, 0, 0, 1, 1, 1...|
|        2|[0, 1, 1, 0, 0, 0...|
|        3|[0, 0, 0, 0, 0, 0...|
|        4|[0, 1, 0, 0, 0, 1...|
|        5|[0, 0, 0, 0, 0, 0...|
|        6|[0, 0, 0, 0, 0, 0...|
|        7|[0, 0, 0, 0, 0, 0...|
|        8|[0, 0, 0, 0, 1, 1...|
|        9|[0, 0, 0, 0, 0, 0...|
|       10|[0, 0, 0, 0, 0, 0...|
|       11|[0, 0, 0, 0, 0, 0...|
|       12|[0, 0, 0, 0, 0, 0...|
|       13|[0, 0, 0, 0, 0, 1...|
|       14|[0, 0, 0, 0, 0, 0...|
|       15|[0, 0, 0, 0, 0, 0...|
|       16|[0, 0, 0, 0, 0, 1...|
|       17|[0, 1, 0, 0, 0, 1...|
|       18|[0, 0, 0, 0, 0, 0...|
|       19|[0, 0, 0, 0, 0, 0...|
|       20|[0, 0, 0, 0, 0, 0...|
+---------+--------------------+
only showing top 20 rows



1682

##Issue with genres based approach:
Suppose we have 1000 movies that are equally similiar genre wise.
user 1 rated one movie 5.0, we would then assign 999 movies to his previous rating which is 5.0, we would end up with 999 recommendations that the user might just hate all of them. Would you like 999 comedy movies if you liked only one? of course not as there are many other factors including actors in the movie for example.


# Movies Ratings Based Cosin Similiarity

In [None]:
ratings.show()

+-------+------+---------+------+
|movieId|rating|timestamp|userId|
+-------+------+---------+------+
|    242|   3.0|881250949|   196|
|    302|   3.0|891717742|   186|
|    377|   1.0|878887116|    22|
|     51|   2.0|880606923|   244|
|    346|   1.0|886397596|   166|
|    474|   4.0|884182806|   298|
|    265|   2.0|881171488|   115|
|    465|   5.0|891628467|   253|
|    451|   3.0|886324817|   305|
|     86|   3.0|883603013|     6|
|    257|   2.0|879372434|    62|
|   1014|   5.0|879781125|   286|
|    222|   5.0|876042340|   200|
|     40|   3.0|891035994|   210|
|     29|   3.0|888104457|   224|
|    785|   3.0|879485318|   303|
|    387|   5.0|879270459|   122|
|    274|   2.0|879539794|   194|
|   1042|   4.0|874834944|   291|
|   1184|   2.0|892079237|   234|
+-------+------+---------+------+
only showing top 20 rows



In [None]:
usersRDD = users.map(lambda p: Row(userId = int(p[0])))
usersRDD.first()
usersPrep = spark.createDataFrame(usersRDD)
#how to link a sparse vector of movies to every user?
from pyspark.sql import functions as F
prepared_DF = usersPrep.join(ratings, ['userId'], how='inner')
df1 = prepared_DF.groupBy('userId').agg(F.collect_list('movieId').alias('movieIds'), F.collect_list('rating').alias('ratings'))
df1.show()
#ratings.filter(ratings.userId == 1).select("rating", "movieId").rdd.first()

+------+--------------------+--------------------+
|userId|            movieIds|             ratings|
+------+--------------------+--------------------+
|    26|[258, 930, 1015, ...|[3.0, 2.0, 3.0, 4...|
|    29|[245, 189, 332, 5...|[3.0, 4.0, 4.0, 2...|
|   474|[8, 602, 410, 480...|[5.0, 3.0, 2.0, 5...|
|    65|[47, 806, 125, 11...|[2.0, 4.0, 4.0, 4...|
|   191|[302, 286, 751, 7...|[4.0, 4.0, 3.0, 3...|
|   418|[362, 313, 301, 3...|[1.0, 3.0, 2.0, 4...|
|   541|[756, 654, 28, 21...|[4.0, 3.0, 4.0, 5...|
|   558|[116, 936, 124, 5...|[5.0, 5.0, 4.0, 5...|
|   222|[366, 750, 755, 1...|[4.0, 5.0, 4.0, 4...|
|   270|[452, 25, 98, 558...|[4.0, 5.0, 5.0, 5...|
|   293|[5, 685, 471, 126...|[3.0, 3.0, 3.0, 3...|
|   730|[298, 332, 50, 74...|[4.0, 3.0, 4.0, 4...|
|   938|[864, 122, 313, 2...|[4.0, 1.0, 5.0, 2...|
|   243|[15, 1039, 208, 6...|[3.0, 4.0, 4.0, 4...|
|   278|[603, 311, 301, 9...|[5.0, 4.0, 2.0, 4...|
|   367|[1012, 250, 246, ...|[4.0, 5.0, 4.0, 4...|
|   442|[401, 385, 31, 18...|[2

In [None]:
movies = df1.rdd.map(lambda x: x[1]).collect()
ratings = df1.rdd.map(lambda x: x[2]).collect()
users = df1.rdd.map(lambda x: x[0]).collect()
user_counter = 0
final_list = []
for movie_list in movies:
  #get the movies and assign their ratings
  #print(len(movie_list))
  tmpVector = [0.0] * 1682
  #print(tmpVector)
  #print(movie_list[0])
  #print(ratings[0])
  for j in range(len(movie_list)):
    tmpVector[movie_list[j] - 1] = ratings[user_counter][j] #0 based but movie index 1 based
  #print(tmpVector)
  final_list.append((users[user_counter], tmpVector))
  user_counter += 1
  #break
#print(tmpVector[257]) #works!
###we have each time a tmp sparse vector with our rating, the sparse vector is what we ll use for each user
print(final_list[0])

(26, [3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 0.0, 4.0, 0.0, 0.0, 0.0, 3.0, 3.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 0.0, 3.0, 0.0, 0.0, 0.0, 0.0, 2.0, 3.0, 3.0, 0.0, 0.0, 3.0, 1.0, 0.0, 0.0, 4.0, 4.0, 5.0, 0.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 0.0, 3.0, 3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,

In [None]:
final_df =spark.createDataFrame(data = final_list, schema = ["userId", "movies_rating"])
final_df.show()

+------+--------------------+
|userId|       movies_rating|
+------+--------------------+
|    26|[3.0, 0.0, 0.0, 0...|
|    29|[0.0, 0.0, 0.0, 0...|
|   474|[0.0, 0.0, 0.0, 5...|
|    65|[3.0, 0.0, 0.0, 0...|
|   191|[0.0, 0.0, 0.0, 0...|
|   418|[0.0, 0.0, 0.0, 0...|
|   541|[4.0, 0.0, 0.0, 0...|
|   558|[0.0, 0.0, 0.0, 0...|
|   222|[4.0, 3.0, 0.0, 3...|
|   270|[0.0, 0.0, 0.0, 0...|
|   293|[2.0, 3.0, 2.0, 4...|
|   730|[4.0, 0.0, 0.0, 0...|
|   938|[4.0, 0.0, 0.0, 0...|
|   243|[4.0, 0.0, 0.0, 0...|
|   278|[0.0, 0.0, 0.0, 0...|
|   367|[0.0, 0.0, 0.0, 0...|
|   442|[0.0, 3.0, 0.0, 0...|
|   705|[5.0, 3.0, 0.0, 0...|
|   720|[0.0, 0.0, 0.0, 0...|
|    19|[0.0, 0.0, 0.0, 4...|
+------+--------------------+
only showing top 20 rows



In [None]:
final_rdd = final_df.rdd
final_rdd.first()

Row(userId=26, movies_rating=[3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 0.0, 4.0, 0.0, 0.0, 0.0, 3.0, 3.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 0.0, 3.0, 0.0, 0.0, 0.0, 0.0, 2.0, 3.0, 3.0, 0.0, 0.0, 3.0, 1.0, 0.0, 0.0, 4.0, 4.0, 5.0, 0.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 0.0, 3.0, 3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 

In [None]:
def cosin_sim(v1, v2):
  v1 = np.array(v1)
  v2 = np.array(v2)
  return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
vectors = final_rdd.take(4)
vv1 = vectors[0][1]
vv2 = vectors[1][1]
vv3 = vectors[2][1]
vv4 = vectors[3][1]
print(cosin_sim(vv1, vv2))
print(cosin_sim(vv1, vv3))
print(cosin_sim(vv1, vv4))
print(cosin_sim(vv1, vv1))
#all that's left is apply cosin to some values of the rdd and get the closest value for that user in the initial matrix

#we're just testing so we re gonna just go through every user , find the highest cosin similiarity and fill in missing values of ratings available
#from the other one if any


0.13273664859903903
0.304158782650116
0.18001636239669339
1.0


In [None]:
#find max cosin sim from lookup, get that array and merge the null values for that user with the non null values of the second user
rows = final_rdd.map(lambda x: (x[0], np.array(x[1]))).collect()
for i in range(len(rows)):
  max = 0
  maxId = i
  for j in range(len(rows)):
    if i != j:
      tmp = cosin_sim(rows[i][1], rows[j][1])
      #print(tmp)
      if (tmp > max):
        max = tmp
        maxId = j
  #here we finished the first row, get the new row val and move to the next one
  #t=0
  update_vals = np.nonzero(rows[maxId][1])
  for val in update_vals:
    for idx in val:
      if rows[i][1][idx] == 0:
        rows[i][1][idx] = rows[maxId][1][idx]
        #t+=1
  #print(t)
#we got our final initial rows restore it as movieId userId rating dataFrame now

In [None]:
arr = []
for row in rows:
  for movie in range(len(row[1])):
    #print(movie)
    #(userId, movieId, rating[movieId])
    if (row[1][movie] != 0):
      arr.append((row[0], movie + 1, int(row[1][movie])))
final_prediction_df = spark.createDataFrame(data = arr, schema = ["userId", "movieId", "rating"])
final_prediction_df.show()


+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|    26|      1|     3|
|    26|      3|     3|
|    26|      7|     3|
|    26|      9|     4|
|    26|     13|     3|
|    26|     14|     3|
|    26|     15|     4|
|    26|     24|     3|
|    26|     25|     3|
|    26|     50|     4|
|    26|     93|     2|
|    26|    100|     5|
|    26|    108|     3|
|    26|    109|     3|
|    26|    111|     3|
|    26|    116|     2|
|    26|    117|     3|
|    26|    118|     3|
|    26|    121|     3|
|    26|    122|     1|
+------+-------+------+
only showing top 20 rows



# ALS Model with initial filled DF

In [None]:
(training, test) = final_prediction_df.randomSplit([0.8, 0.2])
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", rank = 51, coldStartStrategy="drop", nonnegative= True)
param_grid=ParamGridBuilder().addGrid(als.maxIter,[10, 20]).addGrid(als.regParam,[.1,.01, .001]).build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
tvs= TrainValidationSplit(estimator=als, estimatorParamMaps=param_grid,evaluator=evaluator)
print ("start training")
model = tvs.fit(training)

start training


In [None]:
best_model = model.bestModel
print("start testing")
predictions = best_model.transform(test)
print("start evaluation")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = ", str(rmse))
print("**bestModel**")
print(" rank:", best_model.rank)
print(" maxIter:", best_model._java_obj.parent().getMaxIter())
print(" regParam:", best_model._java_obj.parent().getRegParam())
predictions.sort("userId", "rating").show()

start testing
start evaluation
Root-mean-square error =  0.8464401195192319
**bestModel**
 rank: 51
 maxIter: 20
 regParam: 0.1
+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|     1|     35|     1| 1.0291553|
|     1|    720|     1| 2.0341368|
|     1|    219|     1| 2.9014978|
|     1|    809|     1| 2.4156835|
|     1|   1188|     2| 2.3812087|
|     1|    895|     2| 2.7590194|
|     1|    559|     2| 2.8290858|
|     1|    244|     2|  3.170313|
|     1|     63|     2|  2.628068|
|     1|    948|     2|  1.736599|
|     1|    367|     2|  3.055051|
|     1|    271|     2| 2.7403016|
|     1|    940|     2| 2.1313043|
|     1|    218|     3| 3.1721108|
|     1|     10|     3|  3.792191|
|     1|     62|     3|  2.939352|
|     1|     97|     3| 3.8350506|
|     1|    179|     3| 4.2789955|
|     1|     69|     3|  3.547982|
|     1|    117|     3| 3.2081223|
+------+-------+------+----------+
only showing top 20 rows



# Recursively filling all the possible similiarities

In [None]:
def fill_some_values(usersRDD, ratings):#usersRDD and ratings DF
  usersPrep = spark.createDataFrame(usersRDD)
  prepared_DF = usersPrep.join(ratings, ['userId'], how='inner')
  df1 = prepared_DF.groupBy('userId').agg(F.collect_list('movieId').alias('movieIds'), F.collect_list('rating').alias('ratings'))
  movies = df1.rdd.map(lambda x: x[1]).collect()
  ratings = df1.rdd.map(lambda x: x[2]).collect()
  users = df1.rdd.map(lambda x: x[0]).collect()
  user_counter = 0
  final_list = []
  for movie_list in movies:
    tmpVector = [0.0] * 1682
    for j in range(len(movie_list)):
      tmpVector[movie_list[j] - 1] = ratings[user_counter][j] #0 based but movie index 1 based
    final_list.append((users[user_counter], tmpVector))
    user_counter += 1
  final_df =spark.createDataFrame(data = final_list, schema = ["userId", "movies_rating"])
  final_rdd = final_df.rdd
  rows = final_rdd.map(lambda x: (x[0], np.array(x[1]))).collect()
  for i in range(len(rows)):
    max = 0
    maxId = i
    for j in range(len(rows)):
      if i != j:
        tmp = cosin_sim(rows[i][1], rows[j][1])
        if (tmp > max):
          max = tmp
          maxId = j
    update_vals = np.nonzero(rows[maxId][1])
    for val in update_vals:
      for idx in val:
        if rows[i][1][idx] == 0:
          rows[i][1][idx] = rows[maxId][1][idx]
  arr = []
  for row in rows:
    for movie in range(len(row[1])):
      if (row[1][movie] != 0):
        arr.append((row[0], movie + 1, float(row[1][movie])))
  final_prediction_df = spark.createDataFrame(data = arr, schema = ["userId", "movieId", "rating"])
  return final_prediction_df


In [None]:
ratings = spark.createDataFrame(ratingsRDD)
first_fill_df = fill_some_values(usersRDD, ratings)
first_fill_df.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|    26|      1|   3.0|
|    26|      3|   3.0|
|    26|      7|   3.0|
|    26|      9|   4.0|
|    26|     13|   3.0|
|    26|     14|   3.0|
|    26|     15|   4.0|
|    26|     24|   3.0|
|    26|     25|   3.0|
|    26|     50|   4.0|
|    26|     93|   2.0|
|    26|    100|   5.0|
|    26|    108|   3.0|
|    26|    109|   3.0|
|    26|    111|   3.0|
|    26|    116|   2.0|
|    26|    117|   3.0|
|    26|    118|   3.0|
|    26|    121|   3.0|
|    26|    122|   1.0|
+------+-------+------+
only showing top 20 rows



In [None]:
r2 = ratings.drop("timestamp")
#.printSchema()#need to drop it for subtract later

In [None]:
def recursive_till_0(usersRDD, ratings_DF, diff):
  if (diff == 0):
    return ratings_DF#this would mean our last calcd df difference with prev table got 0 so we return that df
  else:
    tmp_DF = fill_some_values(usersRDD, ratings_DF)
    differ = tmp_DF.subtract(ratings_DF).count()
    return recursive_till_0(usersRDD, tmp_DF, differ)
#second_fill_df = fill_some_values(usersRDD, first_fill_df)

#second_fill_df.subtract(first_fill_df).show()
#print(second_fill_df.subtract(first_fill_df).count())

In [None]:
similiarity_overfill = recursive_till_0(usersRDD, r2, 1)
similiarity_overfill.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|    26|      1|   3.0|
|    26|      3|   3.0|
|    26|      7|   3.0|
|    26|      9|   4.0|
|    26|     13|   3.0|
|    26|     14|   3.0|
|    26|     15|   4.0|
|    26|     24|   3.0|
|    26|     25|   3.0|
|    26|     50|   4.0|
|    26|     93|   2.0|
|    26|    100|   5.0|
|    26|    108|   3.0|
|    26|    109|   3.0|
|    26|    111|   3.0|
|    26|    116|   2.0|
|    26|    117|   3.0|
|    26|    118|   3.0|
|    26|    121|   3.0|
|    26|    122|   1.0|
+------+-------+------+
only showing top 20 rows



# Final ALS Model

In [None]:
(training, test) = similiarity_overfill.randomSplit([0.8, 0.2])
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", rank = 51, coldStartStrategy="drop", nonnegative= True)
param_grid=ParamGridBuilder().addGrid(als.maxIter,[10, 20]).addGrid(als.regParam,[.1,.01, .001]).build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
tvs= TrainValidationSplit(estimator=als, estimatorParamMaps=param_grid,evaluator=evaluator)
print ("start training")
model = tvs.fit(training)

start training


In [None]:
best_model = model.bestModel
print("start testing")
predictions = best_model.transform(test)
print("start evaluation")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = ", str(rmse))
print("**bestModel**")
print(" rank:", best_model.rank)
print(" maxIter:", best_model._java_obj.parent().getMaxIter())
print(" regParam:", best_model._java_obj.parent().getRegParam())
predictions.sort("userId", "rating").show()

start testing
start evaluation
Root-mean-square error =  0.8473281966054302
**bestModel**
 rank: 51
 maxIter: 20
 regParam: 0.1
+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|     1|     29|   1.0| 2.3001206|
|     1|    112|   1.0| 1.2614967|
|     1|    243|   1.0| 1.4428204|
|     1|    568|   1.0| 3.2895882|
|     1|    247|   1.0|0.77122325|
|     1|    138|   1.0| 1.5021313|
|     1|    244|   2.0| 3.2144177|
|     1|    213|   2.0| 3.7669806|
|     1|    167|   2.0| 2.5716395|
|     1|     63|   2.0|  2.429794|
|     1|    288|   2.0| 2.8325233|
|     1|    855|   2.0| 3.5927796|
|     1|    233|   2.0|  3.198913|
|     1|    142|   2.0| 2.4178603|
|     1|    225|   2.0| 2.5224266|
|     1|    164|   3.0| 3.4547267|
|     1|     24|   3.0| 3.3763232|
|     1|    262|   3.0| 3.7714942|
|     1|    211|   3.0| 4.1020403|
|     1|     31|   3.0| 3.4506667|
+------+-------+------+----------+
only showing top 20 rows

