In [None]:
# Spark Joins
# Build a Spark job that
# - Disabled automataic broadcast join
# - Explicit broadcast join
# - Bucket join

In [None]:
# Import Libraries
from pyspark.sql import SparkSession
from pyspark import SparkConf


In [None]:
# Create Spark Session
spark = SparkSession.builder.appName('sparkjoins').getOrCreate()


In [5]:
# Disable broadcast Join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [6]:
# Read csv files into pyspark dataframes

# Read matches.csv
# a row for every match

matches = spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("/home/iceberg/data/matches.csv")
# Read match_details.csv
# a row for every players' performance in a match

matchDetails =  spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("/home/iceberg/data/match_details.csv")
# Read medals_matches_players
# a row for every medal type a player gets in a match

medalsMatchesPlayers =  spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("/home/iceberg/data/medals_matches_players.csv")
# Read medals
# a row for every medal type

medals =  spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("/home/iceberg/data/medals.csv")
# Read maps
# a row for every map type

maps =  spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("/home/iceberg/data/maps.csv")

                                                                                

In [12]:
# Rename columns

medalsMatchesPlayers = medalsMatchesPlayers.withColumnRenamed("count","medal_count")

maps = maps.withColumnRenamed("name","map_name")

medals = medals.withColumnRenamed("classification", "medal_class") \
        .withColumnRenamed("description","medal_desc") \
        .withColumnRenamed("difficulty","medal_difficulty") \
        .withColumnRenamed("name", "medal_name")

In [77]:
maps.show(5)

+--------------------+-------------------+--------------------+
|               mapid|           map_name|         description|
+--------------------+-------------------+--------------------+
|c93d708f-f206-11e...|              Urban|Andesia was the c...|
|cb251c51-f206-11e...|     Raid on Apex 7|This unbroken rin...|
|c854e54f-f206-11e...|March on Stormbreak|                NULL|
|c8d69870-f206-11e...| Escape from A.R.C.|Scientists flocke...|
|73ed1fd0-45e5-4bb...|             Osiris|                NULL|
+--------------------+-------------------+--------------------+
only showing top 5 rows



In [13]:
# import broadcast
from pyspark.sql.functions import expr, col, broadcast, split



In [21]:
# Explicitly broadcast join maps to matches
# Select columns from matches
# Select columns from maps
# Join on mapid

matchesMaps = (
    matches
    .select('match_id', 'is_team_game', 'playlist_id', 'completion_date', 'mapid')  \
    .join(broadcast(maps).select('mapid', 'map_name'), on='mapid', how='left_outer')  \
    .select('match_id', 'is_team_game', 'playlist_id', 'completion_date', 'mapid', 'map_name')  \
)

In [24]:
# Explicitly broadcast joins medals to medalMatchesPlayers
# Select columns from medalsMatchesPlayers
# Select columns from medals
# Join on medal_id

mmpMedals = (
    medalsMatchesPlayers
    .select('match_id', 'player_gamertag','medal_id','medal_count')    
    .join(broadcast(medals).select('medal_id','medal_name'), \
          on='medal_id' , how='left_outer') \
    .select('match_id', 'player_gamertag','medal_id','medal_name', 'medal_count') 
    )

In [25]:
#  Create a version of matchDetails with fewer columns.  

matchDetailsMini = matchDetails.select("match_id","player_gamertag", "player_total_kills", "player_total_deaths")                

In [None]:
# Bucket join matches, match_details, and medals_matches_players on match_id with 16 buckets

In [50]:
# Create table for matches

matchesMaps.write.format("iceberg").mode("overwrite").bucketBy(16,"match_id").saveAsTable("sparkjoins.matchesMaps_bucketed")

In [51]:
spark.sql(""" select count(*) from sparkjoins.matchesMaps_bucketed """).show()

+--------+
|count(1)|
+--------+
|   24025|
+--------+



In [52]:
# Create table with 16 buckets in iceberg format for medals_matches_players

mmpMedals.write.format("iceberg").mode("overwrite").bucketBy(16,"match_id").saveAsTable("sparkjoins.medals_bucketed")

                                                                                

In [54]:
spark.sql(""" select count(*) from sparkjoins.medals_bucketed """).show()

+--------+
|count(1)|
+--------+
|  755229|
+--------+



In [56]:
# Create table with 16 bucketes in iceberg format for match_details

matchDetailsMini.write.format("iceberg").mode("overwrite").bucketBy(16,"match_id").saveAsTable("sparkjoins.matchDetails_bucketed")

In [57]:
spark.sql(""" select count(*) from sparkjoins.matchDetails_bucketed """).show()

+--------+
|count(1)|
+--------+
|  151761|
+--------+



In [58]:
# Set the following to ensure no shuffling
spark.conf.set('spark.sql.sources.v2.bucketing.enabled','true')
spark.conf.set('spark.sql.sources.v2.bucketing.pushPartValues.enabled','true')
spark.conf.set('spark.sql.iceberg.planning.preserve-data-grouping','true')
spark.conf.set('spark.sql.requireAllClusterKeysForCoPartition','false')

In [59]:
# Bucket join bootcamp.matchesMaps_bucketed with bootcamp.matchDetails_bucketed
# Select all column from bootcamp.matchesMaps_bucketed
# Select three columns from bootcamp.matchDetails_bucketed
# Join on match_id

matchesJoinDetails = spark.sql("""  
select
mm.*, 
md.player_gamertag, 
md.player_total_kills, 
md.player_total_deaths 
from sparkjoins.matchesMaps_bucketed mm
join sparkjoins.matchDetails_bucketed md 
on mm.match_id = md.match_id 
 """)

In [60]:
# Create table with 16 buckets in iceberg format for matchesJoinDetails

matchesJoinDetails.write.format("iceberg").mode("overwrite").bucketBy(16,"match_id").saveAsTable("sparkjoins.matches_n_Details_bucketed")

                                                                                

In [61]:
spark.sql(""" select count(*) from sparkjoins.matches_n_Details_bucketed """).show()

+--------+
|count(1)|
+--------+
|  151761|
+--------+



In [62]:
# Bucket Join bootcamp.matches_n_Details_bucketed with bootcamp.medals_bucketed
# Select all columns from bootcamp.matches_n_Details_bucketed
# Select three columns from bootcamp.medals_bucketed
# Join on match_id and player_gamertag

gamingDetails1 = spark.sql(""" 
select 
md.*, 
m.medal_id, 
m.medal_name,
m.medal_count
from sparkjoins.matches_n_Details_bucketed md
left join sparkjoins.medals_bucketed m 
on md.match_id = m.match_id
and md.player_gamertag = m.player_gamertag 

 """)

In [63]:
# Create table with 16  buckets in iceberg format for gamingDetails1

gamingDetails1.write.format("iceberg").mode("overwrite").bucketBy(16,"match_id").saveAsTable("sparkjoins.gamingInfo")

                                                                                

In [64]:
spark.sql(""" select count(*) from sparkjoins.gamingInfo """).show()

+--------+
|count(1)|
+--------+
|  757943|
+--------+



In [66]:
# Joined gaming datasets table: bootcamp.gamingInfo
# Summarize game details
# Display one summary record for each game played by a gamer with total_kills, total_deaths, total_medals
# Eliminate repeating game details

spark.sql("""  
create table sparkjoins.agg_gamer_games using iceberg partitioned by (bucket(16, match_id)) as

select
match_id,
playlist_id,
map_name,
player_gamertag,
player_total_kills,
player_total_deaths,
coalesce(sum(medal_count),0) as total_medals
from sparkjoins.gamingInfo
group by match_id, playlist_id, map_name,player_gamertag, player_total_kills, player_total_deaths
order by player_gamertag, match_id
""")

                                                                                

DataFrame[]

In [67]:
spark.sql(""" select count(*) from sparkjoins.agg_gamer_games""").show()

+--------+
|count(1)|
+--------+
|  151761|
+--------+



In [69]:
spark.sql(""" select * from sparkjoins.agg_gamer_games limit 5 """).show()

+--------------------+--------------------+--------------+---------------+------------------+-------------------+------------+
|            match_id|         playlist_id|      map_name|player_gamertag|player_total_kills|player_total_deaths|total_medals|
+--------------------+--------------------+--------------+---------------+------------------+-------------------+------------+
|7f43c723-9971-4d2...|f72e0ef0-7c4a-430...|Breakout Arena|   A 2tha nimal|                 4|                  6|           3|
|17b357b4-3e1b-4e3...|f72e0ef0-7c4a-430...|Breakout Arena|       A BIG mC|                 5|                  6|           6|
|d97b0507-1d96-4bd...|f72e0ef0-7c4a-430...|Breakout Arena|A BRIGHT SHADOW|                 5|                  7|           7|
|fee605e5-4aa2-412...|2323b76a-db98-4e0...|      Riptide |A BacKWaRdsManN|                 6|                 13|           4|
|34ad3973-769e-433...|f72e0ef0-7c4a-430...|Breakout Arena| A CAT WHO DABS|                 4|                  

In [70]:
# Q1. Which player averages the most kills per game?

spark.sql("""
select player_gamertag, avg(player_total_kills) as avg_kills_per_game
from sparkjoins.agg_gamer_games
group by player_gamertag
order by avg_kills_per_game desc
limit 1
""").show()

+---------------+------------------+
|player_gamertag|avg_kills_per_game|
+---------------+------------------+
|   gimpinator14|             109.0|
+---------------+------------------+



In [71]:
# Q2. Which playlist gets played the most?

spark.sql("""
select playlist_id, count(*) as amt_played
from sparkjoins.agg_gamer_games
group by playlist_id
order by amt_played desc
limit 1
""").show(truncate=False)

+------------------------------------+----------+
|playlist_id                         |amt_played|
+------------------------------------+----------+
|f72e0ef0-7c4a-4307-af78-8e38dac3fdba|58868     |
+------------------------------------+----------+



In [72]:
# Q3. Which map get played the most?

spark.sql("""
select map_name, count(*) as amt_played
from sparkjoins.agg_gamer_games
group by map_name
order by amt_played desc
limit 1
""").show(truncate=False)

+--------------+----------+
|map_name      |amt_played|
+--------------+----------+
|Breakout Arena|54001     |
+--------------+----------+



In [74]:
# For each game played by a gamer list the medals earned along with match_id, playlist_id, and map_name
# sum medal count
# Remove players who have not earned medals

spark.sql("""
create table sparkjoins.agg_gamer_medals using iceberg partitioned by (bucket(16, match_id)) as
 
select
match_id,
playlist_id,
map_name,
player_gamertag,
medal_name,
coalesce(sum(medal_count),0) as medal_count
from sparkjoins.gamingInfo
where medal_id is not null
group by match_id, playlist_id, map_name,player_gamertag, medal_name
order by player_gamertag, medal_name

""")

                                                                                

DataFrame[]

In [75]:
# Q4. Which map do players get the most Killing Spree medals on ?

spark.sql("""
select map_name, medal_name, count(*) as num_medals
from sparkjoins.agg_gamer_medals
where medal_name = 'Killing Spree'
group by map_name, medal_name
order by num_medals desc
limit 1
""").show()

+--------------+-------------+----------+
|      map_name|   medal_name|num_medals|
+--------------+-------------+----------+
|Breakout Arena|Killing Spree|      6553|
+--------------+-------------+----------+

