In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [4]:
#create spark Session

spark = SparkSession.builder \
    .appName("movie rating") \
    .getOrCreate()


23/11/06 06:54:29 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
#hdfs path

hdfs_path = "/tmp/movie_rate_project_input/"


In [5]:
# read movie.csv file
movies_df = spark.read.format("csv").option("inferSchema","True").option("header","True").load(hdfs_path+'movies.csv')
movies_df.printSchema()
movies_df.show(5)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [6]:
# read tags.csv file

tags_df =spark.read.format("csv").option("inferSchema","True").option("header","True").load(hdfs_path+"tags.csv")

tags_df.printSchema()
tags_df.show(5)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: integer (nullable = true)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [7]:
# read ratings.csv file

ratings_df = spark.read\
                .format("csv")\
                .option("inferSchema","True")\
                .option("header","True")\
                .load(hdfs_path+"ratings.csv")

ratings_df.printSchema()
ratings_df.show(5)

                                                                                

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [14]:
#a. Show the aggregated number of ratings per year

In [8]:
#convert unix time to Coordinated Universal Time (UTC)
ratings_df1=ratings_df.withColumn("timestamp", from_unixtime(col("timestamp")))

ratings_df1.show(10)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      1|   4.0|2000-07-30 18:45:03|
|     1|      3|   4.0|2000-07-30 18:20:47|
|     1|      6|   4.0|2000-07-30 18:37:04|
|     1|     47|   5.0|2000-07-30 19:03:35|
|     1|     50|   5.0|2000-07-30 18:48:51|
|     1|     70|   3.0|2000-07-30 18:40:00|
|     1|    101|   5.0|2000-07-30 18:14:28|
|     1|    110|   4.0|2000-07-30 18:36:16|
|     1|    151|   5.0|2000-07-30 19:07:21|
|     1|    157|   5.0|2000-07-30 19:08:20|
+------+-------+------+-------------------+
only showing top 10 rows



In [6]:
#extract year from timestamp

ratings_per_year = ratings_df1.withColumn("year", year(col("timestamp")))
ratings_per_year.show(5)


+------+-------+------+-------------------+----+
|userId|movieId|rating|          timestamp|year|
+------+-------+------+-------------------+----+
|     1|      1|   4.0|2000-07-30 18:45:03|2000|
|     1|      3|   4.0|2000-07-30 18:20:47|2000|
|     1|      6|   4.0|2000-07-30 18:37:04|2000|
|     1|     47|   5.0|2000-07-30 19:03:35|2000|
|     1|     50|   5.0|2000-07-30 18:48:51|2000|
+------+-------+------+-------------------+----+
only showing top 5 rows



In [11]:
aggregate_rate_per_year = ratings_per_year.groupBy("year").agg(count("rating").alias("count_rating")).orderBy(col("year").asc())
aggregate_rate_per_year.show(25)                                                           

+----+------------+
|year|count_rating|
+----+------------+
|1996|        6040|
|1997|        1916|
|1998|         507|
|1999|        2439|
|2000|       10061|
|2001|        3922|
|2002|        3478|
|2003|        4014|
|2004|        3279|
|2005|        5813|
|2006|        4059|
|2007|        7114|
|2008|        4351|
|2009|        4158|
|2010|        2301|
|2011|        1690|
|2012|        4656|
|2013|        1664|
|2014|        1439|
|2015|        6616|
|2016|        6703|
|2017|        8198|
|2018|        6418|
+----+------------+



In [12]:
## a. second method 
ratings_per_year=ratings_df1.groupBy(substring("timestamp", 0, 4).alias("year"))\
                    .agg(count("*").alias("count"))
ratings_per_year.show()

[Stage 20:>                                                         (0 + 1) / 1]

+----+-----+
|year|count|
+----+-----+
|2016| 6703|
|2012| 4656|
|2017| 8198|
|2014| 1439|
|2013| 1664|
|2005| 5813|
|2000|10061|
|2002| 3478|
|2009| 4158|
|2018| 6418|
|2006| 4059|
|2004| 3279|
|2011| 1690|
|2008| 4351|
|1999| 2439|
|1997| 1916|
|2007| 7114|
|1996| 6040|
|2015| 6616|
|1998|  507|
+----+-----+
only showing top 20 rows



                                                                                

In [8]:
# b. Show the average monthly number of ratings

ratings_per_month = ratings_df1.withColumn("Month",month(col("timestamp")))
ratings_per_month.show(5)

+------+-------+------+-------------------+-----+
|userId|movieId|rating|          timestamp|Month|
+------+-------+------+-------------------+-----+
|     1|      1|   4.0|2000-07-30 18:45:03|    7|
|     1|      3|   4.0|2000-07-30 18:20:47|    7|
|     1|      6|   4.0|2000-07-30 18:37:04|    7|
|     1|     47|   5.0|2000-07-30 19:03:35|    7|
|     1|     50|   5.0|2000-07-30 18:48:51|    7|
+------+-------+------+-------------------+-----+
only showing top 5 rows



In [11]:
count_ratings_per_month = ratings_per_month.groupBy("Month").agg(count("rating").alias("avg_rating_month")).orderBy(col("Month").asc())

count_ratings_per_month.show(5)

[Stage 11:>                                                         (0 + 1) / 1]

+-----+----------------+
|Month|avg_rating_month|
+-----+----------------+
|    1|            8684|
|    2|            7635|
|    3|            8880|
|    4|            7727|
|    5|           10883|
+-----+----------------+
only showing top 5 rows



                                                                                

In [14]:
#c. Show the rating levels distribution
ratings_level = ratings_df1.groupBy(col("rating").alias("rating_distribution")).agg(count("rating").alias("rating_count")).orderBy(col("rating").asc())

ratings_level.show(5)

[Stage 17:>                                                         (0 + 1) / 1]

+-------------------+------------+
|rating_distribution|rating_count|
+-------------------+------------+
|                0.5|        1370|
|                1.0|        2811|
|                1.5|        1791|
|                2.0|        7551|
|                2.5|        5550|
+-------------------+------------+
only showing top 5 rows



                                                                                

In [11]:
#d. Show the 18 movies that are tagged but not rated

movies_df.show(5)
tags_df.show(5)
ratings_df1.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp

In [16]:
not_tagged_rated_movie = movies_df.alias("m").join(tags_df.alias("t"), col("m.movieId") == col("t.movieId"), "inner")\
                        .join(ratings_df1.alias("r"), col("t.movieId") == col("r.movieId"), "left")\
                        .filter(col("rating").isNull())\
                        .select("m.title").distinct()

count=not_tagged_rated_movie.count()
print(count)

not_tagged_rated_movie.show()


                                                                                

18


                                                                                

+--------------------+
|               title|
+--------------------+
|Mutiny on the Bou...|
|Call Northside 77...|
|Color of Paradise...|
|For All Mankind (...|
|Browning Version,...|
|I Know Where I'm ...|
|        Proof (1991)|
|Twentieth Century...|
|Innocents, The (1...|
|In the Realms of ...|
|Parallax View, Th...|
|Road Home, The (W...|
|Roaring Twenties,...|
|  Chalet Girl (2011)|
|      Scrooge (1970)|
|      Niagara (1953)|
|  Chosen, The (1981)|
|This Gun for Hire...|
+--------------------+



In [17]:

movies_df.createOrReplaceTempView("movies")
tags_df.createOrReplaceTempView("tags")
ratings_df1.createOrReplaceTempView("ratings")

In [18]:
#d. Show the 18 movies that are tagged but not rated

In [32]:

not_tagged_rate_movie_sql= spark.sql( """
    SELECT  distinct m.title
    FROM movies m
    JOIN tags t ON m.movieId = t.movieId
    LEFT JOIN ratings r ON m.movieId = r.movieId
    WHERE r.rating IS NULL
   
""")

not_tagged_rate_movie_sql.show()

print(not_tagged_rate_movie_sql.count())


+--------------------+
|               title|
+--------------------+
|Mutiny on the Bou...|
|Call Northside 77...|
|Color of Paradise...|
|For All Mankind (...|
|Browning Version,...|
|I Know Where I'm ...|
|        Proof (1991)|
|Twentieth Century...|
|Innocents, The (1...|
|In the Realms of ...|
|Parallax View, Th...|
|Road Home, The (W...|
|Roaring Twenties,...|
|  Chalet Girl (2011)|
|      Scrooge (1970)|
|      Niagara (1953)|
|  Chosen, The (1981)|
|This Gun for Hire...|
+--------------------+

18


In [62]:
#e. Show the movies that have rating but no tag



In [27]:
# e.Show the movies that have rating but no tag

rated_not_tagged = movies_df.alias('m').join(tags_df.alias('t'), col('m.movieId') == col('t.movieId'), "left")\
                      .join(ratings_df1.alias('r'), col('m.movieId') == col('r.movieId'), "inner")\
                      .filter(col("tag").isNull())\
                      .select("title").distinct()
                 

                                                                      
rated_not_tagged.show(5)

print(rated_not_tagged.count())


+--------------------+
|               title|
+--------------------+
|    Fair Game (1995)|
| If Lucy Fell (1996)|
| Three Wishes (1995)|
|Odd Couple II, Th...|
|First Blood (Ramb...|
+--------------------+
only showing top 5 rows

8167


In [46]:
# f. Focusing on the rated untagged movies with more than 30 user ratings,
# show the top 10 movies in terms of average rating and number of
# ratings

top_10_rated_untagged=rated_not_tagged.groupBy(movies_df.movieId, "title")\
                                        .agg(avg(ratings_df1.rating).alias("avg_rating"),\
                                             count(ratings_df1.rating).alias("num_ratings"))\
                                        .orderBy(col("avg_rating").desc(), col("num_ratings").desc())
top_10_rated_untagged.show(10)

+-------+--------------------+----------+-----------+
|movieId|               title|avg_rating|num_ratings|
+-------+--------------------+----------+-----------+
|  78836|Enter the Void (2...|       5.0|          2|
|     53|     Lamerica (1994)|       5.0|          2|
|   6442| Belle époque (1992)|       5.0|          2|
|   3473|Jonah Who Will Be...|       5.0|          2|
|     99|Heidi Fleiss: Hol...|       5.0|          2|
|   1151| Lesson Faust (1994)|       5.0|          2|
|  44851|Go for Zucker! (A...|       5.0|          1|
|   5468|20 Million Miles ...|       5.0|          1|
|  72692|Mickey's Once Upo...|       5.0|          1|
|   2824| On the Ropes (1999)|       5.0|          1|
+-------+--------------------+----------+-----------+
only showing top 10 rows



In [None]:
# g. What is the count number of tags per movie in tagsDF? And the
# count number of tags per user? How does it compare with the
# average number of tags a user assigns to a movie?

In [59]:

total_tags=tags_df.agg(count("tag").alias("count_tag")).collect()[0][0]
print(total_tags)




#tags_df.collect() retrieves all elements in a DataFrame as an Array of Row type to the driver node. printing a resultant array yields the below output.
#read all about .collect()
# [0] means 0 index and again [0] index of that row

3683


In [60]:
no_of_movies=tags_df.select("movieId").distinct().count()
print(no_of_movies)




1572


In [61]:
avg_tags_per_movie= total_tags/no_of_movies
print(int(avg_tags_per_movie))

2


In [None]:
#h. Identify the users that tagged movies without rating them


In [13]:

users_tagged_not_rated=movies_df.join(tags_df, tags_df.movieId == movies_df.movieId, 'inner')\
                          .join(ratings_df, ratings_df.movieId == movies_df.movieId, 'left')\
                          .filter(col("rating").isNull())\
                          .select(tags_df.userId).distinct()

users_tagged_not_rated.show()

count= users_tagged_not_rated.count()
print(count)


+------+
|userId|
+------+
|   474|
|   318|
|   543|
|   288|
+------+

4


In [None]:
#i)What is the average number of ratings per user in ratings DF? And the
#average number of ratings per movie?

In [23]:
count_ratings=ratings_df.agg(count("rating").alias("count_rating")).collect()[0][0]
print(count_ratings)

total_users=ratings_df.select("userId").distinct().count()
print(total_users)

avg_ratings_per_user=count_ratings/total_users
print(avg_ratings_per_user)

100836
610
165.30491803278687


In [30]:
#j. What is the predominant (frequency based) genre per rating level?



from pyspark.sql.window import Window

joined_df=ratings_df.join(movies_df, ratings_df.movieId == movies_df.movieId, 'inner')

exploded_df = joined_df.withColumn("genre", explode(split("genres", "\|")))
grouped_df = exploded_df.groupBy("rating", "genre").count()

window=Window.partitionBy("rating").orderBy(col("count").desc())
ranked_df=grouped_df.withColumn("rank", rank().over(window)).filter(col("rank") == 1).orderBy(col("rating").desc())

ranked_df.select("rating", "genre").show()





[Stage 85:>                                                         (0 + 1) / 1]

+------+------+
|rating| genre|
+------+------+
|   5.0| Drama|
|   4.5| Drama|
|   4.0| Drama|
|   3.5| Drama|
|   3.0|Comedy|
|   2.5|Comedy|
|   2.0|Comedy|
|   1.5|Comedy|
|   1.0|Comedy|
|   0.5|Comedy|
+------+------+



                                                                                

In [33]:
#k. What is the predominant tag per genre and the most tagged genres?
from pyspark.sql.window import Window

joined_df=tags_df.join(movies_df, tags_df.movieId == movies_df.movieId, 'inner')
exploded_df=joined_df.withColumn("genre", explode(split("genres", "\|")))
grouped_df=exploded_df.groupBy("genre", "tag").count()

window=Window.partitionBy("genre").orderBy(desc("count"))
ranked_df=grouped_df.withColumn("rank", rank().over(window)).filter(col("rank") == 1)

ranked_df.select("genre", "tag").groupBy("genre").agg(collect_list("tag")).show(10)

+------------------+--------------------+
|             genre|   collect_list(tag)|
+------------------+--------------------+
|(no genres listed)|[quirky, understa...|
|            Action|         [superhero]|
|         Adventure|         [superhero]|
|         Animation|            [Disney]|
|          Children|            [Disney]|
|            Comedy|  [In Netflix queue]|
|             Crime|  [In Netflix queue]|
|       Documentary|  [In Netflix queue]|
|             Drama|  [In Netflix queue]|
|           Fantasy|            [Disney]|
+------------------+--------------------+
only showing top 10 rows



In [34]:
#l. What are the most predominant (popularity based) movies?


predominant_df=movies_df.join(ratings_df, ratings_df.movieId == movies_df.movieId, 'inner')\
                        .groupBy("title").count()\
                        .orderBy(desc("count"))

predominant_df.show(10)

[Stage 113:>                                                        (0 + 1) / 1]

+--------------------+-----+
|               title|count|
+--------------------+-----+
| Forrest Gump (1994)|  329|
|Shawshank Redempt...|  317|
| Pulp Fiction (1994)|  307|
|Silence of the La...|  279|
|  Matrix, The (1999)|  278|
|Star Wars: Episod...|  251|
|Jurassic Park (1993)|  238|
|   Braveheart (1995)|  237|
|Terminator 2: Jud...|  224|
|Schindler's List ...|  220|
+--------------------+-----+
only showing top 10 rows



                                                                                

In [39]:
#m. Top 10 movies in terms of average rating (provided more than 30 users
#reviewed them)


joined_df = movies_df.join(ratings_df, ratings_df.movieId == movies_df.movieId, 'inner')
grouped_df = joined_df.groupBy("title").agg(avg(col("rating")).alias("avg_rating"), count(col("rating")).alias("count_ratings"))
filtered_df = grouped_df.filter(col("count_ratings") > 30).orderBy(desc("avg_rating"))

filtered_df.show(5)


[Stage 120:>                                                        (0 + 1) / 1]

+--------------------+-----------------+-------------+
|               title|       avg_rating|count_ratings|
+--------------------+-----------------+-------------+
|Shawshank Redempt...|4.429022082018927|          317|
|Lawrence of Arabi...|              4.3|           45|
|Godfather, The (1...|        4.2890625|          192|
|   Fight Club (1999)|4.272935779816514|          218|
|Cool Hand Luke (1...|4.271929824561403|           57|
+--------------------+-----------------+-------------+
only showing top 5 rows



                                                                                

In [42]:
# Assuming 'avg_rating_per_user' contains the DataFrame with average ratings per user
avg_rating_per_user.write.csv("/tmp/movie_rate_project_output/avg_rating_per_user.csv", header=True, mode="overwrite")
print("sussess")

sussess


In [41]:
# Assuming 'filtered_df' contains the filtered DataFrame
filtered_df.write.csv("/tmp/movie_rate_project_output/top_movies_avg_ratings.csv", header=True, mode="overwrite")

print("sussess")


sussess


In [43]:
# Assuming 'movies_rated_not_tagged' contains the DataFrame
movies_rated_not_tagged.write.csv("/tmp/movie_rate_project_output/movies_rated_not_tagged.csv", header=True, mode="overwrite")
print("sussess")


NameError: name 'movies_rated_not_tagged' is not defined