In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, broadcast, to_timestamp, to_date
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("Jupyter").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# 
spark

25/09/14 15:04:40 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
spark.conf.set('spark.sql.sources.v2.bucketing.enabled','true') 
spark.conf.set('spark.sql.sources.v2.bucketing.pushPartValues.enabled','true')
spark.conf.set('spark.sql.iceberg.planning.preserve-data-grouping','true')
spark.conf.set('spark.sql.requireAllClusterKeysForCoPartition','false')
spark.conf.set('spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled','true')

In [3]:
spark.sql("show tables in bootcamp").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
| bootcamp|joined_dataset_bu...|      false|
| bootcamp|joined_dataset_bu...|      false|
| bootcamp|match_details_buc...|      false|
| bootcamp|  match_details_hive|      false|
| bootcamp|    matches_bucketed|      false|
| bootcamp|        matches_hive|      false|
| bootcamp|              medals|      false|
| bootcamp|medals_matches_pl...|      false|
| bootcamp|medals_matches_pl...|      false|
+---------+--------------------+-----------+



In [4]:
%%sql
CREATE DATABASE IF NOT EXISTS bootcamp

In [5]:
%%sql

show tables in bootcamp

namespace,tableName,isTemporary
bootcamp,joined_dataset_bucketed,False
bootcamp,joined_dataset_bucketed_sorted,False
bootcamp,match_details_bucketed,False
bootcamp,match_details_hive,False
bootcamp,matches_bucketed,False
bootcamp,matches_hive,False
bootcamp,medals,False
bootcamp,medals_matches_players_bucketed,False
bootcamp,medals_matches_players_hive,False


In [6]:
%%sql

DROP TABLE if exists bootcamp.match_details_bucketed;

In [7]:
%%sql
CREATE TABLE bootcamp.match_details_bucketed (
    match_id STRING,
    player_gamertag STRING,
    previous_spartan_rank INT,
    spartan_rank INT,
    previous_total_xp BIGINT,
    total_xp BIGINT,
    previous_csr_tier INT,
    previous_csr_designation INT,
    previous_csr INT,
    previous_csr_percent_to_next_tier DOUBLE,
    previous_csr_rank INT,
    current_csr_tier INT,
    current_csr_designation INT,
    current_csr INT,
    current_csr_percent_to_next_tier DOUBLE,
    current_csr_rank INT,
    player_rank_on_team INT,
    player_finished BOOLEAN,
    player_average_life STRING,
    player_total_kills INT,
    player_total_headshots INT,
    player_total_weapon_damage DOUBLE,
    player_total_shots_landed INT,
    player_total_melee_kills INT,
    player_total_melee_damage DOUBLE,
    player_total_assassinations INT,
    player_total_ground_pound_kills INT,
    player_total_shoulder_bash_kills INT,
    player_total_grenade_damage DOUBLE,
    player_total_power_weapon_damage DOUBLE,
    player_total_power_weapon_grabs INT,
    player_total_deaths INT,
    player_total_assists INT,
    player_total_grenade_kills INT,
    did_win BOOLEAN,
    team_id INT
  ) USING iceberg PARTITIONED BY (bucket(16, match_id));

In [8]:
%%sql

describe bootcamp.match_details_bucketed;

col_name,data_type,comment
match_id,string,
player_gamertag,string,
previous_spartan_rank,int,
spartan_rank,int,
previous_total_xp,bigint,
total_xp,bigint,
previous_csr_tier,int,
previous_csr_designation,int,
previous_csr,int,
previous_csr_percent_to_next_tier,double,


In [9]:
match_details = spark.read.option("header", "true").csv("/home/iceberg/data/match_details.csv")

(
    match_details
    .withColumn("match_id", col("match_id").cast("string"))
    .withColumn("player_gamertag", col("player_gamertag").cast("string"))
    .withColumn("previous_spartan_rank", col("previous_spartan_rank").cast("int"))
    .withColumn("spartan_rank", col("spartan_rank").cast("int"))
    .withColumn("previous_total_xp", col("previous_total_xp").cast("bigint"))
    .withColumn("total_xp", col("total_xp").cast("bigint"))
    .withColumn("previous_csr_tier", col("previous_csr_tier").cast("int"))
    .withColumn("previous_csr_designation", col("previous_csr_designation").cast("int"))
    .withColumn("previous_csr", col("previous_csr").cast("int"))
    .withColumn("previous_csr_percent_to_next_tier", col("previous_csr_percent_to_next_tier").cast("double"))
    .withColumn("previous_csr_rank", col("previous_csr_rank").cast("int"))
    .withColumn("current_csr_tier", col("current_csr_tier").cast("int"))
    .withColumn("current_csr_designation", col("current_csr_designation").cast("int"))
    .withColumn("current_csr", col("current_csr").cast("int"))
    .withColumn("current_csr_percent_to_next_tier", col("current_csr_percent_to_next_tier").cast("double"))
    .withColumn("current_csr_rank", col("current_csr_rank").cast("int"))
    .withColumn("player_rank_on_team", col("player_rank_on_team").cast("int"))
    .withColumn("player_finished", col("player_finished").cast("boolean"))
    .withColumn("player_average_life", col("player_average_life").cast("string"))
    .withColumn("player_total_kills", col("player_total_kills").cast("int"))
    .withColumn("player_total_headshots", col("player_total_headshots").cast("int"))
    .withColumn("player_total_weapon_damage", col("player_total_weapon_damage").cast("double"))
    .withColumn("player_total_shots_landed", col("player_total_shots_landed").cast("int"))
    .withColumn("player_total_melee_kills", col("player_total_melee_kills").cast("int"))
    .withColumn("player_total_melee_damage", col("player_total_melee_damage").cast("double"))
    .withColumn("player_total_assassinations", col("player_total_assassinations").cast("int"))
    .withColumn("player_total_ground_pound_kills", col("player_total_ground_pound_kills").cast("int"))
    .withColumn("player_total_shoulder_bash_kills", col("player_total_shoulder_bash_kills").cast("int"))
    .withColumn("player_total_grenade_damage", col("player_total_grenade_damage").cast("double"))
    .withColumn("player_total_power_weapon_damage", col("player_total_power_weapon_damage").cast("double"))
    .withColumn("player_total_power_weapon_grabs", col("player_total_power_weapon_grabs").cast("int"))
    .withColumn("player_total_deaths", col("player_total_deaths").cast("int"))
    .withColumn("player_total_assists", col("player_total_assists").cast("int"))
    .withColumn("player_total_grenade_kills", col("player_total_grenade_kills").cast("int"))
    .withColumn("did_win", col("did_win").cast("boolean"))  # converte 0/1 para False/True
    .withColumn("team_id", col("team_id").cast("int"))
).write.mode("append").bucketBy(16, "match_id").saveAsTable("bootcamp.match_details_bucketed")

                                                                                

In [10]:
%%sql

select * from bootcamp.match_details_bucketed limit 5;

match_id,player_gamertag,previous_spartan_rank,spartan_rank,previous_total_xp,total_xp,previous_csr_tier,previous_csr_designation,previous_csr,previous_csr_percent_to_next_tier,previous_csr_rank,current_csr_tier,current_csr_designation,current_csr,current_csr_percent_to_next_tier,current_csr_rank,player_rank_on_team,player_finished,player_average_life,player_total_kills,player_total_headshots,player_total_weapon_damage,player_total_shots_landed,player_total_melee_kills,player_total_melee_damage,player_total_assassinations,player_total_ground_pound_kills,player_total_shoulder_bash_kills,player_total_grenade_damage,player_total_power_weapon_damage,player_total_power_weapon_grabs,player_total_deaths,player_total_assists,player_total_grenade_kills,did_win,team_id
7a50f4f6-beec-40cd-b0d8-5e5e7a788c7a,JakeWilson801,17,17,116601,117376,1.0,4.0,0.0,32.0,,1.0,4.0,0.0,62.0,,3,False,PT30.6740704S,12,12,590.0,39,0,0.0,0,0,0,0.0,0.0,0,5,2,0,True,1
7a50f4f6-beec-40cd-b0d8-5e5e7a788c7a,taterbase,5,5,11480,11951,1.0,3.0,0.0,34.0,,1.0,3.0,0.0,64.0,,4,False,PT18.0020486S,6,1,140.0,22,3,135.0,0,0,1,0.0,0.0,0,9,0,0,True,1
7a50f4f6-beec-40cd-b0d8-5e5e7a788c7a,HxC B0ut2k,18,18,133408,134197,1.0,4.0,0.0,34.0,,1.0,4.0,0.0,66.0,,2,False,PT30.6697393S,13,13,595.0,27,0,0.0,0,0,0,0.0,0.0,0,5,0,0,True,1
7a50f4f6-beec-40cd-b0d8-5e5e7a788c7a,NecroLust12,14,14,71925,72388,1.0,4.0,0.0,64.0,,1.0,4.0,0.0,34.0,,8,False,PT16.2777739S,3,2,125.0,13,1,45.0,0,0,0,0.0,0.0,0,10,1,0,False,0
7a50f4f6-beec-40cd-b0d8-5e5e7a788c7a,Centerhead01,3,3,4560,5589,,,,,,,,,,,5,False,PT12.5488965S,11,9,395.0,12,2,75.00899999961257,0,0,0,0.0,0.0,0,14,0,0,False,0


In [11]:
%%sql

select count(*) from bootcamp.match_details_bucketed;

count(1)
151761


In [12]:
%%sql

DROP TABLE if exists bootcamp.matches_bucketed;

In [13]:
%%sql
CREATE TABLE bootcamp.matches_bucketed (
   match_id STRING,
    mapid STRING,
    is_team_game BOOLEAN,
    playlist_id STRING,
    game_variant_id STRING,
    is_match_over BOOLEAN,
    completion_date TIMESTAMP,
    match_duration STRING,
    game_mode STRING,
    map_variant_id STRING
  ) USING iceberg PARTITIONED BY (bucket(16, match_id));

In [14]:
%%sql

describe bootcamp.matches_bucketed;

col_name,data_type,comment
match_id,string,
mapid,string,
is_team_game,boolean,
playlist_id,string,
game_variant_id,string,
is_match_over,boolean,
completion_date,timestamp,
match_duration,string,
game_mode,string,
map_variant_id,string,


In [15]:
matches = spark.read.option("header", "true").csv("/home/iceberg/data/matches.csv")

(
    matches
    .withColumn("match_id", col("match_id").cast("string"))
    .withColumn("mapid", col("mapid").cast("string"))
    .withColumn("is_team_game", col("is_team_game").cast("boolean"))        
    .withColumn("playlist_id", col("playlist_id").cast("string"))
    .withColumn("game_variant_id", col("game_variant_id").cast("string"))
    .withColumn("is_match_over", col("is_match_over").cast("boolean"))      
    .withColumn("completion_date", to_timestamp(col("completion_date")))    
    .withColumn("match_duration", col("match_duration").cast("string"))
    .withColumn("game_mode", col("game_mode").cast("string"))
    .withColumn("map_variant_id", col("map_variant_id").cast("string"))
).write.mode("append").bucketBy(16, "match_id").saveAsTable("bootcamp.matches_bucketed")

In [16]:
%%sql

select count(*) from bootcamp.matches_bucketed;

count(1)
24025


In [17]:
%%sql

DROP TABLE if exists bootcamp.medals_matches_players_bucketed;

In [18]:
%%sql
CREATE TABLE bootcamp.medals_matches_players_bucketed (
   match_id STRING,
    player_gamertag STRING,
    medal_id BIGINT,
    COUNT INT
  ) USING iceberg PARTITIONED BY (bucket(16, match_id));

In [19]:
medals_matches_players = spark.read.option("header", "true").csv("/home/iceberg/data/medals_matches_players.csv")

(
    medals_matches_players
    .withColumn("match_id", col("match_id").cast("string"))
    .withColumn("player_gamertag", col("player_gamertag").cast("string"))
    .withColumn("medal_id", col("medal_id").cast("bigint"))        
    .withColumn("count", col("count").cast("int"))
).write.mode("append").bucketBy(16, "match_id").saveAsTable("bootcamp.medals_matches_players_bucketed")

In [20]:
%%sql

select count(*) from bootcamp.medals_matches_players_bucketed;

count(1)
755229


In [21]:
%%sql
drop table if exists bootcamp.medals

In [22]:
%%sql
CREATE TABLE bootcamp.medals (
    medal_id BIGINT,
    description STRING,
    name STRING
  ) USING iceberg

In [23]:
medals = spark.read.option("header", "true").csv("/home/iceberg/data/medals.csv").select("medal_id", "description", "name")

(
    medals
    .withColumn("medal_id", col("medal_id").cast("bigint"))        
    .withColumn("description", col("description").cast("string"))
    .withColumn("name", col("name").cast("string"))
).write.mode("append").saveAsTable("bootcamp.medals")

In [24]:
%%sql

-- medals
-- medals_matches_players_bucketed
-- matches_bucketed
-- match_details_bucketed

SELECT medal_id, count(*)
FROM bootcamp.medals
group by 1
having count(*) > 1
;

medal_id,count(1)


In [25]:
%%sql
-- matches_bucketed
-- match_details_bucketed

SELECT match_id, medal_id, player_gamertag, count(*)
FROM bootcamp.medals_matches_players_bucketed
group by 1,2,3
having count(*) > 1
;

match_id,medal_id,player_gamertag,count(1)


In [26]:
%%sql
-- match_details_bucketed

SELECT match_id, count(*)
FROM bootcamp.matches_bucketed
group by 1
having count(*) > 1
;

match_id,count(1)


In [27]:
%%sql

SELECT match_id, player_gamertag, count(*)
FROM bootcamp.match_details_bucketed
group by 1,2
having count(*) > 1
;

match_id,player_gamertag,count(1)


In [28]:
matches_bucketed = spark.table("bootcamp.matches_bucketed")
medals = spark.table("bootcamp.medals")
match_details_bucketed = spark.table("bootcamp.match_details_bucketed")
medals_matches_players_bucketed = spark.table("bootcamp.medals_matches_players_bucketed")

In [29]:
joined_dataset = (
    match_details_bucketed.alias("mdb")
    .join(medals_matches_players_bucketed.alias("mmp"), "match_id", "inner")
    .join(matches_bucketed.alias("mb"), "match_id", "inner")
    .select (
        "mb.match_id"
        , "mdb.player_gamertag"
        , "mdb.player_total_kills"
        , "mb.playlist_id"
        , "mb.mapid"
        , "mmp.medal_id"
        , "mmp.count"
    )
    .groupBy(
        "mb.match_id"
        , "mdb.player_gamertag"
        , "mdb.player_total_kills"
        , "mb.playlist_id"
        , "mb.mapid"
        , "mmp.medal_id"
    )
    .agg(
        F.sum("mmp.count").alias("num_medals")
    )
)

(
    joined_dataset.groupBy("mb.match_id", "mmp.medal_id", "mdb.player_gamertag")
    .agg(
        F.count("*").alias("cnt")
    )
    .filter(F.col("cnt") > 1)
).show(5, truncate=False)


# spark.sql("""
#     SELECT *
#     FROM bootcamp.match_details_bucketed mdb JOIN bootcamp.medals_matches_players_bucketed mmp
#     ON mdb.match_id = mmp.match_id
#     JOIN bootcamp.matches_bucketed mb
#     on mdb.match_id = mb.match_id
# """).explain("FORMATTED")

                                                                                

+--------+--------+---------------+---+
|match_id|medal_id|player_gamertag|cnt|
+--------+--------+---------------+---+
+--------+--------+---------------+---+



In [30]:
(
    joined_dataset.filter (
        (F.col("match_id") == '1f2eb695-282f-42c3-b178-0476157bcc3c')
        & (F.col("medal_id") == '3261908037')
        & (F.col("player_gamertag") == 'Pakacorn')
)
).show(5, truncate=False)

+------------------------------------+---------------+------------------+------------------------------------+------------------------------------+----------+----------+
|match_id                            |player_gamertag|player_total_kills|playlist_id                         |mapid                               |medal_id  |num_medals|
+------------------------------------+---------------+------------------+------------------------------------+------------------------------------+----------+----------+
|1f2eb695-282f-42c3-b178-0476157bcc3c|Pakacorn       |7                 |f72e0ef0-7c4a-4307-af78-8e38dac3fdba|c7805740-f206-11e4-982c-24be05e24f7e|3261908037|31        |
+------------------------------------+---------------+------------------+------------------------------------+------------------------------------+----------+----------+



In [31]:
(
joined_dataset.groupBy("player_gamertag")
    .agg(
        F.avg("player_total_kills").alias("avg_kills")
    )
    .orderBy(F.desc('avg_kills'))
    .limit(1)
).show()

# gimpinator14
# (
# match_details_bucketed.groupBy("player_gamertag")
#     .agg(
#         F.avg("player_total_kills").alias("avg_kills")
#     )
#     .orderBy(F.desc('avg_kills'))
#     .limit(1)
# ).show()

[Stage 59:====>                                                   (1 + 12) / 13]

+---------------+---------+
|player_gamertag|avg_kills|
+---------------+---------+
|   gimpinator14|    109.0|
+---------------+---------+



                                                                                

In [32]:
(
joined_dataset.groupBy("playlist_id")
    .agg(
        F.count("*").alias("num_played")
    )
    .orderBy(F.desc('num_played'))
    .limit(1)
).show(truncate=False)

# f72e0ef0-7c4a-4307-af78-8e38dac3fdba
# (
# matches_bucketed.groupBy("playlist_id")
#     .agg(
#         F.count("*").alias("num_played")
#     )
#     .orderBy(F.desc('num_played'))
#     .limit(1)
# ).show(truncate=False)

[Stage 67:>                                                       (0 + 12) / 13]

+------------------------------------+----------+
|playlist_id                         |num_played|
+------------------------------------+----------+
|f72e0ef0-7c4a-4307-af78-8e38dac3fdba|818439    |
+------------------------------------+----------+



                                                                                

In [33]:
(
joined_dataset.groupBy("mapid")
    .agg(
        F.count("*").alias("num_played")
    )
    .orderBy(F.desc('num_played'))
    .limit(1)
).show(truncate=False)

# c7edbf0f-f206-11e4-aa52-24be05e24f7e
# (
# matches_bucketed.groupBy("mapid")
#     .agg(
#         F.count("*").alias("num_played")
#     )
#     .orderBy(F.desc('num_played'))
#     .limit(1)
# ).show(truncate=False)



+------------------------------------+----------+
|mapid                               |num_played|
+------------------------------------+----------+
|c7edbf0f-f206-11e4-aa52-24be05e24f7e|748924    |
+------------------------------------+----------+



                                                                                

In [34]:
# caacb800-f206-11e4-81ab-24be05e24f7e|1931              |Plaza|Promesa’s streets were no stranger to organized violence.|

(
medals.alias("m").filter("name = 'Killing Spree'")
    .join(joined_dataset, "medal_id")
    .groupBy("mapid")
    .agg(
         F.sum("num_medals").alias("num_medals_per_map")
    )
    .orderBy(F.desc("num_medals_per_map"))
    .join(spark.read.option("header", "true").csv("/home/iceberg/data/maps.csv"), 'mapid')
    .limit(1)
).show(truncate=False)

# (
# medals.filter("name = 'Killing Spree'")
#     .join(medals_matches_players_bucketed, "medal_id")
#     .join(matches_bucketed, "match_id")
#     .groupBy("mapid")
#     .agg(
#         F.sum("count").alias("num_medals_per_map")
#     )
#     .orderBy(F.desc("num_medals_per_map"))
#     .join(spark.read.option("header", "true").csv("/home/iceberg/data/maps.csv"), 'mapid')
#     .limit(1)
# ).show(truncate=False)



+------------------------------------+------------------+-----+---------------------------------------------------------+
|mapid                               |num_medals_per_map|name |description                                              |
+------------------------------------+------------------+-----+---------------------------------------------------------+
|caacb800-f206-11e4-81ab-24be05e24f7e|14380             |Plaza|Promesa’s streets were no stranger to organized violence.|
+------------------------------------+------------------+-----+---------------------------------------------------------+



                                                                                

In [35]:
joined_dataset.dtypes

[('match_id', 'string'),
 ('player_gamertag', 'string'),
 ('player_total_kills', 'int'),
 ('playlist_id', 'string'),
 ('mapid', 'string'),
 ('medal_id', 'bigint'),
 ('num_medals', 'bigint')]

In [36]:
%%sql

drop table if exists bootcamp.joined_dataset_bucketed

In [37]:
%%sql
CREATE TABLE bootcamp.joined_dataset_bucketed (
   match_id string
 ,player_gamertag string
 ,player_total_kills int
 ,playlist_id string
 ,mapid string
 ,medal_id bigint
 ,num_medals bigint
  ) USING iceberg PARTITIONED BY (bucket(16, match_id));

In [38]:
joined_dataset.write.mode("append").bucketBy(16, "match_id").saveAsTable("bootcamp.joined_dataset_bucketed")

                                                                                

In [39]:
%%sql

select count(*) from bootcamp.joined_dataset_bucketed

count(1)
2664603


In [40]:
%%sql

describe bootcamp.joined_dataset_bucketed

col_name,data_type,comment
match_id,string,
player_gamertag,string,
player_total_kills,int,
playlist_id,string,
mapid,string,
medal_id,bigint,
num_medals,bigint,
,,
# Partitioning,,
Part 0,"bucket(16, match_id)",


In [41]:
%%sql

drop table if exists bootcamp.joined_dataset_bucketed_sorted

In [42]:
%%sql
CREATE TABLE bootcamp.joined_dataset_bucketed_sorted (
   match_id string
 ,player_gamertag string
 ,player_total_kills int
 ,playlist_id string
 ,mapid string
 ,medal_id bigint
 ,num_medals bigint
  ) USING iceberg PARTITIONED BY (bucket(16, match_id));

In [43]:
sorted_joined_dataset = joined_dataset.sortWithinPartitions("playlist_id", "mapid")

sorted_joined_dataset.write.mode("append").bucketBy(16, "match_id").saveAsTable("bootcamp.joined_dataset_bucketed_sorted")

                                                                                

In [44]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM bootcamp.joined_dataset_bucketed.files
    union all
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM bootcamp.joined_dataset_bucketed_sorted.files

size,num_files,unsorted
12743414,16,unsorted
12121463,16,sorted
