In [7]:
# Install PySpark
import sys
!{sys.executable} -m pip install pyspark



In [8]:
# Verify Installation
import pyspark
print("PySpark version:", pyspark.__version__)

PySpark version: 4.0.1


In [9]:
!java -version

openjdk version "17.0.16" 2025-07-15
OpenJDK Runtime Environment Homebrew (build 17.0.16+0)
OpenJDK 64-Bit Server VM Homebrew (build 17.0.16+0, mixed mode, sharing)


In [1]:
# -----------------------------------
# 1. Import Spark and Start Session
# -----------------------------------
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, desc

spark = SparkSession.builder \
    .appName("MovieLensEDA_Data") \
    .getOrCreate()

print("Spark session created!")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/14 13:50:00 WARN Utils: Your hostname, NM-MBA-M2.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.153 instead (on interface en0)
25/10/14 13:50:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/14 13:50:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark session created!


In [16]:
# -----------------------------------
# 2. Load Ratings and Movies Data
# -----------------------------------
from pyspark.sql.functions import array, when, array_remove, filter, explode
"""
🧠 Understanding Your Columns

Each _c5, _c6, … _c23 corresponds to a genre flag (1 = belongs to that genre, 0 = not).
For example:

Column	Genre
_c5	Unknown
_c6	Action
_c7	Adventure
_c8	Animation
_c9	Children’s
_c10	Comedy
_c11	Crime
_c12	Documentary
_c13	Drama
_c14	Fantasy
_c15	Film-Noir
_c16	Horror
_c17	Musical
_c18	Mystery
_c19	Romance
_c20	Sci-Fi
_c21	Thriller
_c22	War
_c23	Western
"""
ratings = spark.read.csv("../data/u.data", sep="\t", header=False, inferSchema=True) \
    .toDF("userId", "movieId", "rating", "timestamp")

# Load dataset (no header)
movies_raw = spark.read.csv("../data/u.item", sep="|", header=False, inferSchema=True)
"""
Sample movies_raw data
+---+-----------------+-----------+----+--------------------+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|_c0|              _c1|        _c2| _c3|                 _c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|
+---+-----------------+-----------+----+--------------------+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|  1| Toy Story (1995)|01-Jan-1995|NULL|http://us.imdb.co...|  0|  0|  0|  1|  1|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|
|  2| GoldenEye (1995)|01-Jan-1995|NULL|http://us.imdb.co...|  0|  1|  1|  0|  0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   1|   0|   0|
|  3|Four Rooms (1995)|01-Jan-1995|NULL|http://us.imdb.co...|  0|  0|  0|  0|  0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   1|   0|   0|
|  4|Get Shorty (1995)|01-Jan-1995|NULL|http://us.imdb.co...|  0|  1|  0|  0|  0|   1|   0|   0|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|
|  5|   Copycat (1995)|01-Jan-1995|NULL|http://us.imdb.co...|  0|  0|  0|  0|  0|   0|   1|   0|   1|   0|   0|   0|   0|   0|   0|   0|   1|   0|   0|
+---+-----------------+-----------+----+--------------------+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
"""

# Define list of genres based on MovieLens 100K schema
genre_names = [
    "Unknown", "Action", "Adventure", "Animation", "Children", "Comedy", "Crime",
    "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical",
    "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"
]

# Select and Rename c0 as movieId, c1 as title and c5 onwards as per available geners from genre_names

movies = movies_raw.select(
    col("_c0").alias("movieId"),
    col("_c1").alias("title"),
    *[col(f"_c{5+i}").cast("int").alias(genre) for i, genre in enumerate(genre_names)]
)
#movies.printSchema()
"""
Output

+-------+-----------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|movieId|            title|Unknown|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|
+-------+-----------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|      1| Toy Story (1995)|      0|     0|        0|        1|       1|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|
|      2| GoldenEye (1995)|      0|     1|        1|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       1|  0|      0|
|      3|Four Rooms (1995)|      0|     0|        0|        0|       0|     0|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       1|  0|      0|
|      4|Get Shorty (1995)|      0|     1|        0|        0|       0|     1|    0|          0|    1|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|
|      5|   Copycat (1995)|      0|     0|        0|        0|       0|     0|    1|          0|    1|      0|        0|     0|      0|      0|      0|     0|       1|  0|      0|
+-------+-----------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
"""

# Create array of geners where flag = 1 from above output
movies_with_genres = movies.select("movieId", \
                                   "title", \
                                   filter( \
                                       array(*[when(col(g) == 1, g).otherwise(None) for g in genre_names]), \
                                       lambda x: x.isNotNull() \
                                   ).alias("genres") \
                                  )
movies_with_genres_exploded = movies_with_genres.withColumn("genre", explode(col("genres"))) \
                                                .select("movieId", "title", "genre")
                            
"""
Output
+-------+----------------+---------+
|movieId|           title|    genre|
+-------+----------------+---------+
|      1|Toy Story (1995)|Animation|
|      1|Toy Story (1995)| Children|
|      1|Toy Story (1995)|   Comedy|
|      2|GoldenEye (1995)|   Action|
|      2|GoldenEye (1995)|Adventure|
+-------+----------------+---------+
"""
print("movies_with_genres_exploded Sample:")
movies_with_genres_exploded.show(5)

print("Ratings Sample:")
ratings.show(5)




movies_with_genres_exploded Sample:
+-------+----------------+---------+
|movieId|           title|    genre|
+-------+----------------+---------+
|      1|Toy Story (1995)|Animation|
|      1|Toy Story (1995)| Children|
|      1|Toy Story (1995)|   Comedy|
|      2|GoldenEye (1995)|   Action|
|      2|GoldenEye (1995)|Adventure|
+-------+----------------+---------+
only showing top 5 rows
Ratings Sample:
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|   196|    242|     3|881250949|
|   186|    302|     3|891717742|
|    22|    377|     1|878887116|
|   244|     51|     2|880606923|
|   166|    346|     1|886397596|
+------+-------+------+---------+
only showing top 5 rows


In [17]:
"""
1️⃣ Analyze Rating Distribution

Objective: Understand how users rate movies in general — are they generous or strict?

Key Steps:

- Compute a histogram of ratings (count by rating value).

- Visualize or summarize: mean, median, mode of ratings.
"""

rating_dist = ratings.groupBy("rating") \
                     .agg(count("*").alias("num_ratings")) \
                     .orderBy("rating")


rating_dist.show()

"""
If most ratings are 4 or 5, the dataset has a positive bias — users tend to rate generously.
"""

+------+-----------+
|rating|num_ratings|
+------+-----------+
|     1|       6110|
|     2|      11370|
|     3|      27145|
|     4|      34174|
|     5|      21201|
+------+-----------+



'\nIf most ratings are 4 or 5, the dataset has a positive bias — users tend to rate generously.\n'

In [18]:
"""
2️⃣ Popular Genres by Rating

Objective: Find which movie genres are most popular and best rated.

Key Steps:

- Explode the genres column into multiple rows (many movies have multiple genres).
- Group by genre → compute average rating + total number of ratings.
"""

popular_genres_by_rating = movies_with_genres_exploded \
                           .join(ratings, "movieId") \
                           .groupBy("genre") \
                           .agg(avg("rating").alias("avg_rating"), count("rating").alias("total_rating")) \
                           .orderBy(col("avg_rating").desc())

popular_genres_by_rating.show(10)                    


+-----------+------------------+------------+
|      genre|        avg_rating|total_rating|
+-----------+------------------+------------+
|  Film-Noir|3.9215233698788228|        1733|
|        War| 3.815811874866993|        9398|
|      Drama|3.6873793708484772|       39895|
|Documentary|3.6728232189973613|         758|
|    Mystery|  3.63813155386082|        5245|
|      Crime|3.6322780881440098|        8055|
|    Romance| 3.621704948358255|       19461|
|    Western|3.6132686084142396|        1854|
|  Animation|3.5766990291262135|        3605|
|     Sci-Fi|3.5607227022780834|       12730|
+-----------+------------------+------------+
only showing top 10 rows


In [22]:
"""
3️⃣ Ratings Trend Over Time

Objective: See how movie ratings evolve — e.g., do users rate older movies differently?

Key Steps:

Convert timestamp to date/year.

Group by year → compute average rating per year.
"""
from pyspark.sql.functions import year, from_unixtime

ratings_by_year = ratings \
                  .withColumn("year", year(from_unixtime(col("timestamp")))) \
                  .groupBy("year") \
                  .agg(avg("rating").alias("avg_rating"), count("rating").alias("total_rating")) \
                  .orderBy("year")

ratings_by_year.show(10)
                              

+----+------------------+------------+
|year|        avg_rating|total_rating|
+----+------------------+------------+
|1997|3.5678641583559987|       52944|
|1998|3.4871004760285618|       47056|
+----+------------------+------------+



In [23]:
"""
4️⃣ Top Users (Most Active Raters)

Objective: Identify power users who contribute the most ratings.
"""

top_users = ratings \
            .groupBy("userId") \
            .agg(count("rating").alias("total_rating"))\
            .orderBy(col("total_rating").desc())

top_users.show(10)

+------+------------+
|userId|total_rating|
+------+------------+
|   405|         737|
|   655|         685|
|    13|         636|
|   450|         540|
|   276|         518|
|   416|         493|
|   537|         490|
|   303|         484|
|   234|         480|
|   393|         448|
+------+------------+
only showing top 10 rows
