# Exploring the MovieLens Dataset with `pySpark`

*Apache Spark* is a popular framework for big data. It supports a wide variety of data analytics tasks including data cleaning, stream processing, and machine learning. It can be used to perform large and parallel computations, performing calculations on a single laptop or a cluster of computers. This makes it a useful tool when the data gets too large for `pandas` to handle. Spark can be accessed in Scala, its native API, as well as in Python, Java, R and SQL. In general the `DataFrame` API in Python should achieve the same performance as in Scala.

This Jupyter Notebook will demonstrate how to get started using `pySpark` and the `DataFrame` API to perform some basic data analysis, including:
- reading in data
- performing aggregations and joins using the Spark SQL module
- calculating summary statistics

We will use the [MovieLens 20M Dataset](https://grouplens.org/datasets/movielens/) on movie ratings to find out:
- What are the most popular movies?
- What are the top rated movies?
- Which movies are the most polarising?

**Note**: This Notebook assumes that you have pySpark installed and configured to work with the Jupyter Notebook. The purpose of this Notebook is to demonstrate some basic Spark techniques rather than to provide an installation guide. For information on how to get pySpark running on the Jupyter Notebook, please refer to [this blog post](https://blog.sicara.com/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f).


**Note**: Dataset values can change with time as updation is donein kaggle



In [12]:
import os
import sys
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

## Initialising Spark

To use Spark, we must first initialise a `SparkSession` or `SparkContext`. This is the entry point to using Spark in an application.


In [3]:
# Create Spark Context
spark = SparkSession.builder.appName("MovieLens").master("local[*]").getOrCreate()

## Reading in the data

The *MovieLens 20M* Dataset contains 20,000,263 ratings and 465,564 tag applications across 27,278 movies. The dataset was generated in 2016. All users in the dataset rated at least 20 movies.

The dataset contains six CSV files. We will be using the **`movies`** and **`ratings`** files. Let's see what these two files look like.

To read in a CSV file, we access the `DataFrameReader` class through `read` and then call the `csv()` method on it. We also specify `option("header", "true")` so that the first row of the file is used for the column headers.

In [4]:
movie_df = (spark.read
            .format("csv")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("path", "D:/Spark/RealTimeUseCases/ProjectData/ml-20m/ml-20m/movies.csv")
            .load())

rating_df = (spark.read
             .format("csv")
             .option("header", "true")
             .option("inferSchema", "true")
             .option("path", "D:/Spark/RealTimeUseCases/ProjectData/ml-20m/ml-20m/ratings.csv")
             .load())

In [5]:
movie_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



Each row of the `ratings` DataFrame represents one rating for one movie (`movieId`) by one user (`userId`). The ratings use a 5-star scale with half-star increments from 0.5 stars up to 5.0 stars. We can print the DataFrame's column names and types using the `printSchema()` method.

In [6]:
rating_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [7]:
movie_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



Each row of the `movies` DataFrame represents one movie and its title and genre(s), indexed by the key `movieId`. We will use this DataFrame to get the movie titles out so we know which movie the ratings in the `ratings` DataFrame are actually referring to.

## Most popular movies

To get the most popular movies, we are looking for the movies with the highest number of ratings (we use the number of ratings as a proxy for the number of views). To do this, we will perform the following *transformations* on the `ratings` DataFrame:
- group by `movieId`
- count the number of users (`userId`) associated with each movie
- rename this column to `num_ratings`
- sort by `num_ratings` in descending order

In the next cell, we perform these transformations in `pySpark` and store the DataFrame as `most_popular`.

In [8]:
most_popular = rating_df \
    .groupBy("movieId") \
    .agg(count("userId")) \
    .withColumnRenamed("count(userId)", "num_ratings") \
    .sort(desc("num_ratings"))

The DataFrame methods we have used here are:
- `groupBy` - groups the DataFrame by the given column
- `agg` - allows us to perform an aggregate calculation on grouped data (this can be a built-in aggregation function such as *count* or a user defined function)
- `withColumnRenamed` - renames an existing column with a new column name
- `sort` - sorts by the specified column(s)

Because transformations are *lazy* in Spark, the transformations above aren't performed until we call an *action*, such as `show()`, `take()`, or `collect()`.

In [9]:
most_popular.show(5)

+-------+-----------+
|movieId|num_ratings|
+-------+-----------+
|    296|      67310|
|    356|      66172|
|    318|      63366|
|    593|      63299|
|    480|      59715|
+-------+-----------+
only showing top 5 rows



This DataFrame contains only the `movieId` and `num_ratings`. The actual title of the movie is stored in the `movies` DataFrame. To get the movie titles, we can join our `most_popular` DataFrame with the `movies` DataFrame on `movieId`. By default, `join` performs an inner join which is what we want in this case.

In [11]:
# Join most_popular with movies on movieId
most_popular_with_titles = most_popular.join(movie_df, on="movieId", how="inner")

# Select relevant columns and sort by number of ratings
most_popular_with_titles = most_popular_with_titles.select("movieId", "num_ratings", "title", "genres") \
    .orderBy(desc("num_ratings"))

# Show top results
most_popular_with_titles.show(5)

+-------+-----------+--------------------+--------------------+
|movieId|num_ratings|               title|              genres|
+-------+-----------+--------------------+--------------------+
|    296|      67310| Pulp Fiction (1994)|Comedy|Crime|Dram...|
|    356|      66172| Forrest Gump (1994)|Comedy|Drama|Roma...|
|    318|      63366|Shawshank Redempt...|         Crime|Drama|
|    593|      63299|Silence of the La...|Crime|Horror|Thri...|
|    480|      59715|Jurassic Park (1993)|Action|Adventure|...|
+-------+-----------+--------------------+--------------------+
only showing top 5 rows



We now have a list of the most popular (or most rated) movies on the *MovieLens* website. As expected, the titles listed here are indeed all well-known movies.

## Top rated movies

We've got the top 10 most popular movies, but now we want to see which movies are perceived to be the best. To get the top rated movies, we are looking for the movies with the highest average rating. To do this, we will use the `ratings` DataFrame and:

- group by `movieId`
- calculate the average rating for each movie
- rename this column to `avg_rating`
- sort by `avg_rating` in descending order

In [13]:
from pyspark.sql.functions import avg, desc

# Step 1: Group by movieId and calculate average rating
top_rated = rating_df.groupBy("movieId") \
    .agg(avg("rating").alias("avg_rating"))

# Step 2: Sort by average rating in descending order
top_rated = top_rated.orderBy(desc("avg_rating"))


We will again join this DataFrame with the `movies` DataFrame so we know which movie each `movieId` is referring to.

In [16]:
top_rated_with_titles = top_rated.join(movie_df, on="movieId", how="inner") \
    .select("movieId", "title", "avg_rating", "genres") \
    .orderBy(desc("avg_rating"))

top_rated_with_titles.show(5)


+-------+--------------------+----------+--------------------+
|movieId|               title|avg_rating|              genres|
+-------+--------------------+----------+--------------------+
| 129530|Slingshot Hip Hop...|       5.0|  (no genres listed)|
| 130996|The Beautiful Sto...|       5.0|Adventure|Drama|F...|
| 130644|The Garden of Sin...|       5.0|           Animation|
| 129295|A Gun for Jennife...|       5.0|Crime|Drama|Thriller|
| 112790|Going Down in LA-...|       5.0|Comedy|Drama|Romance|
+-------+--------------------+----------+--------------------+
only showing top 5 rows



The movies listed here appear to be quite niche. We want to focus on top rated movies that also have a decent number of ratings, so want to take into account both the average rating *and* the number of ratings. We can easily create a DataFrame which has both of these columns by specifying multiple expressions within one `agg()` call.

In [20]:
# Step 1: Aggregate average rating and rating count per movie
rating_stats = rating_df.groupBy("movieId").agg(
    avg("rating").alias("avg_rating"),
    count("rating").alias("num_ratings")
)

# Step 2: Format avg_rating to 2 decimal places
rating_stats = rating_stats.withColumn("avg_rating", format_number("avg_rating", 2))

# Step 3: Join with movie titles
rating_stats_with_titles = rating_stats.join(movie_df, on="movieId") \
    .select("movieId", "title", "avg_rating", "num_ratings", "genres")


rating_stats_with_titles.show(10)


+-------+--------------------+----------+-----------+--------------------+
|movieId|               title|avg_rating|num_ratings|              genres|
+-------+--------------------+----------+-----------+--------------------+
|   3997|Dungeons & Dragon...|      2.07|       2047|   Adventure|Fantasy|
|   1580|Men in Black (a.k...|      3.56|      35580|Action|Comedy|Sci-Fi|
|   3918|Hellbound: Hellra...|      2.92|       1246|              Horror|
|   2366|    King Kong (1933)|      3.55|       6627|Action|Adventure|...|
|   3175| Galaxy Quest (1999)|      3.60|      13945|Adventure|Comedy|...|
|   4519|Land Before Time,...|      3.25|       1936|Adventure|Animati...|
|   1591|        Spawn (1997)|      2.62|       5255|Action|Adventure|...|
|    471|Hudsucker Proxy, ...|      3.66|      11268|              Comedy|
|  36525|Just Like Heaven ...|      3.48|       1169|Comedy|Fantasy|Ro...|
|  44022|Ice Age 2: The Me...|      3.33|       2465|Adventure|Animati...|
+-------+----------------

We see that all of the movies with an average rating of exactly 5.0 have 2 or less ratings. We would like to only consider movies that have achieved some minimum number of ratings. To determine an appropriate threshold, we should investigate the distribution of `num_ratings`. We can do this by calculating some summary statistics within Spark.

In [23]:
# Calculate average, minimum, and maximum of num_ratings

rating_stats_summary = rating_stats.agg(
    avg("num_ratings").alias("avg_ratings"),
    min("num_ratings").alias("min_ratings"),
    max("num_ratings").alias("max_ratings")
)

rating_stats_summary.show(10)

+-----------------+-----------+-----------+
|      avg_ratings|min_ratings|max_ratings|
+-----------------+-----------+-----------+
|747.8411232425965|          1|      67310|
+-----------------+-----------+-----------+



To calculate quantiles we use the `approxQuantile` method. This method can calculate the quantiles of the specified column approximately or exactly, depending on the value of the relative error parameter. If the relative error parameter is set to 0 then the quantiles are calculated exactly, however this can be expensive.

In [24]:
# Calculate the median (50th percentile) of num_ratings
median_num_ratings = rating_stats.approxQuantile("num_ratings", [0.5], 0)

# Display the result
print(f"Median number of ratings: {median_num_ratings[0]}")


Median number of ratings: 18.0


In [25]:
#First Quantile: Calculate 25th (Q1)

first_quartile_ratings = rating_stats.approxQuantile("num_ratings", [0.25], 0)
print(f"First quartile (Q1 - 25th percentile): {first_quartile_ratings[0]}")


First quartile (Q1 - 25th percentile): 3.0


In [26]:
#Second Quantile: Calculate 75th (Q2)

second_quartile_ratings = rating_stats.approxQuantile("num_ratings", [0.75], 0)
print(f"Second quartile (Q2 - 75th percentile): {second_quartile_ratings[0]}")


Second quartile (Q2 - 75th percentile): 205.0


The mean is much greater than the median value, suggesting that this distribution is `skewed to the right`. We will choose a minimum threshold of 500 ratings, however there is no right or wrong answer here and the reader is encouraged to experiment with different values for this threshold.

In [27]:
# Filter movies with at least 500 ratings as threshold
popular_and_rated = rating_stats.filter("num_ratings >= 500")

# Optionally join with titles
popular_and_top_rated_with_titles = popular_and_rated.join(movie_df, on="movieId")\
                                                 .select("movieId", "title", "avg_rating", "num_ratings", "genres")

# Show top results
popular_and_top_rated_with_titles.orderBy("avg_rating", ascending=False).show(10)


+-------+--------------------+----------+-----------+--------------------+
|movieId|               title|avg_rating|num_ratings|              genres|
+-------+--------------------+----------+-----------+--------------------+
|    318|Shawshank Redempt...|      4.45|      63366|         Crime|Drama|
|    858|Godfather, The (1...|      4.36|      41355|         Crime|Drama|
|     50|Usual Suspects, T...|      4.33|      47006|Crime|Mystery|Thr...|
|    527|Schindler's List ...|      4.31|      50054|           Drama|War|
|   1221|Godfather: Part I...|      4.28|      27398|         Crime|Drama|
|    904|  Rear Window (1954)|      4.27|      17449|    Mystery|Thriller|
|   2019|Seven Samurai (Sh...|      4.27|      11611|Action|Adventure|...|
|    922|Sunset Blvd. (a.k...|      4.26|       6525|Drama|Film-Noir|R...|
|   7502|Band of Brothers ...|      4.26|       4305|    Action|Drama|War|
|    912|   Casablanca (1942)|      4.26|      24349|       Drama|Romance|
+-------+----------------

We've now gotten a list of the top rated movies on MovieLens, which includes the usual movies considered to be all time greats such as *The Shawshank Redemption* and *Casablanca*. Interestingly, nearly all of these movies appear in the [top 100 of the IMDb top rated movies list](https://www.imdb.com/chart/top) as well, with the exception of the *The Third Man* (listed as #135) and *Band of Brothers* which is technically a TV series rather than a movie.

What's also interesting is that this list of movies is not the same as the list of the most popular movies. *The Shawshank Redemption*, *Schindler's List*, and *The Usual Suspects* were all popular movies which also appear in this list. However, other movies such as *Pulp Fiction*, *Forrest Gump*, and *The Silence of the Lambs* made the top 10 most popular but not the top 10 (or even top 20) most rated. This suggests that some movies actually divide opinion.

## Most polarising movies (Marmite movies)

Next, we will try to answer the question, *What are the most polarising movies*? These are the movies that divide opinon, with people tending to rate them either really highly or really poorly. We will refer to these as *Marmite* movies. Again, we only want to consider movies that achieve some minimum number of ratings - we will stick with our previous threshold of 500 ratings.

To approach this, we will look for the movies with the highest standard deviation in rating. This is a measure of how much the data varies from the mean, so in this case, how much a movie's ratings vary around its mean rating. A high standard deviation would suggest that the movie's ratings are highly variable. There are other approaches to this as well, for instance, what proportion of the ratings are very positive or very negative.

In [33]:
# Step 1: Aggregate average rating, number of ratings, and standard deviation

rating_variability = rating_df.groupBy("movieId").agg(
    avg("rating").alias("avg_rating"),
    count("rating").alias("num_ratings"),
    stddev("rating").alias("stddev_ratings")
)

rating_variability =rating_variability.withColumn("stddev_ratings", format_number("stddev_ratings", 2))\
    .withColumn("avg_rating", format_number("avg_rating", 2))


In [34]:
# Step 2: Filter for movies with at least 500 ratings threshold

polarizing_candidates = rating_variability.filter("num_ratings >= 500")

In [36]:
# Step 3: Join with movie titles
polarizing_movies = polarizing_candidates.join(movie_df, on="movieId")\
    .select("movieId", "title", "avg_rating", "num_ratings", "stddev_ratings","genres")

polarizing_movies.orderBy("stddev_ratings", ascending=False)

polarizing_movies.show(15)

+-------+--------------------+----------+-----------+--------------+--------------------+
|movieId|               title|avg_rating|num_ratings|stddev_ratings|              genres|
+-------+--------------------+----------+-----------+--------------+--------------------+
|   3997|Dungeons & Dragon...|      2.07|       2047|          1.11|   Adventure|Fantasy|
|   1580|Men in Black (a.k...|      3.56|      35580|          0.90|Action|Comedy|Sci-Fi|
|   3918|Hellbound: Hellra...|      2.92|       1246|          1.13|              Horror|
|   2366|    King Kong (1933)|      3.55|       6627|          1.06|Action|Adventure|...|
|   3175| Galaxy Quest (1999)|      3.60|      13945|          0.96|Adventure|Comedy|...|
|   4519|Land Before Time,...|      3.25|       1936|          0.99|Adventure|Animati...|
|   1591|        Spawn (1997)|      2.62|       5255|          1.09|Action|Adventure|...|
|    471|Hudsucker Proxy, ...|      3.66|      11268|          0.93|              Comedy|
|  36525|J

## Conclusion

This tutorial has demonstrated how to use the `pySpark DataFrame` API to perform some simple data analysis tasks. In particular, we have seen how to perform aggregations, joins, and compute summary statistics on large datasets. There is a lot more that could be done with this dataset, including investigating other ways to identify polarising movies, looking at the effect of movie genres, and building a recommender system. Note that when working in `pySpark`, it may useful to refer back to the [official pySpark documentation](https://spark.apache.org/docs/latest/api/python/pyspark.html).