# Step 1: Create the SparkSession Object

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('lin_reg').getOrCreate()

# Step 2: Read the Dataset

In [2]:
df = spark.read.csv('movie_ratings_df.csv', inferSchema=True, header=True)

# Step 3: Exploratory Data Analysis

In [4]:
def shape(df):
    return (df.count(), len(df.columns))

In [5]:
shape(df)

(100000, 3)

In [8]:
df.head(3)

[Row(userId=196, title='Kolya (1996)', rating=3),
 Row(userId=63, title='Kolya (1996)', rating=3),
 Row(userId=226, title='Kolya (1996)', rating=5)]

In [7]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [10]:
from pyspark.sql.functions import rand

df.orderBy(rand()).show(5)

+------+--------------------+------+
|userId|               title|rating|
+------+--------------------+------+
|   345|    Quiz Show (1994)|     4|
|   466|Conspiracy Theory...|     4|
|   363|Singin' in the Ra...|     2|
|   734|      Rebecca (1940)|     5|
|   716|Air Force One (1997)|     5|
+------+--------------------+------+
only showing top 5 rows



In [13]:
df.groupBy('userId').count().orderBy('count', ascending=False).show(5)

+------+-----+
|userId|count|
+------+-----+
|   405|  737|
|   655|  685|
|    13|  636|
|   450|  540|
|   276|  518|
+------+-----+
only showing top 5 rows



In [14]:
df.groupBy('userId').count().orderBy('count', ascending=True).show(5)

+------+-----+
|userId|count|
+------+-----+
|   685|   20|
|   596|   20|
|   926|   20|
|    34|   20|
|   300|   20|
+------+-----+
only showing top 5 rows



In [15]:
df.groupBy('title').count().orderBy('count', ascending=False).show(5)

+--------------------+-----+
|               title|count|
+--------------------+-----+
|    Star Wars (1977)|  583|
|      Contact (1997)|  509|
|        Fargo (1996)|  508|
|Return of the Jed...|  507|
|    Liar Liar (1997)|  485|
+--------------------+-----+
only showing top 5 rows



# Step 4: Feature Engineering

In [16]:
from pyspark.ml.feature import StringIndexer, IndexToString

In [17]:
stringIndexer = StringIndexer(inputCol='title', outputCol='title_new')
model = stringIndexer.fit(df)
indexed = model.transform(df)
indexed.show(5)

+------+------------+------+---------+
|userId|       title|rating|title_new|
+------+------------+------+---------+
|   196|Kolya (1996)|     3|    287.0|
|    63|Kolya (1996)|     3|    287.0|
|   226|Kolya (1996)|     5|    287.0|
|   154|Kolya (1996)|     3|    287.0|
|   306|Kolya (1996)|     5|    287.0|
+------+------------+------+---------+
only showing top 5 rows



In [18]:
indexed.groupBy('title_new').count().orderBy('count', ascending=False).show(5)

+---------+-----+
|title_new|count|
+---------+-----+
|      0.0|  583|
|      1.0|  509|
|      2.0|  508|
|      3.0|  507|
|      4.0|  485|
+---------+-----+
only showing top 5 rows



# Step 5: Splitting the Dataset

In [19]:
train, test = indexed.randomSplit([.75, .25], seed=42)

In [20]:
shape(train), shape(test)

((75031, 4), (24969, 4))

# Step 6: Build and Train Recommender Model

In [22]:
from pyspark.ml.recommendation import ALS

rec = ALS(maxIter=10,
          regParam=0.01,
          userCol='userId',
          itemCol='title_new',
          ratingCol='rating',
          nonnegative=True, # Ensure that no negative ratings are created.
          coldStartStrategy='drop') # Prevent any NaN ratings predictions.

In [23]:
rec_model = rec.fit(train)

# Step 7: Predictions and Evaluation on Test Data

In [25]:
predicted_ratings = rec_model.transform(test)
predicted_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)
 |-- prediction: float (nullable = false)



In [26]:
predicted_ratings.orderBy(rand()).show(10)

+------+--------------------+------+---------+----------+
|userId|               title|rating|title_new|prediction|
+------+--------------------+------+---------+----------+
|   422|Devil's Advocate,...|     4|    130.0|  3.219592|
|   314|        Dumbo (1941)|     4|    271.0| 3.6539402|
|    95|       Grease (1978)|     3|    166.0| 3.5170355|
|   407|    Star Wars (1977)|     4|      0.0|  4.291096|
|   758|    True Lies (1994)|     4|    109.0| 3.7785692|
|   660|When Harry Met Sa...|     2|     45.0| 2.7547045|
|   465|E.T. the Extra-Te...|     3|     33.0| 3.4026253|
|   561|  Kansas City (1996)|     4|    880.0| 2.7149253|
|   901|Somewhere in Time...|     4|    407.0|  4.249585|
|   531|     Scream 2 (1997)|     2|    314.0|  3.499322|
+------+--------------------+------+---------+----------+
only showing top 10 rows



In [28]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse',
                                predictionCol='prediction',
                                labelCol='rating')
rmse = evaluator.evaluate(predicted_ratings)
rmse

1.0236514924750828

# Step 8: Recommend Top Movies That Active User Might Like

In [30]:
unique_movies = indexed.select('title_new').distinct()
unique_movies.count()

1664

In [31]:
a = unique_movies.alias('a')

In [34]:
user_id = 85

watched_movies = indexed.filter(indexed['userId'] == user_id).select('title_new').distinct()
watched_movies.count()

287

In [35]:
b = watched_movies.alias('b')

In [36]:
total_movies = a.join(b, a.title_new == b.title_new, how='left')
total_movies.show(10)

+---------+---------+
|title_new|title_new|
+---------+---------+
|    558.0|     null|
|    305.0|    305.0|
|    299.0|     null|
|    596.0|     null|
|    769.0|     null|
|    934.0|     null|
|    496.0|    496.0|
|   1051.0|     null|
|    692.0|     null|
|    810.0|     null|
+---------+---------+
only showing top 10 rows



In [40]:
from pyspark.sql.functions import col, lit

remaining_movies = total_movies.where(col('b.title_new').isNull()).select(a.title_new).distinct()
remaining_movies.count()

1377

In [41]:
remaining_movies = remaining_movies.withColumn('userId', lit(int(user_id)))
remaining_movies.show(10)

+---------+------+
|title_new|userId|
+---------+------+
|    558.0|    85|
|    299.0|    85|
|    596.0|    85|
|    769.0|    85|
|    934.0|    85|
|   1051.0|    85|
|    692.0|    85|
|    810.0|    85|
|    720.0|    85|
|    782.0|    85|
+---------+------+
only showing top 10 rows



In [44]:
recommendations = rec_model.transform(remaining_movies).orderBy('prediction', ascending=False)
recommendations.show(5)

+---------+------+----------+
|title_new|userId|prediction|
+---------+------+----------+
|   1358.0|    85| 5.7467284|
|   1322.0|    85| 5.2020054|
|    924.0|    85| 5.1696897|
|   1067.0|    85|  4.771118|
|   1148.0|    85|  4.739236|
+---------+------+----------+
only showing top 5 rows



In [45]:
movie_title = IndexToString(inputCol='title_new',
                            outputCol='title',
                            labels=model.labels)
final_recommendations = movie_title.transform(recommendations)
final_recommendations.show(10, False)

+---------+------+----------+-----------------------------------------------------------+
|title_new|userId|prediction|title                                                      |
+---------+------+----------+-----------------------------------------------------------+
|1358.0   |85    |5.7467284 |Angel Baby (1995)                                          |
|1322.0   |85    |5.2020054 |Faust (1994)                                               |
|924.0    |85    |5.1696897 |Paradise Lost: The Child Murders at Robin Hood Hills (1996)|
|1067.0   |85    |4.771118  |Man in the Iron Mask, The (1998)                           |
|1148.0   |85    |4.739236  |Wild America (1997)                                        |
|1434.0   |85    |4.7329054 |Joy Luck Club, The (1993)                                  |
|1071.0   |85    |4.6990867 |Stalingrad (1993)                                          |
|482.0    |85    |4.672848  |Wallace & Gromit: The Best of Aardman Animation (1996)     |
|888.0    