# MODEL 1: ALS Estimator Using Spark

In [3]:
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz

!pip install -q findspark
#!pip install pyspark

# Set up required environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/drive/My Drive/spark-2.4.5-bin-hadoop2.7"

!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

!java -version
!python --version

/bin/sh: apt-get: command not found
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java has not been configured as an alternative for java
openjdk version "1.8.0_152-release"
OpenJDK Runtime Environment (build 1.8.0_152-release-1056-b12)
OpenJDK 64-Bit Server VM (build 25.152-b12, mixed mode)
Python 3.6.5 :: Anaconda, Inc.


In [2]:
#install spark
#!pip install pyspark
import pyspark
 # get a spark context
sc = pyspark.SparkContext.getOrCreate()
print(sc)
# and a spark session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
print(spark)
spark.version

<SparkContext master=local[*] appName=pyspark-shell>
<pyspark.sql.session.SparkSession object at 0x7f05a20c0160>


'2.3.2'

In [18]:
import findspark
findspark.init("spark-2.4.5-bin-hadoop2.7")

In [25]:
#step 2: load data
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import *
import pandas as pd
# the imports are used creating the data frame

# create a SparkSession
spark = SparkSession.builder.getOrCreate()  


data_movie=pd.read_csv("movies.csv")
df_movie=spark.createDataFrame(data_movie)
df_movie.show()

data_ratings=pd.read_csv("ratings.csv")
df_ratings=spark.createDataFrame(data_ratings)
df_ratings.show()

df_ratings.createOrReplaceTempView('ratings') 
df_movie.createOrReplaceTempView('movie') 


+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [28]:
(training, test) = df_ratings.randomSplit([0.8, 0.2]) # split into test and training set
training.printSchema() # just for testing, should show the four columns
print(training.count()) # just for testing, should be around 1200
print(test.count())

root
 |-- userId: long (nullable = true)
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)

80692
20144


In [29]:
##step2: create a baseline
SQL1 = 'SELECT AVG(rating) FROM ratings'
row = spark.sql(SQL1).collect()[0] # get the single row with the result

meanRating = row['avg(rating)'] # access Row as a map 
print('meanRating',meanRating)

se_rdd = test.rdd.map(lambda row: Row(se = pow(row['rating']-meanRating,2)) ) 
se_df = spark.createDataFrame(se_rdd) 
se_df.createOrReplaceTempView('se')
print('se_df',se_df)
SQL2 = 'SELECT AVG(se) FROM se'
row = spark.sql(SQL2).collect()[0]
meanSE = row['avg(se)'] # access Row as a map 
print('RMSE',pow(meanSE,0.5))

meanRating 3.501556983616962
se_df DataFrame[se: double]
RMSE 1.042255274433242


In [32]:
##step3:train ALS estimator and perform CV
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

# Build the recommendation model using ALS on the training data
als = ALS(maxIter=3, rank=10, regParam=0.1, userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop")

paramGrid = ParamGridBuilder() \
  .addGrid(als.regParam, [0.03,0.1,0.3]) \
  .addGrid(als.rank, [3,10,30]).build()

regEval = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

crossVal = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=regEval)
print('starting cross-validation')
cvModel = crossVal.fit(training)
print('finished cross-validation')

starting cross-validation
finished cross-validation


In [33]:
print(cvModel.avgMetrics) # the metrics form the CrossValidation
print(cvModel.getEstimatorParamMaps()) # gives you the parameter combinations
paramMap = list(zip(cvModel.getEstimatorParamMaps(),cvModel.avgMetrics))
paramMin = min(paramMap, key=lambda x: x[1])
print(paramMin)

# Evaluate the model by computing the RMSE on the test data
predictions = cvModel.transform(test)
rmse = regEval.evaluate(predictions)
print("RMSE = " + str(rmse))

[1.0407429967801038, 1.1286802586942164, 1.2243138199626695, 0.9565689384360606, 0.9675925202230801, 0.9511332209407668, 0.9468515116188687, 0.9542613945302745, 0.960822201414864]
[{Param(parent='ALS_43eb84313eccbbc282f9', name='regParam', doc='regularization parameter (>= 0).'): 0.03, Param(parent='ALS_43eb84313eccbbc282f9', name='rank', doc='rank of the factorization'): 3}, {Param(parent='ALS_43eb84313eccbbc282f9', name='regParam', doc='regularization parameter (>= 0).'): 0.03, Param(parent='ALS_43eb84313eccbbc282f9', name='rank', doc='rank of the factorization'): 10}, {Param(parent='ALS_43eb84313eccbbc282f9', name='regParam', doc='regularization parameter (>= 0).'): 0.03, Param(parent='ALS_43eb84313eccbbc282f9', name='rank', doc='rank of the factorization'): 30}, {Param(parent='ALS_43eb84313eccbbc282f9', name='regParam', doc='regularization parameter (>= 0).'): 0.1, Param(parent='ALS_43eb84313eccbbc282f9', name='rank', doc='rank of the factorization'): 3}, {Param(parent='ALS_43eb843

In [40]:
predictions.show()
predictions.createOrReplaceTempView('predictions') 

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   372|    471|   3.0| 874415126| 3.1767607|
|   462|    471|   2.5|1123890831| 2.5841093|
|   287|    471|   4.5|1110231536| 2.4000242|
|    32|    471|   3.0| 856737165| 3.6538186|
|   414|    471|   5.0| 961514069| 3.3663368|
|   608|    471|   1.5|1117161794| 3.1839418|
|   426|    471|   5.0|1451081135| 3.4786158|
|   608|    833|   0.5|1117506344|  2.160833|
|    20|   1088|   4.5|1054147512| 3.4739363|
|    64|   1088|   4.0|1161559902| 3.4067392|
|   583|   1088|   3.5|1481474480|  3.046348|
|   555|   1088|   4.0| 978822670| 3.2257967|
|   226|   1088|   1.0|1096420160| 3.2811496|
|   483|   1088|   3.0|1215895737|  3.340597|
|   517|   1088|   1.0|1487958398| 2.2894535|
|   593|   1580|   1.5|1181007882|  2.870765|
|    34|   1580|   2.5|1162048827|  3.187867|
|   587|   1580|   4.0| 953138475| 3.5870633|
|   577|   1580|   3.0| 945965825|

In [35]:
###for a specific user

user_history = training.filter(training['userId']==11)
user_history.show()


+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|    11|      6|   5.0|902154266|
|    11|     10|   3.0|902154316|
|    11|     36|   4.0|902155135|
|    11|     44|   2.0|902154593|
|    11|     95|   3.0|902154458|
|    11|    110|   5.0|902154266|
|    11|    150|   5.0|902154266|
|    11|    165|   3.0|902154567|
|    11|    170|   4.0|902154621|
|    11|    208|   3.0|902154706|
|    11|    292|   4.0|902154383|
|    11|    318|   4.0|902155070|
|    11|    349|   5.0|902154342|
|    11|    368|   3.0|904510567|
|    11|    376|   2.0|902154684|
|    11|    377|   3.0|902154431|
|    11|    434|   3.0|902154685|
|    11|    457|   5.0|902154316|
|    11|    466|   3.0|902154805|
|    11|    474|   4.0|902154431|
+------+-------+------+---------+
only showing top 20 rows



In [36]:
# a list of movies we are thinking to offer
user_suggest = test.filter(training['userId']==11).select(['movieId', 'userId'])
user_suggest.show()

+-------+------+
|movieId|userId|
+-------+------+
|    153|    11|
|    356|    11|
|    380|    11|
|    593|    11|
|   1370|    11|
|   1408|    11|
|   1552|    11|
|   1584|    11|
|   1693|    11|
|   1918|    11|
|   2027|    11|
|   2028|    11|
+-------+------+



In [37]:
# offer movies with a high predicted rating
user_offer = cvModel.transform(user_suggest)
user_offer.orderBy('prediction', ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|    593|    11|  4.205923|
|    356|    11|  4.087714|
|   2028|    11| 3.9526556|
|   1584|    11| 3.6363184|
|   1408|    11| 3.6256447|
|   1370|    11|  3.483639|
|    380|    11| 3.4523625|
|   1693|    11| 3.3422816|
|   1552|    11| 3.0669956|
|    153|    11| 2.8908749|
|   1918|    11|   2.69265|
|   2027|    11| 1.8627541|
+-------+------+----------+



In [81]:
training.createOrReplaceTempView('training') 
df_movie.createOrReplaceTempView('movies') 

rec_table = '''SELECT C.userId,predictions.movieId,C.maxpred FROM predictions,
        (SELECT MAX(prediction) AS maxpred,userId FROM predictions WHERE userId IN (SELECT userId FROM training) GROUP BY userId) C
        WHERE predictions.prediction=C.maxpred'''

spark.sql(pred_table).createOrReplaceTempView('rec_table') 

rec_with_title='''SELECT P.userId,P.movieId,M.title,P.maxpred 
                FROM rec_table P,movies M WHERE M.movieId=P.movieId
                ORDER BY P.userId ASC '''
spark.sql(rec_with_title).show()
spark.sql(rec_with_title).createOrReplaceTempView('rec_table_with_title') 

+------+-------+--------------------+---------+
|userId|movieId|               title|  maxpred|
+------+-------+--------------------+---------+
|     1|   1927|All Quiet on the ...|   4.7581|
|     2|  48516|Departed, The (2006)|3.8582234|
|     3|   5048|    Snow Dogs (2002)|2.0995681|
|     4|    599|Wild Bunch, The (...| 3.780693|
|     5|    608|        Fargo (1996)|3.8461504|
|     6|     47|Seven (a.k.a. Se7...|3.7696934|
|     7|  38388|Goal! The Dream B...|3.8658383|
|     8|    318|Shawshank Redempt...|  4.05491|
|     9|   5965|Duellists, The (1...|3.9293904|
|    10|   2959|   Fight Club (1999)|3.5206406|
|    11|    593|Silence of the La...| 4.205923|
|    12|    222|Circle of Friends...|4.5654244|
|    13|     47|Seven (a.k.a. Se7...| 3.706434|
|    14|    356| Forrest Gump (1994)|3.8502324|
|    15|    527|Schindler's List ...|3.5272605|
|    16|   3022| General, The (1926)|  4.09401|
|    17|   1201|Good, the Bad and...|4.2333884|
|    18|   6300|Flickering Lights...|4.1

# Model 2: AWS SageMaker Factorization Machines 

In [82]:
!wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
!unzip -o ml-100k.zip

--2020-04-12 21:10:12--  http://files.grouplens.org/datasets/movielens/ml-100k.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4924029 (4.7M) [application/zip]
Saving to: ‘ml-100k.zip’


2020-04-12 21:10:13 (11.1 MB/s) - ‘ml-100k.zip’ saved [4924029/4924029]

Archive:  ml-100k.zip
   creating: ml-100k/
  inflating: ml-100k/allbut.pl       
  inflating: ml-100k/mku.sh          
  inflating: ml-100k/README          
  inflating: ml-100k/u.data          
  inflating: ml-100k/u.genre         
  inflating: ml-100k/u.info          
  inflating: ml-100k/u.item          
  inflating: ml-100k/u.occupation    
  inflating: ml-100k/u.user          
  inflating: ml-100k/u1.base         
  inflating: ml-100k/u1.test         
  inflating: ml-100k/u2.base         
  inflating: ml-100k/u2.test         
  inflating: ml-100k/u3.base    

In [83]:
%cd ml-100k
!shuf ua.base -o ua.base.shuffled
!head -10 ua.base.shuffled

/home/ec2-user/SageMaker/ml-100k
189	661	4	893265569
374	38	4	880937876
83	692	4	880307979
249	456	3	879640549
327	523	4	887818800
479	177	4	889125665
472	780	4	875982922
805	831	4	881695040
846	601	5	883947500
144	961	3	888106106


In [84]:
!head -10 ua.test

1	20	4	887431883
1	33	4	878542699
1	61	4	878542420
1	117	3	874965739
1	155	2	878542201
1	160	4	875072547
1	171	5	889751711
1	189	3	888732928
1	202	5	875072442
1	265	4	878542441


In [85]:
import sagemaker
import sagemaker.amazon.common as smac
from sagemaker import get_execution_role
from sagemaker.predictor import json_deserializer

import boto3, csv, io, json
import numpy as np
from scipy.sparse import lil_matrix