### Importing Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import format_number

### Starting Spark Session and Load Data

In [2]:
spark = SparkSession.builder.appName("Top movies").master("local[*]").getOrCreate()

In [58]:
movies = spark.read.csv("./data/movies.csv", inferSchema=True, header=True)
ratings = spark.read.csv("./data/ratings.csv", inferSchema=True, header=True)

In [59]:
movies.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [60]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [61]:
movies.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [62]:
ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
|     1|   1263|   2.0|1260759151|
|     1|   1287|   2.0|1260759187|
|     1|   1293|   2.0|1260759148|
|     1|   1339|   3.5|1260759125|
|     1|   1343|   2.0|1260759131|
|     1|   1371|   2.5|1260759135|
|     1|   1405|   1.0|1260759203|
|     1|   1953|   4.0|1260759191|
|     1|   2105|   4.0|1260759139|
|     1|   2150|   3.0|1260759194|
|     1|   2193|   2.0|1260759198|
|     1|   2294|   2.0|1260759108|
|     1|   2455|   2.5|1260759113|
|     1|   2968|   1.0|1260759200|
|     1|   3671|   3.0|1260759117|
+------+-------+------+----------+
only showing top 20 rows



In [63]:
ratings.count()

100004

# Filter Movies That Received at least 10 user reviews

In [64]:
reviewCounts = ratings.groupBy(ratings.movieId).count().withColumnRenamed("count","reviewCounts")

In [65]:
reviewCounts.show()

+-------+------------+
|movieId|reviewCounts|
+-------+------------+
|   1580|         190|
|   2659|           3|
|   3794|           5|
|   3175|          65|
|    471|          49|
|   1088|          53|
|   1342|          17|
|   1645|          60|
|   2366|          23|
|   6620|          17|
|   8638|          17|
|  96488|           4|
| 160563|           2|
|   7982|           3|
|   1238|          17|
|   1959|          30|
|    463|           7|
|   2122|          11|
|   1591|          15|
|   5518|           1|
+-------+------------+
only showing top 20 rows



In [66]:
reviewCountsGRE10 = reviewCounts.filter(reviewCounts["reviewCounts"] >= 10)

In [67]:
reviewCountsGRE10.show()

+-------+------------+
|movieId|reviewCounts|
+-------+------------+
|   1580|         190|
|   3175|          65|
|    471|          49|
|   1088|          53|
|   1342|          17|
|   1645|          60|
|   2366|          23|
|   6620|          17|
|   8638|          17|
|   1238|          17|
|   1959|          30|
|   2122|          11|
|   1591|          15|
|  44022|          20|
|   2142|          12|
|   2866|          10|
|  68135|          11|
|   3997|          10|
|   1721|         164|
|    858|         200|
+-------+------------+
only showing top 20 rows



In [68]:
reviewCountsGRE10.count()

2245

### Join reviewCountsGRE10 with ratings data

In [69]:
df = reviewCountsGRE10.join(ratings, ratings["movieId"] == reviewCountsGRE10["movieId"])

In [70]:

df.show()

+-------+------------+------+-------+------+----------+
|movieId|reviewCounts|userId|movieId|rating| timestamp|
+-------+------------+------+-------+------+----------+
|     31|          42|     1|     31|   2.5|1260759144|
|   1029|          42|     1|   1029|   3.0|1260759179|
|   1061|          33|     1|   1061|   3.0|1260759182|
|   1129|          48|     1|   1129|   2.0|1260759185|
|   1172|          46|     1|   1172|   4.0|1260759205|
|   1263|          48|     1|   1263|   2.0|1260759151|
|   1287|          46|     1|   1287|   2.0|1260759187|
|   1293|          46|     1|   1293|   2.0|1260759148|
|   1339|          52|     1|   1339|   3.5|1260759125|
|   1343|          39|     1|   1343|   2.0|1260759131|
|   1371|          47|     1|   1371|   2.5|1260759135|
|   1405|          46|     1|   1405|   1.0|1260759203|
|   1953|          46|     1|   1953|   4.0|1260759191|
|   2105|          47|     1|   2105|   4.0|1260759139|
|   2150|          36|     1|   2150|   3.0|1260

In [71]:
repeated_columns = [c for c in reviewCountsGRE10.columns if c in ratings.columns]

for col in repeated_columns:
    df = df.drop(reviewCountsGRE10[col])

In [72]:
df.show()

+------------+------+-------+------+----------+
|reviewCounts|userId|movieId|rating| timestamp|
+------------+------+-------+------+----------+
|          42|     1|     31|   2.5|1260759144|
|          42|     1|   1029|   3.0|1260759179|
|          33|     1|   1061|   3.0|1260759182|
|          48|     1|   1129|   2.0|1260759185|
|          46|     1|   1172|   4.0|1260759205|
|          48|     1|   1263|   2.0|1260759151|
|          46|     1|   1287|   2.0|1260759187|
|          46|     1|   1293|   2.0|1260759148|
|          52|     1|   1339|   3.5|1260759125|
|          39|     1|   1343|   2.0|1260759131|
|          47|     1|   1371|   2.5|1260759135|
|          46|     1|   1405|   1.0|1260759203|
|          46|     1|   1953|   4.0|1260759191|
|          47|     1|   2105|   4.0|1260759139|
|          36|     1|   2150|   3.0|1260759194|
|          42|     1|   2193|   2.0|1260759198|
|          53|     1|   2294|   2.0|1260759108|
|          47|     1|   2455|   2.5|1260

In [73]:
df.count()

81915

## TOP 20 movies with highest average ratings

In [74]:
avgRatings = df.groupBy(df.movieId).avg('rating').withColumnRenamed("avg(rating)", "AvgRating")

In [75]:
avgRatings_sorted = avgRatings.sort(avgRatings["AvgRating"].desc())

In [76]:
avgRatings_sorted.show()

+-------+-----------------+
|movieId|        AvgRating|
+-------+-----------------+
|   1939|4.636363636363637|
|   3469|4.541666666666667|
|    858|           4.4875|
|    318|4.487138263665595|
|   1948|4.458333333333333|
|   8132|4.454545454545454|
|   1945|4.448275862068965|
|   1147|           4.4375|
|    926|4.434210526315789|
|   1217|4.423076923076923|
|    969|             4.42|
|   3035|4.411764705882353|
|   1066|4.409090909090909|
|   2203|              4.4|
|   2064|4.392857142857143|
|    913|4.387096774193548|
|   7502|4.386363636363637|
|   1221|4.385185185185185|
|    905|             4.38|
|     50|4.370646766169155|
+-------+-----------------+
only showing top 20 rows



In [77]:
top_20 = avgRatings_sorted.take(20)

In [78]:
top_20

[Row(movieId=1939, AvgRating=4.636363636363637),
 Row(movieId=3469, AvgRating=4.541666666666667),
 Row(movieId=858, AvgRating=4.4875),
 Row(movieId=318, AvgRating=4.487138263665595),
 Row(movieId=1948, AvgRating=4.458333333333333),
 Row(movieId=8132, AvgRating=4.454545454545454),
 Row(movieId=1945, AvgRating=4.448275862068965),
 Row(movieId=1147, AvgRating=4.4375),
 Row(movieId=926, AvgRating=4.434210526315789),
 Row(movieId=1217, AvgRating=4.423076923076923),
 Row(movieId=969, AvgRating=4.42),
 Row(movieId=3035, AvgRating=4.411764705882353),
 Row(movieId=1066, AvgRating=4.409090909090909),
 Row(movieId=2203, AvgRating=4.4),
 Row(movieId=2064, AvgRating=4.392857142857143),
 Row(movieId=913, AvgRating=4.387096774193548),
 Row(movieId=7502, AvgRating=4.386363636363637),
 Row(movieId=1221, AvgRating=4.385185185185185),
 Row(movieId=905, AvgRating=4.38),
 Row(movieId=50, AvgRating=4.370646766169155)]

In [79]:
top_20_df = spark.createDataFrame(top_20)

In [80]:
top_20_df.show()

+-------+-----------------+
|movieId|        AvgRating|
+-------+-----------------+
|   1939|4.636363636363637|
|   3469|4.541666666666667|
|    858|           4.4875|
|    318|4.487138263665595|
|   1948|4.458333333333333|
|   8132|4.454545454545454|
|   1945|4.448275862068965|
|   1147|           4.4375|
|    926|4.434210526315789|
|   1217|4.423076923076923|
|    969|             4.42|
|   3035|4.411764705882353|
|   1066|4.409090909090909|
|   2203|              4.4|
|   2064|4.392857142857143|
|    913|4.387096774193548|
|   7502|4.386363636363637|
|   1221|4.385185185185185|
|    905|             4.38|
|     50|4.370646766169155|
+-------+-----------------+



## Finding top20 movies

In [81]:
result = top_20_df.join(movies,"movieId")

In [82]:
result.show()

+-------+-----------------+--------------------+--------------------+
|movieId|        AvgRating|               title|              genres|
+-------+-----------------+--------------------+--------------------+
|   1939|4.636363636363637|Best Years of Our...|           Drama|War|
|   3469|4.541666666666667|Inherit the Wind ...|               Drama|
|    858|           4.4875|Godfather, The (1...|         Crime|Drama|
|    318|4.487138263665595|Shawshank Redempt...|         Crime|Drama|
|   1948|4.458333333333333|    Tom Jones (1963)|Adventure|Comedy|...|
|   8132|4.454545454545454|    Gladiator (1992)|        Action|Drama|
|   1945|4.448275862068965|On the Waterfront...|         Crime|Drama|
|   1147|           4.4375|When We Were King...|         Documentary|
|    926|4.434210526315789|All About Eve (1950)|               Drama|
|   1217|4.423076923076923|          Ran (1985)|           Drama|War|
|    969|             4.42|African Queen, Th...|Adventure|Comedy|...|
|   3035|4.411764705

In [83]:
final_result = result.join(reviewCountsGRE10, "movieId")
final_result = final_result.orderBy(final_result.AvgRating, final_result.reviewCounts)

### Format according to requirements

In [84]:
final_result = final_result.select(
    final_result["movieId"],
    final_result["title"].alias("Movie_Name"),
    format_number(final_result["AvgRating"].cast("float"), 2).alias("Average_Rating"),
    final_result["reviewCounts"].alias("Total_number_of_ratings")
)

In [85]:
final_result.show(truncate=False)

+-------+-----------------------------------+--------------+-----------------------+
|movieId|Movie_Name                         |Average_Rating|Total_number_of_ratings|
+-------+-----------------------------------+--------------+-----------------------+
|50     |Usual Suspects, The (1995)         |4.37          |201                    |
|905    |It Happened One Night (1934)       |4.38          |25                     |
|1221   |Godfather: Part II, The (1974)     |4.39          |135                    |
|7502   |Band of Brothers (2001)            |4.39          |22                     |
|913    |Maltese Falcon, The (1941)         |4.39          |62                     |
|2064   |Roger & Me (1989)                  |4.39          |42                     |
|2203   |Shadow of a Doubt (1943)           |4.40          |10                     |
|1066   |Shall We Dance (1937)              |4.41          |11                     |
|3035   |Mister Roberts (1955)              |4.41          |17   

#### Storing Data into a parquet file

In [86]:
final_result.toPandas().to_parquet("./data/top20_movies.parquet")

#### Create dataframe from a parquet file

#### Ending Session

In [87]:
popular_20 = spark.read.parquet("./data/top20_movies.parquet")

In [88]:
popular_20.show(truncate=False)

+-------+-----------------------------------+--------------+-----------------------+
|movieId|Movie_Name                         |Average_Rating|Total_number_of_ratings|
+-------+-----------------------------------+--------------+-----------------------+
|50     |Usual Suspects, The (1995)         |4.37          |201                    |
|905    |It Happened One Night (1934)       |4.38          |25                     |
|1221   |Godfather: Part II, The (1974)     |4.39          |135                    |
|7502   |Band of Brothers (2001)            |4.39          |22                     |
|913    |Maltese Falcon, The (1941)         |4.39          |62                     |
|2064   |Roger & Me (1989)                  |4.39          |42                     |
|2203   |Shadow of a Doubt (1943)           |4.40          |10                     |
|1066   |Shall We Dance (1937)              |4.41          |11                     |
|3035   |Mister Roberts (1955)              |4.41          |17   

#### Store popular_20 into JSON

In [89]:
popular_20.toJSON().collect()

['{"movieId":50,"Movie_Name":"Usual Suspects, The (1995)","Average_Rating":"4.37","Total_number_of_ratings":201}',
 '{"movieId":905,"Movie_Name":"It Happened One Night (1934)","Average_Rating":"4.38","Total_number_of_ratings":25}',
 '{"movieId":1221,"Movie_Name":"Godfather: Part II, The (1974)","Average_Rating":"4.39","Total_number_of_ratings":135}',
 '{"movieId":7502,"Movie_Name":"Band of Brothers (2001)","Average_Rating":"4.39","Total_number_of_ratings":22}',
 '{"movieId":913,"Movie_Name":"Maltese Falcon, The (1941)","Average_Rating":"4.39","Total_number_of_ratings":62}',
 '{"movieId":2064,"Movie_Name":"Roger & Me (1989)","Average_Rating":"4.39","Total_number_of_ratings":42}',
 '{"movieId":2203,"Movie_Name":"Shadow of a Doubt (1943)","Average_Rating":"4.40","Total_number_of_ratings":10}',
 '{"movieId":1066,"Movie_Name":"Shall We Dance (1937)","Average_Rating":"4.41","Total_number_of_ratings":11}',
 '{"movieId":3035,"Movie_Name":"Mister Roberts (1955)","Average_Rating":"4.41","Total_n

In [90]:
# popular_20.coalesce(1).write.format('json').save('./data/top20movies')
popular_20.toPandas().to_json('./data/top20_movies.json',orient='records', force_ascii=False, lines=True)

#### Ending Session

In [5]:
spark.stop()