In [None]:
import sys
import os
if os.getcwd().endswith('notebooks'):
    os.chdir("..")
print(os.getcwd())

## Imports 

In [2]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col, countDistinct
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, round, rank
from pyspark.sql.window import Window
import warnings
warnings.filterwarnings("ignore")

# Initialize pyspark

In [None]:
spark = SparkSession.builder.appName("SQL in PySpark").getOrCreate()

# Import and view the data 

In [4]:
df_info = spark.read.csv(os.path.join('data', 'movie_general.tsv'), header=True, sep='\t')
# Print the shape of the data
print(f"Dataframe has {df_info.count()} rows ---> {len(df_info.columns)} columns\n\n")
df_info.printSchema()

[Stage 1:>                                                          (0 + 4) / 4]

Dataframe has 1034420 rows ---> 7 columns


root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)



                                                                                

### Number of unique values per column

In [5]:
expression = [countDistinct(c).alias(c) for c in df_info.columns]
df_info.select(*expression).show()

[Stage 6:>                                                          (0 + 4) / 4]

+-------+---------+------------+-------+---------+--------------+------+
| tconst|titleType|primaryTitle|isAdult|startYear|runtimeMinutes|genres|
+-------+---------+------------+-------+---------+--------------+------+
|1034420|        9|      921111|      2|       46|           813|  2119|
+-------+---------+------------+-------+---------+--------------+------+





# Import Ratings data

In [6]:
df_ratings = spark.read.csv(os.path.join('data', 'movie_ratings.tsv'), header=True, sep='\t')
print(f"Dataframe has {df_ratings.count()} rows ---> {len(df_ratings.columns)} columns\n\n")
df_ratings.printSchema()

Dataframe has 1275973 rows ---> 3 columns


root
 |-- tconst: string (nullable = true)
 |-- averageRating: string (nullable = true)
 |-- numVotes: string (nullable = true)



## Number of unique values per column

In [7]:
expression = [countDistinct(c).alias(c) for c in df_ratings.columns]
df_ratings.select(*expression).show()



+-------+-------------+--------+
| tconst|averageRating|numVotes|
+-------+-------------+--------+
|1275973|           91|   20963|
+-------+-------------+--------+



                                                                                

# Join the 2 dfs by 'tconst' column

In [8]:
# Using the classic SQL format
df_info.createOrReplaceTempView("INFO")
df_ratings.createOrReplaceTempView("RATINGS")
keep_cols = [
    "INFO.tconst", "INFO.titleType", "INFO.primaryTitle", "INFO.isAdult", "INFO.startYear", 
    "INFO.runtimeMinutes", "INFO.genres", "RATINGS.averageRating", "RATINGS.numVotes",
]
df = spark.sql(f"select {', '.join(keep_cols)} from INFO LEFT JOIN RATINGS ON INFO.tconst = RATINGS.tconst")
print(f"Dataframe has {df.count()} rows ---> {len(df.columns)} columns\n\n")
print(df.printSchema())
df.show(5)

                                                                                

Dataframe has 1034420 rows ---> 9 columns


root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- averageRating: string (nullable = true)
 |-- numVotes: string (nullable = true)

None


                                                                                

+----------+---------+--------------------+-------+---------+--------------+--------------------+-------------+--------+
|    tconst|titleType|        primaryTitle|isAdult|startYear|runtimeMinutes|              genres|averageRating|numVotes|
+----------+---------+--------------------+-------+---------+--------------+--------------------+-------------+--------+
|tt18302012|tvSpecial|          NHK Trophy|      0|     2019|            \N|               Sport|         null|    null|
|tt18302040|tvSpecial|      Mission Health|      0|     2016|            \N|           Talk-Show|         null|    null|
| tt5117572|    video|Hiru wa shika jos...|      1|     2013|           120|               Adult|         null|    null|
| tt5117670|    movie|        Peter Rabbit|      0|     2018|            95|Adventure,Comedy,...|          6.6|   44223|
| tt5117706|    video|Hiru wa shika jos...|      1|     2013|           150|               Adult|         null|    null|
+----------+---------+----------

### Convert number columns to type Double

In [9]:
cols_to_double = ['averageRating', 'numVotes']
for col in cols_to_double:
    df = df.withColumn(col, round(df[col].cast(DoubleType()),2))
df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: double (nullable = true)



### View unique values in column

In [10]:
df.select('titleType').distinct().collect()

                                                                                

[Row(titleType='tvSeries'),
 Row(titleType='tvMiniSeries'),
 Row(titleType='tvMovie'),
 Row(titleType='movie'),
 Row(titleType='tvSpecial'),
 Row(titleType='video'),
 Row(titleType='videoGame'),
 Row(titleType='tvShort'),
 Row(titleType='tvPilot')]

# Filtering the data

### Keep only columns from list

In [11]:
keep_cols = [
    "tconst", "titleType", "primaryTitle", "isAdult", "startYear",
    "runtimeMinutes", "genres", "averageRating", "numVotes",
]
df = df.select(*keep_cols)

### Keep only the movies made after 1990

In [12]:
df = df.where(df.startYear >= 1990)

### Filter 'titleType'

In [13]:
titles_to_exclude = ['video', 'videoGame', 'tvShort',]
df = df.filter(~(df.titleType.isin(titles_to_exclude)))

### Remove the ones without raitings

In [14]:
df.createOrReplaceTempView("DF")
df = spark.sql(f"select * FROM DF WHERE DF.averageRating IS NOT NULL")
df.show(3)



+---------+---------+------------+-------+---------+--------------+--------------------+-------------+--------+
|   tconst|titleType|primaryTitle|isAdult|startYear|runtimeMinutes|              genres|averageRating|numVotes|
+---------+---------+------------+-------+---------+--------------+--------------------+-------------+--------+
|tt0059325|    movie| Born in '45|      0|     1990|           100|       Drama,Romance|          6.4|   250.0|
|tt0075259|    movie|   Spy Story|      0|     1990|           103|Action,Drama,Mystery|          6.0|    53.0|
|tt0077432|    movie| Bloody Hero|      0|     1991|            85|        Action,Drama|          5.4|    13.0|
+---------+---------+------------+-------+---------+--------------+--------------------+-------------+--------+
only showing top 3 rows



                                                                                

# Table Operations

### Avg, max, min of columns

In [15]:
# of averageRating and numVotes columns
print(
    df.agg({'averageRating': 'avg', 'numVotes': 'avg'}).show(), 
    df.agg({'averageRating': 'max', 'numVotes': 'max'}).show(), 
    df.agg({'averageRating': 'min', 'numVotes': 'min'}).show(),
)

                                                                                

+------------------+------------------+
|avg(averageRating)|     avg(numVotes)|
+------------------+------------------+
| 6.456809838814827|3151.2246902077704|
+------------------+------------------+



                                                                                

+------------------+-------------+
|max(averageRating)|max(numVotes)|
+------------------+-------------+
|              10.0|    2696853.0|
+------------------+-------------+



[Stage 62:>                                                         (0 + 4) / 4]

+------------------+-------------+
|min(averageRating)|min(numVotes)|
+------------------+-------------+
|               1.0|          5.0|
+------------------+-------------+

None None None


                                                                                

### Sum of numVotes column

In [16]:
# Total for the whole table
df.createOrReplaceTempView("DF")
spark.sql("SELECT SUM(numVotes) FROM DF").show()

[Stage 71:>                                                         (0 + 4) / 4]

+-------------+
|sum(numVotes)|
+-------------+
|1.014662838E9|
+-------------+



                                                                                

In [17]:
# Sum by Genre
df.groupby('genres').sum('numVotes').show()

[Stage 80:>                                                         (0 + 4) / 4]

+--------------------+-------------+
|              genres|sum(numVotes)|
+--------------------+-------------+
|        Comedy,Sport|    2133109.0|
|Action,Adventure,...|   2.526786E7|
|Documentary,News,...|        296.0|
|Documentary,Drama...|       3181.0|
|Animation,Fantasy...|         32.0|
|Biography,Thrille...|        349.0|
|Animation,Sci-Fi,War|       1066.0|
|Adventure,Family,...|    8603716.0|
| Documentary,Western|       5049.0|
|Comedy,Drama,Western|     111691.0|
|Game-Show,Reality-TV|     288114.0|
|Action,Fantasy,Hi...|       5200.0|
|  Fantasy,Horror,War|        957.0|
|Documentary,Myste...|       1290.0|
|Fantasy,Mystery,T...|      27430.0|
|Comedy,Family,His...|      13451.0|
|   Documentary,Sport|     535769.0|
|Action,Animation,...|     194074.0|
|Action,Game-Show,...|       7488.0|
|Fantasy,Sci-Fi,Th...|       7391.0|
+--------------------+-------------+
only showing top 20 rows





### Rank the Titles by the raiting and number of votes

In [18]:
# Rank by descending order so the best are on top
cols_to_view = ['titleType', 'primaryTitle', 'averageRating', 'numVotes']
df.select(cols_to_view).where(df.numVotes >= 4000).sort(df.averageRating.desc(), df.numVotes.desc()).show()

                                                                                

+------------+--------------------+-------------+---------+
|   titleType|        primaryTitle|averageRating| numVotes|
+------------+--------------------+-------------+---------+
|    tvSeries|   Friday Five Sharp|          9.8|   4075.0|
|       movie|The Silence of Sw...|          9.6|  10330.0|
|     tvMovie|Threat Level Midn...|          9.6|   8808.0|
|    tvSeries|        Breaking Bad|          9.5|1915171.0|
|tvMiniSeries|     Planet Earth II|          9.5| 147166.0|
|    tvSeries|               Bluey|          9.5|  12611.0|
|tvMiniSeries|           Chernobyl|          9.4| 768150.0|
|tvMiniSeries|    Band of Brothers|          9.4| 474086.0|
|tvMiniSeries|        Planet Earth|          9.4| 211349.0|
|    tvSeries|         BB Ki Vines|          9.4|   9925.0|
|       movie|The Shawshank Red...|          9.3|2696853.0|
|    tvSeries|            The Wire|          9.3| 344326.0|
|    tvSeries|Avatar: The Last ...|          9.3| 321148.0|
|    tvSeries|Scam 1992: The Ha...|     