In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder\
        .master("local")\
        .appName("SparkApp2233")\
        .config("some.spark.option.config", "somevalue")\
        .getOrCreate()

In [3]:
from pyspark.sql.functions import *

# Load Files

In [4]:
movies = spark.read.format("csv").option("header","true").load("movies.csv")
ratings = spark.read.format("csv").option("header","true").load("ratings.csv")
users = spark.read.format("csv").option("header","true").load("users.csv")

# Schema

In [5]:
movies.printSchema()
ratings.printSchema()
users.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- country: string (nullable = true)
 |-- release_year: string (nullable = true)

root
 |-- user_id: string (nullable = true)
 |-- movie_id: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- rating_date: string (nullable = true)

root
 |-- user_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- age: string (nullable = true)



# Row Count

In [6]:
print(movies.count())
print(ratings.count())
print(users.count())

50
1000
100


# show

In [7]:
movies.show(2)
ratings.show(2)
users.show(2)

+--------+-------+-------+-------+------------+
|movie_id|  title|  genre|country|release_year|
+--------+-------+-------+-------+------------+
|       1|Movie_1|Romance|  Japan|        2004|
|       2|Movie_2| Action|  India|        2010|
+--------+-------+-------+-------+------------+
only showing top 2 rows
+-------+--------+------+-----------+
|user_id|movie_id|rating|rating_date|
+-------+--------+------+-----------+
|   1097|       5|   1.0| 2023-10-19|
|   1013|       5|   3.2| 2023-09-09|
+-------+--------+------+-----------+
only showing top 2 rows
+-------+-------+---+
|user_id|country|age|
+-------+-------+---+
|   1001|     UK| 47|
|   1002|  Japan| 21|
+-------+-------+---+
only showing top 2 rows


In [8]:
#Rename Columns
users = users.withColumnRenamed("user_id" , "UID")\
                .withColumnRenamed("country", "user_country")\
                .withColumnRenamed("age","user_age")
movies = movies.withColumnRenamed("movie_id","MID")

# Joining

In [9]:
ratings_users = ratings.join(users, users["UID"] == ratings["user_id"], how="inner")

In [10]:
ratings_users.show(5)

+-------+--------+------+-----------+----+------------+--------+
|user_id|movie_id|rating|rating_date| UID|user_country|user_age|
+-------+--------+------+-----------+----+------------+--------+
|   1097|       5|   1.0| 2023-10-19|1097|      France|      59|
|   1013|       5|   3.2| 2023-09-09|1013|       Japan|      52|
|   1017|      23|   4.5| 2023-04-12|1017|       India|      29|
|   1037|      11|   2.8| 2023-09-23|1037|       Japan|      21|
|   1039|      40|   4.9| 2023-11-17|1039|       India|      42|
+-------+--------+------+-----------+----+------------+--------+
only showing top 5 rows


In [11]:
ratings_users_movies = ratings_users.join(movies, movies["MID"] == ratings_users["movie_id"], how="inner")

In [12]:
ratings_users_movies = ratings_users_movies.drop("MID","UID")

In [13]:
ratings_users_movies.count()

1000

In [14]:
ratings_users_movies.show(5)

+-------+--------+------+-----------+------------+--------+--------+-------+-------+------------+
|user_id|movie_id|rating|rating_date|user_country|user_age|   title|  genre|country|release_year|
+-------+--------+------+-----------+------------+--------+--------+-------+-------+------------+
|   1097|       5|   1.0| 2023-10-19|      France|      59| Movie_5|  Drama| France|        1992|
|   1013|       5|   3.2| 2023-09-09|       Japan|      52| Movie_5|  Drama| France|        1992|
|   1017|      23|   4.5| 2023-04-12|       India|      29|Movie_23| Sci-Fi|    USA|        2013|
|   1037|      11|   2.8| 2023-09-23|       Japan|      21|Movie_11|Romance|  India|        2010|
|   1039|      40|   4.9| 2023-11-17|       India|      42|Movie_40|  Drama|    USA|        1997|
+-------+--------+------+-----------+------------+--------+--------+-------+-------+------------+
only showing top 5 rows


In [15]:
ratings_users_movies = ratings_users_movies.withColumn("movie_id", col("movie_id").cast("int"))
ratings_users_movies = ratings_users_movies.withColumn("rating", col("rating").cast("float"))

# 1. Calculate average rating per movie and identify top 5 movies in each genre.

In [16]:
avgRating = ratings_users_movies.groupBy("movie_id","title","genre").agg(round(avg("rating"),1).alias("Rating Avg"))
avgRating = avgRating.orderBy(col("movie_id").asc())
avgRating.show()

+--------+--------+-------+----------+
|movie_id|   title|  genre|Rating Avg|
+--------+--------+-------+----------+
|       1| Movie_1|Romance|       3.5|
|       2| Movie_2| Action|       3.0|
|       3| Movie_3| Action|       3.2|
|       4| Movie_4|Romance|       2.9|
|       5| Movie_5|  Drama|       2.7|
|       6| Movie_6| Comedy|       2.6|
|       7| Movie_7| Comedy|       2.7|
|       8| Movie_8| Comedy|       2.8|
|       9| Movie_9|Romance|       3.0|
|      10|Movie_10| Action|       3.3|
|      11|Movie_11|Romance|       2.9|
|      12|Movie_12|Romance|       3.2|
|      13|Movie_13| Sci-Fi|       3.1|
|      14|Movie_14| Action|       3.4|
|      15|Movie_15| Sci-Fi|       3.0|
|      16|Movie_16| Horror|       2.8|
|      17|Movie_17| Action|       3.2|
|      18|Movie_18| Action|       2.9|
|      19|Movie_19| Action|       3.1|
|      20|Movie_20| Comedy|       3.3|
+--------+--------+-------+----------+
only showing top 20 rows


# Window functions

In [17]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, avg, round, row_number
window = Window.partitionBy("genre").orderBy(col("Rating Avg").desc())

In [18]:
topGenre = avgRating.withColumn("rank", row_number().over(window))

In [19]:
topGenre = topGenre.filter(col("rank")<=5)
topGenre.show(100)

+--------+--------+-------+----------+----+
|movie_id|   title|  genre|Rating Avg|rank|
+--------+--------+-------+----------+----+
|      14|Movie_14| Action|       3.4|   1|
|      10|Movie_10| Action|       3.3|   2|
|       3| Movie_3| Action|       3.2|   3|
|      17|Movie_17| Action|       3.2|   4|
|      46|Movie_46| Action|       3.2|   5|
|      20|Movie_20| Comedy|       3.3|   1|
|      21|Movie_21| Comedy|       3.3|   2|
|      43|Movie_43| Comedy|       3.3|   3|
|      26|Movie_26| Comedy|       3.0|   4|
|      32|Movie_32| Comedy|       3.0|   5|
|      50|Movie_50|  Drama|       3.3|   1|
|      41|Movie_41|  Drama|       3.2|   2|
|      40|Movie_40|  Drama|       3.1|   3|
|      44|Movie_44|  Drama|       3.0|   4|
|      49|Movie_49|  Drama|       3.0|   5|
|      39|Movie_39| Horror|       3.3|   1|
|      47|Movie_47| Horror|       3.1|   2|
|      33|Movie_33| Horror|       2.9|   3|
|      16|Movie_16| Horror|       2.8|   4|
|      31|Movie_31| Horror|     

# 2. Calculate average rating per country (based on user country).

In [20]:
AvgRatingCountry = ratings_users_movies.groupBy("user_country").agg(round(avg("rating"),1).alias("Avg Rating"))
AvgRatingCountry.show()

+------------+----------+
|user_country|Avg Rating|
+------------+----------+
|     Germany|       3.1|
|      France|       3.0|
|       India|       3.0|
|         USA|       3.0|
|          UK|       3.0|
|       Japan|       3.0|
+------------+----------+



# 3. Identify rating anomalies where rating > 5 or rating < 1 (simulate a few anomalies).

In [21]:
RatingAnomalies = ratings_users_movies.filter((col("rating")>5) | (col("rating")<1))

In [22]:
RatingAnomalies.show(10)

+-------+--------+------+-----------+------------+--------+-----+-----+-------+------------+
|user_id|movie_id|rating|rating_date|user_country|user_age|title|genre|country|release_year|
+-------+--------+------+-----------+------------+--------+-----+-----+-------+------------+
+-------+--------+------+-----------+------------+--------+-----+-----+-------+------------+



# 4. Segment users into High (avg rating > 4), Medium (avg rating 2-4), Low (avg rating < 2).

In [23]:
ratings_users_movies= ratings_users_movies.withColumn(
    "segment",
    when(col("rating") > 4, "High")
    .when((col("rating") >= 2) & (col("rating") <= 4), "Medium")
    .otherwise("Low")
)

In [24]:
ratings_users_movies.show(10)

+-------+--------+------+-----------+------------+--------+--------+-------+-------+------------+-------+
|user_id|movie_id|rating|rating_date|user_country|user_age|   title|  genre|country|release_year|segment|
+-------+--------+------+-----------+------------+--------+--------+-------+-------+------------+-------+
|   1097|       5|   1.0| 2023-10-19|      France|      59| Movie_5|  Drama| France|        1992|    Low|
|   1013|       5|   3.2| 2023-09-09|       Japan|      52| Movie_5|  Drama| France|        1992| Medium|
|   1017|      23|   4.5| 2023-04-12|       India|      29|Movie_23| Sci-Fi|    USA|        2013|   High|
|   1037|      11|   2.8| 2023-09-23|       Japan|      21|Movie_11|Romance|  India|        2010| Medium|
|   1039|      40|   4.9| 2023-11-17|       India|      42|Movie_40|  Drama|    USA|        1997|   High|
|   1002|      43|   4.3| 2023-05-22|       Japan|      21|Movie_43| Comedy|     UK|        1997|   High|
|   1014|       9|   2.1| 2023-02-24|       Ja

# 5. Generate a summary report with:- Total number of ratings - Number of unique users and movies - Average rating overall - Most active user (by number of ratings)

In [25]:
TotalRatings = ratings_users_movies.distinct().count()

In [26]:
TotalUsers = ratings_users_movies.select("user_id").distinct().count()

In [27]:
TotalMovies = ratings_users_movies.select("movie_id").distinct().count()

In [28]:
AvgRating = ratings_users_movies.select(round(avg("rating"),2)).first()[0]

In [29]:
MostActiveUser = ratings_users_movies.groupby("user_id").agg(count("rating").alias("rating count")).orderBy(desc("rating count")).first()

In [30]:
# Create summary dictionary
summary = {
    "Total Ratings": [TotalRatings],
    "Unique Users": [TotalUsers],
    "Unique Movies": [TotalMovies],
    "Average Rating": [AvgRating],
    "Most Active User": [MostActiveUser["user_id"]],
}

In [31]:
print(summary)

{'Total Ratings': [1000], 'Unique Users': [100], 'Unique Movies': [50], 'Average Rating': [3.0], 'Most Active User': ['1082']}


In [32]:
# Convert to pandas DataFrame and export to CSV
import pandas as pd
summary_df = pd.DataFrame(summary)
summary_df.to_csv("finalsummary_report.csv", index=False)
print("Summary report has been exported to 'summary_report.csv'.")

Summary report has been exported to 'summary_report.csv'.
