<a href="https://colab.research.google.com/github/Rakeshb-dev/Movie_Rating_Analysis/blob/main/Movie_Rating_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark



In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TestSpark").getOrCreate()
print("Spark is ready ✅ Version:", spark.version)

Spark is ready ✅ Version: 3.5.1


In [None]:
!wget https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
!unzip -o ml-latest-small.zip


--2025-09-15 15:04:04--  https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.96.204
Connecting to files.grouplens.org (files.grouplens.org)|128.101.96.204|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 978202 (955K) [application/zip]
Saving to: ‘ml-latest-small.zip’


2025-09-15 15:04:04 (6.03 MB/s) - ‘ml-latest-small.zip’ saved [978202/978202]

Archive:  ml-latest-small.zip
   creating: ml-latest-small/
  inflating: ml-latest-small/links.csv  
  inflating: ml-latest-small/tags.csv  
  inflating: ml-latest-small/ratings.csv  
  inflating: ml-latest-small/README.txt  
  inflating: ml-latest-small/movies.csv  


In [None]:
ratings = spark.read.csv("ml-latest-small/ratings.csv", header=True, inferSchema=True)
movies = spark.read.csv("ml-latest-small/movies.csv", header=True, inferSchema=True)

ratings.printSchema()
movies.printSchema()

ratings.show(5)
movies.show(5)


root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of 

In [None]:
# Convert ratings DataFrame to RDD (movieId → (rating, 1))
ratings_rdd = ratings.rdd.map(lambda row: (row['movieId'], (row['rating'], 1)))

# ReduceByKey to sum ratings and counts
movie_rating_totals = ratings_rdd.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# Calculate average
movie_avg_ratings = movie_rating_totals.mapValues(lambda x: round(x[0] / x[1], 2))

# Convert movies DataFrame to RDD (movieId → title)
movies_rdd = movies.rdd.map(lambda row: (row['movieId'], row['title']))

# Join average ratings with movie titles
movie_avg_with_names = movie_avg_ratings.join(movies_rdd)

# Sort by rating (descending)
top_movies = movie_avg_with_names.sortBy(lambda x: x[1][0], ascending=False)

# Show top 10
for movie in top_movies.take(10):
    print(movie)


(131724, (5.0, 'The Jinx: The Life and Deaths of Robert Durst (2015)'))
(5746, (5.0, 'Galaxy of Terror (Quest) (1981)'))
(92494, (5.0, 'Dylan Moran: Monster (2004)'))
(67618, (5.0, 'Strictly Sexual (2008)'))
(8804, (5.0, 'Story of Women (Affaire de femmes, Une) (1988)'))
(26350, (5.0, 'Passenger, The (Professione: reporter) (1975)'))
(31522, (5.0, 'Marriage of Maria Braun, The (Ehe der Maria Braun, Die) (1979)'))
(1140, (5.0, 'Entertaining Angels: The Dorothy Day Story (1996)'))
(6402, (5.0, 'Siam Sunset (1999)'))
(8238, (5.0, 'Little Murders (1971)'))


In [None]:
# Create temporary views
ratings.createOrReplaceTempView("ratings")
movies.createOrReplaceTempView("movies")

# Calculate average ratings using SQL
avg_ratings_sql = spark.sql("""
    SELECT m.title, ROUND(AVG(r.rating), 2) AS avg_rating, COUNT(r.rating) AS num_ratings
    FROM ratings r
    JOIN movies m ON r.movieId = m.movieId
    GROUP BY m.title
    ORDER BY avg_rating DESC, num_ratings DESC
    LIMIT 10
""")

avg_ratings_sql.show()


+--------------------+----------+-----------+
|               title|avg_rating|num_ratings|
+--------------------+----------+-----------+
|Heidi Fleiss: Hol...|       5.0|          2|
|     Lamerica (1994)|       5.0|          2|
| Lesson Faust (1994)|       5.0|          2|
| Belle époque (1992)|       5.0|          2|
|Come and See (Idi...|       5.0|          2|
|Enter the Void (2...|       5.0|          2|
|Jonah Who Will Be...|       5.0|          2|
|Martin Lawrence L...|       5.0|          1|
|Tickling Giants (...|       5.0|          1|
|Bill Hicks: Revel...|       5.0|          1|
+--------------------+----------+-----------+

