# GCS File Path is written to hide the actual file paths that are in Google Cloud Storage

In [2]:
spark

In [3]:
# Import the storage module
import pandas as pd

import matplotlib.pyplot as plt
from pyspark.sql.functions import *
from pyspark.sql.functions import col, count



In [4]:
critic_reviews_file_path = #GCS file path

In [5]:
critic_reviews_sdf = spark.read.csv(critic_reviews_file_path, header = True, sep = ",", inferSchema = True)

                                                                                

In [6]:
critic_reviews_sdf.count()

In [7]:
critic_reviews_sdf.printSchema()

root
 |-- reviewId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- creationDate: string (nullable = true)
 |-- criticName: string (nullable = true)
 |-- criticPageUrl: string (nullable = true)
 |-- reviewState: string (nullable = true)
 |-- isFresh: boolean (nullable = true)
 |-- isRotten: boolean (nullable = true)
 |-- isRtUrl: boolean (nullable = true)
 |-- isTopCritic: boolean (nullable = true)
 |-- publicationUrl: string (nullable = true)
 |-- publicationName: string (nullable = true)
 |-- reviewUrl: string (nullable = true)
 |-- quote: string (nullable = true)
 |-- scoreSentiment: string (nullable = true)
 |-- originalScore: string (nullable = true)



In [8]:
critic_reviews_sdf = critic_reviews_sdf.drop('reviewId', 'criticName', 'criticPageUrl', 'reviewState', 'isRtUrl', 'isTopCritic', 'publicationUrl', 'publicationName', 'reviewUrl', 'originalScore')

In [9]:
critic_reviews_sdf = critic_reviews_sdf.withColumnRenamed("quote", "critic_quote") \
                    .withColumnRenamed("scoreSentiment", "critic_scoreSentiment") \
                    .withColumnRenamed("creationDate", "critic_creationDate")


critic_reviews_sdf.printSchema()


root
 |-- movieId: string (nullable = true)
 |-- critic_creationDate: string (nullable = true)
 |-- isFresh: boolean (nullable = true)
 |-- isRotten: boolean (nullable = true)
 |-- critic_quote: string (nullable = true)
 |-- critic_scoreSentiment: string (nullable = true)



In [10]:
#for col_name in critic_reviews_sdf.columns:
null_count = critic_reviews_sdf.filter(col(col_name).isNull()).count()
print(f"Number of null values in column '{col_name}': '{null_count}'")

In [11]:
na_critic_reviews_sdf = critic_reviews_sdf.dropna(how = "any")

In [12]:
#for col_name in na_critic_reviews_sdf.columns:
null_count = na_critic_reviews_sdf.filter(col(col_name).isNull()).count()
print(f"Number of null values in column '{col_name}': '{null_count}'")

In [13]:
na_critic_reviews_sdf.show()

+--------------------+-------------------+-------+--------+--------------------+---------------------+
|             movieId|critic_creationDate|isFresh|isRotten|        critic_quote|critic_scoreSentiment|
+--------------------+-------------------+-------+--------+--------------------+---------------------+
|16db6b7e-176a-3a4...|         1800-01-01|   true|   false|Terry Gilliam's t...|             POSITIVE|
|34bf7e79-d110-3b3...|         1800-01-01|   true|   false|Weir, working fro...|             POSITIVE|
|3aeb7064-f73d-32c...|         1800-01-01|   true|   false|Coming out from b...|             POSITIVE|
|58ec5bd7-733b-304...|         1800-01-01|   true|   false|Still, despite it...|             POSITIVE|
|649005b8-02b6-325...|         1800-01-01|  false|    true|It's the sort of ...|             NEGATIVE|
|6a0dd327-6254-3f8...|         1800-01-01|   true|   false|This enjoyable mi...|             POSITIVE|
|b22d3561-38d6-38a...|         1800-01-01|   true|   false|A grimly seduc

# critic_scoreSentiment will be used to compare my results with the results the Kaggle user got.

In [14]:
na_critic_reviews_sdf.select("critic_creationDate").describe().show()

In [15]:
na_critic_reviews_sdf.select("isFresh").where((na_critic_reviews_sdf.isFresh == 'true')).count()

In [16]:
na_critic_reviews_sdf.select("isFresh").where((na_critic_reviews_sdf.isFresh == 'false')).count()

In [17]:
na_critic_reviews_sdf.select("isRotten").where((na_critic_reviews_sdf.isRotten == 'true')).count()

In [18]:
na_critic_reviews_sdf.select("isRotten").where((na_critic_reviews_sdf.isRotten == 'false')).count()

In [19]:
na_critic_reviews_sdf.count()

In [20]:
#lost around this many data points
968361 - 933860

In [21]:
movies_file_path = #GCS file path

In [22]:
movies_sdf = spark.read.csv(movies_file_path, header = True, sep = ",", inferSchema = True)

In [23]:
movies_sdf.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- movieYear: integer (nullable = true)
 |-- movieURL: string (nullable = true)
 |-- movieTitle: string (nullable = true)
 |-- critic_score: double (nullable = true)
 |-- critic_sentiment: string (nullable = true)
 |-- audience_score: double (nullable = true)
 |-- audience_sentiment: string (nullable = true)
 |-- release_date_theaters: string (nullable = true)
 |-- release_date_streaming: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- runtime: string (nullable = true)



In [24]:
movies_sdf.count()

In [25]:
movies_sdf = movies_sdf.drop('movieURL', 'critic_score', 'critic_sentiment', 'release_date_theaters', 'release_date_streaming', 'rating')

In [26]:
movies_sdf = movies_sdf.withColumnRenamed("original_language", "movie_originalLanguage") \
                    .withColumnRenamed("runtime", "movie_runtime")

In [27]:
movies_sdf.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- movieYear: integer (nullable = true)
 |-- movieTitle: string (nullable = true)
 |-- audience_score: double (nullable = true)
 |-- audience_sentiment: string (nullable = true)
 |-- movie_originalLanguage: string (nullable = true)
 |-- movie_runtime: string (nullable = true)



In [28]:
movies_sdf.show()

+--------------------+---------+--------------------+--------------+------------------+----------------------+-------------+
|             movieId|movieYear|          movieTitle|audience_score|audience_sentiment|movie_originalLanguage|movie_runtime|
+--------------------+---------+--------------------+--------------+------------------+----------------------+-------------+
|281004c8-bbc3-352...|     1902|  A Trip to the Moon|          90.0|          positive|       French (France)|          14m|
|ac173b27-b71a-34b...|     1915|The Birth of a Na...|          47.0|          negative|                  NULL|       3h 10m|
|96f91c04-5e32-39b...|     1921|The Cabinet of Dr...|          89.0|          positive|                German|        1h 9m|
|b70c2dc6-41e7-324...|     1921|             The Kid|          95.0|          positive|               English|        1h 0m|
|13101368-55d8-30a...|     1922|           Nosferatu|          87.0|          positive|                German|        1h 5m|


In [29]:
#for col_name in movies_sdf.columns:
null_count = movies_sdf.filter(col(col_name).isNull()).count()
print(f"Number of null values in column '{col_name}': '{null_count}'")

In [30]:
na_movies_sdf = movies_sdf.dropna(how = "any")

In [31]:
#for col_name in na_movies_sdf.columns:
null_count = na_movies_sdf.filter(col(col_name).isNull()).count()
print(f"Number of null values in column '{col_name}': '{null_count}'")

In [32]:
na_movies_sdf.count()

In [33]:
na_movies_sdf.show()

+--------------------+---------+--------------------+--------------+------------------+----------------------+-------------+
|             movieId|movieYear|          movieTitle|audience_score|audience_sentiment|movie_originalLanguage|movie_runtime|
+--------------------+---------+--------------------+--------------+------------------+----------------------+-------------+
|281004c8-bbc3-352...|     1902|  A Trip to the Moon|          90.0|          positive|       French (France)|          14m|
|96f91c04-5e32-39b...|     1921|The Cabinet of Dr...|          89.0|          positive|                German|        1h 9m|
|b70c2dc6-41e7-324...|     1921|             The Kid|          95.0|          positive|               English|        1h 0m|
|13101368-55d8-30a...|     1922|           Nosferatu|          87.0|          positive|                German|        1h 5m|
|0b3942e5-015f-32c...|     1923|         Safety Last|          93.0|          positive|               English|       1h 14m|


In [34]:
#lost around this many data points
10233 - 10110

In [35]:
na_movies_sdf.select("movieYear").describe().show()

In [36]:
na_movies_sdf.select("audience_score").describe().show()

# audience_sentiment will be used to compare my results with the results the Kaggle user got.

In [37]:
na_movies_sdf.select("movie_originalLanguage").count()

In [38]:
na_movies_sdf.select("movie_runtime").describe().show()

In [39]:
user_reviews_file_path = #GCS filepath

In [40]:
user_reviews_sdf = spark.read.csv(user_reviews_file_path, header = True, sep = ",", inferSchema = True)

                                                                                

In [41]:
user_reviews_sdf.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- quote: string (nullable = true)
 |-- reviewId: string (nullable = true)
 |-- isVerified: string (nullable = true)
 |-- isSuperReviewer: string (nullable = true)
 |-- hasSpoilers: string (nullable = true)
 |-- hasProfanity: string (nullable = true)
 |-- score: string (nullable = true)
 |-- creationDate: string (nullable = true)
 |-- userDisplayName: string (nullable = true)
 |-- userRealm: string (nullable = true)
 |-- userId: string (nullable = true)



In [42]:
user_reviews_sdf.count()

In [43]:
user_reviews_sdf = user_reviews_sdf.drop('reviewId', 'isVerified', 'isSuperReviewer', 'hasSpoilers', 'hasProfanity', 'score', 'userDisplayName', 'userRealm', 'userId')

In [44]:
user_reviews_sdf.show()

+--------------------+--------------------+--------------------+--------------------+
|             movieId|              rating|               quote|        creationDate|
+--------------------+--------------------+--------------------+--------------------+
|220bcfa0-72b6-329...|                 3.5|:fresh: pretty go...|                NULL|
| [2004: that's ri...|                NULL|               False|                  RT|
|66fb1afc-13fd-336...|                 4.0|":fresh: pretty g...|                 4.0|
|8f036839-39c8-3a5...|                 2.5|:rotten: I watche...|          1996-01-23|
|4b0c6583-ef43-3ad...|                 3.5|:fresh: parts wer...|                NULL|
| [2004: I think t...|                NULL|               False|                  RT|
|a56feddb-2758-3ce...|                 2.5|:fresh: slightly ...|          1996-01-30|
|bcf01f9b-9931-3ff...|                 2.5|"[b][img]http://i...|               False|
|fc2954ca-8af3-3ec...|                 3.5|"[b]:fresh:

In [45]:
user_reviews_sdf = user_reviews_sdf.withColumnRenamed("rating", "audience_rating") \
                    .withColumnRenamed("quote", "audience_quote") \
                    .withColumnRenamed("creationDate", "audience_creationDate")


user_reviews_sdf.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- audience_rating: string (nullable = true)
 |-- audience_quote: string (nullable = true)
 |-- audience_creationDate: string (nullable = true)



In [46]:
for col_name in user_reviews_sdf.columns:
null_count = user_reviews_sdf.filter(col(col_name).isNull()).count()
print(f"Number of null values in column '{col_name}': '{null_count}'")

In [47]:
na_user_reviews_sdf = user_reviews_sdf.dropna(how = "any")

In [48]:
#for col_name in na_user_reviews_sdf.columns:
null_count = na_user_reviews_sdf.filter(col(col_name).isNull()).count()
print(f"Number of null values in column '{col_name}': '{null_count}'")

In [49]:
na_user_reviews_sdf.count()

In [50]:
#lost around this many data points
61794610 - 53983718

In [51]:
na_user_reviews_sdf.show()

+--------------------+--------------------+--------------------+---------------------+
|             movieId|     audience_rating|      audience_quote|audience_creationDate|
+--------------------+--------------------+--------------------+---------------------+
|66fb1afc-13fd-336...|                 4.0|":fresh: pretty g...|                  4.0|
|8f036839-39c8-3a5...|                 2.5|:rotten: I watche...|           1996-01-23|
|a56feddb-2758-3ce...|                 2.5|:fresh: slightly ...|           1996-01-30|
|bcf01f9b-9931-3ff...|                 2.5|"[b][img]http://i...|                False|
|f67c28af-8622-3dd...|                 2.5|":fresh: funnier ...|           1996-02-19|
| It's hard to jud...| which nonetheles...| and the film rem...|  especially the s...|
|4d2ce818-75d1-332...|                 4.5|A scaringly real ...|           1997-12-08|
|f67c28af-8622-3dd...|                 3.5|"[color=black]""H...|                  3.5|
|737027d8-d872-384...|                 4.5|

# -----------------------------------------------------------------------

In [52]:
na_user_reviews_sdf.select("audience_creationDate").describe().show()

In [53]:
all_sdf = na_movies_sdf.join(na_user_reviews_sdf, on = 'movieId', how = 'inner')

In [54]:
all_sdf.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- movieYear: integer (nullable = true)
 |-- movieTitle: string (nullable = true)
 |-- audience_score: double (nullable = true)
 |-- audience_sentiment: string (nullable = true)
 |-- movie_originalLanguage: string (nullable = true)
 |-- movie_runtime: string (nullable = true)
 |-- audience_rating: string (nullable = true)
 |-- audience_quote: string (nullable = true)
 |-- audience_creationDate: string (nullable = true)



In [55]:
#for col_name in all_sdf.columns:
null_count = all_sdf.select(col(col_name).isNull()).count()
print(f"Number of null values in column '{col_name}': '{null_count}'")

In [56]:
all_sdf.show()

+--------------------+---------+--------------------+--------------+------------------+----------------------+-------------+---------------+--------------------+---------------------+
|             movieId|movieYear|          movieTitle|audience_score|audience_sentiment|movie_originalLanguage|movie_runtime|audience_rating|      audience_quote|audience_creationDate|
+--------------------+---------+--------------------+--------------+------------------+----------------------+-------------+---------------+--------------------+---------------------+
|66fb1afc-13fd-336...|     1995|              Casino|          93.0|          positive|               English|       2h 57m|            4.0|":fresh: pretty g...|                  4.0|
|8f036839-39c8-3a5...|     1988|The Last Temptati...|          84.0|          positive|               English|       2h 44m|            2.5|:rotten: I watche...|           1996-01-23|
|a56feddb-2758-3ce...|     1995|       Billy Madison|          79.0|          po

In [57]:
grouped_df = all_sdf.groupBy("movieId").agg(count("*").alias("review_count"))

In [58]:
sorted_df = grouped_df.sort(col("review_count").desc())

In [59]:
top_2000_movies = sorted_df.limit(2000)

In [60]:
top_2000_movies_info = top_2000_movies.join(na_movies_sdf, on="movieId", how = 'inner').join(na_user_reviews_sdf, on = 'movieId', how = 'inner')

In [61]:
top_2000_movies_info.show()

                                                                                

+--------------------+------------+---------+--------------------+--------------+------------------+----------------------+-------------+---------------+--------------------+---------------------+
|             movieId|review_count|movieYear|          movieTitle|audience_score|audience_sentiment|movie_originalLanguage|movie_runtime|audience_rating|      audience_quote|audience_creationDate|
+--------------------+------------+---------+--------------------+--------------+------------------+----------------------+-------------+---------------+--------------------+---------------------+
|66fb1afc-13fd-336...|       11662|     1995|              Casino|          93.0|          positive|               English|       2h 57m|            4.0|":fresh: pretty g...|                  4.0|
|a56feddb-2758-3ce...|       20076|     1995|       Billy Madison|          79.0|          positive|               English|       1h 29m|            2.5|:fresh: slightly ...|           1996-01-30|
|f67c28af-8622-

In [62]:
clean_reviews_sdf = top_2000_movies_info.drop('movieId', 'movie_originalLanguage')

In [63]:
clean_reviews_sdf.show(5)

                                                                                

+------------+---------+-------------+--------------+------------------+-------------+---------------+--------------------+---------------------+
|review_count|movieYear|   movieTitle|audience_score|audience_sentiment|movie_runtime|audience_rating|      audience_quote|audience_creationDate|
+------------+---------+-------------+--------------+------------------+-------------+---------------+--------------------+---------------------+
|       11662|     1995|       Casino|          93.0|          positive|       2h 57m|            4.0|":fresh: pretty g...|                  4.0|
|       20076|     1995|Billy Madison|          79.0|          positive|       1h 29m|            2.5|:fresh: slightly ...|           1996-01-30|
|       31441|     1996|Happy Gilmore|          85.0|          positive|       1h 32m|            2.5|":fresh: funnier ...|           1996-02-19|
|       31441|     1996|Happy Gilmore|          85.0|          positive|       1h 32m|            3.5|"[color=black]""H...| 

In [64]:
pd = clean_reviews_sdf.select('review_count').sample(False, 0.9).toPandas()

In [65]:
pd.review_count.count()

In [66]:
all_sdf.printSchema()

In [67]:
movieYear_audience_quote_df = all_sdf.select("movieYear", "audience_quote").sample(False, 0.5).toPandas()

In [68]:
critic_creationDate_isFresh_df = na_critic_reviews_sdf.select("critic_creationDate", "isFresh", "isRotten").sample(False, 0.5).toPandas()

In [69]:
year_score_df = na_movies_sdf.select("movieYear", "audience_score").sample(False, 0.5).toPandas()

In [70]:
fig, ax = plt.subplots()
ax.plot(year_score_df.movieYear, year_score_df.audience_score)
ax.set_xlabel("Year")
ax.set_ylabel("Audience Rating")
plt.show()

In [71]:
plt.hist(year_score_df.audience_score, bins = 10)

In [73]:
output_file_path = #GCS file path
clean_reviews_sdf.write.parquet(output_file_path)

                                                                                

In [None]:
cleaned = #GCS file path

In [None]:
reviews_sdf = spark.read.csv(cleaned, header = True, sep = ",", inferSchema = True)

In [None]:
reviews_sdf.printSchema()

In [None]:
reviews_sdf.show()

In [None]:
reviews_sdf.count()