In [1]:
#import the packages 
import boto3 
import numpy as np
import pandas as pd
from io import StringIO, BytesIO

In [2]:
#Build the connection to S3 server
s3 = boto3.client("s3")
s3_resource = boto3.resource('s3')
bucket_name = "zyyaphet"

In [55]:
#list all of the files in the bucket with keys
response2 = s3.list_objects_v2(Bucket=bucket_name)
response2['Contents']

In [4]:
#To get the csv file from S3, we first specify the key and bucket name, and then
#read through BytesIO since the file is byte-like in S3 object
#We incoroporated the file to pandas dataframe to process

obj = s3.get_object(Bucket=bucket_name, Key="Input/ratings_small.csv")
df = pd.read_csv(BytesIO(obj['Body'].read()))
df.drop('timestamp', inplace=True, axis=1)

In [5]:
df.count()

userId     100836
movieId    100836
rating     100836
dtype: int64

# Modelling

In [6]:
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import *

# Build SparkSession
spark = SparkSession.builder.appName('ALS').getOrCreate()
    
# Convert Pandas Dataframe to Spark Dataframe with schema    
schema = StructType([StructField("userId", IntegerType(), True),StructField("movieId", IntegerType(), True), StructField("rating", DoubleType(), True)])
ratings = spark.createDataFrame(df, schema=schema) 
ratings.show(10)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
+------+-------+------+
only showing top 10 rows



In [7]:
# ALS Modelling

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

(training, test) = ratings.randomSplit([0.8, 0.2])
#Here I used the default parameters, but you can iterate to tune the parameters yourself
als = ALS(maxIter=5, regParam=0.01, rank=20, userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop")
model = als.fit(training)
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))


Root-mean-square error = 1.2040343707404213


# Recommendation

In [18]:
# Generate recommendations

userRecs = model.recommendForAllUsers(10)
movieRecs = model.recommendForAllItems(10)
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

In [19]:
# Genearte top 10 movie recommendation for each user
userRecs.show(10)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[2318, 8.494622]...|
|   463|[[27611, 6.997333...|
|   496|[[89904, 8.015404...|
|   148|[[7169, 6.281178]...|
|   540|[[2990, 7.455493]...|
|   392|[[4144, 8.627564]...|
|   243|[[4270, 8.880862]...|
|    31|[[7099, 9.043392]...|
|   516|[[3791, 7.0694942...|
|   580|[[55247, 6.338495...|
+------+--------------------+
only showing top 10 rows



In [21]:
# Generate top 10 user recomendation for each movie
movieRecs.show(10)

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1580|[[77, 7.0766826],...|
|   4900|[[264, 6.061315],...|
|   5300|[[164, 5.9314775]...|
|   6620|[[158, 8.308047],...|
|   7340|[[302, 5.0520425]...|
|  32460|[[536, 8.207065],...|
|  54190|[[35, 7.330521], ...|
|    471|[[55, 8.853011], ...|
|   1591|[[344, 7.281192],...|
| 140541|[[270, 6.0058136]...|
+-------+--------------------+
only showing top 10 rows



In [20]:
# A subset of user recommendation

users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
userSubsetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[2318, 8.494622]...|
|   463|[[27611, 6.997333...|
|   148|[[7169, 6.281178]...|
+------+--------------------+



In [21]:
# A subset of movie recommendation

movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
movieSubSetRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1580|[[77, 6.5441346],...|
|   3175|[[258, 7.625138],...|
|   2366|[[5, 7.774533], [...|
+-------+--------------------+



# IO 

In [57]:
# convert the recommendations we get to pandas dataframe
all_recs = model.recommendForAllUsers(10)
userrecs = all_recs.toPandas()


In [58]:
userrecs.head()

Unnamed: 0,userId,recommendations
0,471,"[(5048, 7.3013014793396), (1262, 7.08622741699..."
1,463,"[(3676, 7.3158488273620605), (2944, 6.72965955..."
2,496,"[(12, 7.391838550567627), (412, 7.242005825042..."
3,148,"[(1256, 6.842296600341797), (3087, 5.726652145..."
4,540,"[(8917, 7.768012523651123), (26662, 6.61660957..."


In [61]:
# write a function to parse only movieId in each row
def id_taker(row):
    movieId = ''
    for i in row['recommendations']:
        movieId += str(i['movieId']) + ','
    return movieId

userrecs['new_recommendations'] = userrecs.apply(id_taker, axis=1)
userrecs.head()

Unnamed: 0,userId,recommendations,new_recommendations
0,471,"[(5048, 7.3013014793396), (1262, 7.08622741699...","5048,1262,1464,49932,3925,61240,103228,70994,2..."
1,463,"[(3676, 7.3158488273620605), (2944, 6.72965955...",36762944159492012601188306327289582076
2,496,"[(12, 7.391838550567627), (412, 7.242005825042...",1241210121336761590557653113207652281
3,148,"[(1256, 6.842296600341797), (3087, 5.726652145...","1256,3087,42002,80693,7367,142115,3030,1080,71..."
4,540,"[(8917, 7.768012523651123), (26662, 6.61660957...","8917,26662,3638,5944,7323,3160,134853,139385,2..."


In [62]:
#Now we have recommendations with only movie ids
#We can output the dataframe to a csv file for later usage

userrecs = userrecs[['userId', 'new_recommendations']]
userrecs.columns = ['Userid', 'Recommendations']
userrecs.to_csv('ALS_recommendations.csv', index=False)