In [1]:
from pyspark import SparkContext, SQLContext

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20190815131323-0000
KERNEL_ID = 49ed001b-5c9d-409c-8f75-e7ec0e974b14


In [2]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [3]:
from pyspark.ml.evaluation import RegressionEvaluator

In [4]:
#import module
from pyspark.ml.recommendation import ALS

#create session
from pyspark.sql import SparkSession

spark = SparkSession\
  .builder\
  .appName("Recommender system in Spark")\
  .config("spark.some.config.option", "some-value") \
  .getOrCreate()

In [35]:
# The code was removed by Watson Studio for sharing.

[Row(movieId='1', title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId='2', title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId='3', title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId='4', title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId='5', title='Father of the Bride Part II (1995)', genres='Comedy')]

In [36]:
ratings = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('ratings.csv', 'recommendationengine-donotdelete-pr-1cnfjaobjuzd3x'))
ratings.take(5)

[Row(userId='1', movieId='2', rating='3.5', timestamp='1112486027'),
 Row(userId='1', movieId='29', rating='3.5', timestamp='1112484676'),
 Row(userId='1', movieId='32', rating='3.5', timestamp='1112484819'),
 Row(userId='1', movieId='47', rating='3.5', timestamp='1112484727'),
 Row(userId='1', movieId='50', rating='3.5', timestamp='1112484580')]

In [37]:
#merge "movies" and "ratings" dataFrame based on "movieId"
ratings.join(movies, "movieId").show(3)

+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|      2|     1|   3.5|1112486027|      Jumanji (1995)|Adventure|Childre...|
|     29|     1|   3.5|1112484676|City of Lost Chil...|Adventure|Drama|F...|
|     32|     1|   3.5|1112484819|Twelve Monkeys (a...|Mystery|Sci-Fi|Th...|
+-------+------+------+----------+--------------------+--------------------+
only showing top 3 rows



In [38]:
ratings.dtypes

[('userId', 'string'),
 ('movieId', 'string'),
 ('rating', 'string'),
 ('timestamp', 'string')]

In [40]:
#use only column data of "userId", "movieId", dan "rating"
data = ratings.select("userId", "movieId", "rating")

In [41]:
from pyspark.sql.types import IntegerType
data = data.withColumn("userId", data["userId"].cast(IntegerType()))
data = data.withColumn("movieId", data["movieId"].cast(IntegerType()))
data = data.withColumn("rating", data["rating"].cast(IntegerType()))

In [42]:
#data['userId'] = data['userId'].astype(int)
#data['movieId'] = data['movieId'].astype(int)
#data['rating'] = data['rating'].astype(int)

In [43]:
data.dtypes

[('userId', 'int'), ('movieId', 'int'), ('rating', 'int')]

In [44]:
#divide data, 70% for training and 30% for testing
splits = data.randomSplit([0.7, 0.3])
train = splits[0].withColumnRenamed("rating", "label")
test = splits[1].withColumnRenamed("rating", "trueLabel")
#calculate number of rows
train_rows = train.count()
test_rows = test.count()
print ("number of training data rows:", train_rows, 
       ", number of testing data rows:", test_rows)

number of training data rows: 734252 , number of testing data rows: 314323


In [45]:
#define ALS (Alternating Least Square) as our recommender system
als = ALS(maxIter=19, regParam=0.01, userCol="userId", 
          itemCol="movieId", ratingCol="label")
#train our ALS model
model = als.fit(train)
print("Training is done!")

Training is done!


In [46]:
prediction = model.transform(test)
print("testing is done!")

testing is done!


In [47]:
prediction.join(movies, "movieId").select(
    "userId", "title", "prediction", "trueLabel").show(n=10, truncate=False)

+------+--------------------------------+----------+---------+
|userId|title                           |prediction|trueLabel|
+------+--------------------------------+----------+---------+
|6225  |Awfully Big Adventure, An (1995)|1.0461599 |2        |
|1259  |Awfully Big Adventure, An (1995)|4.4552374 |5        |
|5814  |Awfully Big Adventure, An (1995)|2.1236084 |3        |
|4162  |Guilty as Sin (1993)            |2.3960946 |3        |
|4324  |Guilty as Sin (1993)            |3.885243  |4        |
|4986  |Guilty as Sin (1993)            |3.5348058 |4        |
|2242  |Guilty as Sin (1993)            |2.2652295 |3        |
|156   |Guilty as Sin (1993)            |3.590667  |4        |
|5518  |Hudsucker Proxy, The (1994)     |3.5980818 |4        |
|3352  |Hudsucker Proxy, The (1994)     |3.2893536 |3        |
+------+--------------------------------+----------+---------+
only showing top 10 rows



In [48]:
#import RegressionEvaluator since we also want to calculate RMSE (Root Mean Square Error)
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(prediction)
print ("Root Mean Square Error (RMSE):", rmse)

Root Mean Square Error (RMSE): nan


In [49]:
prediction.count()
a = prediction.count()
print("number of original data rows: ", a)
#drop rows with any missing data
cleanPred = prediction.dropna(how="any", subset=["prediction"])
b = cleanPred.count()
print("number of rows after dropping data with missing value: ", b)
print("number of missing data: ", a-b)

number of original data rows:  314323
number of rows after dropping data with missing value:  313259
number of missing data:  1064


In [50]:
rmse = evaluator.evaluate(cleanPred)
print ("Root Mean Square Error (RMSE):", rmse)

Root Mean Square Error (RMSE): 0.9464697990256984


In [65]:
#Generate recommendations for all the users
userRecs = model.recommendForAllUsers(10).show(5)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|  1580|[[1477, 15.164588...|
|  4900|[[94985, 10.14870...|
|  5300|[[59549, 10.24481...|
|  6620|[[757, 8.254283],...|
|   463|[[59549, 9.028948...|
+------+--------------------+
only showing top 5 rows



In [58]:
def get_recs_for_users(recs):
    #Recs should be for a specific user
    recs = recs.select('recommendations.movieId','recommendations.rating')
    movies = recs.select('movieId').toPandas().iloc[0,0]
    ratings = recs.select('rating').toPandas().iloc[0,0]
    ratings_matrix = pd.DataFrame(movies,columns = ['movieId'])
    ratings_matrix['ratings'] = ratings
    ratings_matrix_ps = sqlContext.createDataFrame(ratings_matrix)
    return ratings_matrix_ps

In [59]:
#Recommendations for user 41

from pyspark.sql.functions import col 
user_41_recs = user_recs.filter(col("userId") == 41) 
get_recs_for_users(user_41_recs).head(16)

[Row(movieId=6234, ratings=9.639665603637695),
 Row(movieId=56167, ratings=9.2036714553833),
 Row(movieId=32456, ratings=8.928771018981934),
 Row(movieId=79318, ratings=8.317727088928223),
 Row(movieId=7319, ratings=7.845831394195557),
 Row(movieId=7222, ratings=7.8397932052612305),
 Row(movieId=7243, ratings=7.720480918884277),
 Row(movieId=8235, ratings=7.519848823547363),
 Row(movieId=2079, ratings=7.4928741455078125),
 Row(movieId=3289, ratings=7.429114818572998)]