In [35]:
from google.cloud import storage
import os
from io import BytesIO
import pandas as pd

#Connect to GCP bucket and assign the bucket_name and specify the file name
bucket_name = "jupyter-pyspark" #Assign the bucket name where your file is stored
storage_client = storage.Client()

bucket = storage_client.get_bucket(bucket_name)

In [36]:
blob = storage.blob.Blob("ratings_small.csv",bucket)
blob

<Blob: jupyter-pyspark, ratings_small.csv, None>

In [37]:
# Convert to a pandas dataframe
content = blob.download_as_string()
train = pd.read_csv(BytesIO(content))

In [38]:
train

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931
...,...,...,...,...
100831,610,166534,4.0,1493848402
100832,610,168248,5.0,1493850091
100833,610,168250,5.0,1494273047
100834,610,168252,5.0,1493846352


In [39]:
# impport everything we need for the rec sys
from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql import SparkSession

In [40]:
# initialize spark session
spark = SparkSession.builder.appName('Recommendation_system').getOrCreate()

In [41]:
# create a spark dataframe from a pandas dataframe
df = spark.createDataFrame(train)
df.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [42]:
# create testing and training data
(training, test) = df.randomSplit([0.8, 0.2])

In [43]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop")

In [44]:
model = als.fit(training)

In [45]:
# generate predictions and evaluations
predictions = model.transform(test)

In [46]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")

In [47]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.096489302072982


In [48]:
#recommendations
userRecs = model.recommendForAllUsers(10) # top 10 movie recommendations for each user
movieRecs = model.recommendForAllItems(10) # top 10 user recommendations for each movie
users = df.select(als.getUserCol()).distinct().limit(3) 
userSubsetRecs = model.recommendForUserSubset(users, 10) # top 10 movie recommendations for a specified set of users
movies = df.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10) # top 10 user recommendations for a specified set of movies.

In [49]:
userRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[4144, 9.155789]...|
|   463|[[932, 7.311982],...|
|   496|[[1237, 6.7022276...|
|   148|[[2135, 6.6665936...|
|   540|[[48322, 7.490763...|
|   392|[[70946, 9.99129]...|
|   243|[[6550, 9.606707]...|
|    31|[[129354, 9.65955...|
|   516|[[213, 7.5691013]...|
|   580|[[54256, 6.564732...|
|   251|[[2843, 8.490271]...|
|   451|[[42723, 9.619178...|
|    85|[[55363, 7.814979...|
|   137|[[42723, 6.789931...|
|    65|[[183897, 6.19192...|
|   458|[[5135, 10.103954...|
|   481|[[86320, 7.282119...|
|    53|[[2492, 8.814103]...|
|   255|[[5419, 11.82389]...|
|   588|[[215, 8.803522],...|
+------+--------------------+
only showing top 20 rows

