# Importing required libraries and Creating Spark Session

In [129]:
# Importing Required libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, FloatType

# Creating Spark Session
Spark = SparkSession.builder.appName("Assignment2").enableHiveSupport().getOrCreate()

In [130]:
Spark

# Setting data location

In [131]:
# Setting data location
movies = "/Spark/Assignment2/movies.csv"
ratings = "/Spark/Assignment2/ratings.csv"
tags = "/Spark/Assignment2/tags.csv"

# Reading Movie data,Infering schema and Creating Dataframe

In [132]:
# Reading movies data and printing sample data
movies_df = Spark.read.format("csv").options(header = True,inferschema = True).load(movies)
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



# Reading Ratings data,Creating schema,changing timestamp column type from Integer to Timestamp and Creating Dataframe

In [133]:
# Setting ratings data schema
ratings_schema = StructType([StructField('userId',IntegerType(),True),
                             StructField('movieId',IntegerType(),True),
                             StructField('rating',FloatType(),True),
                             StructField('timestamp',IntegerType(),True)
])

# Reading ratings data and converting timestamp from int type to timestamp type and printing data
ratings_df = Spark.read.format("csv").option('header','true').schema(ratings_schema).load(ratings)
ratings_df = ratings_df.withColumn("timestamp",F.from_unixtime("timestamp").cast(TimestampType()))
ratings_df.show(5)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      1|   4.0|2000-07-30 18:45:03|
|     1|      3|   4.0|2000-07-30 18:20:47|
|     1|      6|   4.0|2000-07-30 18:37:04|
|     1|     47|   5.0|2000-07-30 19:03:35|
|     1|     50|   5.0|2000-07-30 18:48:51|
+------+-------+------+-------------------+
only showing top 5 rows



# Reading Tags data,Creating schema,changing timestamp column type from Integer to Timestamp and Creating Dataframe

In [134]:
# Setting tags data schema
tags_schema = StructType([StructField("userId",IntegerType(),True),
                          StructField("movieId",IntegerType(),True),
                          StructField("tag",StringType(),True),
                          StructField("timestamp",IntegerType(),True)
])

# Reading tags data and converting timestamp from int type to timestamp type and printing data
tags_df = Spark.read.csv(tags,header=True,schema=tags_schema)
tags_df = tags_df.withColumn("timestamp",F.from_unixtime("timestamp").cast(TimestampType()))
tags_df.show(5)

+------+-------+---------------+-------------------+
|userId|movieId|            tag|          timestamp|
+------+-------+---------------+-------------------+
|     2|  60756|          funny|2015-10-24 19:29:54|
|     2|  60756|Highly quotable|2015-10-24 19:29:56|
|     2|  60756|   will ferrell|2015-10-24 19:29:52|
|     2|  89774|   Boxing story|2015-10-24 19:33:27|
|     2|  89774|            MMA|2015-10-24 19:33:20|
+------+-------+---------------+-------------------+
only showing top 5 rows



# Work with Spark SQL

In [135]:
movies_df.createOrReplaceTempView("Movies")
ratings_df.createOrReplaceTempView("Ratings")
tags_df.createOrReplaceTempView("Tags")

# 1. Show the aggregated number of ratings per year and save the data in a single CSV file in HDFS 

In [136]:
# Aggregated number of ratings per year
Query = """ SELECT Year(timestamp) as Year, Count(rating) AS Ratings
            FROM Ratings
            GROUP BY Year
            ORDER BY Year DESC; """
Output = Spark.sql(Query)
Output.show()

# Write data in HDFS into single file
Output.coalesce(1).write.mode("overwrite").format('csv').option('header', 'true') .option('delimiter', ',').save("/Spark/Assignment2/Solution/agg_ratings_yearly.csv")
print("Saved Successfully")

+----+-------+
|Year|Ratings|
+----+-------+
|2018|   6418|
|2017|   8198|
|2016|   6703|
|2015|   6616|
|2014|   1439|
|2013|   1664|
|2012|   4656|
|2011|   1690|
|2010|   2301|
|2009|   4158|
|2008|   4351|
|2007|   7114|
|2006|   4059|
|2005|   5813|
|2004|   3279|
|2003|   4014|
|2002|   3478|
|2001|   3922|
|2000|  10061|
|1999|   2439|
+----+-------+
only showing top 20 rows

Saved Successfully


# 2. Show the average monthly number of ratings and save the data in a single CSV file in HDFS

In [137]:
# Average monthly number of ratings
Query =  """ SELECT left(timestamp,7) as Month, Count(rating) AS Ratings 
            FROM Ratings
            GROUP BY Month 
            ORDER BY Month DESC; """

Output = Spark.sql(Query)
Output.show()
# Write data in HDFS into single file
Output.coalesce(1).write.mode("overwrite").format('csv').options(header = True,delimiter = ',').save("/Spark/Assignment2/Solution/agg_ratings_monthly.csv")
print("Saved Successfully")

+-------+-------+
|  Month|Ratings|
+-------+-------+
|2018-09|    604|
|2018-08|    831|
|2018-07|    293|
|2018-06|    419|
|2018-05|    951|
|2018-04|    230|
|2018-03|    971|
|2018-02|   1169|
|2018-01|    950|
|2017-12|    536|
|2017-11|    230|
|2017-10|    225|
|2017-09|    424|
|2017-08|    221|
|2017-07|    170|
|2017-06|   1910|
|2017-05|   2397|
|2017-04|    923|
|2017-03|    549|
|2017-02|    420|
+-------+-------+
only showing top 20 rows

Saved Successfully


# 3. Show the ratings count and save the data in a single CSV file in HDFS

In [138]:
# rating counts

Query = """ SELECT rating, COUNT(rating) as Counts
            FROM Ratings
            Group by rating
            ORDER BY rating; """
Output = Spark.sql(Query)
Output.show()

# Write data in HDFS into single file
Output.coalesce(1).write.mode("overwrite").format("csv").options(header = True,delimiter = ',').save("/Spark/Assignment2/Solution/ratings_count.csv")
print("Saved Successfully")

+------+------+
|rating|Counts|
+------+------+
|   0.5|  1370|
|   1.0|  2811|
|   1.5|  1791|
|   2.0|  7551|
|   2.5|  5550|
|   3.0| 20047|
|   3.5| 13136|
|   4.0| 26818|
|   4.5|  8551|
|   5.0| 13211|
+------+------+

Saved Successfully


# 4. Show the rating levels distribution and save the data in a single CSV file in HDFS

In [139]:
# rating levels distribution

Query = """ WITH T1 AS (
            SELECT rating,
            CASE WHEN rating >= 0 AND rating < 2.5 THEN '0.0 - 2.0'
            WHEN rating >= 2.5 AND rating < 4.0 THEN '2.5 - 3.5'
            ELSE '4.0 - 5.0' END AS Rating_Bucket 
            FROM Ratings),
            
            T2 AS (SELECT Rating_Bucket, Count(rating) as Counts
            FROM T1 
            GROUP BY Rating_Bucket
            ORDER BY Rating_Bucket)
            
            SELECT Rating_Bucket, Counts, Counts*100/SUM(Counts)OVER() AS Percentage
            FROM T2 """

Output = Spark.sql(Query)
Output.show()

# Write data in HDFS into single file
Output.coalesce(1).write.mode("overwrite").format("csv").options(header = True,delimiter = ',').save("/Spark/Assignment2/Solution/rating_level_distribution.csv")
print("Saved Successfully")

24/02/17 20:39:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 2

+-------------+------+------------------+
|Rating_Bucket|Counts|        Percentage|
+-------------+------+------------------+
|    0.0 - 2.0| 13523|13.410885001388394|
|    2.5 - 3.5| 38733| 38.41187671069856|
|    4.0 - 5.0| 48580| 48.17723828791305|
+-------------+------+------------------+



24/02/17 20:39:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 2

Saved Successfully


# 5. Show the movies that have tag but no rating and save the data in a single CSV file in HDFS

In [140]:
# movies that have tag but no rating

Query = """ WITH T1 AS(SELECT DISTINCT(T.movieId) FROM Tags T 
            LEFT JOIN Ratings R ON 
            T.movieId = R.movieId
            WHERE R.movieID is NULL)
            
            SELECT M.title FROM Movies M
            INNER JOIN T1 ON T1.movieId = M.movieId 
            ORDER BY M.title"""
Output = Spark.sql(Query)
Output.show()

# Write data in HDFS into single file
Output.coalesce(1).write.mode("overwrite").format("csv").options(header = True, delimiter = ',').save("/Spark/Assignment2/Solution/movies_without_ratings.csv")
print("saved Successfully")

+--------------------+
|               title|
+--------------------+
|Browning Version,...|
|Call Northside 77...|
|  Chalet Girl (2011)|
|  Chosen, The (1981)|
|Color of Paradise...|
|For All Mankind (...|
|I Know Where I'm ...|
|In the Realms of ...|
|Innocents, The (1...|
|Mutiny on the Bou...|
|      Niagara (1953)|
|Parallax View, Th...|
|        Proof (1991)|
|Road Home, The (W...|
|Roaring Twenties,...|
|      Scrooge (1970)|
|This Gun for Hire...|
|Twentieth Century...|
+--------------------+

saved Successfully


# 6. Show the movies that have rating but no tag and save the data in a single CSV file in HDFS

In [141]:
# movies that have rating but no tag

Query = """ WITH T1 AS(SELECT DISTINCT(R.movieId) FROM Tags T 
            RIGHT JOIN Ratings R ON 
            T.movieId = R.movieId
            WHERE T.movieID is NULL)
            
            SELECT M.title FROM Movies M
            INNER JOIN T1 ON T1.movieId = M.movieId 
            ORDER BY M.title"""
Output = Spark.sql(Query)
Output.show()

# Write data in HDFS into single file
Output.coalesce(1).write.mode("overwrite").format("csv").options(header = True, delimiter = ',').save("/Spark/Assignment2/Solution/movies_without_tags.csv")
print("saved Successfully")

+--------------------+
|               title|
+--------------------+
|          '71 (2014)|
|'Hellboy': The Se...|
|'Round Midnight (...|
| 'Salem's Lot (2004)|
|'Til There Was Yo...|
|'Tis the Season f...|
|  'burbs, The (1989)|
|'night Mother (1986)|
|*batteries not in...|
|...All the Marble...|
|00 Schneider - Ja...|
|   1-900 (06) (1994)|
|           10 (1979)|
|10 Cent Pistol (2...|
|10 Items or Less ...|
|     10 Years (2011)|
|    10,000 BC (2008)|
|    100 Girls (2000)|
|  100 Streets (2016)|
|101 Dalmatians II...|
+--------------------+
only showing top 20 rows

saved Successfully


# 7. Focusing on the rated untagged movies with more than 30 user ratings, show the top 10 movies in terms of average rating and number of ratings and save the data in a single CSV file in HDFS

In [142]:
#Focusing on the rated untagged movies with more than 30 user ratings, show the top 10 movies in terms of average rating and number of ratings

Query = """ WITH T1 AS (
                SELECT movieId FROM Ratings
                GROUP BY movieId
                HAVING COUNT(DISTINCT userId) > 30),
            T2 AS ( SELECT T1.movieId FROM T1 
                    LEFT JOIN Tags T
                    ON T1.movieId = T.movieId
                    WHERE  T.movieId is NULL),
            T3 AS ( SELECT M.Title, M.movieId FROM Movies M
                    INNER JOIN T2 ON 
                    M.movieId = T2.movieId
                    ORDER BY 1),
            T4 AS ( SELECT T3.title, AVG(R.rating) as avg_rating,
                    DENSE_RANK() OVER(ORDER BY AVG(R.rating) DESC) AS avg_rank
                    FROM T3 LEFT JOIN 
                    Ratings R ON T3.movieId = R.movieId
                    GROUP BY 1),
            T5 AS ( SELECT T3.title, COUNT(R.rating) AS rating_count,
                    DENSE_RANK() OVER(ORDER BY COUNT(R.rating) DESC) AS count_rank
                    FROM T3 LEFT JOIN 
                    Ratings R ON T3.movieId = R.movieId
                    GROUP BY 1)
            SELECT T4.title as MovieTitle1, T4.avg_rating AS AverageRating, T4.avg_rank as AverageRank,
            T5.title AS MovieTitle2, T5.rating_count as CountRating , T5.count_rank AS CountRank
            FROM T4 INNER JOIN T5 ON 
            T4.avg_rank = T5.count_rank
            WHERE T4.avg_rank <= 10 AND T5.count_rank <= 10 """

Output = Spark.sql(Query)
Output.show()
# Write data in HDFS into single file
Output.coalesce(1).write.mode("overwrite").format("csv").options(header = True, delimiter = ',').save("/Spark/Assignment2/Solution/Top_10_Average_Ratings&Count_Ratings.csv")
print("Saved Successfully")

24/02/17 20:39:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 2

+--------------------+-----------------+-----------+--------------------+-----------+---------+
|         MovieTitle1|    AverageRating|AverageRank|         MovieTitle2|CountRating|CountRank|
+--------------------+-----------------+-----------+--------------------+-----------+---------+
|Boondock Saints, ...| 4.22093023255814|          1|American Beauty (...|        204|        1|
|       Brazil (1985)|4.177966101694915|          2|Ace Ventura: Pet ...|        161|        2|
|Cinema Paradiso (...|4.161764705882353|          3|    Mask, The (1994)|        157|        3|
|       Snatch (2000)|4.155913978494624|          4|     Die Hard (1988)|        145|        4|
|For a Few Dollars...|4.151515151515151|          5|Die Hard: With a ...|        144|        5|
|Lives of Others, ...|4.117647058823529|          6|Groundhog Day (1993)|        143|        6|
|  Toy Story 3 (2010)|4.109090909090909|          7|Dumb & Dumber (Du...|        133|        7|
|Boogie Nights (1997)|4.076923076923077|

24/02/17 20:39:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 20:39:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/02/17 2

Saved Successfully


# 8. What is the average number of tags per movie in tagsDF? And the average number of tags per user? How does it compare with the average number of tags a user assigns to a movie? 
# save the data in a single CSV file in HDFS

In [143]:
# What is the average number of tags per movie in tagsDF? And the average number of tags per user? How does it compare with the average number of tags a user assigns to a movie?

Query = """ WITH T1 AS
                ( SELECT movieId, COUNT(tag) AS MovieTags
                FROM Tags
                GROUP BY movieId
                ORDER BY movieId ),
            T2 AS 
                ( SELECT userId, COUNT(tag) AS UserTags
                FROM Tags
                GROUP BY userId
                ORDER BY userId )
            
            SELECT AVG(MovieTags) AS Average_Tags_Per_Movie,AVG(UserTags) AS Average_Tags_Per_User FROM T1,T2 """

Output = Spark.sql(Query)
Output.show()

# Write data in HDFS into single file
Output.coalesce(1).write.mode("overwrite").format("csv").options(header = True,delimiter = ',').save("/Spark/Assignment2/Solution/Average_Tags_Per_Movie&User.csv")
print("Saved Successfully")

+----------------------+---------------------+
|Average_Tags_Per_Movie|Average_Tags_Per_User|
+----------------------+---------------------+
|    2.3428753180661577|                 63.5|
+----------------------+---------------------+

Saved Successfully


# 9. Identify the users that tagged movies without rating them and save the data in a single CSV file in HDFS

In [144]:
# Identify the users that tagged movies without rating them

Query = """ SELECT DISTINCT T.userId FROM Tags T 
                LEFT JOIN Ratings R 
                ON T.movieId = R.movieId
                WHERE R.userId IS NULL """

Output = Spark.sql(Query)
Output.show()

# Write data in HDFS into single file
Output.coalesce(1).write.mode("overwrite").format("csv").options(header = True, delimiter = ',').save("/Spark/Assignment2/Solution/User_tags_Without_Ratings.csv")
print("Saved Successfully")

+------+
|userId|
+------+
|   474|
|   318|
|   543|
|   288|
+------+

Saved Successfully


# 10.What is the average number of ratings per user in ratings DF? And the average number of ratings per movie? save the data in a single CSV file in HDFS

In [145]:
# What is the average number of ratings per user in ratings DF? And the average number of ratings per movie?
Query = """ WITH T1 AS 
                ( SELECT COUNT(*)/COUNT(DISTINCT userId) AS Average_Ratings_Per_User 
                FROM Ratings ),
            T2 AS
                ( SELECT COUNT(*)/COUNT(DISTINCT movieId) AS Average_Ratings_Per_Movie
                FROM Ratings )
            
            SELECT * FROM T1,T2 """

Output = Spark.sql(Query)
Output.show()

# Write data in HDFS into single file
Output.coalesce(1).write.mode("overwrite").format("csv").options(header = True, delimiter = ',').save("/Spark/Assignment2/Solution/Average_Ratings_Per_User&Movie.csv")
print("Saved Successfully")

+------------------------+-------------------------+
|Average_Ratings_Per_User|Average_Ratings_Per_Movie|
+------------------------+-------------------------+
|      165.30491803278687|       10.369806663924312|
+------------------------+-------------------------+

Saved Successfully


# 11. What is the predominant (frequency based) genre per rating level? save the data in a single CSV file in HDFS

In [146]:
# What is the predominant (frequency based) genre per rating level?
Query = """ WITH T1 AS 
                ( SELECT R.rating AS Rating, M.genres AS Genre, COUNT(R.rating) AS Counts,
                DENSE_RANK() OVER(Partition BY R.rating ORDER BY COUNT(R.rating) DESC) AS Rank
                FROM Ratings R 
                INNER JOIN Movies M 
                ON M.movieId = R.movieId 
                GROUP BY Rating,Genre
                ORDER BY Counts DESC,Rating )
                
                SELECT T1.Rating, T1.Genre, T1.Counts FROM T1 WHERE Rank = 1
                ORDER BY T1.Rating DESC """

Output = Spark.sql(Query)
Output.show()

# Write data in HDFS into single file
Output.coalesce(1).write.mode("overwrite").format("csv").options(header = True,delimiter = ',').save("/Spark/Assignment2/Solution/Frequency_Based_Genre_Per_Rating.csv")

print("Saved Successfully")



+------+------+------+
|Rating| Genre|Counts|
+------+------+------+
|   5.0| Drama|   895|
|   4.5| Drama|   593|
|   4.0| Drama|  2055|
|   3.5|Comedy|   854|
|   3.0|Comedy|  1614|
|   2.5|Comedy|   515|
|   2.0|Comedy|   828|
|   1.5|Comedy|   256|
|   1.0|Comedy|   348|
|   0.5|Comedy|   136|
+------+------+------+

Saved Successfully


# 12. What is the predominant tag per genre and the most tagged genres? save the data in a single CSV file in HDFS

In [147]:
# What is the predominant tag per genre and the most tagged genres?
Query = """ WITH T1 AS 
                ( SELECT T.tag AS Tag, M.genres AS Genre, COUNT(*) AS Counts,
                DENSE_RANK() OVER(Partition BY M.genres ORDER BY COUNT(*) DESC) AS Rank
                FROM Tags T 
                RIGHT JOIN Movies M 
                ON M.movieId = T.movieId 
                GROUP BY Genre,Tag )
                
                SELECT T1.Tag, T1.Genre FROM T1 WHERE Rank = 1
                ORDER BY T1.Genre DESC """

Output = Spark.sql(Query)
Output.show()

# Write data in HDFS into single file
Output.coalesce(1).write.mode("overwrite").format("csv").options(header = True,delimiter = ',').save("/Spark/Assignment2/Solution/Frequency_Based_Genre_Per_Tag.csv")

print("Saved Successfully")


+--------------+--------------------+
|           Tag|               Genre|
+--------------+--------------------+
|          null|             Western|
|          null|                 War|
|          null|            Thriller|
|          null|Sci-Fi|Thriller|IMAX|
|          null|     Sci-Fi|Thriller|
|   time-travel|         Sci-Fi|IMAX|
|        sci-fi|         Sci-Fi|IMAX|
|          null|              Sci-Fi|
|          null|     Romance|Western|
|     Hemingway|         Romance|War|
|          null|    Romance|Thriller|
|      artistic|Romance|Sci-Fi|Th...|
|         artsy|Romance|Sci-Fi|Th...|
|          null|Romance|Sci-Fi|Th...|
|     dreamlike|Romance|Sci-Fi|Th...|
|   atmospheric|Romance|Sci-Fi|Th...|
|existentialism|Romance|Sci-Fi|Th...|
|     Beautiful|Romance|Sci-Fi|Th...|
|          null|      Romance|Sci-Fi|
|          null|             Romance|
+--------------+--------------------+
only showing top 20 rows

Saved Successfully


# 13. What are the most predominant (popularity based) movies? save the data in a single CSV file in HDFS

In [148]:
# What are the most predominant (popularity based) movies?

Query = """ SELECT M.title AS Title, COUNT(R.rating) as Total_Ratings
            FROM Movies M 
            INNER JOIN Ratings R 
            ON M.movieId = R.movieId 
            GROUP BY Title
            ORDER BY Total_Ratings DESC """

Output = Spark.sql(Query)
Output.show()

# Write data in HDFS into single file
Output.coalesce(1).write.mode("overwrite").format("csv").options(header = True,delimiter = ',').save("/Spark/Assignment2/Solution/Most_Popular_Movies.csv")

print("Saved Successfully")


+--------------------+-------------+
|               Title|Total_Ratings|
+--------------------+-------------+
| Forrest Gump (1994)|          329|
|Shawshank Redempt...|          317|
| Pulp Fiction (1994)|          307|
|Silence of the La...|          279|
|  Matrix, The (1999)|          278|
|Star Wars: Episod...|          251|
|Jurassic Park (1993)|          238|
|   Braveheart (1995)|          237|
|Terminator 2: Jud...|          224|
|Schindler's List ...|          220|
|   Fight Club (1999)|          218|
|    Toy Story (1995)|          215|
|Star Wars: Episod...|          211|
|American Beauty (...|          204|
|Usual Suspects, T...|          204|
|Seven (a.k.a. Se7...|          203|
|Independence Day ...|          202|
|    Apollo 13 (1995)|          201|
|Raiders of the Lo...|          200|
|Lord of the Rings...|          198|
+--------------------+-------------+
only showing top 20 rows

Saved Successfully


# 14. Top 10 movies in terms of average rating (provided more than 30 users reviewed them) save the data in a single CSV file in HDFS

In [149]:
# Top 10 movies in terms of average rating (provided more than 30 users reviewed them)

Query = """ WITH T1 AS 
                ( SELECT movieId, COUNT(DISTINCT userId) AS Rating_Counts
                FROM Ratings
                GROUP BY movieId
                HAVING Rating_Counts >= 30
                ORDER BY Rating_Counts DESC ),
            T2 AS 
                ( SELECT T1.movieId as MovieId, AVG(R.rating) AS Average_Rating FROM
                T1 INNER JOIN Ratings R ON 
                T1.movieId = R.movieId
                GROUP BY T1.movieId
                ORDER BY AVG(R.rating) DESC)
            SELECT M.title,M.MovieId,T2.Average_Rating,M.genres FROM T2
            INNER JOIN Movies M 
            ON M.movieId =  T2.movieId
            ORDER BY Average_Rating DESC """

Output = Spark.sql(Query)
Output.show()

# Write data in HDFS into single file
Output.coalesce(1).write.mode("overwrite").format("csv").options(header = True,delimiter = ',').save("/Spark/Assignment2/Solution/Top_10_Average_Ratings_Movies.csv")

print("Saved Successfully")

                                                                                

+--------------------+-------+-----------------+--------------------+
|               title|MovieId|   Average_Rating|              genres|
+--------------------+-------+-----------------+--------------------+
|Shawshank Redempt...|    318|4.429022082018927|         Crime|Drama|
|Lawrence of Arabi...|   1204|              4.3| Adventure|Drama|War|
|Godfather, The (1...|    858|        4.2890625|         Crime|Drama|
|   Fight Club (1999)|   2959|4.272935779816514|Action|Crime|Dram...|
|Cool Hand Luke (1...|   1276|4.271929824561403|               Drama|
|Dr. Strangelove o...|    750|4.268041237113402|          Comedy|War|
|  Rear Window (1954)|    904|4.261904761904762|    Mystery|Thriller|
|Godfather: Part I...|   1221| 4.25968992248062|         Crime|Drama|
|Departed, The (2006)|  48516|4.252336448598131|Crime|Drama|Thriller|
|Manchurian Candid...|   1267|             4.25|  Crime|Thriller|War|
|   Goodfellas (1990)|   1213|             4.25|         Crime|Drama|
|   Casablanca (1942