# Tomatometer PySpark

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import DecimalType

# Initialize a SparkSession
spark = SparkSession.builder.appName('Avocado').getOrCreate()

# Ingest & Access

In [4]:
# Ingest the dataset
df = spark.read.csv('./data/rotten_tomatoes_movies.csv', header=True, inferSchema=True)
df.limit(10).show()

+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+----------------+--------------------+--------------------+---------+-----------+--------+
|                  id|               title|audienceScore|tomatoMeter|rating|      ratingContents|releaseDateTheaters|releaseDateStreaming|runtimeMinutes|               genre|originalLanguage|            director|              writer|boxOffice|distributor|soundMix|
+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+----------------+--------------------+--------------------+---------+-----------+--------+
|  space-zombie-bingo| Space Zombie Bingo!|           50|       NULL|  NULL|                NULL|               NULL|          2018-08-25|            75|Comedy, Horror, S...|         English|       George 

In [8]:
num_rows = df.count()
num_rows

143258

# Cleaning

Missing Values: Check for missing values in the audienceScore, tomatoMeter, and releaseDateTheaters columns. Handle them by removing rows with a null value in either of these columns.


In [11]:
df.filter(F.col('audienceScore').isNull()).count()

70010

In [12]:
dropped_rows = df.na.drop(subset=['audienceScore', 'tomatoMeter','releaseDateTheaters']).count()
dropped_rows

18807

In [13]:
percent_dropped = 1 - (dropped_rows/num_rows)
percent_dropped

0.8687193734381327

**Dropping all the nulls from the columns audienceScore, tomatoMeter, and releaseDateTheaters drops 86.9% of the data. This is not good practice, but we will proceed following the instructions.**

In [14]:
df_clean = df.na.drop(subset=['audienceScore', 'tomatoMeter','releaseDateTheaters'])

In [15]:
df_clean.limit(5).show()

+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+----------------+--------------------+--------------------+---------+-------------------+--------+
|                  id|               title|audienceScore|tomatoMeter|rating|      ratingContents|releaseDateTheaters|releaseDateStreaming|runtimeMinutes|               genre|originalLanguage|            director|              writer|boxOffice|        distributor|soundMix|
+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+----------------+--------------------+--------------------+---------+-------------------+--------+
|         adrift_2018|              Adrift|           65|         69| PG-13|['Injury Images',...|         2018-06-01|          2018-08-21|           120|Adventure, Drama,...|       

# Processing

**Year**: Create a year column by parsing the year from the releaseDateTheaters column.

**Top Movies**: Find the top 500 movies based on Tomatometer score using functions like orderBy and limit.

In [16]:
df_clean.printSchema()

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- audienceScore: integer (nullable = true)
 |-- tomatoMeter: integer (nullable = true)
 |-- rating: string (nullable = true)
 |-- ratingContents: string (nullable = true)
 |-- releaseDateTheaters: date (nullable = true)
 |-- releaseDateStreaming: string (nullable = true)
 |-- runtimeMinutes: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- originalLanguage: string (nullable = true)
 |-- director: string (nullable = true)
 |-- writer: string (nullable = true)
 |-- boxOffice: string (nullable = true)
 |-- distributor: string (nullable = true)
 |-- soundMix: string (nullable = true)



In [22]:
df_clean = df_clean.withColumn('Year', F.year(F.col('releaseDateTheaters')))

In [28]:
df_clean.select('releaseDateTheaters', 'Year').limit(5).show()

+-------------------+----+
|releaseDateTheaters|Year|
+-------------------+----+
|         2018-06-01|2018|
|         1947-04-30|1947|
|         2009-12-04|2009|
|         2011-09-30|2011|
|         2005-08-10|2005|
+-------------------+----+



In [54]:
df_sorted = df_clean

In [55]:
df_sorted = df_sorted.orderBy('tomatoMeter', ascending=False)

In [56]:
df_sorted.show(5)

+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+----------------+--------------+--------------------+---------+--------------------+--------+----+
|                  id|               title|audienceScore|tomatoMeter|rating|      ratingContents|releaseDateTheaters|releaseDateStreaming|runtimeMinutes|               genre|originalLanguage|      director|              writer|boxOffice|         distributor|soundMix|Year|
+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+----------------+--------------+--------------------+---------+--------------------+--------+----+
|     night_owls_2015|          Night Owls|           56|        100|     R|['Brief Nudity', ...|         2015-12-04|          2016-12-07|            91|     Romance, Comedy|       

In [57]:
df_sorted = df_sorted.limit(5000)

In [58]:
df_sorted.printSchema()

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- audienceScore: integer (nullable = true)
 |-- tomatoMeter: integer (nullable = true)
 |-- rating: string (nullable = true)
 |-- ratingContents: string (nullable = true)
 |-- releaseDateTheaters: date (nullable = true)
 |-- releaseDateStreaming: string (nullable = true)
 |-- runtimeMinutes: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- originalLanguage: string (nullable = true)
 |-- director: string (nullable = true)
 |-- writer: string (nullable = true)
 |-- boxOffice: string (nullable = true)
 |-- distributor: string (nullable = true)
 |-- soundMix: string (nullable = true)
 |-- Year: integer (nullable = true)



In [59]:
df_sorted.count()

5000

In [60]:
df_sorted.write.parquet('fixed_final_df.parquet')