In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import DecimalType

#Initialize a SparkSession
spark = SparkSession.builder.appName("Tomatometer").getOrCreate()

In [2]:
# Read a CSV file
df = spark.read.csv("rotten_tomatoes_movies.csv", header=True, inferSchema=True)

In [3]:
df.show()

+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+----------------+--------------------+--------------------+---------+-----------------+--------+
|                  id|               title|audienceScore|tomatoMeter|rating|      ratingContents|releaseDateTheaters|releaseDateStreaming|runtimeMinutes|               genre|originalLanguage|            director|              writer|boxOffice|      distributor|soundMix|
+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+----------------+--------------------+--------------------+---------+-----------------+--------+
|  space-zombie-bingo| Space Zombie Bingo!|           50|       NULL|  NULL|                NULL|               NULL|          2018-08-25|            75|Comedy, Horror, S...|         Engl

In [4]:
#Table Schema
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- audienceScore: integer (nullable = true)
 |-- tomatoMeter: integer (nullable = true)
 |-- rating: string (nullable = true)
 |-- ratingContents: string (nullable = true)
 |-- releaseDateTheaters: date (nullable = true)
 |-- releaseDateStreaming: string (nullable = true)
 |-- runtimeMinutes: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- originalLanguage: string (nullable = true)
 |-- director: string (nullable = true)
 |-- writer: string (nullable = true)
 |-- boxOffice: string (nullable = true)
 |-- distributor: string (nullable = true)
 |-- soundMix: string (nullable = true)



#### Cleaning:
Missing Values: Check for missing values in the audienceScore, tomatoMeter, and releaseDateTheaters columns. Handle them by removing rows with a null value in either of these columns


In [5]:
#Method 1
df.filter(F.col('audienceScore').isNull()).count()

70010

In [6]:
#Method 2
df.select( [col for col in df.columns if col in ['audienceScore', 'tomatoMeter', 'releaseDateTheaters']] ).show()

+-------------+-----------+-------------------+
|audienceScore|tomatoMeter|releaseDateTheaters|
+-------------+-----------+-------------------+
|           50|       NULL|               NULL|
|         NULL|       NULL|               NULL|
|           43|       NULL|               NULL|
|           60|       NULL|               NULL|
|           70|       NULL|               NULL|
|           65|         69|         2018-06-01|
|           55|       NULL|               NULL|
|           88|       NULL|               NULL|
|         NULL|       NULL|               NULL|
|           74|         83|         1947-04-30|
|           19|       NULL|               NULL|
|         NULL|       NULL|               NULL|
|         NULL|         76|         2002-03-22|
|           86|         93|               NULL|
|           89|       NULL|               NULL|
|           82|       NULL|               NULL|
|           60|       NULL|               NULL|
|           67|         50|         2009

In [7]:
audience_score_nulls = df.filter(df['audienceScore'].isNull()).count()
print(audience_score_nulls)

70010


In [8]:
df.filter(F.col('tomatoMeter').isNull()).count()

109381

In [9]:
df.filter(F.col('releaseDateTheaters').isNull()).count()

112485

In [10]:
# Remove rows with nulls in particular columns
df.na.drop(subset = ['audienceScore', 'tomatoMeter', 'releaseDateTheaters']).show()

+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+
|                  id|               title|audienceScore|tomatoMeter|rating|      ratingContents|releaseDateTheaters|releaseDateStreaming|runtimeMinutes|               genre|    originalLanguage|            director|              writer|boxOffice|         distributor|            soundMix|
+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+
|         adrift_2018|              Adrift|           65|         69| PG-13|['Injury Images',...|         2018-06-01|          201

In [11]:
cleaned_df = df.na.drop(subset = ['audienceScore', 'tomatoMeter', 'releaseDateTheaters'])

In [12]:
cleaned_df.show()

+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+
|                  id|               title|audienceScore|tomatoMeter|rating|      ratingContents|releaseDateTheaters|releaseDateStreaming|runtimeMinutes|               genre|    originalLanguage|            director|              writer|boxOffice|         distributor|            soundMix|
+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+
|         adrift_2018|              Adrift|           65|         69| PG-13|['Injury Images',...|         2018-06-01|          201

In [13]:
#method 2
clean_df = df.dropna(subset = ['audienceScore', 'tomatoMeter', 'releaseDateTheaters'])

In [14]:
clean_df.show()

+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+
|                  id|               title|audienceScore|tomatoMeter|rating|      ratingContents|releaseDateTheaters|releaseDateStreaming|runtimeMinutes|               genre|    originalLanguage|            director|              writer|boxOffice|         distributor|            soundMix|
+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+
|         adrift_2018|              Adrift|           65|         69| PG-13|['Injury Images',...|         2018-06-01|          201

### Processing:
Year: Create a year column by parsing the year from the releaseDateTheaters column.


In [15]:
clean_df.select(F.col('releaseDateTheaters')).show()

+-------------------+
|releaseDateTheaters|
+-------------------+
|         2018-06-01|
|         1947-04-30|
|         2009-12-04|
|         2011-09-30|
|         2005-08-10|
|         2017-09-08|
|         1963-10-02|
|         1996-03-22|
|         2009-01-16|
|         1991-01-11|
|         2002-02-22|
|         2017-05-12|
|         2022-06-03|
|         2015-05-01|
|         1993-07-07|
|         2012-03-09|
|         2019-04-10|
|         2017-07-14|
|         2014-12-05|
|         2004-06-18|
+-------------------+
only showing top 20 rows



In [16]:
df_new = clean_df.withColumn('year', F.year(df['releaseDateTheaters']))

In [17]:
df_new.show()

+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+----+
|                  id|               title|audienceScore|tomatoMeter|rating|      ratingContents|releaseDateTheaters|releaseDateStreaming|runtimeMinutes|               genre|    originalLanguage|            director|              writer|boxOffice|         distributor|            soundMix|year|
+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+----+
|         adrift_2018|              Adrift|           65|         69| PG-13|['Injury Images',...|         2018-06-0

Top Movies: Find the top 500 movies based on Tomatometer score using functions like orderBy and limit.


In [18]:
df_top5000 = df_new.orderBy(F.desc('tomatoMeter')).limit(5000)

In [19]:
df_top5000.show()

+--------------------+--------------------+-------------+-----------+------+--------------+-------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+-----------+----+
|                  id|               title|audienceScore|tomatoMeter|rating|ratingContents|releaseDateTheaters|releaseDateStreaming|runtimeMinutes|               genre|    originalLanguage|            director|              writer|boxOffice|         distributor|   soundMix|year|
+--------------------+--------------------+-------------+-----------+------+--------------+-------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+-----------+----+
|     the_human_trial|     The Human Trial|           99|        100|  NULL|          NULL|         2022-06-24|          2022-06-24|            91|         Docu

### Save as Parquet: 
Save the cleaned and processed data as a Parquet file for efficient loading into Pandas or Power BI. PySpark offers Parquet data support


In [20]:
df_top5000.write.parquet('top5000_movies.parquet', mode='overwrite')

In [21]:
df_top5000.write.csv('top5000_movies.csv', mode='overwrite')

### Converting to pandas df

In [22]:
pandas_df =df_top5000.toPandas()

In [23]:
pandas_df.to_csv('top5000_movies_pandas.csv', index = False)

### Extended Challenges

In [24]:
# df_bo= df_top5000.select(F.translate(F.col('boxOffice'), '[$]', '').alias('BoxOffice'))
# df_bo.show()