<a href="https://colab.research.google.com/github/hilmanrozaini/UKM_SDA/blob/main/assignment3_p121535_STQD6324_hilman.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#!pip install pyspark   # this is to install pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder\
            .master("local")\
            .appName("Colab")\
            .config("spark.ui.port", "4050")\
            .getOrCreate()

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import avg, min, count, col, from_unixtime
#from pyspark.sql.functions import avg, min, count, col


## highest and lowest average movie rating

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, avg, count
from pyspark.sql import Row

# Load up movieID -> movie name dictionary
def loadMovieNames():
    movieNames = {}
    with open("/content/drive/MyDrive/STQD6324_Data_Management/project_3/u.item.txt", encoding="ISO-8859-1") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

# Convert u.data lines into (UserID, movieID, rating, timestamp) rows
def parseInput(line):
    fields = line.value.split()
    userId = int(fields[0])
    movieId = int(fields[1])
    rating = float(fields[2])
    timestamp = int(fields[3])
    return Row(userID=userId, movieID=movieId, rating=rating, timestamp=timestamp)

if __name__ == "__main__":
    # Create a SparkSession
    spark = SparkSession.builder.appName("MovieRatings").getOrCreate()

    # Load up our movieID -> name directory
    movieNames = loadMovieNames()

    # Get the raw data
    lines = spark.read.text("/content/drive/MyDrive/STQD6324_Data_Management/project_3/u.data.txt").rdd

    # Convert it to an RDD of Row objects with (userID, movieID, rating, timestamp)
    ratingsRDD = lines.map(parseInput)

    # Convert to a DataFrame and cache it
    ratings = spark.createDataFrame(ratingsRDD).cache()

    # Create an ALS collaborative filtering model from the complete data set
    als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="movieID", ratingCol="rating")
    model = als.fit(ratings)

    # Find movies rated more than 100 times
    ratingCounts = ratings.groupBy("movieID").count().filter(col("count") > 100)

    # Calculate average ratings and count for each movie
    averageRatings = ratings.groupBy("movieID").agg(avg("rating").alias("avg_rating"), count("rating").alias("rating_count"))

    # Join with rating counts and filter movies with more than 100 ratings
    moviesWithRatings = averageRatings.join(ratingCounts, "movieID").filter(col("count") > 100)

    # Sort movies by average rating in descending order
    bestMovies = moviesWithRatings.orderBy(col("avg_rating").desc()).limit(25)

    # Add movie titles and movie IDs to bestMovies DataFrame
    bestMovies = bestMovies.join(spark.createDataFrame(list(movieNames.items()), ["movieID", "title"]), "movieID")


    # Sort movies by average rating in ascending order
    worstMovies = moviesWithRatings.orderBy(col("avg_rating").asc()).limit(25)

    # Add movie titles and movie IDs to sortedMovies DataFrame
    worstMovies = worstMovies.join(spark.createDataFrame(list(movieNames.items()), ["movieID", "title"]), "movieID")

    # Display the top 25 movies with the best average rating
    print("Top 25 Movies with the best average rating:")
    bestMovies.select("movieID", "title", "rating_count", "avg_rating").show(truncate=False)

    # Display the top 25 movies with the worst average rating
    print("Top 25 Movies with the worst average rating:")
    worstMovies.select("movieID", "title", "rating_count", "avg_rating").show(truncate=False)




    # Stop the SparkSession
    spark.stop()


Top 25 Movies with the best average rating:
+-------+---------------------------------------------------------------------------+------------+------------------+
|movieID|title                                                                      |rating_count|avg_rating        |
+-------+---------------------------------------------------------------------------+------------+------------------+
|12     |Usual Suspects, The (1995)                                                 |267         |4.385767790262173 |
|50     |Star Wars (1977)                                                           |583         |4.3584905660377355|
|64     |Shawshank Redemption, The (1994)                                           |283         |4.445229681978798 |
|98     |Silence of the Lambs, The (1991)                                           |390         |4.28974358974359  |
|127    |Godfather, The (1972)                                                      |413         |4.283292978208232 |
|134    |Cit

## highest and lowest average movie rating with timestamp

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, avg, count
from pyspark.sql import Row

# Load up movieID -> movie name dictionary
def loadMovieNames():
    movieNames = {}
    with open("/content/drive/MyDrive/STQD6324_Data_Management/project_3/u.item.txt", encoding="ISO-8859-1") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

# Convert u.data lines into (UserID, movieID, rating, timestamp) rows
def parseInput(line):
    fields = line.value.split()
    userId = int(fields[0])
    movieId = int(fields[1])
    rating = float(fields[2])
    timestamp = int(fields[3])
    return Row(userID=userId, movieID=movieId, rating=rating, timestamp=timestamp)

if __name__ == "__main__":
    # Create a SparkSession
    spark = SparkSession.builder.appName("MovieRatings").getOrCreate()

    # Load up our movieID -> name directory
    movieNames = loadMovieNames()

    # Get the raw data
    lines = spark.read.text("/content/drive/MyDrive/STQD6324_Data_Management/project_3/u.data.txt").rdd

    # Convert it to an RDD of Row objects with (userID, movieID, rating, timestamp)
    ratingsRDD = lines.map(parseInput)

    # Convert to a DataFrame and cache it
    ratings = spark.createDataFrame(ratingsRDD).cache()

    # Create an ALS collaborative filtering model from the complete data set
    als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="movieID", ratingCol="rating")
    model = als.fit(ratings)

    # Find movies rated more than 100 times
    ratingCounts = ratings.groupBy("movieID").count().filter(col("count") > 100)


    # Calculate average ratings, minimum timestamp, and count for each movie
    # min timestamp should give the earliest (oldest) timestamp of movies
    averageRatings = ratings.groupBy("movieID").agg(avg("rating").alias("avg_rating"), min("timestamp").alias("timestamp"), count("rating").alias("rating_count"))


    # Join with rating counts and filter movies with more than 100 ratings
    moviesWithRatings = averageRatings.join(ratingCounts, "movieID").filter(col("count") > 100)

    # Sort movies by average rating in descending order and oldest timestamp
    bestMovies = moviesWithRatings.orderBy(col("avg_rating").desc(), col("timestamp").asc()).limit(25)

    # Add movie titles and movie IDs to bestMovies DataFrame
    bestMovies = bestMovies.join(spark.createDataFrame(list(movieNames.items()), ["movieID", "title"]), "movieID")

    # Sort movies by average rating in ascending order and oldest timestamp
    worstMovies = moviesWithRatings.orderBy(col("avg_rating").asc(), col("timestamp").asc()).limit(25)

    # Add movie titles and movie IDs to worstMovies DataFrame
    worstMovies = worstMovies.join(spark.createDataFrame(list(movieNames.items()), ["movieID", "title"]), "movieID")

    # Display the top 25 movies with the best average rating
    print("Top 25 Movies with the best average rating:")
    bestMovies.select("movieID", "title", "rating_count", "avg_rating", "timestamp").show(truncate=False)

    # Display the top 25 movies with the worst average rating
    print("Top 25 Movies with the worst average rating:")
    worstMovies.select("movieID", "title", "rating_count", "avg_rating", "timestamp").show(truncate=False)

    # Stop the SparkSession
    spark.stop()



Top 25 Movies with the best average rating:
+-------+---------------------------------------------------------------------------+------------+------------------+---------+
|movieID|title                                                                      |rating_count|avg_rating        |timestamp|
+-------+---------------------------------------------------------------------------+------------+------------------+---------+
|12     |Usual Suspects, The (1995)                                                 |267         |4.385767790262173 |874777491|
|50     |Star Wars (1977)                                                           |583         |4.3584905660377355|874729750|
|64     |Shawshank Redemption, The (1994)                                           |283         |4.445229681978798 |874777701|
|98     |Silence of the Lambs, The (1991)                                           |390         |4.28974358974359  |874786016|
|127    |Godfather, The (1972)                              