In [1]:
import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext
from pyspark import SparkConf

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[12]') \
    .config("spark.driver.memory", "10g") \
    .appName('Recommendations') \
    .getOrCreate()
sc = SparkContext
# sc.setCheckpointDir('checkpoint')

In [3]:
# Create new config
conf = (SparkConf()
    .set("spark.driver.maxResultSize",  0))

# Create new context
sc = SparkContext.getOrCreate();

In [4]:
movies = spark.read.csv("C:/Users/bayra/Desktop/tez proje/Recommender_Deneme_withbigdata/Data/movies.csv",header=True)
ratings = spark.read.csv("C:/Users/bayra/Desktop/tez proje/Recommender_Deneme_withbigdata/Data/ratings.csv",header=True)

In [5]:
ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
|     1|   1590|   2.5|1256677236|
|     1|   1591|   1.5|1256677475|
|     1|   2134|   4.5|1256677464|
|     1|   2478|   4.0|1256677239|
|     1|   2840|   3.0|1256677500|
|     1|   2986|   2.5|1256677496|
|     1|   3020|   4.0|1256677260|
|     1|   3424|   4.5|1256677444|
|     1|   3698|   3.5|1256677243|
|     1|   3826|   2.0|1256677210|
|     1|   3893|   3.5|1256677486|
|     2|    170|   3.5|1192913581|
|     2|    849|   3.5|1192913537|
|     2|   1186|   3.5|1192913611|
|     2|   1235|   3.0|1192913585|
+------+-------+------+----------+
only showing top 20 rows



In [6]:
movies.count()

58098

In [7]:
ratings.count()

27753444

In [8]:
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [9]:
ratings = ratings.    withColumn('userId', col('userId').cast('integer')).    withColumn('movieId', col('movieId').cast('integer')).    withColumn('rating', col('rating').cast('float')).    drop('timestamp')
ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    307|   3.5|
|     1|    481|   3.5|
|     1|   1091|   1.5|
|     1|   1257|   4.5|
|     1|   1449|   4.5|
|     1|   1590|   2.5|
|     1|   1591|   1.5|
|     1|   2134|   4.5|
|     1|   2478|   4.0|
|     1|   2840|   3.0|
|     1|   2986|   2.5|
|     1|   3020|   4.0|
|     1|   3424|   4.5|
|     1|   3698|   3.5|
|     1|   3826|   2.0|
|     1|   3893|   3.5|
|     2|    170|   3.5|
|     2|    849|   3.5|
|     2|   1186|   3.5|
|     2|   1235|   3.0|
+------+-------+------+
only showing top 20 rows



In [10]:
# Count the total number of ratings in the dataset
numerator = ratings.select("rating").count()

# Count the number of distinct userIds and distinct movieIds
num_users = ratings.select("userId").distinct().count()
num_movies = ratings.select("movieId").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The ratings dataframe is  99.82% empty.


In [11]:
# Group data by userId, count ratings
userId_ratings = ratings.groupBy("userId").count().orderBy('count', ascending=False)
userId_ratings.show()

+------+-----+
|userId|count|
+------+-----+
|123100|23715|
|117490| 9279|
|134596| 8381|
|212343| 7884|
|242683| 7515|
|111908| 6645|
| 77609| 6398|
| 63783| 6346|
|172357| 5868|
|141955| 5810|
|158002| 5701|
|253511| 5356|
| 48470| 5257|
|183233| 5169|
| 94843| 5130|
| 73145| 5042|
| 37046| 5041|
|187986| 4951|
|  4796| 4874|
|236981| 4854|
+------+-----+
only showing top 20 rows



In [12]:
# Group data by userId, count ratings
movieId_ratings = ratings.groupBy("movieId").count().orderBy('count', ascending=False)
movieId_ratings.show()

+-------+-----+
|movieId|count|
+-------+-----+
|    318|97999|
|    356|97040|
|    296|92406|
|    593|87899|
|   2571|84545|
|    260|81815|
|    480|76451|
|    527|71516|
|    110|68803|
|      1|68469|
|   1210|66023|
|   1196|65822|
|   2959|65678|
|    589|64258|
|   1198|63505|
|     50|62180|
|   4993|61883|
|    858|60904|
|   2858|60820|
|    780|58949|
+-------+-----+
only showing top 20 rows



In [13]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [14]:
# Create test and train set
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")

# Confirm that a model called "als" was created
type(als)


pyspark.ml.recommendation.ALS

In [15]:
# Import the requisite items
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 15, 20, 25]) \
            .addGrid(als.regParam, [.01, .015, .02, .025]) \
            .build()
            #             .addGrid(als.maxIter, [10, 15, 20, 25]) \

           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


In [16]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Confirm cv was built
print(cv)

CrossValidator_a4a2215dad56


In [17]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

In [18]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 15
  MaxIter: 10
  RegParam: 0.025


In [19]:
# View the rmse
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.8168789588679283


In [22]:
# View the predictions
test_predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|245316|    148|   1.0| 2.0224955|
|211963|    148|   3.0| 1.9834845|
|167692|    148|   3.0| 3.5114198|
| 28455|    148|   2.0| 1.8322432|
|117490|    148|   3.5| 2.7790442|
|224445|    148|   3.0| 3.1968586|
| 80394|    148|   2.0|   2.83979|
|255236|    148|   2.0| 2.2906802|
|  8264|    148|   1.0| 2.8061597|
|111694|    148|   5.0| 3.6297963|
|127048|    148|   3.0| 2.7176917|
| 42263|    148|   2.0| 2.7713304|
|  6328|    148|   5.0| 2.4363732|
| 32658|    148|   3.0|  3.035843|
| 64180|    148|   3.0| 3.0644758|
|169517|    148|   3.0|  3.304547|
|259344|    148|   3.0| 3.2826507|
|189688|    148|   3.0|  3.045232|
| 37381|    148|   2.0| 3.1068919|
|265103|    148|   1.0| 2.8254876|
+------+-------+------+----------+
only showing top 20 rows



In [23]:
# Generate n Recommendations for all users
nrecommendations = best_model.recommendForAllUsers(10)
nrecommendations.limit(10).show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   148|[[188925, 12.3547...|
|   463|[[188925, 8.19713...|
|   471|[[188925, 13.2045...|
|   496|[[188925, 9.84247...|
|   833|[[188925, 10.9463...|
|  1088|[[188925, 8.82124...|
|  1238|[[188925, 3.96162...|
|  1342|[[152711, 5.37587...|
|  1580|[[188925, 12.1764...|
|  1591|[[188925, 15.1259...|
+------+--------------------+



In [24]:
nrecommendations = nrecommendations    .withColumn("rec_exp", explode("recommendations"))    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

nrecommendations.limit(10).show()

+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|   148| 188925|12.354702|
|   148| 188923|11.119231|
|   148| 152711| 8.766706|
|   148| 183185|6.7137213|
|   148|  79236| 6.552726|
|   148| 182521| 6.439548|
|   148| 119661|6.2773037|
|   148| 154860| 6.228688|
|   148| 188935| 6.177351|
|   148| 185035|6.1501584|
+------+-------+---------+

