In [52]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col}
import org.apache.spark.storage.StorageLevel

//CREATE AND CONFIGURE SPARK SESSION
val spark = SparkSession.builder()
  .appName("SparkFundamentalsWeekHW") 
  .config("spark.executor.memory", "4g")
  .config("spark.driver.memory", "4g")
  .config("spark.sql.shuffle.partitions", "200") // Fine for large datasets
  .config("spark.sql.files.maxPartitionBytes", "134217728") // Optional: 128 MB is default
  .config("spark.dynamicAllocation.enabled", "true") // Helps with resource allocation
  .config("spark.dynamicAllocation.minExecutors", "1") // Ensure minimum resources
  .config("spark.dynamicAllocation.maxExecutors", "50") // Scalable resource allocation
  .getOrCreate()

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.storage.StorageLevel
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@61136eaa


In [49]:
//TASK 1: Disabled automatic broadcast join with spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

//Disabled automatic broadcast join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
//END OF TASK 1

In [11]:
//TASK 2: Explicitly broadcast JOINs medals and maps
//SAVE DATA INTO VARIABLES 
val medalsSelected = spark.read.option("header", "true")
  .option("inferSchema", "true")
  .csv("/home/iceberg/data/medals.csv")
val mapsSelected = spark.read.option("header", "true")
  .option("inferSchema", "true")
  .csv("/home/iceberg/data/maps.csv")
val matchesSelected = spark.read.option("header", "true")
  .option("inferSchema", "true")
  .csv("/home/iceberg/data/matches.csv")
val matchDetailsSelected = spark.read.option("header", "true")
  .option("inferSchema", "true")
  .csv("/home/iceberg/data/match_details.csv")
val medalsMatchesPlayersSelected = spark.read.option("header", "true")
  .option("inferSchema", "true")
  .csv("/home/iceberg/data/medals_matches_players.csv")

//CREATE TEMP VIEWS FOR EXPLICIT BRODCAST JOIN
medalsSelected.createOrReplaceTempView("medals")
mapsSelected.createOrReplaceTempView("maps")
matchesSelected.createOrReplaceTempView("matches")
medalsMatchesPlayersSelected.createOrReplaceTempView("medals_matches_players")

//DO AND SAVE EXPLICIT BROADCAST INTO VARIABLEs
val explicitBroadcastMedals = medalsMatchesPlayersSelected.as("mmp")
                                                          .join(broadcast(medalsSelected).as("me"),$"mmp.medal_id" === $"me.medal_id")
                                                          .select($"mmp.*",$"me.classification")
val explicitBroadcastMaps = matchesSelected.as("m")
                                           .join(broadcast(mapsSelected).as("ma"),$"m.mapid" === $"ma.mapid")
                                           .select($"m.*",$"ma.name")

//END OF TASK 2

medalsSelected: org.apache.spark.sql.DataFrame = [medal_id: bigint, sprite_uri: string ... 10 more fields]
mapsSelected: org.apache.spark.sql.DataFrame = [mapid: string, name: string ... 1 more field]
matchesSelected: org.apache.spark.sql.DataFrame = [match_id: string, mapid: string ... 8 more fields]
matchDetailsSelected: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 34 more fields]
medalsMatchesPlayersSelected: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 2 more fields]


In [36]:
//TASK 3: Bucket join match_details, matches, and medal_matches_players on match_id with 16 buckets

//CREATE TABLES WITH BUCKETS AND SAVE BUCKETED DATA
//CREATE TABLE medals_matches_players_bucketed
spark.sql("""DROP TABLE IF EXISTS bootcamp.medals_matches_players_bucketed""")
val bucketedDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.medals_matches_players_bucketed (
     match_id STRING,
     player_gamertag STRING,
     medal_id STRING,
     count INTEGER,
     classification STRING
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """
spark.sql(bucketedDDL)

//LOAD DATA INTO BUCKETED TABLE
explicitBroadcastMedals.select(
    $"match_id", $"player_gamertag", $"medal_id", $"count", $"classification"
    )
    .write.mode("append")
    .bucketBy(16, "match_id").saveAsTable("bootcamp.medals_matches_players_bucketed")

//CREATE TABLE matches_maps_bucketed
// Get distinct completion dates
val distinctDates = matchesSelected.select("completion_date").distinct().collect()

spark.sql("""DROP TABLE IF EXISTS bootcamp.matches_maps_bucketed""")
val bucketedDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.matches_maps_bucketed (
     match_id STRING,
     mapid STRING,
     name STRING,
     is_team_game BOOLEAN,
     playlist_id STRING,
     completion_date TIMESTAMP
 )
 USING iceberg
 PARTITIONED BY (completion_date,bucket(16, match_id));
 """
spark.sql(bucketedDDL)

//LOAD DATA INTO BUCKETED TABLE
// Process data in chunks based on completion_date
distinctDates.foreach { row =>
  val date = row.getAs[java.sql.Timestamp]("completion_date")
  val filteredMatches = explicitBroadcastMaps.filter(col("completion_date") === date)
  
  // Repartition and persist the filtered data
  val optimizedMatches = filteredMatches
    .select( $"match_id", $"mapid", $"name", $"is_team_game", $"playlist_id", $"completion_date")
    .repartition(16, $"match_id")
    .persist(StorageLevel.MEMORY_AND_DISK)
    
  optimizedMatches.write
    .mode("append")
    .bucketBy(16, "match_id")
    .partitionBy("completion_date")
    .saveAsTable("bootcamp.matches_maps_bucketed")
}


//CREATE TABLE match_details_bucketed
spark.sql("""DROP TABLE IF EXISTS bootcamp.match_details_bucketed""")
val bucketedDDL = """
 CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
     match_id STRING,
     player_gamertag STRING,
     player_total_kills INTEGER,
     player_total_deaths INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """
spark.sql(bucketedDDL)

//LOAD DATA INTO BUCKETED TABLE
matchDetailsSelected.select(
    $"match_id", $"player_gamertag", $"player_total_kills", $"player_total_deaths")
    .write.mode("append")
    .bucketBy(16, "match_id").saveAsTable("bootcamp.match_details_bucketed")


//JOIN TABLES TO CREATE DATAFRAME
spark.sql(
"""
    SELECT 
         mmb.match_id,
         mmb.mapid, 
         mmb.name as map_name, 
         mmb.is_team_game, 
         mmb.playlist_id, 
         mmb.completion_date,
         mdb.player_gamertag, 
         mdb.player_total_kills, 
         mdb.player_total_deaths,
         mmpb.medal_id", 
         mmpb.count as medals_count,
         mmpb.classification as medal_classification
    FROM bootcamp.matches_maps_bucketed AS mmb
    JOIN bootcamp.match_details_bucketed AS mdb ON mdb.match_id=mmb.match_id
    JOIN bootcamp.medals_matches_players_bucketed AS mmpb ON mmpb.match_id=mmb.match_id
"""
).createOrReplaceTempView("filteredDF")

//END OF TASK 3

bucketedDDL: String =
"
CREATE TABLE IF NOT EXISTS bootcamp.medals_matches_players_bucketed (
     match_id STRING,
     player_gamertag STRING,
     medal_id STRING,
     count INTEGER,
     classification STRING
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 "


In [71]:
//TASK 4: Aggregate the joined data frame to figure out the following questions

//Which player averages the most kills per game?
val players_avg_kills = spark.sql(
"""
    WITH deduplication as(
    SELECT 
         match_id,
         player_gamertag, 
         player_total_kills
    FROM filteredDF
    GROUP BY match_id,player_gamertag,player_total_kills)
    SELECT 
        match_id,
        player_gamertag,
        sum(player_total_kills)/count(distinct match_id) as avg_kills_per_game
    FROM deduplication
    GROUP BY match_id,player_gamertag
    ORDER BY sum(player_total_kills)/count(distinct match_id) desc
"""
)

players_avg_kills: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 1 more field]


In [62]:


//Which playlist gets played the most?
spark.sql(
"""
    WITH playlist_played as(
    SELECT 
         match_id,
         playlist_id, 
         count(1) as playlist_count
    FROM filteredDF
    GROUP BY match_id,playlist_id)
    SELECT 
        playlist_id,
        sum(playlist_count) num_plays
    FROM playlist_played
    GROUP BY playlist_id
    ORDER BY sum(playlist_count) desc
"""
).show()



+--------------------+---------+
|         playlist_id|num_plays|
+--------------------+---------+
|f72e0ef0-7c4a-430...|  1565529|
|780cc101-005c-4fc...|  1116002|
|0bcf2be1-3168-4e4...|  1015496|
|c98949ae-60a8-43d...|   824932|
|2323b76a-db98-4e0...|   692342|
|892189e9-d712-4bd...|   667670|
|f27a65eb-2d11-496...|   167498|
|355dc154-9809-4ed...|   140006|
|d0766624-dbd7-453...|   138470|
|bc0f8ad6-31e6-4a1...|   111073|
|7b7e892c-d9b7-4b0...|    82723|
|7385b4a1-86bf-4ae...|    76425|
|f0c9ef9a-48bd-4b2...|    47813|
|b5d5a242-ffa5-4d8...|    46411|
|819eb188-1a1c-48b...|    39404|
|d21c8381-26f1-4d6...|    37049|
|4b12472e-2a06-423...|    28733|
|5728f612-3f20-445...|    28543|
|0504ca3c-de41-48f...|    22502|
|88b7de19-113c-4be...|    15860|
+--------------------+---------+
only showing top 20 rows



In [65]:
//Which map gets played the most?
spark.sql(
"""
    WITH maps_played as(
    SELECT 
         match_id,
         mapid,
         map_name,
         count(1) as map_count
    FROM filteredDF
    GROUP BY match_id,mapid,map_name
    )
    SELECT 
        mapid,
        map_name,
        sum(map_count) num_plays
    FROM maps_played
    GROUP BY  mapid,map_name
    ORDER BY sum(map_count) desc
"""
).show()

+--------------------+--------------+---------+
|               mapid|      map_name|num_plays|
+--------------------+--------------+---------+
|c74c9d0f-f206-11e...|        Alpine|  1445545|
|c7edbf0f-f206-11e...|Breakout Arena|  1435048|
|c7805740-f206-11e...|       Glacier|   953278|
|cdb934b0-f206-11e...|        Empire|   396305|
|cb914b9e-f206-11e...|       The Rig|   309045|
|ce1dc2de-f206-11e...|         Truth|   299736|
|cebd854f-f206-11e...|      Coliseum|   298891|
|caacb800-f206-11e...|         Plaza|   291540|
|cd844200-f206-11e...|          Eden|   261162|
|cc040aa1-f206-11e...|        Fathom|   256966|
|cdee4e70-f206-11e...|        Regret|   244295|
|c7b7baf0-f206-11e...|      Parallax|   204568|
|ca737f8f-f206-11e...|    Overgrowth|   156631|
|cbcea2c0-f206-11e...|      Riptide |   135375|
|cc74f4e1-f206-11e...|          NULL|   132392|
|ce89a40f-f206-11e...|          NULL|    65081|
+--------------------+--------------+---------+



In [67]:
//Which map do players get the most Killing Spree medals on?
spark.sql(
"""
    WITH maps_and_medals as(
    SELECT 
         mapid,
         map_name,
         medal_classification,
         medals_count
    FROM filteredDF
    WHERE medal_classification='KillingSpree'
    GROUP BY mapid,map_name,medal_classification,medals_count
    )
    SELECT 
         mapid,
         map_name,
         medal_classification,
         sum(medals_count) num_medals
    FROM maps_and_medals
    GROUP BY  mapid,map_name,medal_classification
    ORDER BY sum(medals_count) desc
"""
).show()
//END OF TASK 4

+--------------------+--------------+--------------------+----------+
|               mapid|      map_name|medal_classification|num_medals|
+--------------------+--------------+--------------------+----------+
|c7805740-f206-11e...|       Glacier|        KillingSpree|        37|
|c74c9d0f-f206-11e...|        Alpine|        KillingSpree|        21|
|c7b7baf0-f206-11e...|      Parallax|        KillingSpree|        15|
|cc74f4e1-f206-11e...|          NULL|        KillingSpree|        11|
|cebd854f-f206-11e...|      Coliseum|        KillingSpree|        11|
|cb914b9e-f206-11e...|       The Rig|        KillingSpree|        10|
|ce1dc2de-f206-11e...|         Truth|        KillingSpree|        10|
|cc040aa1-f206-11e...|        Fathom|        KillingSpree|        10|
|cdee4e70-f206-11e...|        Regret|        KillingSpree|        10|
|ce89a40f-f206-11e...|          NULL|        KillingSpree|        10|
|cd844200-f206-11e...|          Eden|        KillingSpree|        10|
|cdb934b0-f206-11e..

In [None]:
//TASK 5: Try different .sortWithinPartitions to see which has the smallest data size

//CREATE SORTED TABLE
spark.sql("""DROP TABLE IF EXISTS bootcamp.players_avg_kills""")
val sortedDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.players_avg_kills (
    match_id STRING,
    player_gamertag STRING,
    avg_kills_per_game REAL
 )
 USING iceberg
 PARTITIONED BY (match_id));
 """
spark.sql(sortedDDL)
//LOAD DATA INTO BUCKETED TABLE
players_avg_kills.select(
    $"match_id", $"player_gamertag", $"avg_kills_per_game")
    .write.mode("append")
    .saveAsTable("bootcamp.players_avg_kills")

In [57]:
spark.sql("""SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM bootcamp.matches_maps_bucketed.files""").show()

+--------+---------+------+
|    size|num_files|sorted|
+--------+---------+------+
|10001308|     3665|sorted|
+--------+---------+------+



In [22]:
//spark.sql("""SELECT * FROM medals me""").columns//medal_id,classification=KillingSpree
//spark.sql("""SELECT * FROM maps me""").columns//mapid,name
//spark.sql("""SELECT * FROM matches me""").columns//match_id,mapid,playlist_id,completion_date
//spark.sql("""SELECT * FROM match_details me""").columns//match_id,player_gamertag,player_total_kills
//spark.sql("""SELECT * FROM medals_matches_players me""").columns//match_id, player_gamertag, medal_id, count

res17: Array[String] = Array(match_id, mapid, is_team_game, playlist_id, game_variant_id, is_match_over, completion_date, match_duration, game_mode, map_variant_id)
