In [None]:
import pyspark
pyspark.__version__

In [None]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
spark = SparkSession.builder.appName("Movie Rating").getOrCreate()

#### Load and Explore the Data:
- Load the movie ratings data into a Spark DataFrame.
- Print the schema and the first few rows of the DataFrame.

In [None]:
rating = spark.read.csv("../data/movie_ratings.csv", inferSchema=True, header=True)
rating.show(10)
rating.printSchema()

#### Data Cleaning:
- Handle any missing or inconsistent data in the dataset.
- Convert the timestamp column to a TimestampType.

In [None]:
rating = rating.dropna(how="any")

#### Exploratory Data Analysis (EDA):
- Calculate and print the total number of ratings in the dataset.
- Find and print the average rating given by users.
- Identify and print the top 5 movies with the highest average ratings.

In [None]:
# Total number of ratings
rating.select("*").count()

In [None]:
# Average rating given by users
rating.groupBy("userId").agg(round(avg(col("rating")), 2).alias("Average User Rating")).show()

In [None]:
# Top 5 movies with the highest average ratings with more than 10 rating
rating.groupBy("movieId").agg(round(avg(col("rating")), 2).alias("average_movie_rating")) \
    .where(count(col("rating")) > 10) \
    .orderBy(col("average_movie_rating").desc()) \
    .limit(5) \
    .show()

#### SQL Queries:
- Register the DataFrame as a temporary SQL table.
- Write and execute a SQL query to find the number of ratings given by each user.
- Write and execute a SQL query to find the average rating for each movie.

In [None]:
rating.createOrReplaceTempView("movie_rating")

In [None]:
# the number of ratings given by each user
spark.sql("""
    select
        userId,
        count(rating) as numberRating
    from movie_rating
    group by userId
""").show()

In [None]:
# the average rating for each movie
spark.sql("""
    select
        movieId,
        round(avg(rating), 2) as avgRating
    from movie_rating
    group by movieId
""").show()

#### Time-based Analysis:
- Extract the year and month from the timestamp column.
- Calculate and print the total number of ratings for each month.
- Identify and print the month with the highest number of ratings.

In [None]:
rating = rating.withColumn("month", month(from_unixtime(col("timestamp")))) \
    .withColumn("year", year(from_unixtime(col("timestamp"))))

In [None]:
# Total number of ratings for each month
rating.groupBy("year", "month").agg(count(col("rating")).alias("Number Of Rating")) \
    .show()

In [None]:
# The month with highest number of ratings
rating.groupBy("year", "month").agg(count(col("rating")).alias("Highest Num Of Ratings")) \
    .orderBy(col("Highest Num Of Ratings").desc()) \
    .limit(1) \
    .show()

#### Join and Transformation:
- Load a separate dataset containing movie information (movieId, title, genres).
- Join the movie ratings DataFrame with the movie information DataFrame.
- Calculate and print the average rating for each genre.

In [None]:
movies = spark.read.csv("../data/movies.csv", inferSchema=True, header=True)
movies.show(5)

In [None]:
# Only get specific column
movies = movies.select("id", "genres", "title")

In [None]:
# Convert format column genres from list json to list string
def extract_names_from_genres(genres_str):
    genres_list =  json.loads(genres_str)
    return [genres.get("name") for genres in genres_list]

extract_names_udf = udf(extract_names_from_genres, ArrayType(StringType()))

movies = movies.withColumn("genres", extract_names_udf(regexp_replace(col("genres"), lit("'"), lit('"'))))

In [None]:
movies.show()

In [None]:
movie_rating = rating.join(movies, rating.movieId == movies.id ,"inner")

#### Advanced Analysis:
- Implement a custom Spark UDAF (User-Defined Aggregate Function) to calculate the standard deviation of ratings for each movie.

In [None]:
class RatingStdDevUDAF(UserDefinedAggregateFunction):
    def __init__(self):
        self.sums = {}
        self.sum_of_squares = {}
        self.counts = {}

    def input(self, movie_id, rating):
        if movie_id is not None and rating is not None:
            if movie_id not in self.sums:
                self.sums[movie_id] = rating
                self.sum_of_squares[movie_id] = rating ** 2
                self.counts[movie_id] = 1
            else:
                self.sums[movie_id] += rating
                self.sum_of_squares[movie_id] += rating ** 2
                self.counts[movie_id] += 1

    def merge(self, other):
        for movie_id in other.sums.keys():
            if movie_id in self.sums:
                self.sums[movie_id] += other.sums[movie_id]
                self.sum_of_squares[movie_id] += other.sum_of_squares[movie_id]
                self.counts[movie_id] += other.counts[movie_id]
            else:
                self.sums[movie_id] = other.sums[movie_id]
                self.sum_of_squares[movie_id] = other.sum_of_squares[movie_id]
                self.counts[movie_id] = other.counts[movie_id]
    
    def evaluate(self, movie_id):
        if movie_id in self.sums:
            mean = self.sums[movie_id] / self.counts[movie_id]
            variance = (self.sums_of_squares[movie_id] / self.counts[movie_id]) - (mean ** 2)
            return (variance ** 0.5) if variance >= 0 else None
        else:
            return None

rating_std_dev_udaf = RatingStdDevUDAF()

rating.groupBy("movieId").agg(rating_std_dev_udaf(col("movieId"), col("rating"))).show()