In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, broadcast, to_timestamp

# Build spark session
spark = SparkSession.builder \
    .appName("Jupyter") \
    .config("spark.driver.memory", "15g") \
    .getOrCreate()

# Deactivate automatic broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

spark

25/01/28 20:20:27 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## Data
**match_details**
- a row for every players performance in a match
  
**matches**
- a row for every match

**medals_matches_players**
- a row for every medal type a player gets in a match
     
**medals**
- a row for every medal type

## Homework Assignment

1. Build a Spark job that
- Disables automatic broadcast join with `spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")`
- Explicitly broadcast JOINs `matches` and `maps`
- Bucket join `match_details`, `matches`, and `medals_matches_players` on `match_id` with 16 buckets

2. Aggregate the joined data frame to figure out questions like:
- Which player averages the most kills per game?
- Which playlist gets played the most?
- Which map gets played the most?
- Which map do players get the most Killing Spree medals on?

3. Use the aggregated data set to:
- Try different `.sortWithinPartitions` to see which has the smallest data size (hint: `playlists` and `maps` are both very low cardinality)


In [2]:
# Load data
medals = spark.read.option("header", "true") \
                .csv("/home/iceberg/data/medals.csv")

maps = spark.read.option("header", "true") \
                .csv("/home/iceberg/data/maps.csv")

match_details = spark.read.option("header", "true") \
                .csv("/home/iceberg/data/match_details.csv")

matches = spark.read.option("header", "true") \
                .csv("/home/iceberg/data/matches.csv") \
                .withColumn("event_ts", to_timestamp("completion_date")) \
                .withColumn("event_date", expr("DATE_TRUNC('DAY', completion_date)"))

medals_matches_players = spark.read.option("header", "true") \
                .csv("/home/iceberg/data/medals_matches_players.csv")

matches.take(5)
            

[Row(match_id='11de1a94-8d07-4162-9f5f-d3cc753c811c', mapid='c7edbf0f-f206-11e4-aa52-24be05e24f7e', is_team_game='true', playlist_id='f72e0ef0-7c4a-4307-af78-8e38dac3fdba', game_variant_id='1e473914-46e4-408d-af26-178fb115de76', is_match_over='true', completion_date='2016-02-22 00:00:00.000000', match_duration=None, game_mode=None, map_variant_id=None, event_ts=datetime.datetime(2016, 2, 22, 0, 0), event_date=datetime.datetime(2016, 2, 22, 0, 0)),
 Row(match_id='d3643e71-3e51-43e6-a200-f4a7f306ac12', mapid='cb914b9e-f206-11e4-b447-24be05e24f7e', is_team_game='false', playlist_id='d0766624-dbd7-4536-ba39-2d890a6143a9', game_variant_id='257a305e-4dd3-41f1-9824-dfe7e8bd59e1', is_match_over='true', completion_date='2016-02-14 00:00:00.000000', match_duration=None, game_mode=None, map_variant_id=None, event_ts=datetime.datetime(2016, 2, 14, 0, 0), event_date=datetime.datetime(2016, 2, 14, 0, 0)),
 Row(match_id='d78d2aae-36e4-48ac-a3b5-6d4d90f90ace', mapid='c7edbf0f-f206-11e4-aa52-24be05e24f

In [3]:
## Broadcast JOIN `matches` and `maps`

matches_df = matches.join(
    broadcast(maps),
    matches["mapid"] == maps["mapid"],
    how = "inner"
)#.explain()

# matches_df.take(5)

### Bucket join `match_details`, `matches`, and `medals_matches_players` on `match_id` with 16 buckets

In [4]:
medals_matches_players.printSchema()

root
 |-- match_id: string (nullable = true)
 |-- player_gamertag: string (nullable = true)
 |-- medal_id: string (nullable = true)
 |-- count: string (nullable = true)



In [5]:
%%sql
CREATE DATABASE IF NOT EXISTS bootcamp_hw

In [6]:
%%sql
DROP TABLE IF EXISTS bootcamp_hw.match_details_bucketed

In [7]:
%%sql
DROP TABLE IF EXISTS bootcamp_hw.matches_bucketed

In [8]:
%%sql
DROP TABLE IF EXISTS bootcamp_hw.medals_matches_players_bucketed

In [9]:
%%sql
DROP TABLE IF EXISTS bootcamp_hw.matches_full_details_bucketed

In [10]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp_hw.match_details_bucketed(
    match_id STRING,
    player_gamertag STRING,
    previous_spartan_rank STRING,
    spartan_rank STRING,
    previous_total_xp STRING,
    total_xp STRING,
    previous_csr_tier STRING,
    previous_csr_designation STRING,
    previous_csr STRING,
    previous_csr_percent_to_next_tier STRING,
    previous_csr_rank STRING,
    current_csr_tier STRING,
    current_csr_designation STRING,
    current_csr STRING,
    current_csr_percent_to_next_tier STRING,
    current_csr_rank STRING,
    player_rank_on_team STRING,
    player_finished STRING,
    player_average_life STRING,
    player_total_kills STRING,
    player_total_headshots STRING,
    player_total_weapon_damage STRING,
    player_total_shots_landed STRING,
    player_total_melee_kills STRING,
    player_total_melee_damage STRING,
    player_total_assassinations STRING,
    player_total_ground_pound_kills STRING,
    player_total_shoulder_bash_kills STRING,
    player_total_grenade_damage STRING,
    player_total_power_weapon_damage STRING,
    player_total_power_weapon_grabs STRING,
    player_total_deaths STRING,
    player_total_assists STRING,
    player_total_grenade_kills STRING,
    did_win STRING,
    team_id STRING
)
USING parquet
PARTITIONED BY (bucket(16, match_id));


25/01/28 20:20:38 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [11]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp_hw.matches_bucketed(
    match_id STRING,
    mapid STRING,
    is_team_game STRING,
    playlist_id STRING,
    game_variant_id STRING,
    is_match_over STRING,
    completion_date TIMESTAMP,
    match_duration STRING,
    game_mode STRING,
    map_variant_id STRING,
    event_date DATE
)
USING parquet
PARTITIONED BY (bucket(16, match_id));

In [12]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp_hw.medals_matches_players_bucketed(
    match_id STRING,
    player_gamertag STRING,
    medal_id STRING,
    count STRING
)
USING parquet
PARTITIONED BY (bucket(16, match_id));

In [13]:
# Declare bucket number
num_buckets = 16

# Create bucketed tables
match_details.write.format("iceberg") \
                            .bucketBy(num_buckets, "match_id") \
                            .partitionBy("match_id") \
                            .mode("overwrite") \
                            .saveAsTable("bootcamp_hw.match_details_bucketed")

matches.write.format("iceberg") \
                            .bucketBy(num_buckets, "match_id") \
                            .partitionBy("match_id") \
                            .mode("overwrite") \
                            .saveAsTable("bootcamp_hw.matches_bucketed")

medals_matches_players.write.format("iceberg") \
                            .bucketBy(num_buckets, "match_id") \
                            .partitionBy("match_id") \
                            .mode("overwrite") \
                            .saveAsTable("bootcamp_hw.medals_matches_players_bucketed")

25/01/28 20:20:46 ERROR Utils: Aborting task                      (0 + 11) / 11]
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:79)
	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.<init>(CodecFactory.java:158)
	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:219)
	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:202)
	at org.apache.iceberg.parquet.ParquetWriter.<init>(ParquetWriter.java:90)
	at org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:352)
	at org.apache.iceberg.parquet.Parquet$DataWriteBuilder.build(Parquet.java:734)
	at org.apache.iceberg.data.BaseFileWriterFactory.newDataWriter(BaseFileWriterFactory.java:131)
	at org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:52)
	at org.apache.iceberg.io.RollingDataWriter.newWrit



25/01/28 20:20:46 ERROR Utils: Aborting task
java.lang.OutOfMemoryError: Java heap space
25/01/28 20:20:46 ERROR DataWritingSparkTask: Aborting commit for partition 5 (task 18, attempt 0, stage 8.0)
25/01/28 20:20:46 ERROR Utils: Aborting task
java.lang.OutOfMemoryError: Java heap space
25/01/28 20:20:46 ERROR DataWritingSparkTask: Aborting commit for partition 2 (task 15, attempt 0, stage 8.0)
25/01/28 20:20:46 ERROR Utils: Aborting task
java.lang.OutOfMemoryError: Java heap space
25/01/28 20:20:46 ERROR DataWritingSparkTask: Aborting commit for partition 10 (task 23, attempt 0, stage 8.0)
25/01/28 20:20:47 ERROR Utils: Aborting task
java.lang.OutOfMemoryError: Java heap space
25/01/28 20:20:47 ERROR DataWritingSparkTask: Aborting commit for partition 4 (task 17, attempt 0, stage 8.0)
25/01/28 20:20:47 ERROR Utils: Aborting task
java.lang.OutOfMemoryError: Java heap space
25/01/28 20:20:47 ERROR Utils: Aborting task
java.lang.OutOfMemoryError: Java heap space
25/01/28 20:20:47 ERROR D

Py4JError: py4j does not exist in the JVM

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


In [14]:
# Declare bucket number
num_buckets = 16

# Create bucketed tables
match_details.writeTo("bootcamp_hw.match_details_bucketed") \
    .option("write.format.default", "parquet") \
    .option("bucketby", "match_id") \
    .option("num-buckets", num_buckets) \
    .createOrReplace()

matches.writeTo("bootcamp_hw.matches_bucketed") \
    .option("write.format.default", "parquet") \
    .option("bucketby", "match_id") \
    .option("num-buckets", num_buckets) \
    .createOrReplace()

medals_matches_players.writeTo("bootcamp_hw.medals_matches_players_bucketed") \
    .option("write.format.default", "parquet") \
    .option("bucketby", "match_id") \
    .option("num-buckets", num_buckets) \
    .createOrReplace()


ConnectionRefusedError: [Errno 111] Connection refused

In [15]:
spark.conf.set("spark.sql.shuffle.partitions", num_buckets)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", "false")

ConnectionRefusedError: [Errno 111] Connection refused

In [16]:
# spark.sql("""
#   SELECT * 
#   FROM bootcamp_hw.matches_bucketed mb
#   JOIN bootcamp_hw.match_details_bucketed mdb
#     ON mb.match_id = mdb.match_id
# """).explain()

In [17]:
match_details_bucketed = spark.table("bootcamp_hw.match_details_bucketed")
matches_bucketed = spark.table("bootcamp_hw.matches_bucketed")
medals_matches_players_bucketed = spark.table("bootcamp_hw.medals_matches_players_bucketed")



matches_bucketed \
    .join(
        match_details_bucketed,
        on = "match_id",
        how = "inner"
    ) \
    .explain()

ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
val matches_bucketed = spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/matches.csv")
                        .withColumn("event_ts", to_timestamp(col("completion_date")))
                        .withColumn("event_date", expr("DATE_TRUNC('DAY', completion_date)"))

val match_details_bucketed = spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/match_details.csv")

val medals_matches_players_bucketed = spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/medals_matches_players.csv")

In [None]:
spark.sql("""DROP TABLE IF EXISTS bootcamp_hw.matches_bucketed""")

val matches_bucketed_DDL = """
    CREATE TABLE IF NOT EXISTS bootcamp_hw.matches_bucketed (
      match_id STRING,
      mapid STRING,
      is_team_game BOOLEAN,
      playlist_id STRING,
      game_variant_id STRING,
      is_match_over BOOLEAN,
      completion_date TIMESTAMP,
      match_duration STRING,
      game_mode STRING,
      map_variant_id STRING,
      event_date DATE
    )
    USING iceberg
    PARTITIONED BY (completion_date, bucket(16, match_id));
"""
spark.sql(matches_bucketed_DDL)

In [None]:
matches_bucketed.select(
  $"match_id",
  $"mapid",
  $"is_team_game",
  $"playlist_id",
  $"game_variant_id",
  $"is_match_over",
  $"completion_date",
  $"match_duration",
  $"game_mode",
  $"map_variant_id",
  $"event_date",
)
    .write.mode("append")
    .partitionBy("completion_date")
    .bucketBy(16, "match_id")
    .saveAsTable("bootcamp_hw.matches_bucketed")
    

In [None]:
spark.sql("""DROP TABLE IF EXISTS bootcamp_hw.match_details_bucketed""")

val match_details_bucketed_DDL = """
    CREATE TABLE IF NOT EXISTS bootcamp_hw.match_details_bucketed (
      match_id STRING,
      player_gamertag STRING,
      player_total_kills INTEGER,
      player_total_headshots INTEGER,
      player_total_deaths INTEGER,
      did_win INTEGER,
      team_id STRING
    )
    USING iceberg
    PARTITIONED BY (bucket(16, match_id));
"""
spark.sql(match_details_bucketed_DDL)

In [None]:
match_details_bucketed.select(
  $"match_id",
  $"player_gamertag",
  $"player_total_kills",
  $"player_total_headshots",
  $"player_total_deaths",
  $"did_win",
  $"team_id",
)
    .write.mode("append")
    .bucketBy(16, "match_id")
    .saveAsTable("bootcamp_hw.match_details_bucketed")
    

In [None]:
spark.sql("""DROP TABLE IF EXISTS bootcamp_hw.medals_matches_players_bucketed""")

val medals_matches_players_bucketed_DDL = """
    CREATE TABLE IF NOT EXISTS bootcamp_hw.medals_matches_players_bucketed(
      match_id STRING,
      player_gamertag STRING,
      medal_id LONG,
      count INTEGER
    )
    USING iceberg
    PARTITIONED BY (bucket(16, match_id));
"""
spark.sql(medals_matches_players_bucketed_DDL)

In [None]:
medals_matches_players_bucketed.select(
  $"match_id",
  $"player_gamertag",
  $"medal_id",
  $"count",
)
    .write.mode("append")
    .bucketBy(16, "match_id")
    .saveAsTable("bootcamp_hw.medals_matches_players_bucketed")

In [None]:
spark.sql("""DROP TABLE IF EXISTS bootcamp_hw.matches_full_details_bucketed""")


In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

spark.sql("""
  SELECT 
    mb.match_id
  , mb.mapid
  , mb.is_team_game
  , mb.playlist_id
  , mb.game_variant_id
  , mb.is_match_over
  , mb.completion_date
  , mb.match_duration
  , mb.game_mode
  , mb.map_variant_id
  , mb.event_date
  , mdb.player_gamertag
  , mdb.player_total_kills
  , mdb.player_total_headshots
  , mdb.player_total_deaths
  , mdb.did_win
  , mdb.team_id
  , mmpb.medal_id
  , mmpb.count
  FROM bootcamp_hw.matches_bucketed mb
  LEFT JOIN bootcamp_hw.match_details_bucketed mdb
    ON mb.match_id = mdb.match_id
  LEFT JOIN bootcamp_hw.medals_matches_players_bucketed mmpb
    ON mdb.match_id = mmpb.match_id AND mdb.player_gamertag = mmpb.player_gamertag
"""
).explain()
      

Which player averages the most kills per game?

In [None]:
spark.sql("""
  WITH match_summary AS (
    SELECT 
      player_gamertag
    , COUNT(match_id) AS num_matches
    , COUNT(DISTINCT match_id) AS num_distinct_matches
    , SUM(player_total_kills) AS total_kills
    FROM matches_full_details_bucketed
    WHERE 1=1
    GROUP BY 1
  )

  , avg_kills_top_10 AS (
    SELECT 
      player_gamertag
    , total_kills / COALESCE(num_matches, 1) AS avg_kills
    FROM match_summary
    WHERE 1=1
    ORDER BY 2 DESC
    LIMIT 10
  )

  SELECT *
  FROM avg_kills_top_10
"""
)

Which playlist gets played the most?

In [None]:
spark.sql("""
  WITH match_summary AS (
    SELECT 
      playlist_id
    , COUNT(DISTINCT match_id) AS num_distinct_matches
    FROM matches_full_details_bucketed
    WHERE 1=1
    GROUP BY 1
  )

  SELECT *
  FROM match_summary
  WHERE 1=1
  ORDER BY 2 DESC
  LIMIT 10
"""
)

Which map gets played the most?

In [None]:
spark.sql("""
  WITH match_summary AS (
    SELECT 
      mapid AS map_id
    , COUNT(DISTINCT match_id) AS num_distinct_matches
    FROM matches_full_details_bucketed
    WHERE 1=1
    GROUP BY 1
  )

  SELECT *
  FROM match_summary
  WHERE 1=1
  ORDER BY 2 DESC
  LIMIT 10
"""
)

Which map do players get the most Killing Spree medals on?