Problem statment:
Please calculate the average ratings, per genre and year.
By year we mean the year in which the movies were released.
Please consider only movies, which were released after 1989.
Please consider the ratings of all persons aged 18-49 years.

##### Provide path where you have saved your .dat files here by replacing "your_path" with actual path

In [0]:
#Provide path where you have saved your .dat files here by replacing "your_path" with actual path
path="your_path"

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, regexp_extract, col, avg

try:
    # Initialize Spark session and read path
    spark = SparkSession.builder \
        .appName("MovieLens Average Ratings") \
        .getOrCreate()

    movies_path = path+"/movies.dat"
    ratings_path = path+"/ratings.dat"
    users_path = path+"/users.dat"

    #Read users data and Filter users aged between 18–49 
    users_df = spark.read.text(users_path)

    users_df = users_df.select(
        split(users_df.value, "::").getItem(0).alias("UserID"),
        split(users_df.value, "::").getItem(2).alias("Age"))

    users_df = users_df.filter((users_df.Age>=18) & (users_df.Age<=49))

    # Read movies data
    movies_df = spark.read.text(movies_path)
    movies_df = movies_df.select(
        split(movies_df.value, "::").getItem(0).alias("MovieID"),
        split(movies_df.value, "::").getItem(1).alias("Title"),
        split(movies_df.value, "::").getItem(2).alias("Genres")
    )

    ##DataQuality Checks applied on movies dataframe as the Readme file mentions that this file may have inconsistent and duplicate data##
    # 1. Filter out movies without valid year or null genres
    # 2. Dropping duplicate MovieId
    cleaned_movies_df = movies_df.filter(
        (col("Title").rlike(r"\(\d{4}\)")) &
        (~col("Genres").isNull()) &
        (col("Genres") != "")
    )
    cleaned_movies_df = cleaned_movies_df.dropDuplicates(["MovieID"])

    # Extract year from title
    year_movies_df = cleaned_movies_df.withColumn("Year", \
                        regexp_extract(col("Title"), r"\((\d{4})\)", 1) \
                        .cast("int")) \
                        .filter(col("Year") > 1989)

    # Explode genres
    final_movies_df = year_movies_df.withColumn("Genre", \
            explode(split(col("Genres"), "\\|"))) \
            .drop("Genres")

    # Read ratings data
    ratings_df = spark.read.text(ratings_path)
    ratings_df = ratings_df.select(
        split(ratings_df.value, "::").getItem(0).alias("UserID"),
        split(ratings_df.value, "::").getItem(1).alias("MovieID"),
        split(ratings_df.value, "::").getItem(2).alias("Rating")
    )

    #Joining the three dataframes
    joined_df = ratings_df.join(users_df, "UserId", "inner") \
                .join(final_movies_df, "MovieID", "inner")

    # Group by Year and Genre, compute average rating
    result_df = joined_df.groupBy("Year", "Genre") \
                .agg(avg("Rating").alias("AvgRating")) \
                .orderBy("Year", "Genre")

    # Saving the result in two ways:
    # 1. Writing all the data to a single csv file for reading purpose only
    result_df.coalesce(1).write \
            .option("header", "true") \
            .mode("overwrite") \
            .csv(path+"output/avg_ratings")

    # 2. Assuming for big data coming, will keep this Partitioning strategy on Year and Genre
    result_df.write \
            .option("header", "true") \
            .mode("overwrite") \
            .partitionBy("Year", "Genre") \
            .csv(path+"output/avg_ratings_partitioned")

except Exception as e:
        print(f"An error occurred during processing: {e}", exc_info=True)
