# Start Spark Session & Import data form Google Cloud Bucket

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("GCSMarvel")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

#  Google Storage File Path
#  use your gcp bucket name. Also upload marvel.csv first
gsc_file_path_mc = 'gs://marvel_data_onno/marvel_clean.csv'
gsc_file_path_rc = 'gs://marvel_data_onno/marvel_reviews_clean.csv'
# Create data frames
marvelcleanDF = spark.read.format("csv").option("header", "true") \
       .load(gsc_file_path_mc)
marvelcleanDF.printSchema()

reviewscleanDF = spark.read.format("csv").option("header", "true") \
       .load(gsc_file_path_rc)

newDf = marvelcleanDF.select('Title')  # select one column

newDf.show()

marvelcleanDF.show()

root
 |-- Title: string (nullable = true)
 |-- Distributor: string (nullable = true)
 |-- ReleaseDateUS: string (nullable = true)
 |-- Budget: string (nullable = true)
 |-- OpeningWeekendNorthAmerica: string (nullable = true)
 |-- NorthAmerica: string (nullable = true)
 |-- OtherTerritories: string (nullable = true)
 |-- Worldwide: string (nullable = true)

+--------------------+
|               Title|
+--------------------+
|     Howard the Duck|
|               Blade|
|               X-Men|
|            Blade II|
|          Spider-Man|
|           Daredevil|
|                  X2|
|                Hulk|
|        The Punisher|
|        Spider-Man 2|
|      Blade: Trinity|
|             Elektra|
|      Fantastic Four|
|X-Men: The Last S...|
|         Ghost Rider|
|        Spider-Man 3|
|Fantastic Four: R...|
|            Iron Man|
| The Incredible Hulk|
|  Punisher: War Zone|
+--------------------+
only showing top 20 rows

+--------------------+------------------+-------------------+-

# Data Preprocessing 

Pipeline 1: The first pipeline aims to help us understand which distributor 
releases the 'best' Marvel movies, in order for Marvel Studios to find out 
which distributor to partner with in the future. This insight will be gained 
by analyzing the average critic grades and average revenue earned per 
distributor. First, the grades will be normalized. This is done because all 
three critic websites have different ways of scoring. Where Rotten Tomatoes 
and Metacritic use a scoring system ranging from 0 to 100, 0 being the lowest 
score and 100 the highest, CinemaScore uses a system similar to the United 
States grading system. This system ranges from F to A+, with A+ being the
highest. Letters can also have signs (+, -). A C+ would thus be a better 
score than just C. Then, an intermediate score to assess financial success 
will be determined. This will be the worldwide revenue generated by the movie
divided by the budget of the movie. Financial success will thus be determined
by the ratio of revenue to budget. The average ratio and average grade will be
calculated for every individual distributor, and lastly, the top distributors
for both metrics will be identified.

We will first join the two Marvel datasets we have available. 
- `marvelcleanDF`: contains information about marvel movies, such as their distributors, budgets and revenues
- `reviewscleanDF`: contains information about marvel movie reviews by 3 different review websites
- They will be joined on the `Title` and `Film` key resepctively, as this is they primary key they share.

We will then divide these into `reviewDF` and `budgetDF` which will be used in their relevant tasks.

In [2]:
#First, join dataframes on movie name
joinExpression = marvelcleanDF["Title"] == reviewscleanDF["Film"]
mergedDF = marvelcleanDF.join(reviewscleanDF, joinExpression, "inner")
mergedDF.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Distributor: string (nullable = true)
 |-- ReleaseDateUS: string (nullable = true)
 |-- Budget: string (nullable = true)
 |-- OpeningWeekendNorthAmerica: string (nullable = true)
 |-- NorthAmerica: string (nullable = true)
 |-- OtherTerritories: string (nullable = true)
 |-- Worldwide: string (nullable = true)
 |-- Film: string (nullable = true)
 |-- Rotten Tomatoesin%: string (nullable = true)
 |-- Metacritic: string (nullable = true)
 |-- CinemaScore3: string (nullable = true)
 |-- CinemaScore4: string (nullable = true)



In [3]:
#Second, select relevant columns
reviewDF = mergedDF.select("Title", "Distributor", "Rotten Tomatoesin%", "Metacritic", "CinemaScore4")
budgetDF = mergedDF.select("Title", "Distributor", "Budget", "Worldwide")
totalDF = mergedDF.select("Title", "Distributor", "Budget", "Worldwide", "Rotten Tomatoesin%", "Metacritic", "CinemaScore4")
#We take Cinemascore reviews WITH the sign, for more accurate insights.
totalDF.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Distributor: string (nullable = true)
 |-- Budget: string (nullable = true)
 |-- Worldwide: string (nullable = true)
 |-- Rotten Tomatoesin%: string (nullable = true)
 |-- Metacritic: string (nullable = true)
 |-- CinemaScore4: string (nullable = true)



### Ranking Distributors based on Average ratings

We map of american grades to percentages (based on the wikipedia page for the american grading system) in order to be able to process CinemaScore ratings. \
Next, we calculate the average rating for every movie by adding all ratings and dividing by 3. \
Then, we calculate the average rating per distributor by summing all average movie ratings and dividing by the count of movies. \
Lastly, we sort the distributors in descending order to easily find the top performers. \

We also show the amount of movies they made, to gain insight into how representative their performance is.

We solve for: Give and average rating of the distributor that makes the best (highest-rated on average) Marvel movies

In [4]:
#Preprocess Cinemascore ratings to 0-100 scale.
#https://en.wikipedia.org/wiki/Academic_grading_in_the_United_States
from pyspark.sql.functions import when, col

# Define the mapping logic
grade_to_percentage = {
    "A+": 97,
    "A": 93,
    "A-": 90,
    "B+": 87,
    "B": 83,
    "B-": 80,
    "C+": 77,
    "C": 73,
    "C-": 70,
    "D+": 67,
    "D": 63,
    "D-": 60,
    "F": 50
}

# Map grades to percentages
percentageDF = totalDF.withColumn(
    "CinemaScorePercentage",
    when(col("CinemaScore4") == "A+", grade_to_percentage["A+"])
    .when(col("CinemaScore4") == "A", grade_to_percentage["A"])
    .when(col("CinemaScore4") == "A-", grade_to_percentage["A-"])
    .when(col("CinemaScore4") == "B+", grade_to_percentage["B+"])
    .when(col("CinemaScore4") == "B", grade_to_percentage["B"])
    .when(col("CinemaScore4") == "B-", grade_to_percentage["B-"])
    .when(col("CinemaScore4") == "C+", grade_to_percentage["C+"])
    .when(col("CinemaScore4") == "C", grade_to_percentage["C"])
    .when(col("CinemaScore4") == "C-", grade_to_percentage["C-"])
    .when(col("CinemaScore4") == "D+", grade_to_percentage["D+"])
    .when(col("CinemaScore4") == "D", grade_to_percentage["D"])
    .when(col("CinemaScore4") == "D-", grade_to_percentage["D-"])
    .when(col("CinemaScore4") == "F", grade_to_percentage["F"])
)

#Drop old column with American grades
finalDF = percentageDF.drop("CinemaScore4")

# Show the results
finalDF.show()

+--------------------+------------------+---------+---------+------------------+----------+---------------------+
|               Title|       Distributor|   Budget|Worldwide|Rotten Tomatoesin%|Metacritic|CinemaScorePercentage|
+--------------------+------------------+---------+---------+------------------+----------+---------------------+
|     Howard the Duck|Universal Pictures| 37000000| 37962774|                14|        28|                   80|
|               Blade|   New Line Cinema| 45000000|131183530|                57|        47|                   90|
|               X-Men|  20th Century Fox| 75000000|296339527|                82|        64|                   90|
|            Blade II|   New Line Cinema| 54000000|155010032|                57|        52|                   87|
|          Spider-Man|     Sony Pictures|139000000|821708551|                90|        73|                   90|
|           Daredevil|  20th Century Fox| 78000000|179179718|                44|        

In [5]:
#Get top-rated movies per distributor
from pyspark.sql import functions as F
#First get average ratings
avgRatingsDF = finalDF.withColumn("avgRating", 
                                  (finalDF["Rotten Tomatoesin%"] +
                                  finalDF["Metacritic"] +
                                  finalDF["CinemaScorePercentage"]) / 3
                                 )

avgRatingsDistributorDF = avgRatingsDF.groupby("Distributor") \
    .agg(
        F.avg("avgRating").alias("avgRatingDistributor"),
        F.count("Title").alias("moviesCount")
        )

bestDistributorsRatingDF = avgRatingsDistributorDF.orderBy(F.desc("avgRatingDistributor"))

bestDistributorsRatingDF.show()

+--------------------+--------------------+-----------+
|         Distributor|avgRatingDistributor|moviesCount|
+--------------------+--------------------+-----------+
|Walt Disney Studi...|   81.93333333333334|         20|
|  Paramount Pictures|   78.83333333333334|          4|
|       Sony Pictures|    72.3076923076923|         13|
|    20th Century Fox|   69.15555555555555|         15|
|  Universal Pictures|  59.555555555555564|          3|
|     New Line Cinema|   59.22222222222222|          3|
|     Lionsgate Films|  46.333333333333336|          1|
|20th Century Studios|                NULL|          1|
+--------------------+--------------------+-----------+



`20th Century Studios` having `NULL` as their rating can be explained by the fact that their only movie, `'The New Mutants'` does not exist in the reviews dataset.

### Ranking distributors based on their revenue-budget ratios

We calculate the ratio of revenue to budget per movie. \
We then group on distributors, while calculating their average ratios and counting the amount of movies they made. \
Lastly, we sort the distributors in descending order to easily gain insight into top performers.

We solve for: Give the name and average profitability ratio of the distributor that makes the most profitable Marvel movies

In [6]:
ratioDF = totalDF.withColumn("ratio", totalDF["Worldwide"] / totalDF["Budget"])

avgRatioDF = ratioDF.groupBy("Distributor") \
    .agg(
        F.avg("ratio").alias("avgRatio"),
        F.count("Title").alias("movieCount")
    )

bestDistributorsRatioDF = avgRatioDF.orderBy(F.desc("avgRatio"))

bestDistributorsRatioDF.show()

+--------------------+------------------+----------+
|         Distributor|          avgRatio|movieCount|
+--------------------+------------------+----------+
|       Sony Pictures|4.8033323893827164|        13|
|Walt Disney Studi...|  4.80195452026283|        20|
|    20th Century Fox| 3.932785455220428|        15|
|  Paramount Pictures| 3.235480067559524|         4|
|     New Line Cinema|2.5896350602089266|         3|
|  Universal Pictures| 1.524385667003792|         3|
|20th Century Studios|0.7264935223880598|         1|
|     Lionsgate Films|0.2885724571428571|         1|
+--------------------+------------------+----------+



`20th Century Studios` and `Lionsgate` only made 1 marvel movie, which have both not been very profitable. Perhaps this metric is not a good representation of their skills. \
`Sony Pictures` and `Walt Disney Studios` show relatively very high profitability, also with a larger sample size.
It is seen that distributors with more movies have higher profitability. This could be attributed to either:
- Them having made 1 or 2 extremely well-performing blockbusters, driving up their ratio.
- Marvel studios already realizing they make profitable movies in the past, thus giving them more opportunities to make movies.

# Save Processed data in BigQuery

In [8]:
# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "marvel_temp_onno"  # use your bucket 
spark.conf.set('temporaryGcsBucket', bucket)
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
# Saving the data to BigQuery
bestDistributorsRatingDF.write.format('bigquery').option('table', 'gothic-talent-435511-s2.marvel.processed_data').mode("append").save() # Change the project name before marvel to yours! You can find this when you click on DE2024
bestDistributorsRatioDF.write.format('bigquery').option('table', 'gothic-talent-435511-s2.marvel.processed_data').mode("append").save() # Change the project name before marvel to yours! You can find this when you click on DE2024

Py4JJavaError: An error occurred while calling o179.save.
: com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Failed to write to BigQuery
	at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:111)
	at com.google.cloud.spark.bigquery.write.BigQueryDeprecatedIndirectInsertableRelation.insert(BigQueryDeprecatedIndirectInsertableRelation.java:43)
	at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:51)
	at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:107)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Provided Schema does not match Table gothic-talent-435511-s2:marvel.processed_data. Field moviesCount is missing in new schema
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.reload(Job.java:424)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitFor(Job.java:252)
	at com.google.cloud.bigquery.connector.common.BigQueryClient.createAndWaitFor(BigQueryClient.java:385)
	at com.google.cloud.bigquery.connector.common.BigQueryClient.createAndWaitFor(BigQueryClient.java:375)
	at com.google.cloud.bigquery.connector.common.BigQueryClient.loadDataIntoTable(BigQueryClient.java:646)
	at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.loadDataToBigQuery(BigQueryWriteHelper.java:139)
	at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:108)
	... 44 more


In [9]:
spark.stop()