In [1]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.{broadcast, split, lit}

// Initialize a Spark Session
val sparkSession = SparkSession.builder.appName("Assignment3").getOrCreate()

Intitializing Scala interpreter ...

Spark Web UI available at http://477c801eb60a:4041
SparkContext available as 'sc' (version = 3.5.1, master = local[*], app id = local-1736158604849)
SparkSession available as 'spark'


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.{broadcast, split, lit}
sparkSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1ba59706


In [2]:
// Task 1: Disable automatic broadcast join

sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

In [3]:
// Task 2: Explicitly Broadcast Join medals and maps
// Define required case classes that represent schemas for the data

case class Matches (
    match_id: Option[String],
    mapid: String,
    is_team_game: Boolean,
    playlist_id: String,
    game_variant_id: String,
    is_match_over: Boolean,
    completion_date: String,
    match_duration: String,
    game_mode: String,
    map_variant_id: String
)

case class MatchDetails (
    match_id: Option[String],
    player_gamertag: String,
    previous_spartan_rank: Integer,
    spartan_rank: Integer,
    previous_total_xp: Integer,
    total_xp: Integer,
    previous_csr_tier: Integer,
    previous_csr_designation: Integer,
    previous_csr: Integer,
    previous_csr_percent_to_next_tier: Integer,
    previous_csr_rank: Integer,
    current_csr_tier: Integer,
    current_csr_designation: Integer,
    current_csr: Integer,
    current_csr_percent_to_next_tier: Integer,
    current_csr_rank: Integer,
    player_rank_on_team: Integer,
    player_finished: Boolean,
    player_average_life: String,
    player_total_kills: Integer,
    player_total_headshots: Integer,
    player_total_weapon_damage: Double,
    player_total_shots_landed: Double,
    player_total_melee_kills: Double,
    player_total_melee_damage: Double,
    player_total_assassinations: Double,
    player_total_ground_pound_kills: Double,
    player_total_shoulder_bash_kills: Double,
    player_total_grenade_damage: Double,
    player_total_power_weapon_damage: Double,
    player_total_power_weapon_grabs: Double,
    player_total_deaths: Double,
    player_total_assists: Double,
    player_total_grenade_kills: Double,
    did_win: Double,
    team_id: Double
)

case class Maps (
    mapid: Option[String],
    name: String,
    description: String
)

case class Medals (
    medal_id: Option[Long], // Updated to Long
    sprite_uri: String,
    sprite_left: Integer,
    sprite_top: Integer,
    sprite_sheet_width: Integer,
    sprite_sheet_height: Integer,
    sprite_width: Integer,
    sprite_height: Integer,
    classification: String,
    description: String,
    name: String,
    difficulty: Integer
)

case class MedalsMatchesPlayers (
    match_id: Option[String],
    player_gamertag: String,
    medal_id: Long,
    count: Integer
)

defined class Matches
defined class MatchDetails
defined class Maps
defined class Medals
defined class MedalsMatchesPlayers


In [4]:
// Read all required .csv files into datasets

val matches: Dataset[Matches] = sparkSession.read.option("header", "true")
                                .option("inferSchema", "true")
                                .csv("/home/iceberg/data/matches.csv")
                                .as[Matches]

//matches.show(2)

matches: org.apache.spark.sql.Dataset[Matches] = [match_id: string, mapid: string ... 8 more fields]


In [5]:
val matchDetails: Dataset[MatchDetails] = sparkSession.read.option("header", "true")
                                .option("inferSchema", "true")
                                .csv("/home/iceberg/data/match_details.csv")
                                .as[MatchDetails]

//matchDetails.show(2)

matchDetails: org.apache.spark.sql.Dataset[MatchDetails] = [match_id: string, player_gamertag: string ... 34 more fields]


In [6]:
val maps: Dataset[Maps] = sparkSession.read.option("header", "true")
                                .option("inferSchema", "true")
                                .csv("/home/iceberg/data/maps.csv")
                                .as[Maps]
//maps.show(2)

maps: org.apache.spark.sql.Dataset[Maps] = [mapid: string, name: string ... 1 more field]


In [7]:
val medals: Dataset[Medals] = sparkSession.read.option("header", "true")
                                .option("inferSchema", "true")
                                .csv("/home/iceberg/data/medals.csv")
                                .as[Medals]
//medals.show(2)

medals: org.apache.spark.sql.Dataset[Medals] = [medal_id: bigint, sprite_uri: string ... 10 more fields]


In [8]:
val medalsMatchesPlayers: Dataset[MedalsMatchesPlayers] = sparkSession.read.option("header", "true")
                                .option("inferSchema", "true")
                                .csv("/home/iceberg/data/medals_matches_players.csv")
                                .as[MedalsMatchesPlayers]
//medalsMatchesPlayers.show(2)                            

medalsMatchesPlayers: org.apache.spark.sql.Dataset[MedalsMatchesPlayers] = [match_id: string, player_gamertag: string ... 2 more fields]


In [9]:
// Create Temporary Views for the required datasets

matches.createOrReplaceTempView("matchesView")

matchDetails.createOrReplaceTempView("matchDetailsView")

maps.createOrReplaceTempView("mapsView")

medalsMatchesPlayers.createOrReplaceTempView("medalsMatchesPlayersView")

medals.createOrReplaceTempView("medalsView")

In [10]:
// Create the long table (medals) using sql

val matchesMedalMapsAgg = sparkSession.sql("""
                                            SELECT mmp.medal_id,
                                                    me.name medal_name,
                                                    mmp.match_id,
                                                    ma.mapid,
                                                    COLLECT_LIST(DISTINCT mmp.player_gamertag) as player_gamertag_array
                                                    
                                            FROM medalsMatchesPlayersView mmp
                                            
                                            JOIN matchesView ma
                                            ON mmp.match_id = ma.match_id
                                            
                                            JOIN medalsView me
                                            ON mmp.medal_id = me.medal_id
                                        
                                            GROUP BY 1,2,3,4
                                        """) //.cache()
matchesMedalMapsAgg.show(5)

+---------+----------+--------------------+--------------------+---------------------+
| medal_id|medal_name|            match_id|               mapid|player_gamertag_array|
+---------+----------+--------------------+--------------------+---------------------+
|121048710|   Rampage|073a5745-339b-4f7...|c74c9d0f-f206-11e...|              [Lanqe]|
|121048710|   Rampage|143a653a-27f6-4d2...|c74c9d0f-f206-11e...|     [False EnvisioN]|
|121048710|   Rampage|259119e2-e79c-443...|c7805740-f206-11e...|     [Lvl 61 Scyther]|
|121048710|   Rampage|3453a66d-6a56-434...|cebd854f-f206-11e...|    [BiscuitAnanas31]|
|121048710|   Rampage|35e30ec5-7e71-43c...|c7b7baf0-f206-11e...|        [XCornholeoX]|
+---------+----------+--------------------+--------------------+---------------------+
only showing top 5 rows



matchesMedalMapsAgg: org.apache.spark.sql.DataFrame = [medal_id: bigint, medal_name: string ... 3 more fields]


In [11]:
// Explicitly broadcast join Medals & Maps (Below is a Dataframe. It doesn't conform to a case class)

val medalsMaps = matchesMedalMapsAgg.as("m")
                    .join(broadcast(maps).as("mp"), $"m.mapid" === $"mp.mapid")
                    .select($"m.*", $"mp.name".as("map_name"), $"mp.description".as("map_description"))

medalsMaps.show(5)

+---------+----------+--------------------+--------------------+---------------------+--------+--------------------+
| medal_id|medal_name|            match_id|               mapid|player_gamertag_array|map_name|     map_description|
+---------+----------+--------------------+--------------------+---------------------+--------+--------------------+
|121048710|   Rampage|073a5745-339b-4f7...|c74c9d0f-f206-11e...|              [Lanqe]|  Alpine|These vistas are ...|
|121048710|   Rampage|143a653a-27f6-4d2...|c74c9d0f-f206-11e...|     [False EnvisioN]|  Alpine|These vistas are ...|
|121048710|   Rampage|259119e2-e79c-443...|c7805740-f206-11e...|     [Lvl 61 Scyther]| Glacier|Each of Halo's mi...|
|121048710|   Rampage|3453a66d-6a56-434...|cebd854f-f206-11e...|    [BiscuitAnanas31]|Coliseum|Forerunner Warrio...|
|121048710|   Rampage|35e30ec5-7e71-43c...|c7b7baf0-f206-11e...|        [XCornholeoX]|Parallax|The Orion Arm of ...|
+---------+----------+--------------------+--------------------+

medalsMaps: org.apache.spark.sql.DataFrame = [medal_id: bigint, medal_name: string ... 5 more fields]


In [12]:
// Task 3: Bucket join match_details, matches, and medal_matches_players on match_id with 16 buckets

// Create DDL for bucketed tables

// Matches
sparkSession.sql("""DROP TABLE IF EXISTS bootcamp.matches_bucketed""")

val bucketedMatchesDDL = """
                            CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (
                                match_id STRING,
                                is_team_game BOOLEAN,
                                playlist_id STRING,
                                mapid STRING
                            )
                            USING iceberg
                            PARTITIONED BY (bucket(16, match_id));
                            """
sparkSession.sql(bucketedMatchesDDL)



// Match Details
sparkSession.sql("""DROP TABLE IF EXISTS bootcamp.match_details_bucketed""")

val bucketedMatchDetailsDDL = """
                                CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
                                    match_id STRING,
                                    player_gamertag STRING,
                                    player_total_kills INTEGER,
                                    player_total_deaths INTEGER
                                )
                                USING iceberg
                                PARTITIONED BY (bucket(16, match_id));
                                """
sparkSession.sql(bucketedMatchDetailsDDL)



// Medals Matches Players
sparkSession.sql("""DROP TABLE IF EXISTS bootcamp.medal_matches_players_bucketed""")

val bucketedMedalMatchesPlayersDDL = """
                                        CREATE TABLE IF NOT EXISTS bootcamp.medal_matches_players_bucketed (
                                            match_id STRING,
                                            player_gamertag STRING,
                                            medal_id BIGINT
                                        )
                                        USING iceberg
                                        PARTITIONED BY (bucket(16, match_id));
                                        """
sparkSession.sql(bucketedMedalMatchesPlayersDDL)

bucketedMatchesDDL: String =
"
                            CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (
                                match_id STRING,
                                is_team_game BOOLEAN,
                                playlist_id STRING,
                                mapid STRING
                            )
                            USING iceberg
                            PARTITIONED BY (bucket(16, match_id));
                            "
bucketedMatchDetailsDDL: String =
"
                                CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
                                    match_id STRING,
                                    player_gamertag STRING,
                                    player_total_kills INTEGER,
        ...


In [13]:
// Write data from the above datasets to the corresponding bucketed tables

// Matches
matches.select($"match_id", $"is_team_game", $"playlist_id", $"mapid")
        .write.mode("append")
        .bucketBy(16, "match_id")
        .saveAsTable("bootcamp.matches_bucketed")

In [14]:
// Match Details
matchDetails.select($"match_id", $"player_gamertag", $"player_total_kills", $"player_total_deaths")
            .write.mode("append")
            .bucketBy(16, "match_id")
            .saveAsTable("bootcamp.match_details_bucketed")

In [15]:
// Medals Matches Players
medalsMatchesPlayers.select($"match_id", $"player_gamertag", $"medal_id")
                    .write.mode("append")
                    .bucketBy(16, "match_id")
                    .saveAsTable("bootcamp.medal_matches_players_bucketed")

In [16]:
val bucketedTest = sparkSession.sql("""
                                                SELECT *
                                                       
                                                FROM bootcamp.medal_matches_players_bucketed mdb
                                                """)

bucketedTest.show(5)

+--------------------+---------------+----------+
|            match_id|player_gamertag|  medal_id|
+--------------------+---------------+----------+
|27d7c16b-b780-4f8...|       EcZachly| 824733727|
|27d7c16b-b780-4f8...|       EcZachly|3261908037|
|27d7c16b-b780-4f8...|       EcZachly|2078758684|
|27d7c16b-b780-4f8...|       EcZachly|1573153198|
|27d7c16b-b780-4f8...|       EcZachly|2782465081|
+--------------------+---------------+----------+
only showing top 5 rows



bucketedTest: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 1 more field]


In [17]:
// Bucket Join match_details, matches, and medal_matches_players

val bucketedMatchesMedals = sparkSession.sql("""
                                                SELECT mb.match_id,
                                                        mb.playlist_id,
                                                        mb.mapid,
                                                        mdb.player_gamertag,
                                                        mdb.player_total_kills,
                                                        mdb.player_total_deaths,
                                                        mmpb.medal_id
                                                       
                                                FROM bootcamp.matches_bucketed mb 
                                                            
                                                JOIN bootcamp.match_details_bucketed mdb 
                                                ON mb.match_id = mdb.match_id
                                                            
                                                JOIN bootcamp.medal_matches_players_bucketed mmpb
                                                ON mb.match_id = mmpb.match_id
                                
                                                WHERE mdb.player_gamertag IS NOT NULL
                                                """)

bucketedMatchesMedals.show(5)

+--------------------+--------------------+--------------------+---------------+------------------+-------------------+----------+
|            match_id|         playlist_id|               mapid|player_gamertag|player_total_kills|player_total_deaths|  medal_id|
+--------------------+--------------------+--------------------+---------------+------------------+-------------------+----------+
|00169217-cca6-4b4...|2323b76a-db98-4e0...|cc040aa1-f206-11e...|  King Terror V|                14|                  7|3261908037|
|00169217-cca6-4b4...|2323b76a-db98-4e0...|cc040aa1-f206-11e...|  King Terror V|                14|                  7|3001183151|
|00169217-cca6-4b4...|2323b76a-db98-4e0...|cc040aa1-f206-11e...|  King Terror V|                14|                  7| 824733727|
|00169217-cca6-4b4...|2323b76a-db98-4e0...|cc040aa1-f206-11e...|  King Terror V|                14|                  7|2078758684|
|00169217-cca6-4b4...|2323b76a-db98-4e0...|cc040aa1-f206-11e...|  King Terror V|   

bucketedMatchesMedals: org.apache.spark.sql.DataFrame = [match_id: string, playlist_id: string ... 5 more fields]


In [18]:
// Join match_details, matches, and medal_matches_players temporary views (To see the difference between the one bucket joined)

sparkSession.sql("""
                SELECT mb.match_id,
                        mb.playlist_id,
                        mb.mapid,
                        mdb.player_gamertag,
                        mdb.player_total_kills,
                        mdb.player_total_deaths,
                        mmpb.medal_id
                
                FROM matchesView mb
                
                JOIN matchDetailsView mdb 
                ON mb.match_id = mdb.match_id
                
                JOIN medalsMatchesPlayersView mmpb
                ON mb.match_id = mmpb.match_id

                WHERE mdb.player_gamertag IS NOT NULL
                -- AND mb.completion_date = DATE('2016-01-01') 
                """).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [match_id#17, playlist_id#20, mapid#18, player_gamertag#76, player_total_kills#94, player_total_deaths#106, medal_id#335L]
   +- SortMergeJoin [match_id#17], [match_id#333], Inner
      :- Project [match_id#17, mapid#18, playlist_id#20, player_gamertag#76, player_total_kills#94, player_total_deaths#106]
      :  +- SortMergeJoin [match_id#17], [match_id#75], Inner
      :     :- Sort [match_id#17 ASC NULLS FIRST], false, 0
      :     :  +- Exchange hashpartitioning(match_id#17, 200), ENSURE_REQUIREMENTS, [plan_id=1492]
      :     :     +- Filter isnotnull(match_id#17)
      :     :        +- FileScan csv [match_id#17,mapid#18,playlist_id#20] Batched: false, DataFilters: [isnotnull(match_id#17)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/matches.csv], PartitionFilters: [], PushedFilters: [IsNotNull(match_id)], ReadSchema: struct<match_id:string,mapid:string,playlist_id:string>
      :   

In [19]:
// Task 4: Aggregate the joined data frame to figure out questions like:

// Saved the bucketed table as a temporary view

bucketedMatchesMedals.createOrReplaceTempView("bucketed_matches_medals")

In [38]:
// I. Which player averages the most kills per game?

val playersAvgKills = sparkSession.sql("""
                                        SELECT player_gamertag, 
                                               AVG(player_total_kills) average_kills 
                                        
                                        FROM bucketed_matches_medals
                                        GROUP BY player_gamertag
                                        ORDER BY average_kills DESC
                                        """)

playersAvgKills.show(1)

+---------------+-------------+
|player_gamertag|average_kills|
+---------------+-------------+
|   gimpinator14|        109.0|
+---------------+-------------+
only showing top 1 row



playersAvgKills: org.apache.spark.sql.DataFrame = [player_gamertag: string, average_kills: double]


In [39]:
// II. Which playlist gets played the most?

val playlistMostPlayed = sparkSession.sql("""
                                        SELECT playlist_id, 
                                               COUNT(match_id) playlist_plays 
                                        
                                        FROM bucketed_matches_medals
                                        GROUP BY playlist_id
                                        ORDER BY playlist_plays DESC
                                        """)

playlistMostPlayed.show(1)

+--------------------+--------------+
|         playlist_id|playlist_plays|
+--------------------+--------------+
|f72e0ef0-7c4a-430...|       1565529|
+--------------------+--------------+
only showing top 1 row



playlistMostPlayed: org.apache.spark.sql.DataFrame = [playlist_id: string, playlist_plays: bigint]


In [40]:
// III. Which map gets played the most?

val mapMostPlayed = sparkSession.sql("""
                                        SELECT map.name map, 
                                               COUNT(match.match_id) map_plays 
                                        
                                        FROM bucketed_matches_medals match
                                        JOIN mapsView map
                                        on match.mapid = map.mapid
                                        GROUP BY map.name
                                        ORDER BY map_plays DESC
                                        """)

mapMostPlayed.show(1)

+------+---------+
|   map|map_plays|
+------+---------+
|Alpine|  1445545|
+------+---------+
only showing top 1 row



mapMostPlayed: org.apache.spark.sql.DataFrame = [map: string, map_plays: bigint]


In [41]:
// IV. Which map do players get the most Killing Spree medals on?

val mapMostKillingMedals = sparkSession.sql("""
                                            SELECT map.name map, 
                                               SUM(match.player_total_kills) map_player_killings 
                                        
                                            FROM bucketed_matches_medals match
                                            JOIN mapsView map
                                            on match.mapid = map.mapid
                                            GROUP BY map.name
                                            ORDER BY map_player_killings DESC
                                            """)

mapMostKillingMedals.show(1)

+------+-------------------+
|   map|map_player_killings|
+------+-------------------+
|Alpine|           17599171|
+------+-------------------+
only showing top 1 row



mapMostKillingMedals: org.apache.spark.sql.DataFrame = [map: string, map_player_killings: bigint]


In [42]:
// Task 5: With the aggregated data set, try different .sortWithinPartitions to see which has the smallest data size (hint: playlists and maps are both very low cardinality)
// I. playlistMostPlayed

val sortPartitionPlaylistMostPlayed = playlistMostPlayed.repartition(col("playlist_id")).sortWithinPartitions(col("playlist_plays").desc)

sortPartitionPlaylistMostPlayed.show(1)

+--------------------+--------------+
|         playlist_id|playlist_plays|
+--------------------+--------------+
|f72e0ef0-7c4a-430...|       1565529|
+--------------------+--------------+
only showing top 1 row



sortPartitionPlaylistMostPlayed: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [playlist_id: string, playlist_plays: bigint]


In [43]:
// II. mapMostPlayed

val sortPartitionPMapMostPlayed = mapMostPlayed.repartition(col("map")).sortWithinPartitions(col("map_plays").desc)

sortPartitionPMapMostPlayed.show(1)

+------+---------+
|   map|map_plays|
+------+---------+
|Alpine|  1445545|
+------+---------+
only showing top 1 row



sortPartitionPMapMostPlayed: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [map: string, map_plays: bigint]


In [44]:
// III. mapMostKillingMedals

val sortPartitionPMapMostKillingMedals = mapMostKillingMedals.repartition(col("map")).sortWithinPartitions(col("map_player_killings").desc)

sortPartitionPMapMostKillingMedals.show(1)

+------+-------------------+
|   map|map_player_killings|
+------+-------------------+
|Alpine|           17599171|
+------+-------------------+
only showing top 1 row



sortPartitionPMapMostKillingMedals: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [map: string, map_player_killings: bigint]
