In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, broadcast
from pyspark.sql.types import BooleanType, IntegerType, TimestampType
from pyspark.sql import functions as F

# Создание Spark-сессии
spark = SparkSession.builder \
    .appName("Iceberg Data Load") \
    .getOrCreate()

# Загрузка matches.csv
matches = spark.read.option("header", "true") \
    .csv("/home/iceberg/data/matches.csv") \
    .withColumn("is_team_game", col("is_team_game").cast(BooleanType())) \
    .withColumn("completion_date", col("completion_date").cast(TimestampType()))

# Загрузка match_details.csv
match_details = spark.read.option("header", "true") \
    .csv("/home/iceberg/data/match_details.csv") \
    .withColumn("player_total_kills", col("player_total_kills").cast(IntegerType())) \
    .withColumn("player_total_deaths", col("player_total_deaths").cast(IntegerType()))

# Загрузка medals.csv
medals = spark.read.option("header", "true") \
    .csv("/home/iceberg/data/medals.csv")

# Загрузка medals_matches_players.csv
medals_matches_players = spark.read.option("header", "true") \
    .csv("/home/iceberg/data/medals_matches_players.csv") \
    .withColumn("count", col("count").cast(IntegerType()))

# Загрузка maps.csv
maps = spark.read.option("header", "true") \
    .csv("/home/iceberg/data/maps.csv")

In [32]:
matches.printSchema()

root
 |-- match_id: string (nullable = true)
 |-- mapid: string (nullable = true)
 |-- is_team_game: boolean (nullable = true)
 |-- playlist_id: string (nullable = true)
 |-- game_variant_id: string (nullable = true)
 |-- is_match_over: string (nullable = true)
 |-- completion_date: timestamp (nullable = true)
 |-- match_duration: string (nullable = true)
 |-- game_mode: string (nullable = true)
 |-- map_variant_id: string (nullable = true)



In [33]:
match_details.printSchema()

root
 |-- match_id: string (nullable = true)
 |-- player_gamertag: string (nullable = true)
 |-- previous_spartan_rank: string (nullable = true)
 |-- spartan_rank: string (nullable = true)
 |-- previous_total_xp: string (nullable = true)
 |-- total_xp: string (nullable = true)
 |-- previous_csr_tier: string (nullable = true)
 |-- previous_csr_designation: string (nullable = true)
 |-- previous_csr: string (nullable = true)
 |-- previous_csr_percent_to_next_tier: string (nullable = true)
 |-- previous_csr_rank: string (nullable = true)
 |-- current_csr_tier: string (nullable = true)
 |-- current_csr_designation: string (nullable = true)
 |-- current_csr: string (nullable = true)
 |-- current_csr_percent_to_next_tier: string (nullable = true)
 |-- current_csr_rank: string (nullable = true)
 |-- player_rank_on_team: string (nullable = true)
 |-- player_finished: string (nullable = true)
 |-- player_average_life: string (nullable = true)
 |-- player_total_kills: integer (nullable = true)
 

In [34]:
medals.printSchema()

root
 |-- medal_id: string (nullable = true)
 |-- sprite_uri: string (nullable = true)
 |-- sprite_left: string (nullable = true)
 |-- sprite_top: string (nullable = true)
 |-- sprite_sheet_width: string (nullable = true)
 |-- sprite_sheet_height: string (nullable = true)
 |-- sprite_width: string (nullable = true)
 |-- sprite_height: string (nullable = true)
 |-- classification: string (nullable = true)
 |-- description: string (nullable = true)
 |-- name: string (nullable = true)
 |-- difficulty: string (nullable = true)



In [35]:
# creating a table bootcamp.matches
matchesDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.matches (
     match_id STRING,
     mapid STRING,
     is_team_game BOOLEAN,
     playlist_id STRING,
     completion_date TIMESTAMP
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """
spark.sql(matchesDDL)

#saving data from a dataframe to a table
matches.select(
    F.col("match_id"), F.col("mapid"), F.col("is_team_game"), F.col("playlist_id"), F.col("completion_date")
).write \
    .mode("append") \
    .bucketBy(16, "match_id") \
    .saveAsTable("bootcamp.matches")

# creating a table bootcamp.match_details
matchDetailsDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.match_details (
     match_id STRING,
     player_gamertag STRING,
     player_total_kills INT,
     player_total_deaths INT
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """
spark.sql(matchDetailsDDL)

#saving data from a dataframe to a table
match_details.select(
     F.col("match_id"), F.col("player_gamertag"), F.col("player_total_kills"), F.col("player_total_deaths")
).write \
    .mode("append") \
    .bucketBy(16, "match_id") \
    .saveAsTable("bootcamp.match_details")

# creating a table bootcamp.medals
medalsDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.medals (
     medal_id STRING,
     classification STRING,
     name STRING
 )
 USING iceberg;
 """
spark.sql(medalsDDL)

#saving data from a dataframe to a table
medals.select(
    F.col("medal_id"), F.col("classification"), F.col("name")
).write \
    .mode("append") \
    .saveAsTable("bootcamp.medals")

# creating a table bootcamp.medals_matches_players
medals_matches_playersDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.medals_matches_players (
     match_id STRING,
     player_gamertag STRING,
     medal_id STRING,
     count INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id)); 
 """
spark.sql(medals_matches_playersDDL)

#saving data from a dataframe to a table
medals_matches_players.select(
    F.col("match_id"), F.col("player_gamertag"), F.col("medal_id"), F.col("count")
).write \
    .mode("append") \
    .bucketBy(16, "match_id") \
    .saveAsTable("bootcamp.medals_matches_players")

# creating a table bootcamp.maps
mapsDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.maps (
     mapid STRING,
     name STRING
 )
 USING iceberg;
 """
spark.sql(mapsDDL)

#saving data from a dataframe to a table
maps.select(
    "mapid", "name"
).write.mode("append").saveAsTable("bootcamp.maps")

                                                                                

In [36]:
spark.sql("""
    SELECT * FROM bootcamp.matches
        
""").show()

+--------------------+--------------------+------------+--------------------+-------------------+
|            match_id|               mapid|is_team_game|         playlist_id|    completion_date|
+--------------------+--------------------+------------+--------------------+-------------------+
|f44c9997-eb6f-4d6...|ce1dc2de-f206-11e...|        true|0504ca3c-de41-48f...|2016-02-28 00:00:00|
|f0f2daf2-52f3-4ff...|cbcea2c0-f206-11e...|        NULL|2323b76a-db98-4e0...|2016-02-04 00:00:00|
|8aec419e-2bfa-4fc...|c7edbf0f-f206-11e...|        true|f72e0ef0-7c4a-430...|2016-01-07 00:00:00|
|c6f24b65-bb73-489...|cebd854f-f206-11e...|        NULL|c98949ae-60a8-43d...|2016-01-26 00:00:00|
|a868eb4e-8b58-4e6...|c7805740-f206-11e...|        true|f72e0ef0-7c4a-430...|2016-02-02 00:00:00|
|a3c79d21-291d-401...|cdb934b0-f206-11e...|        NULL|c98949ae-60a8-43d...|2016-01-26 00:00:00|
|341ede74-f260-450...|c7edbf0f-f206-11e...|        NULL|f72e0ef0-7c4a-430...|2016-01-26 00:00:00|
|df9f097f-a074-4ec..

In [37]:
spark.sql("""
    SELECT * FROM bootcamp.match_details
        
""").show()

+--------------------+---------------+------------------+-------------------+
|            match_id|player_gamertag|player_total_kills|player_total_deaths|
+--------------------+---------------+------------------+-------------------+
|f8852913-2ccf-46f...|    OneWingKing|                 7|                  6|
|155cfd23-4f97-4f1...|   BigChubSmith|                15|                 11|
|155cfd23-4f97-4f1...|  JakeWilson801|                18|                  9|
|155cfd23-4f97-4f1...|      taterbase|                 1|                 12|
|155cfd23-4f97-4f1...| BeyondHumanx39|                13|                 14|
|155cfd23-4f97-4f1...|   Twinsnakes05|                16|                 11|
|155cfd23-4f97-4f1...|  Maverick62011|                 9|                 14|
|155cfd23-4f97-4f1...|       EcZachly|                16|                 16|
|155cfd23-4f97-4f1...|      WhiteSpic|                10|                 12|
|b8d81721-befb-427...|  JakeWilson801|                16|       

In [38]:
spark.sql("""
    SELECT * FROM bootcamp.medals
        
""").show()

+----------+-----------------+--------------+
|  medal_id|   classification|          name|
+----------+-----------------+--------------+
|2315448068|             NULL|          NULL|
|3565441934|             NULL|          NULL|
|4162659350|         Breakout| Buzzer Beater|
|1573153198|         Breakout|    Vanquisher|
| 298813630|            Style|Spartan Charge|
|3824002610|         Vehicles|  Ghost Assist|
|3324603383|          Warzone|    Grunt Kill|
| 979431049|         Breakout|       Bifecta|
|3098362934|WeaponProficiency|  Perfect Kill|
|2435743433|          Warzone|  Base Defense|
|2430242797|     KillingSpree| Killing Spree|
|1427531503|            Style| Team Takedown|
|2359847435|         Breakout|    Extinction|
|1691836029|            Style|   Hard Target|
|2766284219|            Style|     Quickdraw|
|3354395650|      Strongholds| Capture Spree|
|2564994165|WeaponProficiency|Big Gun Runner|
|2896365521|            Style| Team Takedown|
|3786961025|            Style|    

In [41]:
spark.sql("""
    SELECT * FROM bootcamp.medals_matches_players
        
""").show()

+--------------------+---------------+----------+-----+
|            match_id|player_gamertag|  medal_id|count|
+--------------------+---------------+----------+-----+
|27d7c16b-b780-4f8...|       EcZachly| 824733727|    1|
|27d7c16b-b780-4f8...|       EcZachly|3261908037|    5|
|27d7c16b-b780-4f8...|       EcZachly|2078758684|    1|
|27d7c16b-b780-4f8...|       EcZachly|1573153198|    1|
|27d7c16b-b780-4f8...|       EcZachly|2782465081|    1|
|27d7c16b-b780-4f8...|       EcZachly|2287626681|    1|
|e39c1eac-a39b-4e0...|       EcZachly| 250435527|    1|
|e39c1eac-a39b-4e0...|       EcZachly|3261908037|    2|
|e39c1eac-a39b-4e0...|       EcZachly|3400287617|    1|
|6128f58a-e42e-472...|       EcZachly|3261908037|    8|
|6128f58a-e42e-472...|       EcZachly|2078758684|    2|
|c2856e2e-d674-409...|       EcZachly|3261908037|   13|
|c2856e2e-d674-409...|       EcZachly|3001183151|    1|
|c2856e2e-d674-409...|       EcZachly|3400287617|    5|
|c2856e2e-d674-409...|       EcZachly|2838259753

In [42]:
spark.sql("""
    SELECT * FROM bootcamp.maps
        
""").show()

+--------------------+-------------------+
|               mapid|               name|
+--------------------+-------------------+
|c93d708f-f206-11e...|              Urban|
|cb251c51-f206-11e...|     Raid on Apex 7|
|c854e54f-f206-11e...|March on Stormbreak|
|c8d69870-f206-11e...| Escape from A.R.C.|
|73ed1fd0-45e5-4bb...|             Osiris|
|96c3e3dd-7703-408...|          Blue Team|
|1c4f8e19-b046-4f7...|            Glassed|
|825065cf-df57-42e...|        Unconfirmed|
|9a188f67-1664-4d7...|           Alliance|
|2702ea83-2c3e-4fd...|   Before the Storm|
|82f8471c-a2ef-408...|            Genesis|
|fcd7caa4-37c9-436...|       The Breaking|
|7dc80b62-dd39-41d...|          Guardians|
|db9f76c4-6f9a-4c1...|   Meridian Station|
|b486c8f6-0b00-4a4...|         Evacuation|
|37c717b8-7ae8-4be...|            Reunion|
|72c98d4a-4ee4-40d...|Sword of Sanghelios|
|f352eafd-d307-434...|        Enemy Lines|
|8d72dd82-3afb-409...|  Battle of Sunaion|
|c822b1c0-f206-11e...|             Summit|
+----------

In [23]:
# Register DataFrameы as a temporary view
# matches.createOrReplaceTempView("matches")
# match_details.createOrReplaceTempView("match_details")
# medals_matches_players.createOrReplaceTempView("medals_matches_players")
# medals.createOrReplaceTempView("medals")
# maps.createOrReplaceTempView("maps")

In [43]:
# Bucket join match_details, matches, and medal_matches_players on match_id with 16 buckets

bucketed_joined_df = spark.sql("""
SELECT mdb.match_id, mdb.player_gamertag, mdb.player_total_kills, mdb.player_total_deaths,
       md.mapid, md.playlist_id, md.completion_date, mmpb.medal_id, mmpb.count AS medal_count
FROM bootcamp.match_details mdb
JOIN bootcamp.matches md 
    ON mdb.match_id = md.match_id
JOIN bootcamp.medals_matches_players mmpb
    ON mdb.match_id = mmpb.match_id AND mdb.player_gamertag = mmpb.player_gamertag
""")

bucketed_joined_df.show()



+--------------------+---------------+------------------+-------------------+--------------------+--------------------+-------------------+----------+-----------+
|            match_id|player_gamertag|player_total_kills|player_total_deaths|               mapid|         playlist_id|    completion_date|  medal_id|medal_count|
+--------------------+---------------+------------------+-------------------+--------------------+--------------------+-------------------+----------+-----------+
|0001a1c4-83dc-4f4...|    ILLICIT 117|                23|                 28|c7805740-f206-11e...|780cc101-005c-4fc...|2016-01-06 00:00:00|3565443938|          4|
|0001a1c4-83dc-4f4...|    ILLICIT 117|                23|                 28|c7805740-f206-11e...|780cc101-005c-4fc...|2016-01-06 00:00:00|3261908037|          8|
|0001a1c4-83dc-4f4...|    ILLICIT 117|                23|                 28|c7805740-f206-11e...|780cc101-005c-4fc...|2016-01-06 00:00:00| 824733727|          1|
|0001a1c4-83dc-4f4...|

                                                                                

In [50]:
# Turn off the default broadcast join behavior
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# Create a temporary view for the main DataFrame
bucketed_joined_df.createOrReplaceTempView("bucketed_joined_df")

# Broadcast the maps and medals DataFrames
maps_broadcasted = broadcast(spark.table("bootcamp.maps"))
medals_broadcasted = broadcast(spark.table("bootcamp.medals"))

# Create temporary views for the broadcasted DataFrames
maps_broadcasted.createOrReplaceTempView("maps_broadcasted")
medals_broadcasted.createOrReplaceTempView("medals_broadcasted")

# Use SQL to join the DataFrames
final_joined_df = spark.sql("""
    SELECT bj.*, m.name AS map_name, ms.name AS medal_name
    FROM bucketed_joined_df bj
    LEFT JOIN maps_broadcasted m 
        ON bj.mapid = m.mapid
    LEFT JOIN medals_broadcasted ms
        ON bj.medal_id = ms.medal_id
""")

# Show the result
final_joined_df.show()



+--------------------+---------------+------------------+-------------------+--------------------+--------------------+-------------------+----------+-----------+--------+-------------------+
|            match_id|player_gamertag|player_total_kills|player_total_deaths|               mapid|         playlist_id|    completion_date|  medal_id|medal_count|map_name|         medal_name|
+--------------------+---------------+------------------+-------------------+--------------------+--------------------+-------------------+----------+-----------+--------+-------------------+
|0001a1c4-83dc-4f4...|    ILLICIT 117|                23|                 28|c7805740-f206-11e...|780cc101-005c-4fc...|2016-01-06 00:00:00|3565443938|          4| Glacier|Stronghold Captured|
|0001a1c4-83dc-4f4...|    ILLICIT 117|                23|                 28|c7805740-f206-11e...|780cc101-005c-4fc...|2016-01-06 00:00:00|3565443938|          4| Glacier|Stronghold Captured|
|0001a1c4-83dc-4f4...|    ILLICIT 117|  

                                                                                

In [52]:
# Group by player_gamertag and calculate the average kills
player_avg_kills_df = bucketed_joined_df \
    .groupBy("player_gamertag") \
    .agg(F.avg("player_total_kills").alias("avg_kills")) \
    .orderBy(F.col("avg_kills").desc())

# Show the top result
print("most kills per game")
player_avg_kills_df.show(1)

most kills per game




+---------------+---------+
|player_gamertag|avg_kills|
+---------------+---------+
|   gimpinator14|    109.0|
+---------------+---------+
only showing top 1 row



                                                                                

In [53]:
# Group by playlist_id and calculate the count of distinct match_id
most_played_playlist_df = bucketed_joined_df \
    .groupBy("playlist_id") \
    .agg(F.countDistinct("match_id").alias("playlist_count")) \
    .orderBy(F.col("playlist_count").desc())

# Show the top result
print("most played playlist")
most_played_playlist_df.show(1)

most played playlist




+--------------------+--------------+
|         playlist_id|playlist_count|
+--------------------+--------------+
|f72e0ef0-7c4a-430...|          7640|
+--------------------+--------------+
only showing top 1 row



                                                                                

In [54]:
# Create a temporary view for final_joined_df
final_joined_df.createOrReplaceTempView("final_joined_df")

# Group by mapid and map_name, count distinct match_id, and order by games_played in descending order
most_played_map_df = final_joined_df \
    .groupBy("mapid", "map_name") \
    .agg(F.countDistinct("match_id").alias("games_played")) \
    .orderBy(F.col("games_played").desc())

# Show the top result
print("most played map")
most_played_map_df.show(1)

most played map




+--------------------+--------------+------------+
|               mapid|      map_name|games_played|
+--------------------+--------------+------------+
|c7edbf0f-f206-11e...|Breakout Arena|        7032|
+--------------------+--------------+------------+
only showing top 1 row



                                                                                

In [56]:
# Filter by "Killing Spree" and then group by mapid and map_name, summing the medal_count
most_killing_spree_map_df = final_joined_df \
    .filter(F.col("medal_name") == "Killing Spree") \
    .distinct() \
    .groupBy("mapid", "map_name") \
    .agg(F.sum("medal_count").alias("killing_spree_count")) \
    .orderBy(F.col("killing_spree_count").desc())

# Show the top result
print("most killing spree_map")
most_killing_spree_map_df.show(1)

most killing spree_map




+--------------------+--------------+-------------------+
|               mapid|      map_name|killing_spree_count|
+--------------------+--------------+-------------------+
|c7edbf0f-f206-11e...|Breakout Arena|               6738|
+--------------------+--------------+-------------------+
only showing top 1 row



                                                                                

In [61]:
# Sorting within partitions based on low cardinality columns
sorted_df = final_joined_df \
    .sortWithinPartitions("mapid", "playlist_id")

# Show the result
sorted_df.show()

[Stage 260:>                                                        (0 + 1) / 1]

+--------------------+---------------+------------------+-------------------+--------------------+--------------------+-------------------+----------+-----------+--------+-------------------+
|            match_id|player_gamertag|player_total_kills|player_total_deaths|               mapid|         playlist_id|    completion_date|  medal_id|medal_count|map_name|         medal_name|
+--------------------+---------------+------------------+-------------------+--------------------+--------------------+-------------------+----------+-----------+--------+-------------------+
|006634bf-a2ef-45c...|AN EVIL VILLAIN|                 3|                  6|c74c9d0f-f206-11e...|0bcf2be1-3168-4e4...|2015-11-26 00:00:00|3565443938|          1|  Alpine|Stronghold Captured|
|006634bf-a2ef-45c...|AN EVIL VILLAIN|                 3|                  6|c74c9d0f-f206-11e...|0bcf2be1-3168-4e4...|2015-11-26 00:00:00|3565443938|          1|  Alpine|Stronghold Captured|
|006634bf-a2ef-45c...|AN EVIL VILLAIN|  

                                                                                