In [None]:
#Importing all the data
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [None]:
!ls "/content/drive/My Drive/Movie Recommendation Data"

ratings.csv


In [None]:
#Setting up Spark environment
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Movie').getOrCreate()

In [None]:
df = spark.read.csv('/content/drive/My Drive/Movie Recommendation Data/ratings.csv', header=True, sep=',', inferSchema=True)
df.show(3)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
+------+-------+------+----------+
only showing top 3 rows



In [None]:
df.count()

25000095

In [None]:
df = df.drop(df['timestamp'])
df.show(3)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    296|   5.0|
|     1|    306|   3.5|
|     1|    307|   5.0|
+------+-------+------+
only showing top 3 rows



In [None]:
df.describe().show()

+-------+-----------------+------------------+------------------+
|summary|           userId|           movieId|            rating|
+-------+-----------------+------------------+------------------+
|  count|         25000095|          25000095|          25000095|
|   mean|81189.28115381162|21387.981943268616| 3.533854451353085|
| stddev|46791.71589745776| 39198.86210105973|1.0607439611423535|
|    min|                1|                 1|               0.5|
|    max|           162541|            209171|               5.0|
+-------+-----------------+------------------+------------------+



In [None]:
#Splitting the dataset
(training, test) = df.randomSplit([0.8, 0.2])

In [None]:
# Build the recommendation model using ALS on the training data
from pyspark.ml.recommendation import ALS
als = ALS(maxIter=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(training)

In [None]:
predictions = model.transform(test)
predictions.show(10)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
| 32855|    148|   4.0| 2.3492625|
|114572|    148|   2.0| 2.7763708|
| 38199|    148|   2.0| 2.5441282|
| 33354|    148|   3.0| 3.1871886|
|  5055|    148|   3.0| 3.3531446|
| 99010|    148|   3.0| 2.9363954|
| 38679|    148|   3.0|  2.598993|
| 99684|    148|   3.0| 2.9181907|
| 35969|    148|   2.0| 2.1641223|
| 29943|    148|   3.0| 2.9938407|
+------+-------+------+----------+
only showing top 10 rows



In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = nan


In [None]:
single_user = test.filter(test['userId']==8).select(['movieId','userId','rating'])
# User had 10 ratings in the test data set 
# Realistically this should be some sort of hold out set!
single_user.show()

+-------+------+------+
|movieId|userId|rating|
+-------+------+------+
|     47|     8|   5.0|
|    104|     8|   5.0|
|    110|     8|   5.0|
|    161|     8|   5.0|
|    209|     8|   4.0|
|    223|     8|   2.0|
|    225|     8|   2.0|
|    237|     8|   4.0|
|    260|     8|   3.0|
|    344|     8|   5.0|
|    364|     8|   4.0|
|    377|     8|   5.0|
|    474|     8|   4.0|
|    524|     8|   4.0|
|    590|     8|   3.0|
|    593|     8|   4.0|
|    595|     8|   2.0|
|    799|     8|   4.0|
|    953|     8|   4.0|
|   1020|     8|   4.0|
+-------+------+------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import desc
single_user_predictions = model.transform(single_user)
single_user_predictions.orderBy('prediction', ascending=False).show()

+-------+------+------+----------+
|movieId|userId|rating|prediction|
+-------+------+------+----------+
|    344|     8|   5.0| 4.7764993|
|    104|     8|   5.0| 4.6258297|
|     47|     8|   5.0| 4.3787174|
|    377|     8|   5.0|   4.03852|
|   1729|     8|   5.0| 3.8794904|
|    593|     8|   4.0| 3.8407557|
|    223|     8|   2.0| 3.7758756|
|   1385|     8|   5.0| 3.7702544|
|    364|     8|   4.0| 3.7370305|
|   1580|     8|   5.0| 3.7041347|
|    225|     8|   2.0| 3.5131419|
|    474|     8|   4.0| 3.5103621|
|   1653|     8|   2.0| 3.4900517|
|    110|     8|   5.0| 3.4288478|
|    237|     8|   4.0| 3.4210522|
|   1722|     8|   3.0| 3.4041586|
|    161|     8|   5.0|  3.361381|
|    953|     8|   4.0|  3.327233|
|   1407|     8|   5.0|  3.312661|
|   1597|     8|   4.0| 3.3049188|
+-------+------+------+----------+
only showing top 20 rows



In [None]:
single_user = test.filter(test['userId']==23).select(['movieId','userId','rating'])
# User had 10 ratings in the test data set 
# Realistically this should be some sort of hold out set!
single_user.show()

+-------+------+------+
|movieId|userId|rating|
+-------+------+------+
|     16|    23|   4.0|
|     25|    23|   4.0|
|     36|    23|   5.0|
|     39|    23|   3.0|
|     40|    23|   4.0|
|    165|    23|   3.0|
|    216|    23|   4.0|
|    232|    23|   4.0|
|    260|    23|   5.0|
|    281|    23|   4.0|
|    319|    23|   4.0|
|    369|    23|   4.0|
|    380|    23|   3.0|
|    440|    23|   4.0|
|    491|    23|   5.0|
|    529|    23|   4.0|
|    541|    23|   4.0|
|    608|    23|   4.0|
|    648|    23|   3.0|
|    733|    23|   4.0|
+-------+------+------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import desc
single_user_predictions = model.transform(single_user)
single_user_predictions.orderBy('prediction', ascending=False).show()

+-------+------+------+----------+
|movieId|userId|rating|prediction|
+-------+------+------+----------+
|   1207|    23|   5.0| 4.9482827|
|   1704|    23|   5.0|  4.814985|
|   1132|    23|   3.0| 4.7642217|
|   1293|    23|   5.0| 4.7327785|
|   1252|    23|   5.0|  4.705429|
|   1784|    23|   4.0|   4.56531|
|   1208|    23|   5.0| 4.5606303|
|   1231|    23|   5.0|  4.535055|
|   2918|    23|   5.0| 4.5329037|
|    232|    23|   4.0| 4.5252237|
|   1228|    23|   5.0| 4.5129967|
|    608|    23|   4.0|  4.454386|
|    260|    23|   5.0|  4.446644|
|   1196|    23|   5.0| 4.4387074|
|   2571|    23|   1.0| 4.4121437|
|   1834|    23|   4.0| 4.3869414|
|    529|    23|   4.0|  4.367726|
|    281|    23|   4.0|  4.345355|
|   1968|    23|   5.0|  4.339189|
|   2352|    23|   5.0| 4.3029094|
+-------+------+------+----------+
only showing top 20 rows



In [None]:
#Creating a feature vector and then one-hot encode the movie ids
from pyspark.ml.feature import OneHotEncoderEstimator
encoder = OneHotEncoderEstimator(inputCols=['movieId'], outputCols=['movie_vec'], dropLast=False)  #by putting droplast to false we will have a n dimensional vector and not a n-1 dimensional feature vector
dfvec = encoder.fit(df).transform(df) 
dfvec.show()

+------+-------+------+--------------------+
|userId|movieId|rating|           movie_vec|
+------+-------+------+--------------------+
|     1|    296|   5.0|(209172,[296],[1.0])|
|     1|    306|   3.5|(209172,[306],[1.0])|
|     1|    307|   5.0|(209172,[307],[1.0])|
|     1|    665|   5.0|(209172,[665],[1.0])|
|     1|    899|   3.5|(209172,[899],[1.0])|
|     1|   1088|   4.0|(209172,[1088],[1...|
|     1|   1175|   3.5|(209172,[1175],[1...|
|     1|   1217|   3.5|(209172,[1217],[1...|
|     1|   1237|   5.0|(209172,[1237],[1...|
|     1|   1250|   4.0|(209172,[1250],[1...|
|     1|   1260|   3.5|(209172,[1260],[1...|
|     1|   1653|   4.0|(209172,[1653],[1...|
|     1|   2011|   2.5|(209172,[2011],[1...|
|     1|   2012|   2.5|(209172,[2012],[1...|
|     1|   2068|   2.5|(209172,[2068],[1...|
|     1|   2161|   3.5|(209172,[2161],[1...|
|     1|   2351|   4.5|(209172,[2351],[1...|
|     1|   2573|   4.0|(209172,[2573],[1...|
|     1|   2632|   5.0|(209172,[2632],[1...|
|     1|  

In [None]:
dfvec.count()

25000095

## Going to try to find similarities b/w users 


Users have rated more than one movie. So we need to some how aggregate the feature vector (movie_vec) in a way that movies are grouped together.

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.ml.linalg import SparseVector, VectorUDT

def vector_sum(vector_list):
  indices = []
  values = []
  for v in vector_list:
    indices.append(v.indices)
    values.append(v.values)
  indices.sort()
  return SparseVector(208940, indices, values)

sum_udf = udf(vector_sum, VectorUDT())

df_agg = dfvec.groupby('userId').agg(F.collect_list('movie_vec').alias('movie_vec')).withColumn('movie_vec', sum_udf('movie_vec'))
df_agg.show(3)

+------+--------------------+
|userId|           movie_vec|
+------+--------------------+
|   148|(208940,[19,32,47...|
|   463|(208940,[3,6,7,32...|
|   471|(208940,[318,356,...|
+------+--------------------+
only showing top 3 rows



In [None]:
from pyspark.ml.feature import MinHashLSH
lsh = MinHashLSH(inputCol = 'movie_vec', outputCol='hashes', numHashTables=3)
lsh_model = lsh.fit(df_agg)
lsh_model.transform(df_agg).show(3)

+------+--------------------+--------------------+
|userId|           movie_vec|              hashes|
+------+--------------------+--------------------+
|   148|(208940,[19,32,47...|[[3.9422642E7], [...|
|   463|(208940,[3,6,7,32...|[[4.6905538E7], [...|
|   471|(208940,[318,356,...|[[1.1399122E7], [...|
+------+--------------------+--------------------+
only showing top 3 rows



In [None]:
lsh_model.approxSimilarityJoin(df_agg, df_agg, 0.6).show()

Py4JJavaError: ignored