In [1]:
import os
import pyspark
from pyspark.sql import SQLContext, SparkSession

# change "pierre.cs.colsotate.edu" to the name of your spark master node
SPARK_NODE="pierre.cs.colostate.edu"


# 31820 corresponds to SPARK_MASTER_PORT in $SPARK_HOME/conf/spark-env.sh
SPARK_PORT=31820

spark = SparkSession.builder.master('spark://{}:{}'.format(SPARK_NODE,SPARK_PORT)).appName('test').getOrCreate()

sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
print("Spark Version: " + spark.version)
print("PySpark Version: " + pyspark.__version__)

Spark Version: 3.0.3
PySpark Version: 3.0.3


In [2]:
# load data
normalized_ratings = spark.read.json("/FP/output/normalized_ratings.json")


In [52]:
print('Number of partitions for the ratings DataFrame: {}'.format(normalized_ratings.rdd.getNumPartitions()))

Number of partitions for the ratings DataFrame: 100


In [3]:
#load data

#split data

training, validation, test = normalized_ratings.randomSplit([6.0, 2.0, 2.0], 24)

In [57]:
from pyspark.ml.recommendation import ALS

#model = ALS.trainImplicit(validation,5,seed=0)
model = ALS(userCol='user_id', itemCol='item_id', ratingCol='rating').fit(training)


In [62]:
predictions = model.transform(test)
predictions.show()

+-------+--------------------+-------+-------------+
|item_id|              rating|user_id|   prediction|
+-------+--------------------+-------+-------------+
|    148| -0.3621621621621621| 282329| -0.038076196|
|    148|-0.16844898287368337| 168279| -0.018411031|
|    148|-0.14113308749580955| 602455|  -0.01775694|
|    148|  0.2718600953895072| 333305| -0.017734025|
|    148| -0.1428571428571428| 865238|-0.0054512485|
|    148|0.019867549668874274| 872835| -0.021995276|
|    148| 0.05319148936170204| 279216| -0.045541644|
|    148| 0.24878048780487805| 102933| -0.034185715|
|    148|-0.48370273794002605| 900574| -0.038468044|
|    148|-0.07707910750507097| 917175| -0.070053376|
|    148|-0.00928792569659...| 844689| -0.012771412|
|    148|-0.04083204930662554|  64181| -0.030010216|
|    148| -0.2143701252658946| 258674| -0.015988404|
|    148|-0.18652561247216037| 390047| -0.026524104|
|    148|-0.07692307692307698| 748306| -0.007702155|
|    148|-0.18540669856459324| 159827| -0.0133

In [61]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions)))

The root mean squared error for our model is: nan


In [14]:
predTest = model.predictAll(test["user_id","item_id"].rdd).toDF(["user_id","item_id","rating"])

In [32]:
testCompare = test.join(predTest.withColumnRenamed("rating","pred_rating")\
                        ,[test.user_id==predTest.user_id, test.item_id==predTest.item_id]\
                           ).select(test.user_id,test.item_id,"rating","pred_rating")
testCompare.show()

+-------+-------+--------------------+--------------------+
|user_id|item_id|              rating|         pred_rating|
+-------+-------+--------------------+--------------------+
|   1410|      1| 0.20388349514563098|  0.7860451756848548|
|  65518|      3|  0.2222222222222221| 0.03272848759226887|
| 105288|      1|-0.12082957619477008| 0.07986960389975017|
| 139207|      1| 0.22565006610841776| 0.23324659302959194|
| 147252|      1|-0.02304147465437789|  0.2550308501927132|
|  54736|      5|-0.41313868613138693|-0.00419573906341...|
|  79640|      1| 0.27450980392156854|  0.1827480756827744|
|  99517|      1| 0.10828025477707004| 0.07987368238186043|
|   7453|      3| 0.19909502262443435|  0.5181362009771763|
|  26495|      2|-0.08063175394846211|   0.544579038563672|
| 149120|      2|-0.01285347043701801| 0.09677562316412569|
| 181473|      1| -0.7095890410958904|  0.1530106947772265|
|   5719|      5| -0.4350282485875706|-0.01943513792413...|
| 111113|      5|-0.39924433249370284|-0

In [64]:
# replace na predictions with average rating

avgRatings = normalized_ratings.select('rating').groupBy().avg().first()[0]
print ('The average rating in the dataset is: {}'.format(avgRatings))
# avg rating is close to zero because we normalized ratings

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print ('RMSE when replacing na: {}'.format(evaluator.evaluate(predictions.na.fill(avgRatings))))

# drop na predictions 
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print ('RMSE when dropping na: {}'.format(evaluator.evaluate(predictions.na.drop())))

The average rating in the dataset is: -1.4090871080574545e-19
RMSE when replacing na: 0.2767606916548808
RMSE when dropping na: 0.27671374834702206


In [75]:
movies=spark.read.json("/FP/metadata.json")
movies.show()

+---------+---------+--------------------+-------+-------+--------------------+--------------------+
|avgRating|dateAdded|          directedBy| imdbId|item_id|            starring|               title|
+---------+---------+--------------------+-------+-------+--------------------+--------------------+
|  3.89146|     null|       John Lasseter|0114709|      1|Tim Allen, Tom Ha...|    Toy Story (1995)|
|  3.26605|     null|        Joe Johnston|0113497|      2|Jonathan Hyde, Br...|      Jumanji (1995)|
|  3.17146|     null|       Howard Deutch|0113228|      3|Jack Lemmon, Walt...|Grumpier Old Men ...|
|  2.86824|     null|     Forest Whitaker|0114885|      4|Angela Bassett, L...|Waiting to Exhale...|
|   3.0762|     null|       Charles Shyer|0113041|      5|Steve Martin, Mar...|Father of the Bri...|
|  3.85549|     null|        Michael Mann|0113277|      6|Robert De Niro, A...|         Heat (1995)|
|  3.37244|     null|      Sydney Pollack|0114319|      7|Harrison Ford, Gr...|      Sabrin

In [92]:
from pyspark.sql.functions import lit

movies=spark.read.json("/FP/metadata.json")
def recommendMovies(model, user, nbRecommendations):
    ratings=normalized_ratings
    # Create a Spark DataFrame with the specified user and all the movies listed in the ratings DataFrame
    dataSet = ratings.select('item_id').distinct().withColumn('user_id', lit(user))
    #dataSet.show()
    
    # Create a Spark DataFrame with the movies that have already been rated by this user
    moviesAlreadyRated = ratings.filter(ratings.user_id == user).select('item_id', 'user_id')
    #moviesAlreadyRated.show()
    
    # Apply the recommender system to the data set without the already rated movies to predict ratings
    predictions = model.transform(dataSet.subtract(moviesAlreadyRated)).dropna().orderBy('prediction', ascending=False).limit(nbRecommendations).select('item_id', 'prediction')
    #predictions.show()
    
    # Join with the movies DataFrame to get the movies titles and genres
    recommendations = predictions.join(movies, predictions.item_id == movies.item_id).select(predictions.item_id, movies.title, movies.starring, predictions.prediction)

#     recommendations.show(truncate=False)
    return recommendations



In [93]:
recommended = recommendMovies(model,1410,10)
recommended.show()

+-------+--------------------+--------------------+-----------+
|item_id|               title|            starring| prediction|
+-------+--------------------+--------------------+-----------+
| 106607|     Plankton (1994)|Clay Rogers, Mich...|0.088005714|
| 173945|The Wearing of th...|John T. Smith,Mel...|0.090307124|
| 182287|Porky in Egypt (1...|           Mel Blanc| 0.08560727|
| 185669|CM Punk: Best in ...|Phillip Jack Broo...| 0.10411912|
| 188111|Norman Lear: Just...|Norman Lear,John ...|0.085670136|
| 194434|   Adrenaline (1990)|Clémentine Célari...|0.088005714|
| 199524|Arise! SubGenius ...|Mark Mothersbaugh...|0.088005714|
| 202231|       Foster (2018)|                    | 0.10151178|
| 215251|   The Scheme (2020)|                    | 0.10421473|
+-------+--------------------+--------------------+-----------+



In [94]:
recommended = recommendMovies(model,7453,10)
recommended.show()

+-------+--------------------+--------------------+-----------+
|item_id|               title|            starring| prediction|
+-------+--------------------+--------------------+-----------+
| 106607|     Plankton (1994)|Clay Rogers, Mich...|0.010086744|
| 173945|The Wearing of th...|John T. Smith,Mel...|0.010346627|
| 182287|Porky in Egypt (1...|           Mel Blanc|0.009807436|
| 185669|CM Punk: Best in ...|Phillip Jack Broo...|0.011931514|
| 188111|Norman Lear: Just...|Norman Lear,John ...|0.009815398|
| 194434|   Adrenaline (1990)|Clémentine Célari...|0.010086744|
| 199524|Arise! SubGenius ...|Mark Mothersbaugh...|0.010086744|
| 202231|       Foster (2018)|                    | 0.01163055|
| 215251|   The Scheme (2020)|                    |0.011947064|
+-------+--------------------+--------------------+-----------+



### Confusion Matrix

negative = $[-100,-0.1)$

neutral = $[-0.1,0.1)$

positive = $[0.1,100)$

Vertical is true rating

Horizontal in predicted rating

In [131]:
# confusion matrix

predictions = model.transform(test)
joined = test.join(predictions.withColumnRenamed("rating","pred_ratings"),[predictions.item_id==test.item_id,predictions.user_id==test.user_id])
total = joined.count()

classes=[-100,-.1,.1,100]
labels=["negative","nuetral","positive"]
agg = [[labels[j]]+[None for i in range(len(classes)-1)] for j in range(len(classes)-1)]
for i in range(len(classes)-1):
    for j in range(len(classes)-1):
        matching=joined.rdd.filter(lambda x: classes[i]<=x.rating<classes[i+1] and classes[j]<=x.pred_ratings<classes[j+1]).count()
        print("actual in [{},{}), predicted in [{},{}): {}".format(classes[i],classes[i+1],classes[j],classes[j+1],matching))
        agg[i][j+1]=matching/total

actual in [-100,-0.1), predicted in [-100,-0.1): 1792097
actual in [-100,-0.1), predicted in [-0.1,0.1): 1227
actual in [-100,-0.1), predicted in [0.1,100): 2179
actual in [-0.1,0.1), predicted in [-100,-0.1): 1227
actual in [-0.1,0.1), predicted in [-0.1,0.1): 1771987
actual in [-0.1,0.1), predicted in [0.1,100): 2168
actual in [0.1,100), predicted in [-100,-0.1): 2179
actual in [0.1,100), predicted in [-0.1,0.1): 2168
actual in [0.1,100), predicted in [0.1,100): 2138395


In [134]:
import pandas

df = pandas.DataFrame(agg)
labels=["rating","negative","nuetral","positive"]
df.columns=labels
#df.set_index("rating")
display(df)

Unnamed: 0,rating,negative,nuetral,positive
0,negative,0.313653,0.000215,0.000381
1,nuetral,0.000215,0.310133,0.000379
2,positive,0.000381,0.000379,0.374262
