<a href="https://colab.research.google.com/github/KatharinaWiedmann/DataEngGroupProject/blob/master/Collaborative_filtering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Using ALS with Spark 

This basic Recommender System will use the Spark ALS and explore the effects of outcome of a cross-validation on the dataset. 
This Notebook requires the Movielens dataset to be present in Google Drive

In [0]:
#from other notebook 
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark




# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')


import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"


import findspark
findspark.init()


!pip install pyspark
!pip install altair

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
#from other notebook 
import pyspark
# get a spark context
sc = pyspark.SparkContext.getOrCreate()
print(sc)
# get the context
spark = pyspark.sql.SparkSession.builder.getOrCreate()
print(spark)

<SparkContext master=local[*] appName=pyspark-shell>
<pyspark.sql.session.SparkSession object at 0x7fca8c97dc18>


In [0]:
import pandas as pd


In [0]:
from google.colab import files
!ls -l
#!rm spark-2.4.4-bin-hadoop2.7.tgz.1

!echo $JAVA_HOME/bin
!export PATH=$PATH:$JAVA_HOME/bin
!echo $PATH

total 681276
drwx------  4 root root      4096 Mar 19 12:45 drive
drwxr-xr-x  1 root root      4096 Mar  3 18:11 sample_data
drwxr-xr-x 13 1000 1000      4096 Feb  2 19:47 spark-2.4.5-bin-hadoop2.7
-rw-r--r--  1 root root 232530699 Feb  2 20:27 spark-2.4.5-bin-hadoop2.7.tgz
-rw-r--r--  1 root root 232530699 Feb  2 20:27 spark-2.4.5-bin-hadoop2.7.tgz.1
-rw-r--r--  1 root root 232530699 Feb  2 20:27 spark-2.4.5-bin-hadoop2.7.tgz.2
drwxr-xr-x  2 root root      4096 Mar 19 13:11 spark-warehouse
/usr/lib/jvm/java-8-openjdk-amd64/bin
/tensorflow-1.15.0/python3.6/bin:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/tools/node/bin:/tools/google-cloud-sdk/bin:/opt/bin


check versions and dependencies, this may change with changes to Colab and Spark

In [0]:
!java -version
!python --version
!ls /usr/lib/jvm/
!echo $JAVA_HOME
spark.version
!ls /content
!echo $PATH

openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1)
OpenJDK 64-Bit Server VM (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1, mixed mode, sharing)
Python 3.6.9
default-java		   java-11-openjdk-amd64     java-8-openjdk-amd64
java-1.11.0-openjdk-amd64  java-1.8.0-openjdk-amd64
/usr/lib/jvm/java-8-openjdk-amd64
drive			   spark-2.4.5-bin-hadoop2.7.tgz    spark-warehouse
sample_data		   spark-2.4.5-bin-hadoop2.7.tgz.1
spark-2.4.5-bin-hadoop2.7  spark-2.4.5-bin-hadoop2.7.tgz.2
/tensorflow-1.15.0/python3.6/bin:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/tools/node/bin:/tools/google-cloud-sdk/bin:/opt/bin


## Step 1 - Load the Data
Read the data, split into tokens and create a structured DataFrame

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType
# the imports are used creating the data frame

#for ratings 



spark = SparkSession.builder.getOrCreate() # create a SparkSession 
# this gets us an RDD. (could also be done with RDD.textFile in this case)
lines = spark.read.text("/content/drive/My Drive/Colab Notebooks/data/movielens/u.data").rdd
# now split the lines at the '\t'
parts = lines.map(lambda row: row.value.split("\t"))

parts.collect()
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
ratings.createOrReplaceTempView('ratings') # register the DataFrame so that we can use it with Spark SQL.

#this is the whole data frame 
ratings.show(20)

(training, test) = ratings.randomSplit([0.8, 0.2]) # split into test and training set
training.printSchema() # just for testing, should show the four columns
print(training.count()) # just for testing, should be around 1200


# #This is the training data 
# training.show()


# #This is the test data 
# test.show()

+-------+------+---------+------+
|movieId|rating|timestamp|userId|
+-------+------+---------+------+
|    242|   3.0|881250949|   196|
|    302|   3.0|891717742|   186|
|    377|   1.0|878887116|    22|
|     51|   2.0|880606923|   244|
|    346|   1.0|886397596|   166|
|    474|   4.0|884182806|   298|
|    265|   2.0|881171488|   115|
|    465|   5.0|891628467|   253|
|    451|   3.0|886324817|   305|
|     86|   3.0|883603013|     6|
|    257|   2.0|879372434|    62|
|   1014|   5.0|879781125|   286|
|    222|   5.0|876042340|   200|
|     40|   3.0|891035994|   210|
|     29|   3.0|888104457|   224|
|    785|   3.0|879485318|   303|
|    387|   5.0|879270459|   122|
|    274|   2.0|879539794|   194|
|   1042|   4.0|874834944|   291|
|   1184|   2.0|892079237|   234|
+-------+------+---------+------+
only showing top 20 rows

root
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- userId: long (nullable = true)

79

In [0]:
#checking min and max rating 
max_rating = spark.sql("SELECT max(rating) FROM ratings")
max_rating.show()


min_rating = spark.sql("SELECT min(rating) FROM ratings")
min_rating.show()

+-----------+
|max(rating)|
+-----------+
|        5.0|
+-----------+

+-----------+
|min(rating)|
+-----------+
|        1.0|
+-----------+



In [0]:
#for movie titles
m_cols = ['movie_id', 'title', 'release_date']
movies = pd.read_csv("/content/drive/My Drive/Colab Notebooks/data/movielens/u.item", sep="|", names=m_cols, usecols=range(2), encoding="iso-8859-1")


movies


FileNotFoundError: ignored

# Step 2 - Create a baseline

Now take a very simple estimate as the baseline: calculate the mean of all ratings.    

The average can be calculated with the SQL `AVG` command, within an SQL `SELECT` statement. 

Then calculate the MSE with respect to the average (as predictor)

Calculate the RMSE as a naive baseline to compare the trained model against



In [0]:
#Average rating in total 
SQL1 = 'SELECT AVG(rating) FROM ratings'
row = spark.sql(SQL1).collect()[0] # get the single row with the result
#row #average rating = 3.529

meanRating = row['avg(rating)'] # access Row as a map 
print('meanRating',meanRating)

#Average rating per movie ???
se_rdd = test.rdd.map(lambda row: Row(se = pow(row['rating']-meanRating,2)) ) 
se_df = spark.createDataFrame(se_rdd) 
# #se_df.show()
se_df.createOrReplaceTempView('se')
print('se_df',se_df)

#Would select average rating for each movie, but not what we need 
# SQL2 = 'SELECT * FROM se'
# row = spark.sql(SQL2).collect()
# row

#Get average rating for all movies altogether 
SQL2 = 'SELECT AVG(se) FROM se'
row = spark.sql(SQL2).collect()[0]
row

meanSE = row['avg(se)'] # access Row as a map 
print('RMSE',pow(meanSE,0.5))

meanRating 3.52986
se_df DataFrame[se: double]
RMSE 1.1295544091605558


## Step 3 - Train an ALS Estimator and perform CV 

Now create an ALS estimator and a parameter grid to explore different values for the `rank` and `regParam` parameter of the ALS. Then build a cross-validator to train the model.

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit #to cross validate & fine tune hyperparameters of our model 
from pyspark.ml.evaluation import RegressionEvaluator # to measure performance of ALS model 
from pyspark.ml.recommendation import ALS #import ALS algorithm 

#alternative 1: 
# # Build the recommendation model using ALS on the training data
# als = ALS(maxIter=3, 
#           rank=10, 
#           regParam=0.1, 
#           userCol="userId", 
#           itemCol="movieId", 
#           ratingCol="rating")


#Own alternative: 
als = ALS(userCol="userId",
          itemCol="movieId", 
          ratingCol="rating",
          coldStartStrategy="drop",
          nonnegative = True) #don't want negative predictions

#Fine tune hyperparameters: rank, maxIter (here not given, would be max iterations between X, Y to minimize the error)
# Regularization Parameter to prevent ALS from overfitting to the data 
# rank of X, Y matrices
paramGrid = ParamGridBuilder().addGrid(als.regParam, [0.03,0.1,0.3]).addGrid(als.rank, [3,10,30]).build()

#Define evaluator as RMSE
#tell ALS which is the label column and what we want to call the prediction column
regEval = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")


#Build cross validation using CrossValidator  
crossVal = CrossValidator(
          estimator=als,
          estimatorParamMaps=paramGrid, 
          evaluator=regEval)


## Step 4 - Evaluate The Model

Take the trained cvModel and extract the best parameter values by inspecting the estimatorParameterMaps. 
Compare the RMSE value to that of the mean for different parameter settings.

In [0]:
#alternative 2: 

#fit ALS model to training data
model = crossVal.fit(training)

#Extract best model from tuning exercise using ParamGridBuilder 
best_model = model.bestModel 

#fitting the best model (from above) to the test data  --> generates predictions
predictions = best_model.transform(test)
#get RMSE 
rmse = regEval.evaluate(predictions)


#Print evaluation metrics and model parameters
print("RMSE= " + str(rmse))
print("***Best model***")
print("Rank: " + str(best_model.rank))
print("MaxIter: " + str(best_model._java_obj.parent().getMaxIter()))
print("RegParam: " + str(best_model._java_obj.parent().getRegParam())) 


#RMSE = 0.9366 --> reasonably reliable 

RMSE= 0.9278876830744898
***Best model***
Rank: 3
MaxIter: 10
RegParam: 0.1


In [0]:
#To see how far off our predictions are 
predictions.show()

+-------+------+---------+------+----------+
|movieId|rating|timestamp|userId|prediction|
+-------+------+---------+------+----------+
|    148|   3.0|879540276|   406| 2.6706975|
|    148|   3.0|891543129|    27| 2.8232176|
|    148|   2.0|880843892|   916| 2.4764628|
|    148|   4.0|889492989|   663| 3.1737113|
|    148|   4.0|876544781|   330| 4.0028377|
|    148|   5.0|893212730|   416| 3.6113138|
|    148|   3.0|884133284|   435|  2.875884|
|    148|   4.0|880387474|   923| 3.6291358|
|    148|   3.0|879110346|   455| 2.9570184|
|    148|   2.0|880167030|   880| 3.1370335|
|    148|   4.0|890862563|   834| 3.4727056|
|    148|   5.0|888817717|   532| 3.9184709|
|    148|   3.0|891228196|   234| 2.2788446|
|    148|   5.0|879563367|   459| 3.3802338|
|    148|   1.0|888907015|   293| 2.1969743|
|    148|   4.0|884646436|   396| 3.5099092|
|    148|   1.0|874951482|    21|  2.294134|
|    148|   3.0|880434755|   203|  3.164443|
|    148|   2.0|882608961|   592| 2.6451952|
|    148| 

In [0]:
#Now want to make actual recommendations: 
#Get top 5 recommendations for all users 
#predictions can be larger than 5, as they are simply predictions 

user_recs = best_model.recommendForAllUsers(5)
user_recs.show()

#Register as temporary view
user_recs.createOrReplaceTempView('user_recs') 


+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[1155, 6.445314]...|
|   463|[[814, 4.503483],...|
|   833|[[1491, 5.538618]...|
|   496|[[1491, 4.4210076...|
|   148|[[1589, 5.4492717...|
|   540|[[1467, 5.0062723...|
|   392|[[814, 5.312068],...|
|   243|[[814, 4.9330974]...|
|   623|[[814, 4.871832],...|
|   737|[[1491, 6.254104]...|
|   897|[[1589, 5.479496]...|
|   858|[[1449, 4.8004155...|
|    31|[[1491, 6.3782377...|
|   516|[[814, 5.2516212]...|
|   580|[[1589, 5.566204]...|
|   251|[[814, 4.9980354]...|
|   451|[[1589, 6.1134114...|
|    85|[[814, 4.6332707]...|
|   137|[[1589, 6.3322535...|
|   808|[[814, 5.700057],...|
+------+--------------------+
only showing top 20 rows



In [0]:
#read in UserIds for recommendations
selected_user_ids = spark.read.text('/content/drive/My Drive/Colab Notebooks/data/movielens/user_predictions.csv').rdd

selected_user_ids = spark.createDataFrame(selected_user_ids)

# register the DataFrame so that we can use it with Spark SQL
selected_user_ids.createOrReplaceTempView('selected_user_ids')


selected_user_ids = spark.sql("SELECT value from selected_user_ids")

#rename column 
selected_user_ids = selected_user_ids.withColumnRenamed("value", "userId")

# Check that it is a PySpark DataFrame 
type(selected_user_ids)

selected_user_ids.show()


selected_user_ids.columns

# register the DataFrame so that we can use it with Spark SQL
selected_user_ids.createOrReplaceTempView('selected_user_ids')



+------+
|userId|
+------+
|   198|
|    11|
|   314|
|   184|
|   163|
|   710|
|   881|
|   504|
|   267|
|   653|
+------+



In [0]:
#Combine the two frames 
combined_frame = spark.sql("SELECT s.userId, u.recommendations FROM selected_user_ids as s LEFT JOIN user_recs AS u ON s.userId = u.userId")
combined_frame.show()




+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   881|[[1467, 4.5301466...|
|   163|[[814, 4.0768776]...|
|   504|[[814, 5.122557],...|
|   314|[[113, 5.4674745]...|
|   267|[[1467, 5.4449453...|
|   653|[[1467, 3.8005152...|
|   710|[[1491, 4.895296]...|
|    11|[[814, 4.9344387]...|
|   198|[[1449, 4.3232374...|
|   184|[[814, 4.9805717]...|
+------+--------------------+



In [0]:
#Convert into a Pandas DF for easy usability
user_recs_pandas = combined_frame.toPandas()

user_recs_pandas['userId']

user_recs_pandas['recommendations']


movie_rec_for_each_customer = []


for i in user_recs_pandas['recommendations']: 
    first_movie = (i[0][0])
    second_movie = (i[1][0])
    third_movie = (i[2][0])
    fourth_movie = (i[3][0])
    fifth_movie = (i[4][0])
    combined_movie_recs = [first_movie, second_movie, third_movie, fourth_movie, fifth_movie]
    movie_rec_for_each_customer.append(combined_movie_recs)

movie_rec_for_each_customer

recommended_movies = pd.DataFrame(movie_rec_for_each_customer, index=user_recs_pandas['userId'])

recommended_movies.columns = ['movie_1', 'movie_2', 'movie_3', 'movie_4', 'movie_5']

recommended_movies


Unnamed: 0_level_0,movie_1,movie_2,movie_3,movie_4,movie_5
userId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
881,1467,113,814,1589,1500
163,814,113,1662,867,1599
504,814,113,1662,1467,1599
314,113,1269,1589,1662,1192
267,1467,1500,1491,1367,1449
653,1467,1589,1500,1607,1397
710,1491,1449,1512,1367,814
11,814,1599,1449,1651,1650
198,1449,814,1491,1367,1512
184,814,1449,1512,1358,1599


In [0]:
movies



Unnamed: 0,movie_id,title,release_date
0,1,Toy Story (1995),01-Jan-1995
1,2,GoldenEye (1995),01-Jan-1995
2,3,Four Rooms (1995),01-Jan-1995
3,4,Get Shorty (1995),01-Jan-1995
4,5,Copycat (1995),01-Jan-1995
...,...,...,...
1677,1678,Mat' i syn (1997),06-Feb-1998
1678,1679,B. Monkey (1998),06-Feb-1998
1679,1680,Sliding Doors (1998),01-Jan-1998
1680,1681,You So Crazy (1994),01-Jan-1994


In [0]:
#get dictionary of ids & movies
rename_dict = movies.set_index('movie_id').to_dict()['title']
rename_dict

#replace values with titles 
recommended_movies = recommended_movies.replace(rename_dict)

recommended_movies

Unnamed: 0_level_0,movie_1,movie_2,movie_3,movie_4,movie_5
userId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
881,"Saint of Fort Washington, The (1993)","Horseman on the Roof, The (Hussard sur le toit...","Great Day in Harlem, A (1994)",Schizopolis (1996),Santa with Muscles (1996)
163,"Great Day in Harlem, A (1994)","Horseman on the Roof, The (Hussard sur le toit...",Rough Magic (1995),"Whole Wide World, The (1996)",Someone Else's America (1995)
504,"Great Day in Harlem, A (1994)","Horseman on the Roof, The (Hussard sur le toit...",Rough Magic (1995),"Saint of Fort Washington, The (1993)",Someone Else's America (1995)
314,"Horseman on the Roof, The (Hussard sur le toit...",Love in the Afternoon (1957),Schizopolis (1996),Rough Magic (1995),"Boys of St. Vincent, The (1993)"
267,"Saint of Fort Washington, The (1993)",Santa with Muscles (1996),Tough and Deadly (1995),Faust (1994),Pather Panchali (1955)
653,"Saint of Fort Washington, The (1993)",Schizopolis (1996),Santa with Muscles (1996),Hurricane Streets (1998),Of Human Bondage (1934)
710,Tough and Deadly (1995),Pather Panchali (1955),"World of Apu, The (Apur Sansar) (1959)",Faust (1994),"Great Day in Harlem, A (1994)"
11,"Great Day in Harlem, A (1994)",Someone Else's America (1995),Pather Panchali (1955),"Spanish Prisoner, The (1997)","Butcher Boy, The (1998)"
198,Pather Panchali (1955),"Great Day in Harlem, A (1994)",Tough and Deadly (1995),Faust (1994),"World of Apu, The (Apur Sansar) (1959)"
184,"Great Day in Harlem, A (1994)",Pather Panchali (1955),"World of Apu, The (Apur Sansar) (1959)",The Deadly Cure (1996),Someone Else's America (1995)
