In [1]:
spark
import org.apache.spark.sql.SparkSession 

val sparkSession = SparkSession.builder.appName("Juptyer").getOrCreate()

import org.apache.spark.sql.functions.{broadcast, split, lit}

Intitializing Scala interpreter ...

Spark Web UI available at http://6f8bb373fc89:4042
SparkContext available as 'sc' (version = 3.5.5, master = local[*], app id = local-1752420759823)
SparkSession available as 'spark'


       spark
       ^
import org.apache.spark.sql.SparkSession
sparkSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@24cf1a91
import org.apache.spark.sql.functions.{broadcast, split, lit}


In [3]:
// Disabled automatic broadcast join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

In [5]:
// read data from csv into dataframe

val match_details = spark.read.option("header", "true")
                        .option("inferSchema", "true") 
                        .csv("/home/iceberg/data/match_details.csv")

val matches = spark.read.option("header", "true") 
                        .option("inferSchema", "true") 
                        .csv("/home/iceberg/data/matches.csv")

val medals_matches_players = spark.read.option("header", "true") 
                        .option("inferSchema", "true") 
                        .csv("/home/iceberg/data/medals_matches_players.csv")

val medals = spark.read.option("header", "true") 
                        .option("inferSchema", "true") 
                        .csv("/home/iceberg/data/medals.csv")

val maps = spark.read.option("header", "true") 
                        .option("inferSchema", "true") 
                        .csv("/home/iceberg/data/maps.csv")

match_details: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 34 more fields]
matches: org.apache.spark.sql.DataFrame = [match_id: string, mapid: string ... 8 more fields]
medals_matches_players: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 2 more fields]
medals: org.apache.spark.sql.DataFrame = [medal_id: bigint, sprite_uri: string ... 10 more fields]
maps: org.apache.spark.sql.DataFrame = [mapid: string, name: string ... 1 more field]


In [6]:
// Explicitly broadcast JOINs medals and maps
// medals
val medals_matches_players_joined_medals = medals_matches_players.as("mmp")
    .join(broadcast(medals).as("me"), $"mmp.medal_id" === $"me.medal_id")
    .select($"mmp.match_id", $"mmp.player_gamertag", $"mmp.count", $"me.*")

medals_matches_players_joined_medals.show(1)

// maps
val matches_joined_maps = matches.as("m")
    .join(broadcast(maps).as("mp"), $"m.mapid" === $"mp.mapid")
    .select($"m.*", $"mp.name".alias("map_name"), $"mp.description".alias("map_description"))

matches_joined_maps.show(1)

+--------------------+---------------+-----+----------+--------------------+-----------+----------+------------------+-------------------+------------+-------------+-----------------+--------------------+--------+----------+
|            match_id|player_gamertag|count|  medal_id|          sprite_uri|sprite_left|sprite_top|sprite_sheet_width|sprite_sheet_height|sprite_width|sprite_height|   classification|         description|    name|difficulty|
+--------------------+---------------+-----+----------+--------------------+-----------+----------+------------------+-------------------+------------+-------------+-----------------+--------------------+--------+----------+
|009fdac5-e15c-47c...|       EcZachly|    7|3261908037|https://content.h...|        375|       525|                74|                 74|        1125|          899|WeaponProficiency|Kill an opponent ...|Headshot|        60|
+--------------------+---------------+-----+----------+--------------------+-----------+----------+-

medals_matches_players_joined_medals: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 13 more fields]
matches_joined_maps: org.apache.spark.sql.DataFrame = [match_id: string, mapid: string ... 10 more fields]


In [7]:
// match_details

match_details.select(
  $"match_id",
  $"player_gamertag",
  $"previous_spartan_rank",
  $"spartan_rank",
  $"previous_total_xp",
  $"total_xp",
  $"previous_csr_tier",
  $"previous_csr_designation",
  $"previous_csr",
  $"previous_csr_percent_to_next_tier",
  $"previous_csr_rank",
  $"current_csr_tier",
  $"current_csr_designation",
  $"current_csr",
  $"current_csr_percent_to_next_tier",
  $"current_csr_rank",
  $"player_rank_on_team",
  $"player_finished",
  $"player_average_life",
  $"player_total_kills",
  $"player_total_headshots",
  $"player_total_weapon_damage",
  $"player_total_shots_landed",
  $"player_total_melee_kills",
  $"player_total_melee_damage",
  $"player_total_assassinations",
  $"player_total_ground_pound_kills",
  $"player_total_shoulder_bash_kills",
  $"player_total_grenade_damage",
  $"player_total_power_weapon_damage",
  $"player_total_power_weapon_grabs",
  $"player_total_deaths",
  $"player_total_assists",
  $"player_total_grenade_kills",
  $"did_win",
  $"team_id"
).write
  .mode("append")
  .bucketBy(16, "match_id")
  .saveAsTable("bootcamp.match_details_bucketed")

In [8]:
// matches

matches_joined_maps.select(
  $"match_id",
  $"mapid",
  $"is_team_game",
  $"playlist_id",
  $"game_variant_id",
  $"is_match_over",
  $"completion_date",
  $"match_duration",
  $"game_mode",
  $"map_variant_id",
  $"map_name",
  $"map_description"
).write
  .mode("append")
  .bucketBy(16, "match_id")
  .saveAsTable("bootcamp.matches_bucketed")

In [9]:
// medals_matches_players

medals_matches_players_joined_medals.select(
  $"match_id",
  $"player_gamertag",
  $"count",
  $"medal_id",
  $"sprite_uri",
  $"sprite_left",
  $"sprite_top",
  $"sprite_sheet_width",
  $"sprite_sheet_height",
  $"sprite_width",
  $"sprite_height",
  $"classification",
  $"description",
  $"name",
  $"difficulty"
).write
  .mode("append")
  .bucketBy(16, "match_id")
  .saveAsTable("bootcamp.medals_matches_players_bucketed")

In [11]:
// Bucket join match_details, matches, and medal_matches_players on match_id with 16 buckets

spark.sql("""
    SELECT * 
    FROM bootcamp.match_details_bucketed mdb 
    JOIN bootcamp.matches_bucketed mb ON mdb.match_id = mb.match_id
    JOIN bootcamp.medal_matches_players_bucketed mmp ON mdb.match_id = mmp.match_id       
""").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [match_id#905], [match_id#953], Inner
   :- SortMergeJoin [match_id#905], [match_id#941], Inner
   :  :- Sort [match_id#905 ASC NULLS FIRST], false, 0
   :  :  +- Exchange hashpartitioning(match_id#905, 200), ENSURE_REQUIREMENTS, [plan_id=541]
   :  :     +- BatchScan demo.bootcamp.match_details_bucketed[match_id#905, player_gamertag#906, previous_spartan_rank#907, spartan_rank#908, previous_total_xp#909, total_xp#910, previous_csr_tier#911, previous_csr_designation#912, previous_csr#913, previous_csr_percent_to_next_tier#914, previous_csr_rank#915, current_csr_tier#916, current_csr_designation#917, current_csr#918, current_csr_percent_to_next_tier#919, current_csr_rank#920, player_rank_on_team#921, player_finished#922, player_average_life#923, player_total_kills#924, player_total_headshots#925, player_total_weapon_damage#926, player_total_shots_landed#927, player_total_melee_kills#928, ... 12 more fields] demo.bo