In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=7f5a3a1571d8377140a18ac7579fd89f25dd44db04c10e730f98dc15c0e79101
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [4]:
from pyspark.sql import SparkSession


In [5]:
spark = SparkSession.builder.appName("MovieRatingsAnalysis").getOrCreate()

In [6]:
movies_df = spark.read.csv("/content/movies.csv",
header=True, inferSchema=True)
ratings_df = spark.read.csv("/content/ratings.csv",
header=True, inferSchema=True)

In [21]:
movies_rdd = movies_df.rdd
ratings_rdd = ratings_df.rdd

#a) Find the Movie with the Lowest Average Rating Using RDD.

In [8]:
avg_ratings_rdd = ratings_rdd.map(lambda x: (x['movieId'], (x['rating'], 1))) \
 .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
 .mapValues(lambda x: x[0] / x[1])

In [9]:
lowest_avg_rating = avg_ratings_rdd.sortBy(lambda x: x[1]).first()
print(f"Movie with the lowest average rating: {lowest_avg_rating}")


Movie with the lowest average rating: (97024, 0.5)


#b) Identify Users Who Have Rated the Most Movies.

In [10]:
user_ratings_count = ratings_rdd.map(lambda x: (x['userId'], 1)) \
 .reduceByKey(lambda x, y: x + y) \
 .sortBy(lambda x: x[1], ascending=False)

In [11]:
top_users = user_ratings_count.take(10)
print(f"Top users by number of ratings: {top_users}")

Top users by number of ratings: [(414, 2698), (599, 2478), (474, 2108), (448, 1864), (274, 1346), (610, 1302), (68, 1260), (380, 1218), (606, 1115), (288, 1055)]


#c) Explore the Distribution of Ratings Over Time.

In [12]:
from pyspark.sql.functions import from_unixtime, year, month

In [13]:
ratings_df = ratings_df.withColumn("year", year(from_unixtime(ratings_df['timestamp']))) \
 .withColumn("month", month(from_unixtime(ratings_df['timestamp'])))

In [14]:
ratings_over_time = ratings_df.groupBy("year", "month").count().orderBy("year", "month")

In [15]:
ratings_over_time.show()

+----+-----+-----+
|year|month|count|
+----+-----+-----+
|1996|    3|   58|
|1996|    4|  165|
|1996|    5|  832|
|1996|    6|  883|
|1996|    7|  489|
|1996|    8| 1010|
|1996|    9|  384|
|1996|   10|  935|
|1996|   11|  978|
|1996|   12|  306|
|1997|    1|  250|
|1997|    2|  323|
|1997|    3|  398|
|1997|    4|  219|
|1997|    5|  303|
|1997|    6|   84|
|1997|    7|   70|
|1997|    9|  236|
|1997|   10|    1|
|1997|   11|    4|
+----+-----+-----+
only showing top 20 rows



#d) Find the Highest-Rated Movies with a Minimum Number of Ratings.

In [16]:
movie_ratings_stats = ratings_rdd.map(lambda x: (x['movieId'], (x['rating'], 1))) \
 .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
 .mapValues(lambda x: (x[0] / x[1], x[1])) # (avg_rating, count)

In [17]:
min_ratings = 100
qualified_movies = movie_ratings_stats.filter(lambda x: x[1][1] >= min_ratings)

In [18]:
highest_rated_movies = qualified_movies.sortBy(lambda x: x[1][0], ascending=False).take(10)
print(f"Highest-rated movies with at least {min_ratings} ratings: {highest_rated_movies}")

Highest-rated movies with at least 100 ratings: [(318, (4.429022082018927, 317)), (858, (4.2890625, 192)), (2959, (4.272935779816514, 218)), (1221, (4.25968992248062, 129)), (48516, (4.252336448598131, 107)), (1213, (4.25, 126)), (912, (4.24, 100)), (58559, (4.238255033557047, 149)), (50, (4.237745098039215, 204)), (1197, (4.232394366197183, 142))]
