In [51]:
from google.cloud import storage
import os

In [52]:
from io import BytesIO
import pandas as pd
#Connect to GCP bucket and assign the bucket_name and specify the file name
bucket_name = "spy999" #Assign the bucket name where your file is stored
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)

In [53]:
blob = storage.blob.Blob("movies_metadata_small(1).csv",bucket)
blob1 = storage.blob.Blob("ratings_small(2).csv", bucket)
# Convert to a pandas dataframe
content = blob.download_as_string()
train = pd.read_csv(BytesIO(content))

content1 = blob1.download_as_string()
train1 = pd.read_csv(BytesIO(content1))

In [57]:
train = train[["id", "popularity"]]
type(train)

pandas.core.frame.DataFrame

In [55]:
train1 = train1[["userId", "movieId", "rating"]]
train1

Unnamed: 0,userId,movieId,rating
0,1,1,4.0
1,1,3,4.0
2,1,6,4.0
3,1,47,5.0
4,1,50,5.0
...,...,...,...
100831,610,166534,4.0
100832,610,168248,5.0
100833,610,168250,5.0
100834,610,168252,5.0


In [58]:
df2 = pd.merge(train1, train, left_on="movieId", right_on="id")
df2

Unnamed: 0,userId,movieId,rating,id,popularity
0,1,3,4.0,3,2.292110
1,6,3,5.0,3,2.292110
2,19,3,3.0,3,2.292110
3,32,3,3.0,3,2.292110
4,42,3,4.0,3,2.292110
...,...,...,...,...,...
42175,610,80094,4.0,80094,0.000220
42176,610,89072,4.0,89072,0.488615
42177,610,93193,3.0,93193,0.369703
42178,610,114044,3.5,114044,0.074151


In [59]:
# impport everything we need for the rec sys
from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [60]:
# initialize spark session
spark = SparkSession.builder.appName('ALS').getOrCreate()
schema = StructType([StructField("userId", IntegerType(), True), StructField("movieId", IntegerType(), True), StructField("rating", DoubleType(), True), StructField("id", IntegerType(), True), StructField("popularity", DoubleType(), True)])
ratings = spark.createDataFrame(df2, schema=schema)
ratings.show(10)

+------+-------+------+---+------------------+
|userId|movieId|rating| id|        popularity|
+------+-------+------+---+------------------+
|     1|      3|   4.0|  3|2.2921099999999996|
|     6|      3|   5.0|  3|2.2921099999999996|
|    19|      3|   3.0|  3|2.2921099999999996|
|    32|      3|   3.0|  3|2.2921099999999996|
|    42|      3|   4.0|  3|2.2921099999999996|
|    43|      3|   5.0|  3|2.2921099999999996|
|    44|      3|   3.0|  3|2.2921099999999996|
|    51|      3|   4.0|  3|2.2921099999999996|
|    58|      3|   3.0|  3|2.2921099999999996|
|    64|      3|   3.5|  3|2.2921099999999996|
+------+-------+------+---+------------------+
only showing top 10 rows



In [61]:
# create testing and training data
(training, test) = ratings.randomSplit([0.8, 0.2])

In [62]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="popularity",coldStartStrategy="drop")

In [63]:
model = als.fit(training)


In [68]:
#recommendations
userRecs = model.recommendForAllUsers(10) # top 10 movie recommendations for each user


In [70]:
userRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[{680, 127.10423}...|
|   463|[{680, 131.68222}...|
|   496|[{680, 121.807175...|
|   148|[{680, 126.51054}...|
|   540|[{680, 131.91869}...|
|   392|[{680, 127.33894}...|
|   243|[{680, 132.43771}...|
|    31|[{680, 131.31763}...|
|   516|[{680, 132.8027},...|
|   580|[{680, 140.93689}...|
|   251|[{680, 134.86925}...|
|   451|[{680, 131.2594},...|
|    85|[{680, 116.25065}...|
|   137|[{680, 130.56137}...|
|    65|[{680, 128.13367}...|
|   458|[{680, 134.06406}...|
|   481|[{680, 66.080635}...|
|    53|[{680, 121.494415...|
|   255|[{680, 124.89766}...|
|   588|[{680, 128.27061}...|
+------+--------------------+
only showing top 20 rows



In [71]:
all_recs = userRecs.toPandas()
all_recs.head()

Unnamed: 0,userId,recommendations
0,471,"[(680, 127.10423278808594), (155, 125.48609924..."
1,463,"[(680, 131.68222045898438), (155, 126.08360290..."
2,496,"[(680, 121.80717468261719), (155, 121.06945037..."
3,148,"[(680, 126.51053619384766), (155, 122.79273986..."
4,540,"[(680, 131.91868591308594), (155, 124.39183044..."


In [72]:
def id_taker(row):
    movieId = ''
    for i in row['recommendations']:
        movieId += str(i['movieId']) + ','
    return movieId

In [73]:
all_recs["new_recs"] = all_recs.apply(id_taker, axis = 1)
all_recs.head()

Unnamed: 0,userId,recommendations,new_recs
0,471,"[(680, 127.10423278808594), (155, 125.48609924...",68015578550132211424637671
1,463,"[(680, 131.68222045898438), (155, 126.08360290...",68015578550132211424637671
2,496,"[(680, 121.80717468261719), (155, 121.06945037...",68015578550132211424637671
3,148,"[(680, 126.51053619384766), (155, 122.79273986...",68015578550132211424637671
4,540,"[(680, 131.91868591308594), (155, 124.39183044...",68015578550132211424637671
