In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T 
import pyspark.sql.functions as F


spark = SparkSession \
    .builder \
    .appName("Exercise Solutions") \
    .getOrCreate()

# SQL Movie-Rating Query Exercises

You've started a new movie-rating website, and you've been collecting data on reviewers' ratings of various movies. 

There's not much data yet, but you can still try out some interesting queries. Here's the schema:

Movie ( mID, title, year, director )
English: There is a movie with ID number *mID*, a *title*, a release *year*, and a *director*.

Reviewer ( rID, name )
English: The reviewer with ID number *rID* has a certain *name*.

Rating ( rID, mID, stars, ratingDate )
English: The reviewer *rID* gave the movie *mID* a number of *stars* rating (1-5) on a certain *ratingDate.m*

Your queries will run over a small data set conforming to the schema.

### Movie 

In [2]:
movie_df = spark.read.csv('./data/rating/movie.csv', header=True)
movie_df.show(truncate=False)

+---+-----------------------+----+----------------+
|mid|title                  |year|director        |
+---+-----------------------+----+----------------+
|101|Gone with the Wind     |1939|Victor Fleming  |
|102|Star Wars              |1977|George Lucas    |
|103|The Sound of Music     |1965|Robert Wise     |
|104|E.T.                   |1982|Steven Spielberg|
|105|Titanic                |1997|James Cameron   |
|106|Snow White             |1937|null            |
|107|Avatar                 |2009|James Cameron   |
|108|Raiders of the Lost Ark|1981|Steven Spielberg|
+---+-----------------------+----+----------------+



### Reviewer

In [3]:
reviewer_df = spark.read.csv('./data/rating/reviewer.csv', header=True)
reviewer_df.show(truncate=False)

+---+----------------+
|rid|name            |
+---+----------------+
|201|Sarah Martinez  |
|202|Daniel Lewis    |
|203|Brittany Harris |
|204|Mike Anderson   |
|205|Chris Jackson   |
|206|Elizabeth Thomas|
|207|James Cameron   |
|208|Ashley White    |
+---+----------------+



### Rating

In [4]:
rating_df = spark.read.csv('./data/rating/rating.csv', header=True)
rating_df.show(truncate=False)

+---+---+-----+----------+
|rid|mid|stars|ratingdate|
+---+---+-----+----------+
|201|101|2    |2011-01-22|
|201|101|4    |2011-01-27|
|202|106|4    |null      |
|203|103|2    |2011-01-20|
|203|108|4    |2011-01-12|
|203|108|2    |2011-01-30|
|204|101|3    |2011-01-09|
|205|103|3    |2011-01-27|
|205|104|2    |2011-01-22|
|205|108|4    |null      |
|206|107|3    |2011-01-15|
|206|106|5    |2011-01-19|
|207|107|5    |2011-01-20|
|208|104|3    |2011-01-02|
+---+---+-----+----------+



## Q1
Find the titles of all movies directed by Steven Spielberg.

In [5]:
movie_df \
    .select('title') \
    .where(F.col('director') == 'Steven Spielberg') \
    .show(truncate=False)

+-----------------------+
|title                  |
+-----------------------+
|E.T.                   |
|Raiders of the Lost Ark|
+-----------------------+



## Q2
Find all years that have a movie that received a rating of 4 or 5, and sort them in increasing order.

In [6]:
movie_df \
    .join(rating_df, on='mid') \
    .select('year') \
    .where(F.col('stars').isin(4, 5)) \
    .distinct() \
    .orderBy(F.col('year')) \
    .show(truncate=False)

+----+
|year|
+----+
|1937|
|1939|
|1981|
|2009|
+----+



## Q3
Find the titles of all movies that have no ratings.

In [7]:
movie_df \
        .join(rating_df, on='mid') \
        .select('title') \
        .where(F.col('stars').isNull()) \
        .show(truncate=False)

+-----+
|title|
+-----+
+-----+



## Q4
Some reviewers didn't provide a date with their rating. Find the names of all reviewers who have ratings with a NULL value for the date.

In [8]:
rating_df \
    .join(reviewer_df, on='rid') \
    .select('name') \
    .where(F.col('ratingdate').isNull()) \
    .distinct() \
    .show(truncate=False)

+-------------+
|name         |
+-------------+
|Chris Jackson|
|Daniel Lewis |
+-------------+



## Q5
Write a query to return the ratings data in a more readable format: reviewer name, movie title, stars, and ratingDate. Also, sort the data, first by reviewer name, then by movie title, and lastly by number of stars.

In [9]:
movie_df \
    .join(rating_df, on='mid') \
    .join(reviewer_df, on='rid') \
    .select('name', 'title', 'stars', 'ratingdate') \
    .orderBy(['name', 'title', 'stars']) \
    .show(truncate=False)

+----------------+-----------------------+-----+----------+
|name            |title                  |stars|ratingdate|
+----------------+-----------------------+-----+----------+
|Ashley White    |E.T.                   |3    |2011-01-02|
|Brittany Harris |Raiders of the Lost Ark|2    |2011-01-30|
|Brittany Harris |Raiders of the Lost Ark|4    |2011-01-12|
|Brittany Harris |The Sound of Music     |2    |2011-01-20|
|Chris Jackson   |E.T.                   |2    |2011-01-22|
|Chris Jackson   |Raiders of the Lost Ark|4    |null      |
|Chris Jackson   |The Sound of Music     |3    |2011-01-27|
|Daniel Lewis    |Snow White             |4    |null      |
|Elizabeth Thomas|Avatar                 |3    |2011-01-15|
|Elizabeth Thomas|Snow White             |5    |2011-01-19|
|James Cameron   |Avatar                 |5    |2011-01-20|
|Mike Anderson   |Gone with the Wind     |3    |2011-01-09|
|Sarah Martinez  |Gone with the Wind     |2    |2011-01-22|
|Sarah Martinez  |Gone with the Wind    

## Q6
For all cases where the same reviewer rated the same movie twice and gave it a higher rating the second time, return the reviewer's name and the title of the movie.

In [10]:
r = rating_df.alias('r')
r1 = rating_df.alias('r1')
r \
    .join(r1, on=['rid', 'mid']) \
    .join(reviewer_df, on='rid') \
    .join(movie_df, on='mid') \
    .where(
        (F.col('r.ratingdate') < F.col('r1.ratingdate')) &
        (F.col('r.stars') < F.col('r1.stars'))
        ) \
    .select('name', 'title') \
    .show(truncate=False)

+--------------+------------------+
|name          |title             |
+--------------+------------------+
|Sarah Martinez|Gone with the Wind|
+--------------+------------------+



## Q7
For each movie that has at least one rating, find the highest number of stars that movie received. Return the movie title and number of stars. Sort by movie title.

In [11]:
movie_df \
    .join(rating_df, on='mid') \
    .groupBy('title') \
    .agg(F.max('stars')) \
    .show(truncate=False)

+-----------------------+----------+
|title                  |max(stars)|
+-----------------------+----------+
|Avatar                 |5         |
|E.T.                   |3         |
|Gone with the Wind     |4         |
|Raiders of the Lost Ark|4         |
|Snow White             |5         |
|The Sound of Music     |3         |
+-----------------------+----------+



  ## Q8
For each movie, return the title and the 'rating spread', that is, the difference between highest and lowest ratings given to that movie. Sort by rating spread from highest to lowest, then by movie title.



In [12]:
movie_df \
    .join(rating_df, on='mid') \
    .groupBy('title') \
    .agg(F.max('stars').alias('max_star'), F.min('stars').alias('min_star')) \
    .select('title', F.col('max_star') - F.col('min_star')) \
    .show(truncate=False)

+-----------------------+---------------------+
|title                  |(max_star - min_star)|
+-----------------------+---------------------+
|Avatar                 |2.0                  |
|E.T.                   |1.0                  |
|Gone with the Wind     |2.0                  |
|Raiders of the Lost Ark|2.0                  |
|Snow White             |1.0                  |
|The Sound of Music     |1.0                  |
+-----------------------+---------------------+



## Q9
Find the difference between the average rating of movies released before 1980 and the average rating of movies released after 1980. (Make sure to calculate the average rating for each movie, then the average of those averages for movies before 1980 and movies after. Don't just calculate the overall average rating before and after 1980.)

In [13]:
movie_df \
    .join(rating_df, on='mid') \
    .groupBy('title', 'year') \
    .agg(F.avg('stars').alias('av')) \
    .withColumn('more_1980', F.when(F.col('year') > 1980, F.col('av')).otherwise(None)) \
    .withColumn('less_1980', F.when(F.col('year') < 1980, F.col('av')).otherwise(None)) \
    .drop('av') \
    .select(F.avg(F.col('less_1980')) - F.avg(F.col('more_1980'))) \
    .show(truncate=False)

+---------------------------------+
|(avg(less_1980) - avg(more_1980))|
+---------------------------------+
|0.05555555555555536              |
+---------------------------------+



# SQL Movie-Rating Query Exercises Extras

## Q1
Find the names of all reviewers who rated Gone with the Wind.

In [14]:
movie_df \
    .join(rating_df, on='mid') \
    .join(reviewer_df, on='rid') \
    .where(F.col('title') == 'Gone with the Wind') \
    .select('name') \
    .distinct() \
    .show(truncate=False)

+--------------+
|name          |
+--------------+
|Mike Anderson |
|Sarah Martinez|
+--------------+



## Q2
For any rating where the reviewer is the same as the director of the movie, return the reviewer name, movie title, and number of stars.

In [15]:
movie_df \
    .join(rating_df, on='mid') \
    .join(reviewer_df, on='rid') \
    .where(F.col('director') == F.col('name')) \
    .select('name', 'title', 'stars') \
    .distinct() \
    .show(truncate=False)

+-------------+------+-----+
|name         |title |stars|
+-------------+------+-----+
|James Cameron|Avatar|5    |
+-------------+------+-----+



## Q3
Return all reviewer names and movie names together in a single list, alphabetized. (Sorting by the first name of the reviewer and first word in the title is fine; no need for special processing on last names or removing "The".)

In [16]:
movie_titles = movie_df.select('title')
reviewer_df \
    .select('name') \
    .union(movie_titles) \
    .orderBy('name') \
    .show(truncate=False)

+-----------------------+
|name                   |
+-----------------------+
|Ashley White           |
|Avatar                 |
|Brittany Harris        |
|Chris Jackson          |
|Daniel Lewis           |
|E.T.                   |
|Elizabeth Thomas       |
|Gone with the Wind     |
|James Cameron          |
|Mike Anderson          |
|Raiders of the Lost Ark|
|Sarah Martinez         |
|Snow White             |
|Star Wars              |
|The Sound of Music     |
|Titanic                |
+-----------------------+



## Q4
Find the titles of all movies not reviewed by Chris Jackson.

In [17]:
movie_df \
    .join(rating_df, on='mid') \
    .join(reviewer_df, on='rid') \
    .where(F.col('name') != 'Chris Jackson') \
    .select('title') \
    .distinct() \
    .show(truncate=False)

+-----------------------+
|title                  |
+-----------------------+
|E.T.                   |
|Avatar                 |
|Gone with the Wind     |
|Raiders of the Lost Ark|
|The Sound of Music     |
|Snow White             |
+-----------------------+



## Q5
For all pairs of reviewers such that both reviewers gave a rating to the same movie, return the names of both reviewers. Eliminate duplicates, don't pair reviewers with themselves, and include each pair only once. For each pair, return the names in the pair in alphabetical order.

In [18]:
table_1 = reviewer_df \
                .join(rating_df, on='rid') \
                .alias('table_1')

reviewer_df.alias('table_2') \
    .join(rating_df, on='rid') \
    .join(table_1, on='mid') \
    .where(F.col('table_2.name') < F.col('table_1.name')) \
    .select('table_2.name', 'table_1.name') \
    .distinct() \
    .show(truncate=False)

+----------------+----------------+
|name            |name            |
+----------------+----------------+
|Ashley White    |Chris Jackson   |
|Mike Anderson   |Sarah Martinez  |
|Brittany Harris |Chris Jackson   |
|Daniel Lewis    |Elizabeth Thomas|
|Elizabeth Thomas|James Cameron   |
+----------------+----------------+



## Q6
For each rating that is the lowest (fewest stars) currently in the database, return the reviewer name, movie title, and number of stars.

In [19]:
min_star = rating_df.select(F.min('stars').alias('min_star'))
star = min_star.first()['min_star']

movie_df \
    .join(rating_df, on='mid') \
    .join(reviewer_df, on='rid') \
    .where(F.col('stars') == star) \
    .select('name', 'title', 'stars') \
    .show(truncate=False)

+---------------+-----------------------+-----+
|name           |title                  |stars|
+---------------+-----------------------+-----+
|Sarah Martinez |Gone with the Wind     |2    |
|Brittany Harris|The Sound of Music     |2    |
|Brittany Harris|Raiders of the Lost Ark|2    |
|Chris Jackson  |E.T.                   |2    |
+---------------+-----------------------+-----+



## Q7
List movie titles and average ratings, from highest-rated to lowest-rated. If two or more movies have the same average rating, list them in alphabetical order.

In [20]:
movie_df \
    .join(rating_df, on='mid') \
    .groupBy('mid', 'title').agg(F.avg(F.col('stars'))) \
    .select('title', 'avg(stars)') \
    .orderBy(F.col('avg(stars)').desc(), 'title') \
    .show(truncate=False)

+-----------------------+------------------+
|title                  |avg(stars)        |
+-----------------------+------------------+
|Snow White             |4.5               |
|Avatar                 |4.0               |
|Raiders of the Lost Ark|3.3333333333333335|
|Gone with the Wind     |3.0               |
|E.T.                   |2.5               |
|The Sound of Music     |2.5               |
+-----------------------+------------------+



## Q8
Find the names of all reviewers who have contributed three or more ratings. (As an extra challenge, try writing the query without HAVING or without COUNT.)

In [21]:
rating_df \
    .join(reviewer_df, on='rid') \
    .groupBy('name') \
    .agg(F.count('*').alias('count_row')) \
    .where(F.col('count_row') >= 3) \
    .select('name') \
    .show(truncate=False)

+---------------+
|name           |
+---------------+
|Brittany Harris|
|Chris Jackson  |
+---------------+



## Q9
Some directors directed more than one movie. For all such directors, return the titles of all movies directed by them, along with the director name. Sort by director name, then movie title. (As an extra challenge, try writing the query both with and without COUNT.)

In [22]:
directors = movie_df \
                .groupBy('director') \
                .agg(F.count('*').alias('count_movie')) \
                .where(F.col('count_movie') > 1) \
                .join(movie_df, on='director') \
                .select('title', 'director') \
                .orderBy('director', 'title') \
                .show()


+--------------------+----------------+
|               title|        director|
+--------------------+----------------+
|              Avatar|   James Cameron|
|             Titanic|   James Cameron|
|                E.T.|Steven Spielberg|
|Raiders of the Lo...|Steven Spielberg|
+--------------------+----------------+



## Q10
Find the movie(s) with the highest average rating. Return the movie title(s) and average rating. (Hint: This query is more difficult to write in SQLite than other systems; you might think of it as finding the highest average rating and then choosing the movie(s) with that average rating.)

In [23]:
movie_df \
    .join(rating_df, on=['mid']) \
    .groupby('mid', 'title') \
    .agg(F.avg('stars')) \
    .orderBy(F.col('avg(stars)').desc()) \
    .limit(1) \
    .join(movie_df, on=['mid', 'title']) \
    .select('title', 'avg(stars)') \
    .show()

+----------+----------+
|     title|avg(stars)|
+----------+----------+
|Snow White|       4.5|
+----------+----------+



## Q11
Find the movie(s) with the lowest average rating. Return the movie title(s) and average rating. (Hint: This query may be more difficult to write in SQLite than other systems; you might think of it as finding the lowest average rating and then choosing the movie(s) with that average rating.)

In [24]:
movie_df \
    .join(rating_df, on=['mid']) \
    .groupby('mid', 'title') \
    .agg(F.avg('stars')) \
    .orderBy(F.col('avg(stars)')) \
    .limit(1) \
    .join(movie_df, on=['mid', 'title']) \
    .select('title', 'avg(stars)') \
    .show()

+-----+----------+
|title|avg(stars)|
+-----+----------+
| E.T.|       2.5|
+-----+----------+



## Q12
For each director, return the director's name together with the title(s) of the movie(s) they directed that received the highest rating among all of their movies, and the value of that rating. Ignore movies whose director is NULL.

In [25]:
movie_df \
    .join(rating_df, on='mid') \
    .groupBy('director') \
    .agg(F.max('stars')) \
    .where(F.col('director').isNotNull()) \
    .join(movie_df, on='director') \
    .join(rating_df, on='mid') \
    .where(F.col('max(stars)') == F.col('stars')) \
    .select('director', 'title', 'stars') \
    .show()

+----------------+--------------------+-----+
|        director|               title|stars|
+----------------+--------------------+-----+
|   James Cameron|              Avatar|    5|
|     Robert Wise|  The Sound of Music|    3|
|Steven Spielberg|Raiders of the Lo...|    4|
|Steven Spielberg|Raiders of the Lo...|    4|
|  Victor Fleming|  Gone with the Wind|    4|
+----------------+--------------------+-----+

