# Question 1

## Initiate Spark

In [1]:
from pyspark.sql import SparkSession
from sparkmeasure import StageMetrics
from pyspark.sql.functions import lower

# Create a new Spark Session
spark = SparkSession \
    .builder \
    .appName("Movies") \
    .config("spark.jars", "spark-measure_2.11-0.17.jar") \
    .getOrCreate()

# Create spark metrics object
stagemetrics = StageMetrics(spark)

In [2]:
spark

## Load Datasets To Dataframes

### Movie Dataframe

In [47]:
movie_df = (spark.read
            .format("csv")
            .option("header", "true")
            .option("delimiter", ",")
            .option("inferSchema", "true")
            .load("datasets/movie.csv")
           )

In [4]:
movie_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



### Rating Dataframe

In [50]:
rating_df = (spark.read
            .format("csv")
            .option("header", "true")
            .option("delimiter", ",")
            .option("inferSchema", "true")
            .load("datasets/rating.csv")
           )

In [51]:
rating_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [6]:
rating_df.show(5)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
+------+-------+------+-------------------+
only showing top 5 rows



### Tag Dataframe

In [48]:
tag_df = (spark.read
            .format("csv")
            .option("header", "true")
            .option("delimiter", ",")
            .option("inferSchema", "true") 
            .load("datasets/tag.csv")
           )

In [8]:
tag_df.show(5)

+------+-------+-------------+-------------------+
|userId|movieId|          tag|          timestamp|
+------+-------+-------------+-------------------+
|    18|   4141|  Mark Waters|2009-04-24 18:19:40|
|    65|    208|    dark hero|2013-05-10 01:41:18|
|    65|    353|    dark hero|2013-05-10 01:41:19|
|    65|    521|noir thriller|2013-05-10 01:39:43|
|    65|    592|    dark hero|2013-05-10 01:41:18|
+------+-------+-------------+-------------------+
only showing top 5 rows



### Genome Tags Dataframe

In [49]:
genome_tags_df = (spark.read
            .format("csv")
            .option("header", "true")
            .option("delimiter", ",")
            .option("inferSchema", "true") 
            .load("datasets/genome_tags.csv")
           )

In [10]:
genome_tags_df.show(5)

+-----+------------+
|tagId|         tag|
+-----+------------+
|    1|         007|
|    2|007 (series)|
|    3|18th century|
|    4|       1920s|
|    5|       1930s|
+-----+------------+
only showing top 5 rows



## Queries

### Query 1

In [11]:
# Start measuring performance
stagemetrics.begin()

# Get the id of the movie "Jumanji"
jumanji_id = movie_df.filter(movie_df.title.contains("Jumanji")) \
            .select("movieId") \
            .collect()[0]["movieId"]

# Get the number of users that watched "Jumanji"
watched_jumanji = rating_df.filter(rating_df["movieId"] == jumanji_id) \
                 .count()

# Stop measuring performance
stagemetrics.end()

# Print performance metrics
print(stagemetrics.report().split('\n')[6])

elapsedTime => 36913 (37 s)


In [12]:
watched_jumanji

22243

### Query 2

In [52]:
# Start measuring performance
stagemetrics.begin()

# Get the movieIds with tags containing the word "boring"
unique_boring_movieIds = tag_df.filter(lower(tag_df["tag"]).contains("boring")) \
                        .select("movieId") \
                        .dropDuplicates()

# Get the corresponding movie titles from movieIds in alphabetical order
unique_boring_movie_titles = unique_boring_movieIds \
                            .join(movie_df, "movieId", "inner") \
                            .select(movie_df.title) \
                            .sort(movie_df.title)

# Stop measuring performance
stagemetrics.end()

# Print performance metrics
stagemetrics.print_report()
#print(stagemetrics.report().split('\n')[6])


Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
 no data to report 


In [53]:
unique_boring_movie_titles.show(5)

+--------------------+
|               title|
+--------------------+
|(500) Days of Sum...|
|101 Reykjavik (10...|
|12 Years a Slave ...|
|         1408 (2007)|
|1492: Conquest of...|
+--------------------+
only showing top 5 rows



### Query 3

In [57]:
# Start measuring performance
stagemetrics.begin()

# Get the userIds and movieIds with tags containing the word "bollywood"
bollywood_userIds_movieIds = tag_df.filter(lower(tag_df["tag"]).contains("bollywood")) \
                                   .select(["userId", "movieId", "tag"])

# Get all userIds and movieIds with rating above 3
above_3_rating = rating_df.filter(rating_df.rating > 3) \
                          .select(["userId", "movieId", "rating"])

# Inner join based on unique combination of userId and movieId
query_3_result = bollywood_userIds_movieIds \
                .join(above_3_rating, ["userId", "movieId"], "inner") \
                .select("userId") \
                .dropDuplicates() \
                .sort(above_3_rating.userId)

# Stop measuring performance
stagemetrics.end()

# Print performance metrics
stagemetrics.print_report()
#print(stagemetrics.report().split('\n')[6])


Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
 no data to report 


In [58]:
query_3_result.show(5)

+------+
|userId|
+------+
| 10573|
| 19837|
| 23333|
| 25004|
| 31338|
+------+
only showing top 5 rows



 ### Query 4

In [153]:
from pyspark.sql.functions import lower, year
from pyspark.sql.functions import rank, col, row_number
from pyspark.sql.window import Window

# Start measuring performance
stagemetrics.begin()

# Group by year and movieId and find the mean value of rating
grouped_data = rating_df.withColumn("year", year(rating_df["timestamp"])) \
              .groupBy(["year", "movieId"]) \
              .agg({"rating": "mean"})
              
# Create a window to limit ratings
window = Window.partitionBy(grouped_data["year"]) \
        .orderBy(grouped_data["avg(rating)"] \
        .desc())

# Limit the ratings in each group
grouped_data_limited = grouped_data.select('*', row_number().over(window).alias('row_number')) \
                      .filter(col('row_number') <= 10) \
                      .sort(["year", "avg(rating)"], ascending = [True, False])

# Get the movie titles
query_4_result = grouped_data_limited.join(movie_df, "movieId", "inner") \
                .select(["year", "title", "avg(rating)"])

# Show results
query_4_result.show()

# Stop measuring performance
stagemetrics.end()

# Print performance metrics
print(stagemetrics.report().split('\n')[6])

+----+--------------------+-----------+
|year|               title|avg(rating)|
+----+--------------------+-----------+
|2005|Not Love, Just Fr...|        5.0|
|2005|Life Is Rosy (a.k...|        5.0|
|2005|Paris Was a Woman...|        5.0|
|2005|Married to It (1991)|        5.0|
|2005|Too Much Sleep (1...|        5.0|
|2005|Fear Strikes Out ...|        5.0|
|2005|Before the Fall (...|        5.0|
|2005|Gate of Heavenly ...|        5.0|
|2005|Take Care of My C...|        5.0|
|2005|   Dancemaker (1998)|        5.0|
+----+--------------------+-----------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 7
numTasks => 609
elapsedTime => 123180 (2.1 min)
stageDuration => 122907 (2.0 min)
executorRunTime => 353706 (5.9 min)
executorCpuTime => 328093 (5.5 min)
executorDeserializeTime => 2009 (2 s)
executorDeserializeCpuTime => 1683 (2 s)
resultSerializationTime => 20 (20 ms)
jvmGCTime => 5703 (6 s)
shuffleFetchWaitTime => 0

In [154]:
query_4_result.filter(query_4_result.year == 2005).show(truncate=50)

+----+--------------------------------------------------+-----------+
|year|                                             title|avg(rating)|
+----+--------------------------------------------------+-----------+
|2005|Not Love, Just Frenzy (Más que amor, frenesí) (...|        5.0|
|2005|Life Is Rosy (a.k.a. Life Is Beautiful) (Vie es...|        5.0|
|2005|                          Paris Was a Woman (1995)|        5.0|
|2005|                              Married to It (1991)|        5.0|
|2005|                             Too Much Sleep (1997)|        5.0|
|2005|                           Fear Strikes Out (1957)|        5.0|
|2005|Before the Fall (NaPolA - Elite für den Führer)...|        5.0|
|2005|                Gate of Heavenly Peace, The (1995)|        5.0|
|2005| Take Care of My Cat (Goyangileul butaghae) (2001)|        5.0|
|2005|                                 Dancemaker (1998)|        5.0|
+----+--------------------------------------------------+-----------+



### Query 5

In [172]:
from pyspark.sql.functions import *

# Start measuring performance
stagemetrics.begin()

# Get the id and title of 2015 movies
movies_2015 = movie_df.filter(movie_df.title.contains("(2015)")) \
             .select(["movieId", "title"])

# Get the tags of 2015 movies
joined = movies_2015.join(tag_df, "movieId", "inner") \
                    .select(["title", "tag"]) \
                    .sort("title")

# Group tags by movie title and concatenate them
query_5_result = joined.groupby("title") \
                .agg(concat_ws(", ", collect_list(joined.tag)).alias("tags"))

# Show results
query_5_result.show(truncate=50)

# Stop measuring performance
stagemetrics.end()

# Print performance metrics
print(stagemetrics.report().split('\n')[6])

+--------------------------------------------------+--------------------------------------------------+
|                                             title|                                              tags|
+--------------------------------------------------+--------------------------------------------------+
|                           A Grain of Truth (2015)|Borys Lankosz, Abel Korzeniowski, Borys Lankosz...|
|                        A Walk in the Woods (2015)|                                        Ken Kwapis|
|                               Advantageous (2015)|                                    Jennifer Phang|
|                        As We Were Dreaming (2015)|                                   based on a book|
|                            Average Italian (2015)|                           Marcello Macchia, drugs|
|                     Beaver Trilogy Part IV (2015)|                       Brad Besser, movie business|
|                                   Blackhat (2015)|            

In [174]:
query_5_result.show(5, truncate=50)

+--------------------------+--------------------------------------------------+
|                     title|                                              tags|
+--------------------------+--------------------------------------------------+
|   A Grain of Truth (2015)|Borys Lankosz, Abel Korzeniowski, Borys Lankosz...|
|A Walk in the Woods (2015)|                                        Ken Kwapis|
|       Advantageous (2015)|                                    Jennifer Phang|
|As We Were Dreaming (2015)|                                   based on a book|
|    Average Italian (2015)|                           Marcello Macchia, drugs|
+--------------------------+--------------------------------------------------+
only showing top 5 rows



### Query 6