In [120]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, broadcast, when, isnan, count, sum as sparksum, desc, max as sparkmax


ROOT = "/home/iceberg/data"

# Start spark session and add configuration
spark = SparkSession.builder.appName("matches").getOrCreate()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# Read csv files
match_details = spark.read.option("header", "true").csv(f"{ROOT}/match_details.csv")
matches = spark.read.option("header", "true").csv(f"{ROOT}/matches.csv")
medals_matches_players = spark.read.option("header", "true").csv(f"{ROOT}/medals_matches_players.csv")
medals = spark.read.option("header", "true").csv(f"{ROOT}/medals.csv")
maps = spark.read.option("header", "true").csv(f"{ROOT}/maps.csv")

medals = broadcast(medals)
maps = broadcast(maps)

In [121]:
# DROP Queries

drop_match_details_bucketed = "DROP TABLE IF EXISTS bootcamp.match_details_bucketed"
drop_matches_bucketed = "DROP TABLE IF EXISTS bootcamp.matches_bucketed"
drop_medal_matches_players_bucketed = "DROP TABLE IF EXISTS bootcamp.medal_matches_players_bucketed"

spark.sql(drop_match_details_bucketed)
spark.sql(drop_matches_bucketed)
spark.sql(drop_medal_matches_players_bucketed)

DataFrame[]

In [122]:
# Create DDL Statements
match_details_bucketed_ddl = """
CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
     match_id STRING,
     player_gamertag STRING,
     player_total_kills INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """

matches_bucketed_ddl = """
CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (
     match_id STRING,
     map_id STRING,
     playlist_id STRING
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """

medal_matches_player_bucketed_ddl = """
CREATE TABLE IF NOT EXISTS bootcamp.medal_matches_players_bucketed (
     match_id STRING,
     player_game_tag STRING,
     medal_id STRING,
     count INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """

spark.sql(match_details_bucketed_ddl)
spark.sql(matches_bucketed_ddl)
spark.sql(medal_matches_player_bucketed_ddl)

DataFrame[]

In [123]:
# Save data to iceberg tables
match_details \
    .select(
        col('match_id'), 
        col('player_gamertag'), 
        col("player_total_kills").cast("int").alias("player_total_kills")
    ) \
    .sortWithinPartitions(col("match_id"), col("player_gamertag")) \
    .write.mode('overwrite') \
    .bucketBy(16, "match_id") \
    .saveAsTable("bootcamp.match_details_bucketed")

matches \
    .select(
        col('match_id'), 
        col('mapid').alias("map_id"), 
        col('playlist_id'),
    ) \
    .sortWithinPartitions(col("match_id"), col("map_id")) \
    .write.mode('overwrite') \
    .bucketBy(16, "match_id") \
    .saveAsTable("bootcamp.matches_bucketed")

medals_matches_players \
    .select(
        col('match_id'), 
        col('player_gamertag'), 
        col('medal_id'), 
        col('count'), 
    ) \
    .sortWithinPartitions(col("match_id"), col("player_gamertag"), col("medal_id")) \
    .write.mode('overwrite') \
    .bucketBy(16, "match_id") \
    .saveAsTable("bootcamp.medal_matches_players_bucketed")

In [129]:
# Bucketed Join
match_details_bucketed = spark.table("bootcamp.match_details_bucketed")
matches_bucketed = spark.table("bootcamp.matches_bucketed")
medal_matches_players_bucketed = spark.table("bootcamp.medal_matches_players_bucketed")

result_df = match_details_bucketed \
    .join(matches_bucketed, match_details_bucketed.match_id == matches_bucketed.match_id) \
    .join(medal_matches_players_bucketed, medal_matches_players_bucketed.match_id == matches_bucketed.match_id) \
    .join(medals, medal_matches_players_bucketed.medal_id == medals.medal_id) \
    .join(maps, maps.mapid == matches_bucketed.map_id) \
    .select(
        matches_bucketed.match_id,
        matches_bucketed.map_id,
        matches_bucketed.playlist_id,
        match_details_bucketed.player_gamertag,
        match_details_bucketed.player_total_kills,
        medal_matches_players_bucketed.medal_id,
        medal_matches_players_bucketed["count"].alias("medal_count"),
        medals.name.alias("medal_name"),
        maps.name.alias("map_name")
    )

# Show the result
result_df.show()


+--------------------+--------------------+--------------------+---------------+------------------+----------+-----------+-------------+--------+
|            match_id|              map_id|         playlist_id|player_gamertag|player_total_kills|  medal_id|medal_count|   medal_name|map_name|
+--------------------+--------------------+--------------------+---------------+------------------+----------+-----------+-------------+--------+
|00169217-cca6-4b4...|cc040aa1-f206-11e...|2323b76a-db98-4e0...|       DJ RAHHH|                13|2078758684|          1|  Double Kill|  Fathom|
|00169217-cca6-4b4...|cc040aa1-f206-11e...|2323b76a-db98-4e0...|       DJ RAHHH|                13|2782465081|          1|     Reversal|  Fathom|
|00169217-cca6-4b4...|cc040aa1-f206-11e...|2323b76a-db98-4e0...|       DJ RAHHH|                13|3261908037|         12|     Headshot|  Fathom|
|00169217-cca6-4b4...|cc040aa1-f206-11e...|2323b76a-db98-4e0...|       DJ RAHHH|                13|3270120991|          1|  

In [159]:
# - Which player averages the most kills per game?

a = result_df.groupBy("player_gamertag") \
    .agg(
        sparksum(col("player_total_kills")).alias("total_kills")
    ) \
    .orderBy(desc("total_kills")) \
    .limit(1) \
    .show()

#+---------------+-----------+
#|player_gamertag|total_kills|
#+---------------+-----------+
#|       EcZachly|    1503498|
#+---------------+-----------+

# - Which playlist gets played the most?
result_df.groupBy("playlist_id") \
    .count() \
    .orderBy(desc("count")) \
    .limit(1) \
    .show(truncate=False)

# +------------------------------------+-------+
# |playlist_id                         |count  |
# +------------------------------------+-------+
# |f72e0ef0-7c4a-4307-af78-8e38dac3fdba|1565529|
# +------------------------------------+-------+

#     - Which map gets played the most?
result_df.groupBy("map_id", "map_name") \
    .count() \
    .orderBy(desc("count")) \
    .limit(1) \
    .show(truncate=False)

# +------------------------------------+--------+-------+
# |map_id                              |map_name|count  |
# +------------------------------------+--------+-------+
# |c74c9d0f-f206-11e4-8330-24be05e24f7e|Alpine  |1445545|
# +------------------------------------+--------+-------+

#  - Which map do players get the most Killing Spree medals on?

result_df.where(col("medal_name") == "Killing Frenzy") \
    .groupBy("map_id", "map_name", "medal_name") \
    .count() \
    .orderBy(desc("count")) \
    .limit(1) \
    .show(truncate=False)

# +------------------------------------+--------+--------------+-----+
# |map_id                              |map_name|medal_name    |count|
# +------------------------------------+--------+--------------+-----+
# |c74c9d0f-f206-11e4-8330-24be05e24f7e|Alpine  |Killing Frenzy|5872 |
# +------------------------------------+--------+--------------+-----+

+---------------+-----------+
|player_gamertag|total_kills|
+---------------+-----------+
|       EcZachly|    1503498|
+---------------+-----------+

+------------------------------------+-------+
|playlist_id                         |count  |
+------------------------------------+-------+
|f72e0ef0-7c4a-4307-af78-8e38dac3fdba|1565529|
+------------------------------------+-------+

+------------------------------------+--------+-------+
|map_id                              |map_name|count  |
+------------------------------------+--------+-------+
|c74c9d0f-f206-11e4-8330-24be05e24f7e|Alpine  |1445545|
+------------------------------------+--------+-------+

+------------------------------------+--------+--------------+-----+
|map_id                              |map_name|medal_name    |count|
+------------------------------------+--------+--------------+-----+
|c74c9d0f-f206-11e4-8330-24be05e24f7e|Alpine  |Killing Frenzy|5872 |
+------------------------------------+--------+---------

In [154]:
# medals.createTempView("medals")
spark.sql("SELECT DISTINCT name FROM medals WHERE name like '%ill%'").show(20, False)


+--------------+
|name          |
+--------------+
|Ball Kill     |
|Flag Kill     |
|Carrier Kill  |
|Beam Kill     |
|Kill          |
|Soldier Kill  |
|Killamanjaro  |
|Starkiller    |
|Caster Kill   |
|Splaser Kill  |
|Knight Kill   |
|Marine Kill   |
|Jackal Kill   |
|Clutch Kill   |
|Hydra Kill    |
|Crawler Kill  |
|Watcher Kill  |
|Killtrocity   |
|Killing Frenzy|
|Melee Kill    |
+--------------+
only showing top 20 rows



25/01/27 19:01:57 WARN HintErrorLogger: A join hint (strategy=broadcast) is specified but it is not part of a join relation.


In [68]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.matches_bucketed.files

size,num_files,sorted
521497,16,sorted


In [70]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.matches_bucketed.files

size,num_files,sorted
519192,16,sorted


In [71]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.match_details_bucketed.files

size,num_files,sorted
1852087,16,sorted


In [73]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.match_details_bucketed.files

size,num_files,sorted
1850057,16,sorted
