In [1]:
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder \
    .appName("Spark with Hive") \
    .enableHiveSupport() \
    .getOrCreate()

23/12/30 09:53:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
movies_hdfs_path="/ass2/movies"
ratings_hdfs_path="/ass2/ratings"
tags_hdfs_path="/ass2/tags"

In [4]:
df1 = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(movies_hdfs_path)

                                                                                

In [5]:
df2 = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(ratings_hdfs_path)

                                                                                

In [6]:
df3 = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(tags_hdfs_path)

                                                                                

In [7]:
df1.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [8]:
df2.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [9]:
df3.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [10]:
gbdf=df2.groupBy('UserId').agg(count('*').alias('movieId'))
gbdf.show(5)

[Stage 9:>                                                          (0 + 1) / 1]

+------+-------+
|UserId|movieId|
+------+-------+
|   148|     48|
|   463|     33|
|   471|     28|
|   496|     29|
|   243|     36|
+------+-------+
only showing top 5 rows



                                                                                

In [11]:
df2=df2.withColumn("timestamp", from_unixtime(col("timestamp")))
df2.show(5)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      1|   4.0|2000-07-30 18:45:03|
|     1|      3|   4.0|2000-07-30 18:20:47|
|     1|      6|   4.0|2000-07-30 18:37:04|
|     1|     47|   5.0|2000-07-30 19:03:35|
|     1|     50|   5.0|2000-07-30 18:48:51|
+------+-------+------+-------------------+
only showing top 5 rows



In [12]:
df3=df3.withColumn("timestamp", from_unixtime(col("timestamp")))
df3.show(5)

+------+-------+---------------+-------------------+
|userId|movieId|            tag|          timestamp|
+------+-------+---------------+-------------------+
|     2|  60756|          funny|2015-10-24 19:29:54|
|     2|  60756|Highly quotable|2015-10-24 19:29:56|
|     2|  60756|   will ferrell|2015-10-24 19:29:52|
|     2|  89774|   Boxing story|2015-10-24 19:33:27|
|     2|  89774|            MMA|2015-10-24 19:33:20|
+------+-------+---------------+-------------------+
only showing top 5 rows



1. Show the aggregated number of ratings per year

In [13]:
ratings_per_year=df2.groupBy(substring("timestamp", 0, 4).alias("year")).agg(count("*").alias("count"))
ratings_per_year.show(5)

[Stage 14:>                                                         (0 + 1) / 1]

+----+-----+
|year|count|
+----+-----+
|2016| 6703|
|2012| 4656|
|2017| 8198|
|2014| 1439|
|2013| 1664|
+----+-----+
only showing top 5 rows



                                                                                

2. Avg monthly ratings

In [14]:
ratings_per_month=df2.groupBy(substring("timestamp", 6, 2).alias("month")).agg(count("*").alias("count")).orderBy("month")
ratings_per_month.show()

[Stage 17:>                                                         (0 + 1) / 1]

+-----+-----+
|month|count|
+-----+-----+
|   01| 8684|
|   02| 7635|
|   03| 8880|
|   04| 7727|
|   05|10883|
|   06| 8825|
|   07| 6950|
|   08| 9074|
|   09| 8510|
|   10| 7148|
|   11| 9676|
|   12| 6844|
+-----+-----+



                                                                                

3. Ratings Distribution

In [15]:
ratings_distribution=df2.groupBy("rating").agg(count("*").alias("count")).orderBy("rating")
ratings_distribution.show()

+------+-----+
|rating|count|
+------+-----+
|   0.5| 1370|
|   1.0| 2811|
|   1.5| 1791|
|   2.0| 7551|
|   2.5| 5550|
|   3.0|20047|
|   3.5|13136|
|   4.0|26818|
|   4.5| 8551|
|   5.0|13211|
+------+-----+



4. Tags but no ratings

In [16]:
df5=df1.join(df3,df3.movieId == df1.movieId, 'inner').drop(df3.movieId)
df6=df5.join(df2, df5.movieId == df2.movieId, 'left').drop(df5.movieId)
df7=df6.filter(col("rating").isNull())
df7.select("title").distinct().show()

+--------------------+
|               title|
+--------------------+
|Mutiny on the Bou...|
|Call Northside 77...|
|Color of Paradise...|
|For All Mankind (...|
|Browning Version,...|
|I Know Where I'm ...|
|        Proof (1991)|
|Twentieth Century...|
|Innocents, The (1...|
|In the Realms of ...|
|Parallax View, Th...|
|Road Home, The (W...|
|Roaring Twenties,...|
|  Chalet Girl (2011)|
|      Scrooge (1970)|
|      Niagara (1953)|
|  Chosen, The (1981)|
|This Gun for Hire...|
+--------------------+



5. Ratings but no tag

In [17]:
df5=df1.join(df2,df1.movieId == df2.movieId, 'inner').drop(df2.movieId)
df6=df5.join(df3, df5.movieId == df3.movieId, 'left').drop(df5.movieId)
df7=df6.filter(col("tag").isNull())
print(df7.select("title").count())
df7.select("title").distinct().show()

52549


[Stage 35:>                                                         (0 + 1) / 1]

+--------------------+
|               title|
+--------------------+
|Gulliver's Travel...|
|Before Night Fall...|
| Three Wishes (1995)|
| If Lucy Fell (1996)|
|First Blood (Ramb...|
|Don't Tell Mom th...|
| Nut Job, The (2014)|
|22 Jump Street (2...|
|Starship Troopers...|
|Voices from the L...|
|My Father the Her...|
|    Dead Meat (2004)|
|National Lampoon'...|
|7th Voyage of Sin...|
|     Ip Man 3 (2015)|
| Just Friends (2005)|
|I Love You Philli...|
|Tom Segura: Disgr...|
|    Fair Game (1995)|
|Problem Child (1990)|
+--------------------+
only showing top 20 rows



                                                                                

6. Top 10 untagged movies

In [20]:
innerj=df1.join(df2,df1.movieId==df2.movieId).drop(df2.movieId)
leftj=innerj.join(df3,innerj.movieId==df3.movieId,"left").drop(df3.movieId)
filterdf=leftj.filter(col("tag").isNull())
gb=filterdf.groupBy("movieId").agg(first("title").alias("Title"),count("*").alias("Count"),avg("rating").alias("Average")).orderBy("Average",ascending=False)
gb=gb.filter(col("Count")>30)
gb.show(10)

[Stage 40:>                                                         (0 + 1) / 1]

+-------+--------------------+-----+-----------------+
|movieId|               Title|Count|          Average|
+-------+--------------------+-----+-----------------+
|   3275|Boondock Saints, ...|   43| 4.22093023255814|
|   1199|       Brazil (1985)|   59|4.177966101694915|
|   1172|Cinema Paradiso (...|   34|4.161764705882353|
|   4011|       Snatch (2000)|   93|4.155913978494624|
|   3681|For a Few Dollars...|   33|4.151515151515151|
|  44555|Lives of Others, ...|   34|4.117647058823529|
|  78499|  Toy Story 3 (2010)|   55|4.109090909090909|
|   1673|Boogie Nights (1997)|   39|4.076923076923077|
|   2858|American Beauty (...|  204|4.056372549019608|
|   2542|Lock, Stock & Two...|   67|4.052238805970149|
+-------+--------------------+-----+-----------------+
only showing top 10 rows



                                                                                

7. Tags per movies and tags per users

In [21]:
tagspermovie=df3.groupBy("movieId").agg(count("tag").alias("Tagspermovie"))
tagsperuser=df3.groupBy("userId").agg(count("tag").alias("Tagsperuser"))
tagspermovie.select(avg("Tagspermovie")).show()
tagsperuser.select(avg("Tagsperuser")).show()

+------------------+
| avg(Tagspermovie)|
+------------------+
|2.3428753180661577|
+------------------+

+----------------+
|avg(Tagsperuser)|
+----------------+
|            63.5|
+----------------+



8. Users that tagged but not rated

In [22]:
qwerty=df2.join(df3,(df2.userId==df3.userId) & (df2.movieId==df3.movieId),"right")
qwerty2=qwerty.filter(df2.rating.isNull()).select(df3.userId).distinct()
print(qwerty2.count())
qwerty2.show()

17
+------+
|userId|
+------+
|   193|
|   606|
|   336|
|   435|
|   474|
|   318|
|   537|
|   543|
|   288|
|   289|
|   477|
|    21|
|   341|
|   600|
|   424|
|   119|
|   573|
+------+



9. Ratings per user, Ratings per movie

In [23]:
ratings_per_user=df2.select('rating').count()/df2.select('userId').distinct().count()
ratings_per_movie=df2.select('rating').count()/df2.select('movieId').distinct().count()
print(ratings_per_user)
print(ratings_per_movie)

165.30491803278687
10.369806663924312


10. Pre dominant genre per rating level

In [58]:
from pyspark.sql.window import Window

joined_df=df2.join(df1, df2.movieId == df1.movieId, 'inner')
exploded_df = joined_df.withColumn("genre", explode(split("genres", "\|")))
grouped_df = exploded_df.groupBy("rating", "genre").agg(count('*').alias('count'))

window=Window.partitionBy("rating").orderBy(col("count").desc())
ranked_df=grouped_df.withColumn("rank", rank().over(window)).filter(col("rank") == 1).orderBy(col("rating").desc())

ranked_df.select("rating", "genre").show()

+------+------+
|rating| genre|
+------+------+
|   5.0| Drama|
|   4.5| Drama|
|   4.0| Drama|
|   3.5| Drama|
|   3.0|Comedy|
|   2.5|Comedy|
|   2.0|Comedy|
|   1.5|Comedy|
|   1.0|Comedy|
|   0.5|Comedy|
+------+------+



11. Pre dominant tag per genre, Most tagged genres

In [74]:
from pyspark.sql.window import Window

joinedd_df=df3.join(df1, df3.movieId == df1.movieId, 'inner')
exploded_df = joinedd_df.withColumn("genre", explode(split("genres", "\|")))
grouped_df = exploded_df.groupBy("genre",'tag').agg(count('*').alias('count'))

window=Window.partitionBy("genre").orderBy(col("count").desc())
ranked_df=grouped_df.withColumn("rank", rank().over(window)).filter(col("rank") == 1)
#exploded_df.show()
ranked_df.select("genre",'tag').show()

+------------------+------------------+
|             genre|               tag|
+------------------+------------------+
|(no genres listed)|            quirky|
|(no genres listed)|       understated|
|(no genres listed)|             sweet|
|            Action|         superhero|
|         Adventure|         superhero|
|         Animation|            Disney|
|          Children|            Disney|
|            Comedy|  In Netflix queue|
|             Crime|  In Netflix queue|
|       Documentary|  In Netflix queue|
|             Drama|  In Netflix queue|
|           Fantasy|            Disney|
|         Film-Noir|  In Netflix queue|
|            Horror|      Stephen King|
|            Horror|            ghosts|
|              IMAX|visually appealing|
|              IMAX|            sci-fi|
|              IMAX| thought-provoking|
|           Musical|            Disney|
|           Mystery|      twist ending|
+------------------+------------------+
only showing top 20 rows



12. Top 10 movies

In [25]:
innerj=df1.join(df2,df1.movieId==df2.movieId).drop(df2.movieId)
gb=innerj.groupBy("movieId").agg(first("title").alias("Title"),count("*").alias("Count"),avg("rating").alias("Average")).orderBy("Average",ascending=False)
gb=gb.filter(col("Count")>30)
gb.show(10)

[Stage 85:>                                                         (0 + 1) / 1]

+-------+--------------------+-----+-----------------+
|movieId|               Title|Count|          Average|
+-------+--------------------+-----+-----------------+
|    318|Shawshank Redempt...|  317|4.429022082018927|
|   1204|Lawrence of Arabi...|   45|              4.3|
|    858|Godfather, The (1...|  192|        4.2890625|
|   2959|   Fight Club (1999)|  218|4.272935779816514|
|   1276|Cool Hand Luke (1...|   57|4.271929824561403|
|    750|Dr. Strangelove o...|   97|4.268041237113402|
|    904|  Rear Window (1954)|   84|4.261904761904762|
|   1221|Godfather: Part I...|  129| 4.25968992248062|
|  48516|Departed, The (2006)|  107|4.252336448598131|
|   1213|   Goodfellas (1990)|  126|             4.25|
+-------+--------------------+-----+-----------------+
only showing top 10 rows



                                                                                