In [0]:
Movies_df = spark.read.format("csv").option("header", "False").option("sep","::").load("/FileStore/tables/movies.dat").toDF("movie_id","title","genre")
Movies_df.show(Movies_df.count(),False)

+--------+----------------------------------------------------------------------------------+-----------------------------------------------+
|movie_id|title                                                                             |genre                                          |
+--------+----------------------------------------------------------------------------------+-----------------------------------------------+
|1       |Toy Story (1995)                                                                  |Animation|Children's|Comedy                    |
|2       |Jumanji (1995)                                                                    |Adventure|Children's|Fantasy                   |
|3       |Grumpier Old Men (1995)                                                           |Comedy|Romance                                 |
|4       |Waiting to Exhale (1995)                                                          |Comedy|Drama                                   |
|5    

In [0]:
ratings_df = spark.read.format("csv").option("header","False").option("sep","::").load("/FileStore/tables/ratings.dat").toDF("user_id","movie_id","rating","timestamp")
ratings_df.show(ratings_df.count(),False)

+-------+--------+------+----------+
|user_id|movie_id|rating|timestamp |
+-------+--------+------+----------+
|1      |1193    |5     |978300760 |
|1      |661     |3     |978302109 |
|1      |914     |3     |978301968 |
|1      |3408    |4     |978300275 |
|1      |2355    |5     |978824291 |
|1      |1197    |3     |978302268 |
|1      |1287    |5     |978302039 |
|1      |2804    |5     |978300719 |
|1      |594     |4     |978302268 |
|1      |919     |4     |978301368 |
|1      |595     |5     |978824268 |
|1      |938     |4     |978301752 |
|1      |2398    |4     |978302281 |
|1      |2918    |4     |978302124 |
|1      |1035    |5     |978301753 |
|1      |2791    |4     |978302188 |
|1      |2687    |3     |978824268 |
|1      |2018    |4     |978301777 |
|1      |3105    |5     |978301713 |
|1      |2797    |4     |978302039 |
|1      |2321    |3     |978302205 |
|1      |720     |3     |978300760 |
|1      |1270    |5     |978300055 |
|1      |527     |5     |978824195 |
|

In [0]:
#1.) Find the list of the oldest released movies.
from pyspark.sql import functions as f
def get_year(str):
    word=str.split(' ')[-1] 
    return word
year_udf=udf(get_year)
    
Movies_df=Movies_df.withColumn("Year",f.regexp_replace(year_udf(f.col('title')), '[\\(\\)]', '').cast("int"))
Movies_df.show()

+--------+--------------------+--------------------+----+
|movie_id|               title|               genre|Year|
+--------+--------------------+--------------------+----+
|       1|    Toy Story (1995)|Animation|Childre...|1995|
|       2|      Jumanji (1995)|Adventure|Childre...|1995|
|       3|Grumpier Old Men ...|      Comedy|Romance|1995|
|       4|Waiting to Exhale...|        Comedy|Drama|1995|
|       5|Father of the Bri...|              Comedy|1995|
|       6|         Heat (1995)|Action|Crime|Thri...|1995|
|       7|      Sabrina (1995)|      Comedy|Romance|1995|
|       8| Tom and Huck (1995)|Adventure|Children's|1995|
|       9| Sudden Death (1995)|              Action|1995|
|      10|    GoldenEye (1995)|Action|Adventure|...|1995|
|      11|American Presiden...|Comedy|Drama|Romance|1995|
|      12|Dracula: Dead and...|       Comedy|Horror|1995|
|      13|        Balto (1995)|Animation|Children's|1995|
|      14|        Nixon (1995)|               Drama|1995|
|      15|Cutt

In [0]:
#2.) Create datalake for the tables in movielens.
Movies_df.createOrReplaceTempView("movies")
ratings_df.createOrReplaceTempView("ratings")
 
min_year=spark.sql("select min(year) as year from movies")
min_year.show()
min_year_movies=min_year.join(Movies_df,"year","inner")
min_year_movies.show(min_year_movies.count(),False) 

+----+
|year|
+----+
|1919|
+----+

+----+--------+-----------------------------------------------------------+---------------+
|year|movie_id|title                                                      |genre          |
+----+--------+-----------------------------------------------------------+---------------+
|1919|2821    |Male and Female (1919)                                     |Adventure|Drama|
|1919|2823    |Spiders, The (Die Spinnen, 1. Teil: Der Goldene See) (1919)|Action|Drama   |
|1919|3132    |Daddy Long Legs (1919)                                     |Comedy         |
+----+--------+-----------------------------------------------------------+---------------+



In [0]:

#3.) How many movies are released each year?
from pyspark.sql.functions import *
df=Movies_df.groupBy('year').count().select('year',f.col('count').alias("movie_Count")).orderBy(f.asc('year')).show()

+----+-----------+
|year|movie_Count|
+----+-----------+
|null|          1|
|1919|          3|
|1920|          2|
|1921|          1|
|1922|          2|
|1923|          3|
|1925|          6|
|1926|          8|
|1927|          6|
|1928|          3|
|1929|          3|
|1930|          7|
|1931|          7|
|1932|          7|
|1933|          7|
|1934|          7|
|1935|          6|
|1936|          8|
|1937|         11|
|1938|          6|
+----+-----------+
only showing top 20 rows



In [0]:

#4) How many number of movies are there for each rating?
df=Movies_df.join(ratings_df,'movie_id','inner')
#df.show()
df.groupBy('rating').agg(f.count('movie_id')).sort(f.asc('rating')).show()

+------+---------------+
|rating|count(movie_id)|
+------+---------------+
|     1|          56174|
|     2|         107557|
|     3|         261197|
|     4|         348971|
|     5|         226310|
+------+---------------+



In [0]:
# 5.) For each movie,
#a.) How many users rated? please refer the file (movies.dat and ratings.dat)
   
ratings_df.distinct().count()	

Out[28]: 1000209

In [0]:
#b.) What is the total rating?   for given each movie add all of the rating
df=ratings_df.groupBy('movie_id').agg(f.sum('rating')).sort(f.asc('movie_id'))
df=df.join(Movies_df,'movie_id','inner')
df.show()

+--------+-----------+--------------------+--------------------+----+
|movie_id|sum(rating)|               title|               genre|Year|
+--------+-----------+--------------------+--------------------+----+
|    2294|     2247.0|         Antz (1998)|Animation|Children's|1998|
|    1090|     4675.0|      Platoon (1986)|           Drama|War|1986|
|     296|     9288.0| Pulp Fiction (1994)|         Crime|Drama|1994|
|    2136|      699.0|Nutty Professor, ...|              Comedy|1963|
|    3210|     3400.0|Fast Times at Rid...|              Comedy|1982|
|     467|      173.0|Live Nude Girls (...|              Comedy|1995|
|    2088|     1222.0|       Popeye (1980)|Adventure|Comedy|...|1980|
|     691|      375.0|Mrs. Winterbourne...|      Comedy|Romance|1996|
|     829|      300.0|Joe's Apartment (...|      Comedy|Musical|1996|
|    2162|      427.0|NeverEnding Story...|Adventure|Childre...|1990|
|    3414|      163.0|Love Is a Many-Sp...|             Romance|1955|
|    2069|      530.

In [0]:
#c)What is the average rating? (for particular add all of the rating / total number of user rated)
#ratings_df.show()
ratings_df.groupBy('movie_id').agg(f.avg('rating')).select('movie_id',f.round('avg(rating)',2).alias('avg_rating')).show()

+--------+----------+
|movie_id|avg_rating|
+--------+----------+
|    2294|      3.48|
|    1090|      4.09|
|     296|      4.28|
|    2136|      3.15|
|    3210|      3.84|
|     467|       3.2|
|    2088|      2.59|
|     691|       3.1|
|     829|      2.29|
|    2162|      2.41|
|    3414|      3.26|
|    2069|      3.79|
|    3606|      3.94|
|    2904|      3.76|
|    1572|      3.73|
|    1372|      3.41|
|    1394|      4.02|
|     800|      4.08|
|    3826|      2.54|
|    1669|      3.46|
+--------+----------+
only showing top 20 rows



In [0]:
# OR
# We can do the same using : Avg_rating ----> (for particular add all of the rating / total number of user rated)
df=ratings_df.groupBy('movie_id').agg(f.sum('rating')).sort(f.asc('movie_id'))
df2=ratings_df.groupBy('movie_id').agg(f.count('user_id')).sort(f.asc('movie_id'))
df3=df.join(df2,'movie_id','inner')
df3=df3.withColumn("avg_rating",f.round(f.col('sum(rating)')/f.col('count(user_id)'),2))
df3.select('movie_id','avg_rating').show()

+--------+----------+
|movie_id|avg_rating|
+--------+----------+
|    2294|      3.48|
|    1090|      4.09|
|     296|      4.28|
|    2136|      3.15|
|    3210|      3.84|
|     467|       3.2|
|    2088|      2.59|
|     691|       3.1|
|     829|      2.29|
|    2162|      2.41|
|    3414|      3.26|
|    2069|      3.79|
|    3606|      3.94|
|    2904|      3.76|
|    1572|      3.73|
|    1372|      3.41|
|    1394|      4.02|
|     800|      4.08|
|    3826|      2.54|
|    1669|      3.46|
+--------+----------+
only showing top 20 rows

