In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .master("local")\
    .appName("Colab")\
    .config("spark.ui.port", "4040")\
    .getOrCreate()

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=4b5d70216ce547861b0c25e728be759b715c5ba31d5a4f34e17df61af3be3b7a
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

from pyspark.sql.functions import avg, count,col,min,first
from datetime import datetime

In [None]:
def loadMovieNames():
  movieNames = {}
  try:
    with open ("/content/drive/MyDrive/ASIGNMENT3_DATAMANAGEMENT_P125754/ml-100k/u.item", encoding = 'ISO-8859-1') as f:
      for line in f:
        fields = line.split('|')
        movieNames[int(fields[0])] = fields[1]
  except FileNotFoundError as error:
    print("FileNotFoundError! ", error)
  except Exception as error:
    print("Problem!", error)
  return movieNames

def parseInput(line):
  try:
    fields = line.split()
    time = int(fields[3])
    timestamp = datetime.fromtimestamp(time).strftime("%d-%m-%Y %H:%M:%S")
    return Row(MovieID = int(fields[1]), rating = float(fields[2]), time =timestamp)
  except Exception as error:
    print("Error! ",error)
    return None

In [None]:
if __name__ == "__main__":
  # Create a SparkSession
  spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

  # Load up our movie ID
  movieNames = loadMovieNames()

  # Get the raw data
  lines = spark.sparkContext.textFile("/content/drive/MyDrive/ASIGNMENT3_DATAMANAGEMENT_P125754/ml-100k/u.data")

  # Convert it to a RDD of Row objects with (MovieID, rating, time)
  movies = lines.map(parseInput)

  #convert that to a Dataframe(movieId, rating, time)
  movieDataset = spark.createDataFrame(movies,["MovieID","Rating","Timestamp"])
  movieNames   = spark.createDataFrame(list(movieNames.items()),["MovieID","Title"])

  # Join movieDataset and movieNamesdf to movieID
  moviesDataset_new = movieDataset.join(movieNames, "MovieID")

  #Average rating, count ratings for each movie and filter out the movies with less than 100 ratings with the oldest timestamp
  averageRatings = moviesDataset_new.groupBy("MovieID","Title").agg(avg("Rating").alias("AvgRatings"),count("Rating").alias("RatingCount"),min("Timestamp").alias("Time")).filter("RatingCount > 100")

  #save output(already save)
  #averageRatings.write.options(header='True', delimiter='|').csv("/content/drive/MyDrive/ASIGNMENT3_DATAMANAGEMENT_P125754/AverageRatings")

  #sorted the movies based on the average rating by order latest timestamp with limit 25
  #Best movies
  print("\nTop 25 Best Movies average rating ordered by oldest timestamp")
  Best_Movies=averageRatings.sort(col("AvgRatings").desc()).limit(25)
  Best_Movies_bytime=Best_Movies.orderBy(col("Time").asc())
  Best_Movies_bytime.show(truncate=False,n=25)
  #Worst Average movies
  print("\nTop 25 worst average rating Movies ordered by oldest timestamp")
  Worst_Movies=averageRatings.sort(col("AvgRatings").asc()).limit(25)
  Worst_Movies_bytime=Worst_Movies.orderBy(col("Time").asc())
  Worst_Movies_bytime.show(truncate=False,n=25)


  #saved the output(already save)
  #Best_Movies_bytime.write.options(header='True', delimiter='|').csv("/content/drive/MyDrive/ASIGNMENT3_DATAMANAGEMENT_P125754/Best_Movies_bytime")
  #Worst_Movies_bytime.write.options(header='True', delimiter='|').csv("/content/drive/MyDrive/ASIGNMENT3_DATAMANAGEMENT_P125754/Worst_Movies_bytime")

  #stop spark
  spark.stop()


Top 25 Best Movies average rating ordered by oldest timestamp
+-------+---------------------------------------------------------------------------+------------------+-----------+-------------------+
|MovieID|Title                                                                      |AvgRatings        |RatingCount|Time               |
+-------+---------------------------------------------------------------------------+------------------+-----------+-------------------+
|127    |Godfather, The (1972)                                                      |4.283292978208232 |413        |01-01-1998 00:55:33|
|50     |Star Wars (1977)                                                           |4.3584905660377355|583        |01-01-1998 00:57:34|
|98     |Silence of the Lambs, The (1991)                                           |4.28974358974359  |390        |01-01-1998 15:21:37|
|285    |Secrets & Lies (1996)                                                      |4.265432098765432 |162        