In [1]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
import numpy as np
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [2]:
movies=pd.read_csv('../data/movies/movies.csv')
pd_ratings= pd.read_csv('../data/movies/ratings.csv')
pd_ratings=pd_ratings.drop('timestamp', axis=1)
tags=pd.read_csv('../data/movies/tags.csv')
links=pd.read_csv('../data/movies/movies.csv')

In [3]:
pd_ratings.isna().sum()

userId     0
movieId    0
rating     0
dtype: int64

 Setup a SparkSession
spark = SparkSession.builder.getOrCreate()
...

# Convert a Pandas DF to a Spark DF
spark_df = spark.createDataFrame(pandas_df) 

# Convert a Spark DF to a Pandas DF
pandas_df = spark_df.toPandas()

In [4]:
spark = SparkSession.builder.getOrCreate()
spark_ratings= spark.createDataFrame(pd_ratings) 
train, test = spark_ratings.randomSplit([0.8, 0.2], seed=42)

In [5]:
factor_model = ALS(
    itemCol='movieId',
    userCol='userId',
    ratingCol='rating',
    nonnegative=True,    
    regParam=0.1,
    coldStartStrategy='drop',
    rank=20) 

In [6]:
ratings=factor_model.fit(train)

In [7]:
test.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|   1029|   3.0|
|     1|   1061|   3.0|
|     1|   1129|   2.0|
|     1|   2105|   4.0|
|     1|   2294|   2.0|
|     2|    186|   3.0|
|     2|    300|   3.0|
|     2|    314|   4.0|
|     2|    319|   1.0|
|     2|    364|   3.0|
|     2|    372|   3.0|
|     2|    508|   4.0|
|     2|    550|   3.0|
|     2|    552|   3.0|
|     3|    267|   3.0|
|     3|   2318|   4.0|
|     3|   5349|   3.0|
|     3|   7153|   2.5|
|     3|   7361|   3.0|
|     3|  27369|   3.5|
+------+-------+------+
only showing top 20 rows



In [8]:
predict=ratings.transform(test)

predictions_df = predict.toPandas()
train= train.toPandas()
predictions_df = predict.toPandas().fillna(train['rating'].mean())

test_pd=test.toPandas()
predictions_df['squared_error'] = (predictions_df['rating'] - predictions_df['prediction'])**2
#print (predictions_df)


In [9]:
np.sqrt(sum(predictions_df['squared_error']) / len(predictions_df))

0.9059407769452981

In [10]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predict)

In [11]:
print (rmse)

0.9059407769453109


In [23]:
def top_x(n):
    counter=np.linspace(1,n, num=n)
    top_choices=[]
    for place in range(1,n+1):
        if place == 1:
            choice = '1st'
        if place == 2: 
            choice = '2nd'
        if place == 3:  
            choice = '3rd'
        else:
            choice = str(place) + 'th'
        top_choices.append(choice)
    return top_choices


In [24]:
n=10
col_names=top_x(n)
userRecs = ratings.recommendForAllUsers(n)
# Generate top 10 user recommendations for each movie
movieRecs = ratings.recommendForAllItems(n)
col_names

['1th', '2th', '3rd', '4th', '5th', '6th', '7th', '8th', '9th', '10th']

In [14]:
best_movies=userRecs.toPandas()


In [15]:
best_movies.recommendations[0]

[Row(movieId=83411, rating=4.898439407348633),
 Row(movieId=67504, rating=4.898439407348633),
 Row(movieId=83359, rating=4.898439407348633),
 Row(movieId=108583, rating=4.693892478942871),
 Row(movieId=3030, rating=4.692594051361084),
 Row(movieId=59684, rating=4.631417274475098),
 Row(movieId=31435, rating=4.594638347625732),
 Row(movieId=54328, rating=4.59224796295166),
 Row(movieId=3414, rating=4.576173305511475),
 Row(movieId=52767, rating=4.524228096008301)]

In [16]:
#df[['b1', 'b2']] = pd.DataFrame(df['b'].tolist(), index=df.index)

ranked=pd.DataFrame(best_movies['recommendations'].tolist(), index=best_movies.index)

#best_movies[['Best','2nd,','3rd']]=pd.DataFrame(best_movies['recommendations'], index=best_movies.index)