In [1]:
# Install java 8
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Apache Spark binary: This link can change based on the version. Update this link with the latest version before using
!wget -q https://downloads.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

# Unzip file
!tar -xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q pyspark==3.4.1

# Add environmental variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Waiting for headers] [10% [Connecting to archive.ubuntu.com (185.125.190.39)] [Waiting for headers] [C                                                                               Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:6 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Get:7 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [41.5 kB]
Get:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [416 kB]
Get:9 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [108 kB]
Hit

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Mario Galaxy 2 Project ETL") \
    .getOrCreate()
    #.config("spark.some.config.option", "some-value") \

In [3]:
sc = spark.sparkContext
sc

In [4]:
stars_df = spark.read.csv("stars_table.csv", inferSchema = False, header = True)
traps_df = spark.read.csv("traps_table.csv", inferSchema = False, header = True)
trap_dets = spark.read.csv("trap_details_table.csv", inferSchema = False, header = True)
bosses_df = spark.read.csv("bosses_table.csv", inferSchema = False, header = True)
enemies_df = spark.read.csv("enemies_table.csv", inferSchema = False, header = True)
enemy_dets = spark.read.csv("enemy_details_table.csv", inferSchema = False, header = True)
power_df = spark.read.csv("powerups_table.csv", inferSchema = False, header = True)
power_dets = spark.read.csv("powerup_details_table.csv", inferSchema = False, header = True)

In [5]:
stars_df.show(10)

+----+--------------------+----------+---------+------+---------+-----+----+-----+----+--------------------+-----------+--------------------+--------------+-----------+-----------+
|  id|         galaxy_name|star_label|prankster|deaths|star_bits|coins|hits|hits2|boss|         new_enemies|comet_medal|completion_timestamp|major_sections|lives_found| difficulty|
+----+--------------------+----------+---------+------+---------+-----+----+-----+----+--------------------+-----------+--------------------+--------------+-----------+-----------+
|1000|  Sky Station Galaxy|         1|        N|     0|      245|   63|   1|    0|   Y|Small Goomba, Goo...|          Y| 2022-08-22 10:40:33|             5|          5|       Easy|
|1001|   Yoshi Star Galaxy|         1|        N|     0|      224|   30|   0|    0|   N|Piranha Plant, Ma...|          Y| 2022-08-28 21:03:10|             4|          1|       Easy|
|1002|   Yoshi Star Galaxy|         2|        N|     0|      146|   51|   0|    1|   Y|Spiny, L

In [7]:
from pyspark.sql import functions as f

# Take the average of the two hit columns
final_df_last = stars_df.filter(stars_df.id == 1123).withColumn('avg_hits', ((f.col('hits') + f.col('hits2')) / 2)+ 2) # separate the last star and add 2 to the avg hits because hits2 was null
final_df = stars_df.filter(stars_df.id != 1123).withColumn('avg_hits', (f.col('hits') + f.col('hits2')) / 2) # take the average of hits and hits2 for all other stars/rows
final_df = final_df.unionByName(final_df_last) # Join the last row to the other rows again
final_df = final_df.drop('hits', 'hits2') # Drop the hits columns because we only need the average

In [8]:
# Convert the boss and prankster columns to numerical using a Pipeline and StringIndexer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

ind = [StringIndexer(inputCol = 'boss', outputCol = 'bossIndex'), StringIndexer(inputCol = 'prankster', outputCol = 'prankIndex')]
pipeline = Pipeline(stages = ind)
final_df = pipeline.fit(final_df).transform(final_df)
final_df.show()

+----+--------------------+----------+---------+------+---------+-----+----+--------------------+-----------+--------------------+--------------+-----------+-----------+--------+---------+----------+
|  id|         galaxy_name|star_label|prankster|deaths|star_bits|coins|boss|         new_enemies|comet_medal|completion_timestamp|major_sections|lives_found| difficulty|avg_hits|bossIndex|prankIndex|
+----+--------------------+----------+---------+------+---------+-----+----+--------------------+-----------+--------------------+--------------+-----------+-----------+--------+---------+----------+
|1000|  Sky Station Galaxy|         1|        N|     0|      245|   63|   Y|Small Goomba, Goo...|          Y| 2022-08-22 10:40:33|             5|          5|       Easy|     0.5|      1.0|       0.0|
|1001|   Yoshi Star Galaxy|         1|        N|     0|      224|   30|   N|Piranha Plant, Ma...|          Y| 2022-08-28 21:03:10|             4|          1|       Easy|     0.0|      0.0|       0.0|


In [9]:
# Deal with the duplicate entries for the final bowser star
final_bowser_df = final_df.filter((final_df.galaxy_name == "Bowser's Galaxy Generator") & (final_df.prankster == 'N')).toPandas() # Filter out the final bowser stars into a pandas df
final_df = final_df[~final_df['galaxy_name'].endswith('Generator')] # Remove the duplicated final bowser star from the df after filtering it into a separate one

final_bowser_df['id'] = 1070 # Define what the final bowser row's values will be
final_bowser_df['star_label'] = 'GS6'
final_bowser_df['star_bits'] = 72
final_bowser_df['coins'] = 22
final_bowser_df['new_enemies'] = 'Bowser III'
final_bowser_df['comet_medal'] = 'Y'
final_bowser_df['difficulty'] = 'Harder' # First iteration of the final bowser star was Harder, the second Expert
final_bowser_df['avg_hits'] = 2

final_bowser_df.drop(final_bowser_df.tail(1).index,inplace = True) # Extract the first row in the df
final_bowser = spark.createDataFrame(final_bowser_df.astype(str)) # Convert the new final bowser star row into a Spark DF
final_df = final_df.unionByName(final_bowser) # Add the final bowser star to the original df

final_bowser_row = final_df.filter(final_df.galaxy_name == "Bowser's Galaxy Generator") # Show that there is only one row now for the final bowser star
final_bowser_row.show()

+----+--------------------+----------+---------+------+---------+-----+----+-----------+-----------+--------------------+--------------+-----------+----------+--------+---------+----------+
|  id|         galaxy_name|star_label|prankster|deaths|star_bits|coins|boss|new_enemies|comet_medal|completion_timestamp|major_sections|lives_found|difficulty|avg_hits|bossIndex|prankIndex|
+----+--------------------+----------+---------+------+---------+-----+----+-----------+-----------+--------------------+--------------+-----------+----------+--------+---------+----------+
|1070|Bowser's Galaxy G...|       GS6|        N|     0|       72|   22|   Y| Bowser III|          Y| 2022-10-23 09:08:00|             6|          2|    Harder|       2|      1.0|       0.0|
+----+--------------------+----------+---------+------+---------+-----+----+-----------+-----------+--------------------+--------------+-----------+----------+--------+---------+----------+



In [22]:
# Using the enemies, bosses, and enemy details dataframes convert the enemies data into numerical predictors for difficulty and add to final df

# Old, Deprecated way of using pyspark.sql to use SQL to manipulate Spark DFs
"""from pyspark.sql import HiveContext

sql = HiveContext(sc)
sql.registerDataFrameAsTable(stars_df, 'stars')
sql.sql("SELECT * FROM stars").show()
"""
# Modern, easier way using SparkSession
final_df.createOrReplaceTempView("stars") # Create a temporary table out of our created dataframe
#spark.sql("Select * from stars").show() # Use our Spark Session and the sql method to run a SQL query on the temporary table

# Run a SQL query to combine enemy, bosses, enemy_details, and stars tables
enemies_df.createOrReplaceTempView("enemies")
enemy_dets.createOrReplaceTempView("enemy_details")
bosses_df.createOrReplaceTempView("bosses")
#spark.sql('SELECT * FROM enemies WHERE is_boss = "Y"').show()
# we can save the results of a SparkSQL query as a Spark DF (enemy details DF to be exact)
basic_boss_df = bosses_df.select(['star_id', 'name', 'b_rank']) # Save bosses, their rank, and the stars they are found in in a separate df
basic_boss_df = basic_boss_df.withColumnRenamed('star_id', 'id')
#basic_boss_df.show()
# create a dataframe that combines stars, enemy, and enemy details tables together so we can group stars and enemy ranks found in the stars
star_enemy_df = spark.sql("SELECT s.id, s.boss, e.eid, e.is_boss, e.name, e.e_rank FROM stars s LEFT JOIN enemy_details ed ON s.id = ed.star_id LEFT JOIN enemies e ON ed.e_id = e.eid")
star_enemy_df = star_enemy_df.withColumn('e_rank', f.col('e_rank').cast('integer'))
star_avg_rank = star_enemy_df.groupby('id').agg({'eid':'count', 'e_rank':'avg'}) # Take the average rank and count of how many enemies are found in stars
star_avg_rank = star_avg_rank.withColumn('e_rank_avg', f.round(f.col('avg(e_rank)'))) # Round the average rank to the highest integer

#star_avg_rank.show()

plus_bosses = star_avg_rank.join(basic_boss_df, on = 'id', how = 'left_outer') # Add bosses to the star, enemy rank df
#plus_bosses.show()

plus_bosses = plus_bosses.withColumn('total_rank', f.col('avg(e_rank)') * f.col('count(eid)')) # Find the new enemy average by incorporating the boss found in stars
plus_bosses = plus_bosses.withColumn('rank_with_boss', f.round((f.col('total_rank') + (f.col('b_rank') * 0.5)) / (f.col('count(eid)') + 1)))
plus_bosses = plus_bosses.withColumn('final_e_rank', f.when(plus_bosses.rank_with_boss.isNotNull(), plus_bosses.rank_with_boss).otherwise(plus_bosses.e_rank_avg))
# Record the final average rank by returning the non-boss average when there is not a boss in the star and the boss average when there is a boss
plus_bosses = plus_bosses.select(['id', 'final_e_rank'])

star_enemy_ranks = plus_bosses.groupby('id').avg('final_e_rank') # Save the final enemy average rank and the star id in a separate df
star_enemy_ranks = star_enemy_ranks.na.fill(100.0) # Fill the avg rank as 100 for stars with no enemies
#star_enemy_ranks.sort('avg(final_e_rank)', ascending = False).show()

final_df = final_df.join(star_enemy_ranks, ['id'])
final_df.show()

+----+--------------------+----------+---------+------+---------+-----+----+--------------------+-----------+--------------------+--------------+-----------+-----------+--------+---------+----------+-----------------+
|  id|         galaxy_name|star_label|prankster|deaths|star_bits|coins|boss|         new_enemies|comet_medal|completion_timestamp|major_sections|lives_found| difficulty|avg_hits|bossIndex|prankIndex|avg(final_e_rank)|
+----+--------------------+----------+---------+------+---------+-----+----+--------------------+-----------+--------------------+--------------+-----------+-----------+--------+---------+----------+-----------------+
|1090|Stone Cyclone Galaxy|         1|        N|     0|       82|    9|   N|             Tox Box|          Y| 2022-11-09 22:18:42|             2|          2|     Harder|     0.0|      0.0|       0.0|             29.0|
|1043|   Shiverburn Galaxy|         1|        N|     0|      240|   11|   Y|Ring Beam, Ota Ro...|          Y| 2022-10-15 22:43:0

In [23]:
# Do the same thing was traps as with enemies

traps_df.createOrReplaceTempView("traps")
trap_dets.createOrReplaceTempView("trap_details")

# create a dataframe that combines stars, trap, and trap details tables together so we can group stars and trap ranks found in the stars
star_trap_df = spark.sql("SELECT s.id, t.trap_id, t.name, t.t_rank FROM stars s LEFT JOIN trap_details td ON s.id = td.star_id LEFT JOIN traps t ON td.trap_id = t.trap_id")
star_trap_df = star_trap_df.withColumn('t_rank', f.col('t_rank').cast('integer'))

star_tavg_rank = star_trap_df.groupby('id').avg('t_rank') # Take the average rank and count of how many enemies are found in stars
star_tavg_rank = star_tavg_rank.withColumn('t_rank_avg', f.round(f.col('avg(t_rank)'))) # Round the average rank to the highest integer

star_trap_ranks = star_tavg_rank.select(['id', 't_rank_avg']) # Save the final enemy average rank and the star id in a separate df
star_trap_ranks = star_trap_ranks.na.fill(50.0) # Fill the null avg_rank with 50 for stars with no traps
#star_trap_ranks.sort('t_rank_avg', ascending = False).show()

final_df = final_df.join(star_trap_ranks, ['id'])
final_df.show()

+----+--------------------+----------+---------+------+---------+-----+----+--------------------+-----------+--------------------+--------------+-----------+-----------+--------+---------+----------+-----------------+----------+
|  id|         galaxy_name|star_label|prankster|deaths|star_bits|coins|boss|         new_enemies|comet_medal|completion_timestamp|major_sections|lives_found| difficulty|avg_hits|bossIndex|prankIndex|avg(final_e_rank)|t_rank_avg|
+----+--------------------+----------+---------+------+---------+-----+----+--------------------+-----------+--------------------+--------------+-----------+-----------+--------+---------+----------+-----------------+----------+
|1090|Stone Cyclone Galaxy|         1|        N|     0|       82|    9|   N|             Tox Box|          Y| 2022-11-09 22:18:42|             2|          2|     Harder|     0.0|      0.0|       0.0|             29.0|       9.0|
|1043|   Shiverburn Galaxy|         1|        N|     0|      240|   11|   Y|Ring Bea

In [24]:
# Do the same thing was powerups as with enemies, traps
power_df.createOrReplaceTempView("powerups")
power_dets.createOrReplaceTempView("powerup_details")

# create a dataframe that combines stars, trap, and trap details tables together so we can group stars and trap ranks found in the stars
star_power_df = spark.sql("SELECT s.id, p.power_id, p.name, p.p_rank FROM stars s LEFT JOIN powerup_details pd ON s.id = pd.star_id LEFT JOIN powerups p ON pd.p_id = p.power_id")
star_power_df = star_power_df.withColumn('p_rank', f.col('p_rank').cast('integer'))

star_pavg_rank = star_power_df.groupby('id').avg('p_rank') # Take the average rank and count of how many enemies are found in stars
star_pavg_rank = star_pavg_rank.withColumn('p_rank_avg', f.round(f.col('avg(p_rank)'))) # Round the average rank to the highest integer

star_powers_ranks = star_pavg_rank.select(['id', 'p_rank_avg']) # Save the final enemy average rank and the star id in a separate df
star_powers_ranks = star_powers_ranks.na.fill(20.0) # Fills the null values in the df with 20 as the average ranking for stars with no powerups
# df.na returns null values in the dataframe and then we can apply methods to deal with them
#star_powers_ranks.sort('p_rank_avg', ascending = False).show()

final_df = final_df.join(star_powers_ranks, ['id'])
final_df.show()

+----+--------------------+----------+---------+------+---------+-----+----+--------------------+-----------+--------------------+--------------+-----------+-----------+--------+---------+----------+-----------------+----------+----------+
|  id|         galaxy_name|star_label|prankster|deaths|star_bits|coins|boss|         new_enemies|comet_medal|completion_timestamp|major_sections|lives_found| difficulty|avg_hits|bossIndex|prankIndex|avg(final_e_rank)|t_rank_avg|p_rank_avg|
+----+--------------------+----------+---------+------+---------+-----+----+--------------------+-----------+--------------------+--------------+-----------+-----------+--------+---------+----------+-----------------+----------+----------+
|1090|Stone Cyclone Galaxy|         1|        N|     0|       82|    9|   N|             Tox Box|          Y| 2022-11-09 22:18:42|             2|          2|     Harder|     0.0|      0.0|       0.0|             29.0|       9.0|      20.0|
|1043|   Shiverburn Galaxy|         1|  

In [32]:
final_df = final_df.withColumnRenamed('avg(final_e_rank)', 'avg_enemy_rank')
final_df.groupby('difficulty').avg('avg_enemy_rank').sort('avg(avg_enemy_rank)').show() # Definite trend in enemy rankings and difficulty

+-----------+-------------------+
| difficulty|avg(avg_enemy_rank)|
+-----------+-------------------+
|     Harder|              27.55|
|     Insane|               30.0|
|     Expert|             44.375|
|       Hard|  45.80952380952381|
|     Medium| 51.111111111111114|
|Medium-Hard|           51.65625|
|       Easy| 55.166666666666664|
|Easy-Medium| 59.416666666666664|
+-----------+-------------------+



In [33]:
final_df.columns

['id',
 'galaxy_name',
 'star_label',
 'prankster',
 'deaths',
 'star_bits',
 'coins',
 'boss',
 'new_enemies',
 'comet_medal',
 'completion_timestamp',
 'major_sections',
 'lives_found',
 'difficulty',
 'avg_hits',
 'bossIndex',
 'prankIndex',
 'avg_enemy_rank',
 't_rank_avg',
 'p_rank_avg']

In [35]:
dw_stars_df = final_df.select(['id', 'star_bits', 'coins', 'avg_hits', 'deaths', 'major_sections', 'lives_found', 'bossIndex', 'prankIndex', 'avg_enemy_rank', 't_rank_avg', 'p_rank_avg'])
print(dw_stars_df.count()) # Show how many records are in the final df for the model
print(dw_stars_df.na.drop().count()) # Drop NAs and show how many records are the df now (should be the same number)

121
121
