In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast


In [2]:
# start the spark session
spark = (SparkSession
         .builder
         .appName("Assignment")
         .getOrCreate())

25/08/02 00:47:12 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
# disable automatic broadcast join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# Align Shuffle Partitions to Bucket Count
spark.conf.set("spark.sql.shuffle.partitions", "16")

In [4]:
# tell the location of each .csv file
dir_matches = "/home/iceberg/data/matches.csv"
dir_match_details = "/home/iceberg/data/match_details.csv"
dir_medals_matches_players = "/home/iceberg/data/medals_matches_players.csv"
dir_medals = "/home/iceberg/data/medals.csv"
dir_maps = "/home/iceberg/data/maps.csv"

# read csv, create dataframes
matches_df = spark.read.csv(dir_matches, header=True, inferSchema=True)
match_details_df = spark.read.csv(dir_match_details, header=True, inferSchema=True)
medals_matches_players_df = spark.read.csv(dir_medals_matches_players, header=True, inferSchema=True)
medals_df = spark.read.csv(dir_medals, header=True, inferSchema=True)
maps_df = spark.read.csv(dir_maps, header=True, inferSchema=True)




                                                                                

In [5]:
dfs = [matches_df, match_details_df, medals_matches_players_df, medals_df, maps_df]

In [6]:
def descriptor(df_list):
    for df in df_list:
        df.printSchema()
        print(f"Rows: {df.count()}, Columns: {len(df.columns)}")
        df.show(5)

In [7]:
# some exploration on the data
descriptor(dfs)

root
 |-- match_id: string (nullable = true)
 |-- mapid: string (nullable = true)
 |-- is_team_game: boolean (nullable = true)
 |-- playlist_id: string (nullable = true)
 |-- game_variant_id: string (nullable = true)
 |-- is_match_over: boolean (nullable = true)
 |-- completion_date: timestamp (nullable = true)
 |-- match_duration: string (nullable = true)
 |-- game_mode: string (nullable = true)
 |-- map_variant_id: string (nullable = true)

Rows: 24025, Columns: 10
+--------------------+--------------------+------------+--------------------+--------------------+-------------+-------------------+--------------+---------+--------------------+
|            match_id|               mapid|is_team_game|         playlist_id|     game_variant_id|is_match_over|    completion_date|match_duration|game_mode|      map_variant_id|
+--------------------+--------------------+------------+--------------------+--------------------+-------------+-------------------+--------------+---------+-------------

25/07/30 02:11:02 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+---------------+---------------------+------------+-----------------+--------+-----------------+------------------------+------------+---------------------------------+-----------------+----------------+-----------------------+-----------+--------------------------------+----------------+-------------------+---------------+-------------------+------------------+----------------------+--------------------------+-------------------------+------------------------+-------------------------+---------------------------+-------------------------------+--------------------------------+---------------------------+--------------------------------+-------------------------------+-------------------+--------------------+--------------------------+-------+-------+
|            match_id|player_gamertag|previous_spartan_rank|spartan_rank|previous_total_xp|total_xp|previous_csr_tier|previous_csr_designation|previous_csr|previous_csr_percent_to_next_tier|previous_csr_rank|current_

In [8]:
# explicitly perform a broadcast join
result_df = medals_df.join(broadcast(maps_df))
result_df.show(5)

+----------+----------+-----------+----------+------------------+-------------------+------------+-------------+--------------+-----------+----+----------+--------------------+-------------------+--------------------+
|  medal_id|sprite_uri|sprite_left|sprite_top|sprite_sheet_width|sprite_sheet_height|sprite_width|sprite_height|classification|description|name|difficulty|               mapid|               name|         description|
+----------+----------+-----------+----------+------------------+-------------------+------------+-------------+--------------+-----------+----+----------+--------------------+-------------------+--------------------+
|2315448068|      NULL|       NULL|      NULL|              NULL|               NULL|        NULL|         NULL|          NULL|       NULL|NULL|      NULL|c93d708f-f206-11e...|              Urban|Andesia was the c...|
|2315448068|      NULL|       NULL|      NULL|              NULL|               NULL|        NULL|         NULL|          NULL| 

In [10]:
matches_df.select('match_id', 'is_team_game', 'playlist_id', 'completion_date').count()

24025

## Performing a bucket join

To perform a bucket join there are steps to follow, in preparation for it:
1) DDL the tables
2) Repartition the data on different buckets, < 16
3) Write the repartitioned data into your tables
4) Load the data from the tables into dataframes
5) Perform the bucket join of your tables and verify in your execution that there is no shuffling

In [45]:
spark.sql("""DROP TABLE IF EXISTS bootcamp.matches_bucketed""")

matches_bucketedDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (
     match_id STRING,
     map_id STRING,
     is_team_game BOOLEAN,
     playlist_id STRING,
     completion_date TIMESTAMP
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """
spark.sql(matches_bucketedDDL)


# Repartition to improve shuffle handling even on small data
matches_df = matches_df.repartition(16, "match_id")  # match bucket count
# print(matches_df.rdd.getNumPartitions())  # Should now say 16

# write df to a table
# selectExpr solves the issue of renaming here
(matches_df.selectExpr('match_id', 'mapid as map_id', 'is_team_game', 'playlist_id', 'completion_date')
   .write
   .format("iceberg")
   .mode("append")
   .save("bootcamp.matches_bucketed"))



                                                                                

In [17]:
spark.sql("""DROP TABLE IF EXISTS bootcamp.match_details_bucketed""")

match_detaials_bucketedDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
     match_id STRING,
     player_gamertag STRING,
     player_total_kills INTEGER,
     player_total_deaths INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """
spark.sql(match_detaials_bucketedDDL)


# Repartition to improve shuffle handling even on small data
match_details_df = match_details_df.repartition(16, "match_id")  # match bucket count
# print(matches_df.rdd.getNumPartitions())  # Should now say 16

# write df to a table
(match_details_df.select('match_id', 'player_gamertag', 'player_total_kills', 'player_total_deaths')
   .write
   .format("iceberg")
   .mode("append")
   .save("bootcamp.match_details_bucketed"))


                                                                                

In [43]:
spark.sql("""DROP TABLE IF EXISTS bootcamp.medals_matches_players_bucketed""")

medals_matches_playersbucketedDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.medals_matches_players_bucketed (
     match_id STRING,
     player_gamertag STRING,
     medal_id BIGINT,
     medal_count INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """
spark.sql(medals_matches_playersbucketedDDL)

# Repartition to improve shuffle handling even on small data
medals_matches_players_df = medals_matches_players_df.repartition(16, "match_id")  # match bucket count
# print(matches_df.rdd.getNumPartitions())  # Should now say 16

# write df to a table
(medals_matches_players_df.selectExpr('match_id', 'player_gamertag', 'medal_id', 'count as medal_count')
   .write
   .format("iceberg")
   .mode("append")
   .save("bootcamp.medals_matches_players_bucketed"))

                                                                                

Bucketed joins only work when data is written to disk as a bucketed table, because Spark only optimizes joins on bucketed + saved data.
Confirm bucket-based joins in the Spark UI → SQL → Physical Plan — it should say SortMergeJoin without shuffle on both sides.
    
Spark will skip shuffling if all bucketed tables:

    Use the same number of buckets,
    Use the same join key,
    Are written with the same sort order (optional, but helps).



In [5]:
# load the bucketed data
df1_bucketed = spark.table("bootcamp.matches_bucketed")
df2_bucketed = spark.table("bootcamp.match_details_bucketed")
df3_bucketed = spark.table("bootcamp.medals_matches_players_bucketed")


# performing the bucket joins, one at time.
# selecting unique columns to avoid duplicates downstream, really important step
df12 = (df1_bucketed.join(df2_bucketed, "match_id")
                    .select(df1_bucketed["*"], 
                            df2_bucketed["player_gamertag"], 
                            df2_bucketed["player_total_kills"],
                            df2_bucketed["player_total_deaths"])
                            
       )

# df12.explain()

df_all = (df12.join(df3_bucketed, (df12.match_id == df3_bucketed.match_id) 
                    & (df12.player_gamertag == df3_bucketed.player_gamertag))
              .select(df12["*"],
                      df3_bucketed["medal_id"],
                      df3_bucketed["medal_count"])
         )
# df_all.explain()

# Look for:
    # SortMergeJoin — which is fine for bucket joins
    # No Exchange or Shuffle steps before the join inputs
    # Optional: InputPartitioning showing HashPartitioning(match_id, 16)

# all is good

## Analytical queries

In [6]:
# register dfs as a temp view, to work with the data for analytical purposes
df_all.createOrReplaceTempView("df_all")
maps_df.createOrReplaceTempView("maps_df")
medals_df.createOrReplaceTempView("medals_df")

# test df_all
dql_1 = """

SELECT
    *
FROM df_all
WHERE
    df_all.match_id = 'a4f925d4-36f8-4d45-ad4d-c2ea2ef7e487'
LIMIT 10

"""


spark.sql(dql_1).show(truncate=False)

[Stage 12:>                                                         (0 + 1) / 1]

+------------------------------------+------------------------------------+------------+------------------------------------+-------------------+---------------+------------------+-------------------+----------+-----------+
|match_id                            |map_id                              |is_team_game|playlist_id                         |completion_date    |player_gamertag|player_total_kills|player_total_deaths|medal_id  |medal_count|
+------------------------------------+------------------------------------+------------+------------------------------------+-------------------+---------------+------------------+-------------------+----------+-----------+
|a4f925d4-36f8-4d45-ad4d-c2ea2ef7e487|cc040aa1-f206-11e4-a3e0-24be05e24f7e|true        |c98949ae-60a8-43dc-85d7-0feb0b92e719|2016-02-23 00:00:00|A DIABETIC LYON|10                |15                 |3261908037|6          |
|a4f925d4-36f8-4d45-ad4d-c2ea2ef7e487|cc040aa1-f206-11e4-a3e0-24be05e24f7e|true        |c98949ae-60a8-43

                                                                                

In [33]:
# get the player that averages the most kills per match
dql_2 = """

SELECT
    player_gamertag,
    AVG(player_total_kills) as avg_player_kills
FROM df_all
GROUP BY player_gamertag
ORDER BY avg_player_kills DESC
LIMIT 10

"""


spark.sql(dql_2).show()



+---------------+-----------------+
|player_gamertag| avg_player_kills|
+---------------+-----------------+
|   gimpinator14|            109.0|
|  I Johann117 I|             96.0|
|BudgetLegendary|             83.0|
|   TameablePoet|             82.5|
|      GsFurreal|             75.0|
|   Sexy is Back|             73.0|
|     Profit TKO|70.92857142857143|
|   killerguy789|             68.0|
|THC GUILTYSPARK|             67.0|
|  DBossCnDTEXAS|             66.2|
+---------------+-----------------+



                                                                                

In [7]:
# which playlist gets played the most
dql_3 = """
WITH
unique_matches_playlist AS (
    SELECT
       match_id, 
       playlist_id
    FROM df_all
    GROUP BY match_id, playlist_id

)

SELECT
    ump.playlist_id,
    COUNT(1) as times_played
FROM unique_matches_playlist AS ump
GROUP BY ump.playlist_id
ORDER BY times_played DESC
LIMIT 10

"""


spark.sql(dql_3).show(truncate=False)



+------------------------------------+------------+
|playlist_id                         |times_played|
+------------------------------------+------------+
|f72e0ef0-7c4a-4307-af78-8e38dac3fdba|7640        |
|2323b76a-db98-4e03-aa37-e171cfbdd1a4|3171        |
|892189e9-d712-4bdb-afa7-1ccab43fbed4|1961        |
|c98949ae-60a8-43dc-85d7-0feb0b92e719|1816        |
|f27a65eb-2d11-4965-aa9c-daa088fa5c9c|682         |
|d0766624-dbd7-4536-ba39-2d890a6143a9|618         |
|0bcf2be1-3168-4e42-9fb5-3551d7dbce77|535         |
|5728f612-3f20-4459-98bd-3478c79c4861|493         |
|780cc101-005c-4fca-8ce7-6f36d7156ffe|480         |
|355dc154-9809-4edb-8ed4-fff910c6ae9c|269         |
+------------------------------------+------------+



                                                                                

In [12]:
# which map gets played the most
dql_4 = """
WITH
unique_matches_map AS (
    SELECT
       match_id, 
       map_id
    FROM df_all
    GROUP BY match_id, map_id

)

SELECT
    maps_df.name AS map_name,
    COUNT(1) as times_played
FROM unique_matches_map AS umm
LEFT JOIN maps_df -- With this join in here we avoid processing extra data
    ON maps_df.mapid = umm.map_id
GROUP BY maps_df.name
ORDER BY times_played DESC
LIMIT 10

"""


spark.sql(dql_4).show(truncate=False)



+--------------+------------+
|map_name      |times_played|
+--------------+------------+
|Breakout Arena|7032        |
|Alpine        |1358        |
|Empire        |1347        |
|The Rig       |1022        |
|Truth         |970         |
|Plaza         |949         |
|Glacier       |922         |
|Regret        |922         |
|Coliseum      |904         |
|Eden          |870         |
+--------------+------------+



                                                                                

In [42]:
# which map players get the most 'Killing Spree' medal
dql_5 = """
WITH
target_medal_id as (

    SELECT DISTINCT medal_id FROM medals_df where name = 'Killing Spree'
),
test AS (

    SELECT
       maps_df.name AS map_name,
       COUNT(1) as target_medal_count
       
    FROM df_all 
    LEFT JOIN maps_df -- With this join in here we avoid processing extra data
        ON maps_df.mapid = df_all.map_id
    
    WHERE 
        TRUE
        -- AND df_all.match_id = '0000e3cf-727c-491a-9de8-43fe6ea611cc'
        AND df_all.medal_id = (SELECT medal_id FROM target_medal_id)

    GROUP BY map_name
    ORDER BY COUNT(1) DESC

)


SELECT * FROM test LIMIT 10

"""


spark.sql(dql_5).show(truncate=False)

25/08/01 02:06:01 WARN DataSourceV2Strategy: Can't translate true to source filter, unsupported expression


+--------------+------------------+
|map_name      |target_medal_count|
+--------------+------------------+
|Breakout Arena|6553              |
|Alpine        |4317              |
|Glacier       |2611              |
|Empire        |1991              |
|Truth         |1751              |
|The Rig       |1733              |
|Plaza         |1654              |
|Coliseum      |1646              |
|Eden          |1544              |
|Regret        |1540              |
+--------------+------------------+



## sortWithinPartitions
to see which sort gets the most compression of data
It also helps to avoid cost of full sort

In [7]:
print(df_all.rdd.getNumPartitions())

[Stage 26:>                                                         (0 + 5) / 5]

16


In [13]:
df_all.schema

StructType([StructField('match_id', StringType(), True), StructField('map_id', StringType(), True), StructField('is_team_game', BooleanType(), True), StructField('playlist_id', StringType(), True), StructField('completion_date', TimestampType(), True), StructField('player_gamertag', StringType(), True), StructField('player_total_kills', IntegerType(), True), StructField('player_total_deaths', IntegerType(), True), StructField('medal_id', LongType(), True), StructField('medal_count', IntegerType(), True)])

In [22]:
# create two different sortings

sort_a = df_all.sortWithinPartitions('match_id','map_id')
sort_b = df_all.sortWithinPartitions('playlist_id','map_id')
sort_c = df_all.sortWithinPartitions('map_id','playlist_id')

ddls for creating tables for writing the data into

In [15]:
%%sql

-- 
    
CREATE TABLE IF NOT EXISTS bootcamp.all_unsorted (
    match_id            STRING,
    map_id              STRING,
    is_team_game        BOOLEAN,
    playlist_id         STRING,
    completion_date     TIMESTAMP,
    player_gamertag     STRING,
    player_total_kills  INT,
    player_total_deaths INT,
    medal_id            BIGINT,
    medal_count         INT
)
USING iceberg
PARTITIONED BY (map_id);



In [16]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.all_sorted_a (
    match_id            STRING,
    map_id              STRING,
    is_team_game        BOOLEAN,
    playlist_id         STRING,
    completion_date     TIMESTAMP,
    player_gamertag     STRING,
    player_total_kills  INT,
    player_total_deaths INT,
    medal_id            BIGINT,
    medal_count         INT
)
USING iceberg
PARTITIONED BY (map_id);


In [17]:
%%sql
    
CREATE TABLE IF NOT EXISTS bootcamp.all_sorted_b (
    match_id            STRING,
    map_id              STRING,
    is_team_game        BOOLEAN,
    playlist_id         STRING,
    completion_date     TIMESTAMP,
    player_gamertag     STRING,
    player_total_kills  INT,
    player_total_deaths INT,
    medal_id            BIGINT,
    medal_count         INT
)
USING iceberg
PARTITIONED BY (map_id);

In [23]:
%%sql
    
CREATE TABLE IF NOT EXISTS bootcamp.all_sorted_c (
    match_id            STRING,
    map_id              STRING,
    is_team_game        BOOLEAN,
    playlist_id         STRING,
    completion_date     TIMESTAMP,
    player_gamertag     STRING,
    player_total_kills  INT,
    player_total_deaths INT,
    medal_id            BIGINT,
    medal_count         INT
)
USING iceberg
PARTITIONED BY (map_id);

Write the data into the tables

In [24]:
df_all.write.mode('overwrite').saveAsTable('bootcamp.all_unsorted')
sort_a.write.mode('overwrite').saveAsTable('bootcamp.all_sorted_a')
sort_b.write.mode('overwrite').saveAsTable('bootcamp.all_sorted_b')
sort_c.write.mode('overwrite').saveAsTable('bootcamp.all_sorted_c')

                                                                                

In [25]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM demo.bootcamp.all_unsorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted_a' 
FROM demo.bootcamp.all_sorted_a.files
UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted_b' 
FROM demo.bootcamp.all_sorted_b.files
UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted_c' 
FROM demo.bootcamp.all_sorted_c.files


size,num_files,unsorted
6866561,16,unsorted
6866561,16,sorted_a
6387482,16,sorted_b
6414459,16,sorted_c


sorted_b got us a compression of about 7%
