In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("SparkHomework")
    .config("spark.sql.autoBroadcastJoinThreshold", "-1")
    .config("spark.sql.debug.maxToStringFields", "1000")
    .config("spark.sql.iceberg.planning.distribution-mode", "hash")
    .appName("SparkIcebergOptimized")
    # Tell Spark to use the same number of partitions as our buckets
    .config("spark.sql.shuffle.partitions", "16") 
    # --- Configuration for REST Catalog ---
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.spark_catalog.type", "rest")
    .config("spark.sql.catalog.spark_catalog.uri", "http://rest:8181")
    # --- NEW: S3/MinIO Configuration for Spark and Iceberg ---
    .config("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .config("spark.sql.catalog.spark_catalog.s3.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .getOrCreate()
)

25/06/26 21:31:17 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
# %%
# Forcefully set the S3 configuration on the underlying Hadoop configuration
# This can sometimes fix issues where SparkConf doesn't propagate correctly.

sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://minio:9000")
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")

In [3]:
maps = spark.read.option("header", "true").csv("/home/iceberg/data/maps.csv")
matches = spark.read.option("header", "true").csv("/home/iceberg/data/matches.csv")

maps.createOrReplaceTempView("maps")
matches.createOrReplaceTempView("matches")

In [4]:
%%sql
SELECT * FROM maps

25/06/26 21:31:20 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


mapid,name,description
c93d708f-f206-11e4-a815-24be05e24f7e,Urban,Andesia was the crucible for countless heroes and villains caught in the throes of seething rebellion and righteous excess.
cb251c51-f206-11e4-8541-24be05e24f7e,Raid on Apex 7,This unbroken ring is a symbol of the discovery that shook the galaxy and changed the course of both human and Covenant destiny.
c854e54f-f206-11e4-bddc-24be05e24f7e,March on Stormbreak,
c8d69870-f206-11e4-b477-24be05e24f7e,Escape from A.R.C.,Scientists flocked to this Forerunner excavation in search of new beginnings. What they unearthed will lead to their inevitable end.
73ed1fd0-45e5-4bb9-ab6a-d2852c04ea7d,Osiris,
96c3e3dd-7703-4086-9e64-e3a23932bdc4,Blue Team,
1c4f8e19-b046-4f78-9e2d-959cba84663d,Glassed,
825065cf-df57-42e3-b845-830e7340ea43,Unconfirmed,
9a188f67-1664-4d7b-83ca-1d74f714f764,Alliance,
2702ea83-2c3e-4fd5-8370-60d9a6e0422f,Before the Storm,


In [5]:
%%sql
SELECT * FROM matches

match_id,mapid,is_team_game,playlist_id,game_variant_id,is_match_over,completion_date,match_duration,game_mode,map_variant_id
11de1a94-8d07-4162-9f5f-d3cc753c811c,c7edbf0f-f206-11e4-aa52-24be05e24f7e,True,f72e0ef0-7c4a-4307-af78-8e38dac3fdba,1e473914-46e4-408d-af26-178fb115de76,True,2016-02-22 00:00:00.000000,,,
d3643e71-3e51-43e6-a200-f4a7f306ac12,cb914b9e-f206-11e4-b447-24be05e24f7e,False,d0766624-dbd7-4536-ba39-2d890a6143a9,257a305e-4dd3-41f1-9824-dfe7e8bd59e1,True,2016-02-14 00:00:00.000000,,,
d78d2aae-36e4-48ac-a3b5-6d4d90f90ace,c7edbf0f-f206-11e4-aa52-24be05e24f7e,True,f72e0ef0-7c4a-4307-af78-8e38dac3fdba,1e473914-46e4-408d-af26-178fb115de76,True,2016-03-24 00:00:00.000000,,,55e5ee2e-88df-4657-b9ae-b6ec7ca64614
b440069e-ec5f-4f51-bdd1-bc0bc7fe1195,c7edbf0f-f206-11e4-aa52-24be05e24f7e,True,f72e0ef0-7c4a-4307-af78-8e38dac3fdba,1e473914-46e4-408d-af26-178fb115de76,True,2015-12-23 00:00:00.000000,,,ec3eef73-13e3-4d4b-a922-cc195109a842
1dd475fc-ee6b-4e1d-8140-c44d03812076,c93d708f-f206-11e4-a815-24be05e24f7e,True,0e39ead4-383b-4452-bbd4-babb7becd82e,42f97cca-2cb4-497a-a0fd-ceef1ba46bcc,True,2016-04-07 00:00:00.000000,,,
848f02ad-72ef-4792-9914-9673245c5f07,cbcea2c0-f206-11e4-8c4a-24be05e24f7e,True,2323b76a-db98-4e03-aa37-e171cfbdd1a4,257a305e-4dd3-41f1-9824-dfe7e8bd59e1,True,2016-03-17 00:00:00.000000,,,
e207adc1-4d7a-43ab-9854-071d7e7b68ba,cc74f4e1-f206-11e4-ad66-24be05e24f7e,True,2323b76a-db98-4e03-aa37-e171cfbdd1a4,257a305e-4dd3-41f1-9824-dfe7e8bd59e1,True,2016-04-05 00:00:00.000000,,,
1fb5c2ec-ca60-4342-826f-1ab60ea06ca2,ca737f8f-f206-11e4-a7e2-24be05e24f7e,True,bc0f8ad6-31e6-4a18-87d9-ad5a2dbc8212,257a305e-4dd3-41f1-9824-dfe7e8bd59e1,True,2015-12-16 00:00:00.000000,,,
54f1cbd2-2be6-4d5f-bd9e-24f1361d66f7,cbcea2c0-f206-11e4-8c4a-24be05e24f7e,,892189e9-d712-4bdb-afa7-1ccab43fbed4,257a305e-4dd3-41f1-9824-dfe7e8bd59e1,,2016-02-04 00:00:00.000000,,,7108c409-6d1e-41d1-aca2-53b5218fbc3d
9e079488-1355-4c61-8acd-b8667bc48caf,c74c9d0f-f206-11e4-8330-24be05e24f7e,True,0bcf2be1-3168-4e42-9fb5-3551d7dbce77,b45854a7-e6e1-4a9c-9104-139934511779,True,2015-11-22 00:00:00.000000,,,1c632c30-3994-4443-aa6b-41d58352eb60


In [6]:
from pyspark.sql.functions import broadcast

matchesAndMaps = (
    matches.join(
        broadcast(maps),
        on=[matches.mapid == maps.mapid],
        how="inner"
    )
)
matchesAndMaps.createOrReplaceTempView("matchesAndMaps")

matchesAndMapsPlan = (
    matches.join(
        broadcast(maps),
        on=[matches.mapid == maps.mapid],
        how="inner"
    )
).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [mapid#41], [mapid#17], Inner, BuildRight, false
   :- Filter isnotnull(mapid#41)
   :  +- FileScan csv [match_id#40,mapid#41,is_team_game#42,playlist_id#43,game_variant_id#44,is_match_over#45,completion_date#46,match_duration#47,game_mode#48,map_variant_id#49] Batched: false, DataFilters: [isnotnull(mapid#41)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/matches.csv], PartitionFilters: [], PushedFilters: [IsNotNull(mapid)], ReadSchema: struct<match_id:string,mapid:string,is_team_game:string,playlist_id:string,game_variant_id:string...
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=80]
      +- Filter isnotnull(mapid#17)
         +- FileScan csv [mapid#17,name#18,description#19] Batched: false, DataFilters: [isnotnull(mapid#17)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/maps.csv], PartitionFilters

In [7]:
%%sql
SELECT COUNT(1) FROM matchesAndMaps

count(1)
24025


In [8]:
matches = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/matches.csv")
match_details = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/match_details.csv")
medal_matches_players = spark.read.option("header", "true").option("inferSchema", "true").csv("/home/iceberg/data/medals_matches_players.csv")

                                                                                

In [9]:
medal_matches_players.printSchema()

root
 |-- match_id: string (nullable = true)
 |-- player_gamertag: string (nullable = true)
 |-- medal_id: long (nullable = true)
 |-- count: integer (nullable = true)



In [10]:
%%sql
CREATE DATABASE IF NOT EXISTS spark_catalog.bootcamp;

In [11]:
%%sql
DROP TABLE IF EXISTS spark_catalog.bootcamp.matchesBucketed;

In [12]:
%%sql
CREATE TABLE IF NOT EXISTS spark_catalog.bootcamp.matchesBucketed (
    match_id STRING,
    mapid STRING,
    is_team_game BOOLEAN,
    playlist_id STRING,
    game_variant_id STRING,
    is_match_over BOOLEAN,
    completion_date TIMESTAMP,
    match_duration STRING,
    game_mode STRING,
    map_variant_id STRING
)
USING iceberg
PARTITIONED BY (bucket(16, match_id))
TBLPROPERTIES (
    'write.sort-order'='match_id ASC',
    'write.distribution-mode'='hash'
);

In [13]:
%%sql
DROP TABLE IF EXISTS spark_catalog.bootcamp.matchDetailsBucketed;

In [14]:
%%sql
CREATE TABLE IF NOT EXISTS spark_catalog.bootcamp.matchDetailsBucketed (
    match_id STRING,
    player_gamertag STRING,
    previous_spartan_rank INT,
    spartan_rank INT,
    previous_total_xp INT,
    total_xp INT,
    previous_csr_tier INT,
    previous_csr_designation INT,
    previous_csr INT,
    previous_csr_percent_to_next_tier INT,
    previous_csr_rank INT,
    current_csr_tier INT,
    current_csr_designation INT,
    current_csr INT,
    current_csr_percent_to_next_tier INT,
    current_csr_rank INT,
    player_rank_on_team INT,
    player_finished BOOLEAN,
    player_average_life STRING,
    player_total_kills INT,
    player_total_headshots INT,
    player_total_weapon_damage DOUBLE,
    player_total_shots_landed INT,
    player_total_melee_kills INT,
    player_total_melee_damage DOUBLE,
    player_total_assassinations INT,
    player_total_ground_pound_kills INT,
    player_total_shoulder_bash_kills INT,
    player_total_grenade_damage DOUBLE,
    player_total_power_weapon_damage DOUBLE,
    player_total_power_weapon_grabs INT,
    player_total_deaths INT,
    player_total_assists INT,
    player_total_grenade_kills INT,
    did_win INT,
    team_id INT
)
USING iceberg
PARTITIONED BY (bucket(16, match_id))
TBLPROPERTIES (
    'write.sort-order'='match_id ASC',
    'write.distribution-mode'='hash'
);

In [15]:
%%sql
DROP TABLE IF EXISTS spark_catalog.bootcamp.medalsMatchesPlayersBucketed;

In [16]:
%%sql
CREATE TABLE IF NOT EXISTS spark_catalog.bootcamp.medalsMatchesPlayersBucketed (
    match_id STRING,
    player_gamertag STRING,
    medal_id BIGINT,
    count INT
)
USING iceberg
PARTITIONED BY (bucket(16, match_id))
TBLPROPERTIES (
    'write.sort-order'='match_id ASC',
    'write.distribution-mode'='hash'
);

In [None]:
matchDetailsBucketed = (
    match_details
    .write
    .bucketBy(16, "match_id")
    # .sortBy("match_id")
    .format("iceberg")
    .mode("overwrite")
    .saveAsTable("spark_catalog.bootcamp.matchDetailsBucketed")
)

matchesBucketed = (
    matches
    .write
    .bucketBy(16, "match_id")
    # .sortBy("match_id")
    .format("iceberg")
    .mode("overwrite")
    .saveAsTable("spark_catalog.bootcamp.matchesBucketed")
)

medalsMatchesPlayersBucketed = (
    medal_matches_players
    .write
    .bucketBy(16, "match_id")
    # .sortBy("match_id")
    .format("iceberg")
    .mode("overwrite")
    .saveAsTable("spark_catalog.bootcamp.medalsMatchesPlayersBucketed")
)

                                                                                

In [18]:
matchDetailsBucketed = spark.table("spark_catalog.bootcamp.matchDetailsBucketed")
matchesBucketed = spark.table("spark_catalog.bootcamp.matchesBucketed")
medalsMatchesPlayersBucketed = spark.table("spark_catalog.bootcamp.medalsMatchesPlayersBucketed")

In [19]:
# Join match_details, matches, and medal_matches_players on 'match_id'
joined_df = (
    matchDetailsBucketed
    .join(matchesBucketed, on="match_id", how="inner")
    .join(medalsMatchesPlayersBucketed, on="match_id", how="inner")
).explain()

joined_df = (
    matchDetailsBucketed
    .join(matchesBucketed, on="match_id", how="inner")
    .join(medalsMatchesPlayersBucketed, on="match_id", how="inner")
)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [match_id#501, player_gamertag#502, previous_spartan_rank#503, spartan_rank#504, previous_total_xp#505, total_xp#506, previous_csr_tier#507, previous_csr_designation#508, previous_csr#509, previous_csr_percent_to_next_tier#510, previous_csr_rank#511, current_csr_tier#512, current_csr_designation#513, current_csr#514, current_csr_percent_to_next_tier#515, current_csr_rank#516, player_rank_on_team#517, player_finished#518, player_average_life#519, player_total_kills#520, player_total_headshots#521, player_total_weapon_damage#522, player_total_shots_landed#523, player_total_melee_kills#524, player_total_melee_damage#525, player_total_assassinations#526, player_total_ground_pound_kills#527, player_total_shoulder_bash_kills#528, player_total_grenade_damage#529, player_total_power_weapon_damage#530, player_total_power_weapon_grabs#531, player_total_deaths#532, player_total_assists#533, player_total_grenade_kills#534, did_win#

In [24]:
joined_df.show(5)
joined_df.createOrReplaceTempView("joined_df")

+--------------------+---------------+---------------------+------------+-----------------+--------+-----------------+------------------------+------------+---------------------------------+-----------------+----------------+-----------------------+-----------+--------------------------------+----------------+-------------------+---------------+-------------------+------------------+----------------------+--------------------------+-------------------------+------------------------+-------------------------+---------------------------+-------------------------------+--------------------------------+---------------------------+--------------------------------+-------------------------------+-------------------+--------------------+--------------------------+-------+-------+--------------------+------------+--------------------+--------------------+-------------+-------------------+--------------+---------+--------------+---------------+----------+-----+
|            match_id|player_game

In [None]:
from pyspark.sql.functions import col, avg, desc, count

player_avg_kills = (
    joined_df
    .groupBy("player_gamertag")
    .agg(avg("player_total_kills").alias("average_kills"))
    .orderBy(desc("average_kills"))
)
player_avg_kills.show(5)

In [None]:
most_played_playlist = (
    joined_df
    .groupBy("playlist_name")
    .agg(count("*").alias("play_count"))
    .orderBy(desc("play_count"))
)
most_played_playlist.show(5)

In [None]:
most_played_map = (
    joined_df
    .groupBy("name") # 'name' is the map name column from the maps table
    .agg(count("*").alias("play_count"))
    .orderBy(desc("play_count"))
)
most_played_map.show(5)

In [None]:
killing_spree_map = (
    joined_df
    .filter(col("name_medal") == "Killing Spree")
    .groupBy("name") # 'name' is the map name
    .agg(count("*").alias("killing_spree_count"))
    .orderBy(desc("killing_spree_count"))
)
killing_spree_map.show(5)

In [None]:
partitioned_df = joined_df.repartition(8)
sorted_by_playlist = partitioned_df.sortWithinPartitions("playlist_id")
sorted_by_map = partitioned_df.sortWithinPartitions("name")

In [None]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files FROM spark_catalog.bootcamp.joined_df.files;