#**Assignment 3 : STQD6324 Data Management**

---


 


> BY: NUR MARDHIAH BT. ZULKHAIRI (P119717)



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#installing the pyspark

!pip install pyspark
from pyspark.sql import SparkSession

spark = SparkSession. builder\
              .master("local")\
              .appName("Colab")\
              .config(' spark.ui.port',  '4050')\
              .getOrCreate()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=4b1b39e082342dafafe58efb33240fc9c43c7f3b464f2248f729d1ba6d035237
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


###**QUESTION** 
Using Spark2 and the movielens dataset, find the movies with the

> **best average ratings**, and

> **worst average ratings**


* Filter both lists of movies to only include those with more than 100 ratings.
* Save the output of the results.
* Display the top 25 results for each list on the console, ordered by oldest timestamp.


Here are the points to check whether you have successfully executed the PySpark script:
* Successfully loaded the movielens dataset into Spark (2 points)
* Filtered out the movies with less than 100 ratings (2 points)
* Calculated the average rating for each movie (4 points)
* Sorted the movies based on the average rating (2 points)
* Saved the output of the results (2 points)
* Selected the top 25 movies with the highest average rating (2 points)
* Selected the top 25 movies with the lowest average rating (2 points)
* Displayed the results on the console (2 points)
* Ordered the output results on the console by oldest timestamp (2 points)
* Used efficient and optimal Spark transformations and actions (3 points)
* Used appropriate error handling techniques (2 points)


In [None]:
#importing the library 

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import lit
from pyspark.sql import functions 

#Order by ratings

In [None]:
from pyspark.sql.functions import col, avg, count
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS

# Load up movieID -> movie name dictionary
def loadMovieNames():
    movieNames = {}
    with open("/content/drive/MyDrive/SEMESTER 2/DATA_MANAGEMENT/Assignment3/u.item", 
              encoding="ISO-8859-1") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

# Convert u.data lines into (UserID, movieID, rating, timestamp) rows
def parseInput(line):
    fields = line.value.split()
    userId = int(fields[0])
    movieId = int(fields[1])
    rating = float(fields[2])
    timestamp = int(fields[3])
    return Row(userID=userId, movieID=movieId, rating=rating, timestamp=timestamp)

if __name__ == "__main__":
    # Create a SparkSession
    # SparkSession is created using SparkSession.builder.appName()
    spark = SparkSession.builder.appName("MovieRatings").getOrCreate()

    # Load up our movieID -> name directory
    movieNames = loadMovieNames()

    # Get the raw data is read from the file 
    #and converted to RDD using spark.read.text().rdd.
    lines = spark.read.text("/content/drive/MyDrive/SEMESTER 2/DATA_MANAGEMENT/Assignment3/u.data").rdd

    # parseInput() function is defined to parse the input data
    # and convert it to an RDD of Row objects with (userID, movieID, rating,
    #timestamp)
    #RDD used to store key and value information in objects 
    ratingsRDD = lines.map(parseInput)

    # Convert to a DataFrame and cache it
    #cache() allows you to store the intermediate to get the faster access
    ratings = spark.createDataFrame(ratingsRDD).cache()

    # Create an ALS collaborative filtering model from the complete data set
    als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="movieID",
              ratingCol="rating")
    model = als.fit(ratings)

    #ratings in DataFrame used to calculate the count of ratings for each movie
    # using groupBy() and count()
    # The movies filltered by rated more than 100 times
    ratingCounts = ratings.groupBy("movieID").agg(count("rating").
                            alias("rating_count")).filter("rating_count > 100")

    # Calculate average ratings for each movie
    # alias function used to give a custom name to a column in the resulting DataFrame
    averageRatings = ratings.groupBy("movieID").agg(avg
                                    ("rating").alias("avg_rating"))

    # Join with rating counts and filter movies with more than 100 ratings
    moviesWithRatings = averageRatings.join(ratingCounts, "movieID")


#Notes : The movies are ordered by average rating in descending order
          # using orderBy() and limit()

    bestMovies = moviesWithRatings.orderBy(col("avg_rating").desc()).limit(25)
    worstMovies = moviesWithRatings.orderBy(col("avg_rating").asc()).limit(25)

    # Add movie titles to bestMovies and worstMovies DataFrames
    bestMovies = bestMovies.join(spark.createDataFrame(list(movieNames.items()), 
     ["movieID", "title"]), "movieID")
    
    worstMovies = worstMovies.join(spark.createDataFrame(list(movieNames.items()), 
     ["movieID", "title"]), "movieID")

     # Display the results with top 25 movies with calling show() functions
    print("Top 25 Movies with the best average ratings:")
    bestMovies.select("movieID", "title", "avg_rating", "rating_count").show(n=25,
                                                                truncate=False)

    print("Top 25 Movies with the worst average ratings:")
    worstMovies.select("movieID", "title", "avg_rating", "rating_count").show(n=25,
                                                                truncate=False)

    spark.stop()


Top 25 Movies with the best average ratings:
+-------+---------------------------------------------------------------------------+------------------+------------+
|movieID|title                                                                      |avg_rating        |rating_count|
+-------+---------------------------------------------------------------------------+------------------+------------+
|12     |Usual Suspects, The (1995)                                                 |4.385767790262173 |267         |
|50     |Star Wars (1977)                                                           |4.3584905660377355|583         |
|64     |Shawshank Redemption, The (1994)                                           |4.445229681978798 |283         |
|98     |Silence of the Lambs, The (1991)                                           |4.28974358974359  |390         |
|127    |Godfather, The (1972)                                                      |4.283292978208232 |413         |
|134    |Ci

#Ordered by oldest timestamp

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import col, avg, min

# Load up movieID -> movie name dictionary
def loadMovieNames():
    movieNames = {}
    with open("/content/drive/MyDrive/SEMESTER 2/DATA_MANAGEMENT/Assignment3/u.item", 
              encoding="ISO-8859-1") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

# Convert u.data lines into (UserID, movieID, rating, timestamp) rows
def parseInput(line):
    fields = line.value.split()
    userId = int(fields[0])
    movieId = int(fields[1])
    rating = float(fields[2])
    timestamp = int(fields[3])
    return Row(userID=userId, movieID=movieId, rating=rating, timestamp=timestamp)

if __name__ == "__main__":
    # Create a SparkSession
    spark = SparkSession.builder.appName("MovieRatings").getOrCreate()

    # Load up our movieID -> name directory
    movieNames = loadMovieNames()

    # Get the raw data
    lines = spark.read.text("/content/drive/MyDrive/SEMESTER 2/DATA_MANAGEMENT/Assignment3/u.data").rdd

    # Convert it to an RDD of Row objects with (userID, movieID, rating, timestamp)
    ratingsRDD = lines.map(parseInput)

    # Convert to a DataFrame and cache it
    ratings = spark.createDataFrame(ratingsRDD).cache()

    # Create an ALS collaborative filtering model from the complete data set
    als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="movieID", 
              ratingCol="rating")
    model = als.fit(ratings)

    # Find movies rated more than 100 times
    ratingCounts = ratings.groupBy("movieID").count().filter(col("count") > 100)

    # Calculate average ratings, minimum timestamp, and count for each movie
    # The average rating, minimum timestamp, and count for each movie are calculated 
    # using groupBy() and agg()
    averageRatings = ratings.groupBy("movieID").agg(avg("rating").alias("avg_rating"),
                                        min("timestamp").alias("timestamp"), 
                                        count("rating").alias("rating_count"))

    # Join with rating counts and filter movies with more than 100 ratings
    moviesWithRatings = averageRatings.join(ratingCounts, "movieID")

#Notes : 1) The movies are ordered by average rating in descending order
#        2) timestamp is order in ascending order to get the best movies   
#           and worst movies with the oldest timestamp 
#           using orderBy() and limit() 

    bestMovies = moviesWithRatings.orderBy(col("avg_rating").desc(), 
                                          col("timestamp").asc()).limit(25)
    
    worstMovies = moviesWithRatings.orderBy(col("avg_rating").asc(), 
                                          col("timestamp").asc()).limit(25)

    # Add movie titles to bestMovies and worstMovies DataFrames
    bestMovies = bestMovies.join(spark.createDataFrame(list(movieNames.items()), 
                                              ["movieID", "title"]), "movieID")
    
    worstMovies = worstMovies.join(spark.createDataFrame(list(movieNames.items()),
                                              ["movieID", "title"]), "movieID")

    # Display the results with top 25 movies with calling show() functions
    print("Top 25 Movies with the best average ratings (ordered by oldest timestamp):")
    bestMovies.select("movieID", "title", "avg_rating", "rating_count",
                      "timestamp").show(n=25,truncate=False)
                                          

    print("Top 25 Movies with the worst average ratings (ordered by oldest timestamp):")
    worstMovies.select("movieID", "title", "avg_rating", "rating_count",
                       "timestamp").show(n=25,truncate=False)

    # Stop the SparkSession
    spark.stop()


Top 25 Movies with the best average ratings (ordered by oldest timestamp):
+-------+---------------------------------------------------------------------------+------------------+------------+---------+
|movieID|title                                                                      |avg_rating        |rating_count|timestamp|
+-------+---------------------------------------------------------------------------+------------------+------------+---------+
|12     |Usual Suspects, The (1995)                                                 |4.385767790262173 |267         |874777491|
|50     |Star Wars (1977)                                                           |4.3584905660377355|583         |874729750|
|64     |Shawshank Redemption, The (1994)                                           |4.445229681978798 |283         |874777701|
|98     |Silence of the Lambs, The (1991)                                           |4.28974358974359  |390         |874786016|
|127    |Godfather, The (1972

In [None]:
#SAVE THE OUTPUT
#SKIP THIS PART


   # Save the outputs to separate files
    #bestMovies.coalesce(1).write.mode("overwrite").csv("/content/drive/MyDrive/SEMESTER 2/DATA_MANAGEMENT/Assignment3/best_movies")
    #worstMovies.coalesce(1).write.mode("overwrite").csv("/content/drive/MyDrive/SEMESTER 2/DATA_MANAGEMENT/Assignment3/worst_movies")