In [0]:
movies_df = spark.read.csv("/Volumes/workspace/default/data/movies.csv", header = True, inferSchema = True)
ratings_df = spark.read.csv("/Volumes/workspace/default/data/ratings.csv",header = True, inferSchema = True)
tags_df = spark.read.csv("/Volumes/workspace/default/data/tags.csv",header = True, inferSchema = True)

movies_df.printSchema()
ratings_df.printSchema()
tags_df.printSchema()

In [0]:
# Top Highly rated movies with at least 50 ratings

from pyspark.sql.functions import count, avg, desc

movie_ratings = ratings_df.join(movies_df, on='movieId')

movie_stats = movie_ratings.groupBy("title").agg(
    count("rating").alias("num_ratings"),
    avg("rating").alias("avg_rating")
)

top_movies = movie_stats.filter("num_ratings >= 50").orderBy(desc("avg_rating"))

top_movies.show(10, truncate=False)


In [0]:
# most rated movies

most_rated = movie_stats.orderBy(desc("num_ratings"))
most_rated.show(10, truncate = False)

In [0]:
#Ratings Distribution

import pandas as pd
import matplotlib.pyplot as plt

rating_counts = ratings_df.select("rating").groupBy("rating").count().orderBy("rating").toPandas()

plt.figure(figsize=(8,5))
plt.bar(rating_counts["rating"], rating_counts["count"], width=0.3)
plt.xlabel("Rating")
plt.ylabel("Count")
plt.title("Ratings Distribution")
plt.grid(True)
plt.show()

In [0]:
# Average rating per Genre

from pyspark.sql.functions import split, explode, trim

movies_with_genres = movies_df.withColumn("genre", explode(split("genres","\\|")))

genre_ratings = ratings_df.join(movies_with_genres, on = "movieId")

avg_genre_ratings = genre_ratings.groupBy("genre").agg(avg("rating").alias("avg_rating")).orderBy(desc("avg_rating"))

avg_genre_ratings.show()

In [0]:
movies_df.write.mode("overwrite").format("delta").saveAsTable("movielens.movies")
ratings_df.write.mode("overwrite").format("delta").saveAsTable("movielens.ratings")
tags_df.write.mode("overwrite").format("delta").saveAsTable("movielens.tags")