In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as sf
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [2]:
# Creating session
spark = SparkSession.builder.appName("Movie Recommendation App").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/17 11:13:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:

# Reading the ratings_small csv file

df_ratings_small  = spark.read.csv("data/movie-lens-small-latest-dataset/ratings.csv", header=True, inferSchema=True)


df_ratings_small.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows


In [4]:
#counting data
ratings_counts = df_ratings_small.select("rating").count()
users_count = df_ratings_small.select("userId").distinct().count()
movies_count = df_ratings_small.select("movieId").distinct().count()

print(f"ratings counts: {ratings_counts}\n user counts: {users_count}\n movies count: {movies_count}")

ratings counts: 100836
 user counts: 610
 movies count: 9724


In [5]:
df_ratings_small.groupBy("userId").count().show()

+------+-----+
|userId|count|
+------+-----+
|   148|   48|
|   463|   33|
|   471|   28|
|   496|   29|
|   243|   36|
|   392|   25|
|   540|   42|
|    31|   50|
|   516|   26|
|    85|   34|
|   137|  141|
|   251|   23|
|   451|   34|
|   580|  436|
|    65|   34|
|   458|   59|
|    53|   20|
|   255|   44|
|   481|   31|
|   588|   56|
+------+-----+
only showing top 20 rows


In [6]:
(train, test) = df_ratings_small.randomSplit([0.8, 0.2], seed=42)

In [7]:
als = ALS(
    maxIter=10,
    regParam=0.1,
    rank=15,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop", 
    implicitPrefs=False, 

    )


In [8]:
# Train model
model = als.fit(train)

In [9]:
# Test model
predictions = model.transform(test)

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse:.4f}")

Root-mean-square error = 0.8806


In [10]:
# Saving model for later use
model.write().overwrite().save("models/ratings_small_model-latent-features-15")

25/09/17 11:14:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/09/17 11:14:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/09/17 11:14:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/09/17 11:14:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/09/17 11:14:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/09/17 11:14:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/09/17 11:14:44 WARN MemoryManager: Total allocation exceeds 95.00%

In [11]:
## Get recommendations for users
userRecs = model.recommendForAllUsers(5)
print("User Recommendations:")
userRecs.show(5, truncate=False)

User Recommendations:




+------+---------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                    |
+------+---------------------------------------------------------------------------------------------------+
|1     |[{3925, 5.7489285}, {177593, 5.5231633}, {78836, 5.4830256}, {55276, 5.4576864}, {3494, 5.4218707}]|
|2     |[{131724, 4.9030952}, {1274, 4.717609}, {53123, 4.5893817}, {1732, 4.5761437}, {2843, 4.561259}]   |
|3     |[{6835, 4.905354}, {5746, 4.905354}, {5181, 4.848927}, {74754, 4.795737}, {4518, 4.7803164}]       |
|4     |[{1611, 5.0496364}, {1212, 4.964766}, {3851, 4.935221}, {1733, 4.887687}, {1046, 4.8553886}]       |
|5     |[{1203, 4.8973293}, {55276, 4.723255}, {3494, 4.6775494}, {2804, 4.650452}, {5915, 4.6366687}]     |
+------+---------------------------------------------------------------------------------------------------+
only showing top 5 

                                                                                

In [12]:
from pyspark.sql.functions import explode, col

# Flatten movie recommendations data frames
flatRecs = userRecs.withColumn("rec", explode(col("recommendations"))) \
    .select(
        col("userId"),
        col("rec.movieId").alias("movieId"),
        col("rec.rating").alias("predicted_rating")
    )

flatRecs.show(10, False)


+------+-------+----------------+
|userId|movieId|predicted_rating|
+------+-------+----------------+
|1     |3925   |5.7489285       |
|1     |177593 |5.5231633       |
|1     |78836  |5.4830256       |
|1     |55276  |5.4576864       |
|1     |3494   |5.4218707       |
|2     |131724 |4.9030952       |
|2     |1274   |4.717609        |
|2     |53123  |4.5893817       |
|2     |1732   |4.5761437       |
|2     |2843   |4.561259        |
+------+-------+----------------+
only showing top 10 rows


In [13]:
flatRecs.count()

                                                                                

3050

In [14]:
## Getting movie metadata
df_movies_metadata  = spark.read.csv("data/movie-lens-small-latest-dataset/movies.csv", header=True, inferSchema=True )
df_movies_metadata.show()


+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [15]:

recommendations = flatRecs.join(df_movies_metadata, on="movieId",  how = "left")
recommendations.sort("userId", "predicted_rating", ascending = [False, True])
recommendations.show(10)

+-------+------+----------------+--------------------+--------------------+
|movieId|userId|predicted_rating|               title|              genres|
+-------+------+----------------+--------------------+--------------------+
|   3925|     1|       5.7489285|Stranger Than Par...|        Comedy|Drama|
| 177593|     1|       5.5231633|Three Billboards ...|         Crime|Drama|
|  78836|     1|       5.4830256|Enter the Void (2...|               Drama|
|  55276|     1|       5.4576864|Michael Clayton (...|      Drama|Thriller|
|   3494|     1|       5.4218707|    True Grit (1969)|Adventure|Drama|W...|
| 131724|     2|       4.9030952|The Jinx: The Lif...|         Documentary|
|   1274|     2|        4.717609|        Akira (1988)|Action|Adventure|...|
|  53123|     2|       4.5893817|         Once (2006)|Drama|Musical|Rom...|
|   1732|     2|       4.5761437|Big Lebowski, The...|        Comedy|Crime|
|   2843|     2|        4.561259|Black Cat, White ...|      Comedy|Romance|
+-------+---

                                                                                

In [16]:
recommendations.count()

3050