In [1]:
#import the packages 
import boto3 
import numpy as np
import pandas as pd
from io import StringIO, BytesIO

In [2]:
s3 = boto3.client("s3")
s3_resource = boto3.resource('s3')

In [3]:
bucket_name="andybigdata"

In [4]:
obj = s3.get_object(Bucket=bucket_name, Key="ratings_small(1).csv")
df = pd.read_csv(BytesIO(obj['Body'].read()))
df.drop('timestamp',inplace=True,axis=1)

In [5]:
df.head()

Unnamed: 0,userId,movieId,rating
0,1,1,4.0
1,1,3,4.0
2,1,6,4.0
3,1,47,5.0
4,1,50,5.0


In [6]:
df.count()

userId     100836
movieId    100836
rating     100836
dtype: int64

In [7]:
obj = s3.get_object(Bucket=bucket_name, Key="movies_metadata_small.csv")
df1 = pd.read_csv(BytesIO(obj['Body'].read()))
df1=df1[['id','popularity']]

In [8]:
df1

Unnamed: 0,id,popularity
0,8844,17.015539
1,949,17.924927
2,710,14.686036
3,1408,7.284477
4,524,10.137389
...,...,...
2787,3104,2.302582
2788,64197,0.528657
2789,98604,0.803588
2790,5589,0.375001


In [9]:
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import *

In [10]:
# Build SparkSession
spark = SparkSession.builder.appName('ALS').getOrCreate()
# Convert Pandas Dataframe to Spark Dataframe with schema 
schema = StructType([StructField("userId", IntegerType(), True),StructField("movieId", IntegerType(), True), StructField("rating", DoubleType(), True)])
ratings = spark.createDataFrame(df, schema=schema)

In [11]:
ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



In [12]:
# ALS Modelling
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [13]:
#ALS Modeling
(training, test) = ratings.randomSplit([0.8, 0.2])
als = ALS(maxIter=5, regParam=0.01, rank=20, userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop")
model = als.fit(training)
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

In [14]:
#Recommendation
userRecs = model.recommendForAllUsers(10)
userRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[306, 8.133569],...|
|   463|[[1947, 7.5194993...|
|   496|[[56145, 7.10683]...|
|   148|[[4308, 7.7132783...|
|   540|[[1947, 6.905006]...|
|   392|[[2384, 9.437627]...|
|   243|[[4102, 8.204245]...|
|    31|[[2764, 7.9858494...|
|   516|[[7700, 7.8068233...|
|   580|[[27611, 6.62772]...|
|   251|[[86320, 9.603908...|
|   451|[[89904, 7.045653...|
|    85|[[2150, 6.8190093...|
|   137|[[306, 6.1775312]...|
|    65|[[4678, 6.588134]...|
|   458|[[3594, 9.718121]...|
|   481|[[2124, 7.624796]...|
|    53|[[56782, 7.029699...|
|   255|[[3272, 6.4001036...|
|   588|[[158872, 7.35896...|
+------+--------------------+
only showing top 20 rows



In [15]:
#IO-change to pandas format
all_recs = model.recommendForAllUsers(10)
userRecs = all_recs.toPandas()

In [16]:
userRecs

Unnamed: 0,userId,recommendations
0,471,"[(306, 8.13356876373291), (5066, 6.81079387664..."
1,463,"[(1947, 7.5194993019104), (4351, 7.34560489654..."
2,496,"[(56145, 7.10683012008667), (3435, 7.028228282..."
3,148,"[(4308, 7.713278293609619), (5881, 7.342840671..."
4,540,"[(1947, 6.905005931854248), (969, 6.8095397949..."
...,...,...
605,208,"[(106489, 8.950284957885742), (6773, 7.8482837..."
606,401,"[(179819, 5.592379570007324), (4351, 5.4436211..."
607,422,"[(2423, 5.863225936889648), (3836, 5.730740547..."
608,517,"[(102903, 6.318270683288574), (1639, 6.0730390..."


In [17]:
#pass only the movie id to each row
def id_taker(row):
    movieId = ''
    for i in row['recommendations']:
        movieId += str(i['movieId']) + ','
    movieId_list=movieId.split(',')
    return movieId_list

userRecs['new_recommendations'] = userRecs.apply(id_taker, axis=1)
userRecs.head()

Unnamed: 0,userId,recommendations,new_recommendations
0,471,"[(306, 8.13356876373291), (5066, 6.81079387664...","[306, 5066, 1243, 2867, 2788, 81845, 2202, 561..."
1,463,"[(1947, 7.5194993019104), (4351, 7.34560489654...","[1947, 4351, 3477, 3421, 1372, 955, 1278, 7099..."
2,496,"[(56145, 7.10683012008667), (3435, 7.028228282...","[56145, 3435, 2395, 1275, 1673, 94677, 969, 69..."
3,148,"[(4308, 7.713278293609619), (5881, 7.342840671...","[4308, 5881, 8910, 6283, 1734, 55247, 3476, 17..."
4,540,"[(1947, 6.905005931854248), (969, 6.8095397949...","[1947, 969, 2723, 7022, 3083, 1537, 1256, 4419..."


In [18]:
userRecs = userRecs[['userId', 'new_recommendations']]
userRecs.head()

Unnamed: 0,userId,new_recommendations
0,471,"[306, 5066, 1243, 2867, 2788, 81845, 2202, 561..."
1,463,"[1947, 4351, 3477, 3421, 1372, 955, 1278, 7099..."
2,496,"[56145, 3435, 2395, 1275, 1673, 94677, 969, 69..."
3,148,"[4308, 5881, 8910, 6283, 1734, 55247, 3476, 17..."
4,540,"[1947, 969, 2723, 7022, 3083, 1537, 1256, 4419..."


In [19]:
userRecs.to_csv('ALS_recommendations.csv', index=False)

In [20]:
df1

Unnamed: 0,id,popularity
0,8844,17.015539
1,949,17.924927
2,710,14.686036
3,1408,7.284477
4,524,10.137389
...,...,...
2787,3104,2.302582
2788,64197,0.528657
2789,98604,0.803588
2790,5589,0.375001


In [21]:
df1=df1.sort_values('popularity',ascending = False).head(10)

In [22]:
df1

Unnamed: 0,id,popularity
53,680,140.950236
1811,155,123.167259
101,78,96.272374
521,550,63.869599
58,278,51.645403
65,13,48.307194
990,22,47.326665
44,11,42.149697
97,424,41.725123
129,238,41.109264


In [23]:
userRecs

Unnamed: 0,userId,new_recommendations
0,471,"[306, 5066, 1243, 2867, 2788, 81845, 2202, 561..."
1,463,"[1947, 4351, 3477, 3421, 1372, 955, 1278, 7099..."
2,496,"[56145, 3435, 2395, 1275, 1673, 94677, 969, 69..."
3,148,"[4308, 5881, 8910, 6283, 1734, 55247, 3476, 17..."
4,540,"[1947, 969, 2723, 7022, 3083, 1537, 1256, 4419..."
...,...,...
605,208,"[106489, 6773, 8957, 6952, 1219, 213, 6993, 49..."
606,401,"[179819, 4351, 2278, 89492, 1845, 143355, 3421..."
607,422,"[2423, 3836, 5103, 1204, 1287, 3176, 3868, 475..."
608,517,"[102903, 1639, 2690, 1088, 3676, 277, 2065, 80..."


In [50]:
top10_id=list(df1['id'])
top10_id=list(map(lambda x:str(x),top10_id))

In [52]:
overlap=[]
for i in range(len(userRecs['userId'])):
    for x in userRecs['new_recommendations'][i]:
        overlap_num=0
        if x in top10_id:
            overlap_num=overlap_num+1
    overlap.append(overlap_num)

In [53]:
len(overlap)

610

In [54]:
userRecs['#overlap']=overlap

In [55]:
userRecs

Unnamed: 0,userId,new_recommendations,#overlap
0,471,"[306, 5066, 1243, 2867, 2788, 81845, 2202, 561...",0
1,463,"[1947, 4351, 3477, 3421, 1372, 955, 1278, 7099...",0
2,496,"[56145, 3435, 2395, 1275, 1673, 94677, 969, 69...",0
3,148,"[4308, 5881, 8910, 6283, 1734, 55247, 3476, 17...",0
4,540,"[1947, 969, 2723, 7022, 3083, 1537, 1256, 4419...",0
...,...,...,...
605,208,"[106489, 6773, 8957, 6952, 1219, 213, 6993, 49...",0
606,401,"[179819, 4351, 2278, 89492, 1845, 143355, 3421...",0
607,422,"[2423, 3836, 5103, 1204, 1287, 3176, 3868, 475...",0
608,517,"[102903, 1639, 2690, 1088, 3676, 277, 2065, 80...",0


In [56]:
userRecs['#overlap'].sum()

0

In [57]:
top10_id[0]

'680'

In [65]:
userRecs['new_recommendations'][0][1]

'5066'

In [67]:
userRecs.to_csv('ALS_recommendations.csv', index=False)