In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, broadcast, mean, count_distinct, sum
spark = SparkSession.builder.appName("hw").getOrCreate()

# Disabled automatic broadcast join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

24/12/18 21:02:26 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
# Load data from csv files
matches_df = spark.read.option("header", "true") \
                .option("inferSchema", "true") \
                .csv("../data/matches.csv")
maps_df = spark.read.option("header", "true") \
                .option("inferSchema", "true") \
                .csv("../data/maps.csv")
medals_df = spark.read.option("header", "true") \
                .option("inferSchema", "true") \
                .csv("../data/medals.csv")
match_details_df = spark.read.option("header", "true") \
                .option("inferSchema", "true") \
                .csv("../data/match_details.csv")
medals_matches_players_df = spark.read.option("header", "true") \
                .option("inferSchema", "true") \
                .csv("../data/medals_matches_players.csv")

                                                                                

In [3]:
# Broadcast join
# btw matches and maps on mapid
matches_maps_df = matches_df.join(broadcast(maps_df.withColumnRenamed("name", "map_name").withColumnRenamed("description", "map_description")), "mapid", how="left")
matches_maps_df.explain()

# btw medals_matches_players and medals on medal_id
medals_matches_players_medals_df = medals_matches_players_df.join(broadcast(medals_df.withColumnRenamed("name", "medal_name").withColumnRenamed("description", "medal_description")), "medal_id", how="left")
matches_maps_df.explain()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [mapid#18, match_id#17, is_team_game#19, playlist_id#20, game_variant_id#21, is_match_over#22, completion_date#23, match_duration#24, game_mode#25, map_variant_id#26, map_name#215, map_description#220]
   +- BroadcastHashJoin [mapid#18], [mapid#54], LeftOuter, BuildRight, false
      :- FileScan csv [match_id#17,mapid#18,is_team_game#19,playlist_id#20,game_variant_id#21,is_match_over#22,completion_date#23,match_duration#24,game_mode#25,map_variant_id#26] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/notebooks/data/matches.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<match_id:string,mapid:string,is_team_game:boolean,playlist_id:string,game_variant_id:strin...
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [plan_id=115]
         +- Project [mapid#54, name#55 AS map_name#215, description#56 AS map_descr

In [4]:
# Bucket join
# Before bucketing, the plan uses SortMergeJoin with FileScan
match_details_df.join(matches_maps_df, "match_id", how="left").join(medals_matches_players_df, ["match_id", "player_gamertag"], how="left").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [match_id#118, player_gamertag#119, previous_spartan_rank#120, spartan_rank#121, previous_total_xp#122, total_xp#123, previous_csr_tier#124, previous_csr_designation#125, previous_csr#126, previous_csr_percent_to_next_tier#127, previous_csr_rank#128, current_csr_tier#129, current_csr_designation#130, current_csr#131, current_csr_percent_to_next_tier#132, current_csr_rank#133, player_rank_on_team#134, player_finished#135, player_average_life#136, player_total_kills#137, player_total_headshots#138, player_total_weapon_damage#139, player_total_shots_landed#140, player_total_melee_kills#141, ... 25 more fields]
   +- SortMergeJoin [match_id#118, player_gamertag#119], [match_id#207, player_gamertag#208], LeftOuter
      :- Sort [match_id#118 ASC NULLS FIRST, player_gamertag#119 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(match_id#118, player_gamertag#119, 200), ENSURE_REQUIREMENTS, [plan_id=190]
      : 

24/12/18 21:02:38 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [5]:
# Bucket tables
matches_maps_df.write \
    .mode("overwrite") \
    .format("parquet") \
    .bucketBy(16, 'match_id') \
    .saveAsTable('bootcamp.matches_maps_bucketed')
match_details_df.write \
    .mode("overwrite") \
    .format("parquet") \
    .bucketBy(16, 'match_id') \
    .saveAsTable('bootcamp.match_details_bucketed')
medals_matches_players_medals_df.write \
    .mode("overwrite") \
    .format("parquet") \
    .bucketBy(16, 'match_id') \
    .saveAsTable('bootcamp.medals_matches_players_medals_bucketed')

                                                                                

In [6]:
# Read the data again but from bucketed tables
matches_maps_bucketed_df = spark.table('bootcamp.matches_maps_bucketed')
match_details_bucketed_df = spark.table('bootcamp.match_details_bucketed')
medals_matches_players_medals_bucketed_df = spark.table('bootcamp.medals_matches_players_medals_bucketed')

In [7]:
# Join three tables on match_id, the plan now has SortMergeJoin, but with BatchScan
matches_obt_df = match_details_bucketed_df.join(matches_maps_bucketed_df, "match_id", how="left") \
    .join(medals_matches_players_medals_bucketed_df, ["match_id", "player_gamertag"], how="left")
matches_obt_df.explain()
matches_obt_df.cache()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [match_id#658, player_gamertag#659, previous_spartan_rank#660, spartan_rank#661, previous_total_xp#662, total_xp#663, previous_csr_tier#664, previous_csr_designation#665, previous_csr#666, previous_csr_percent_to_next_tier#667, previous_csr_rank#668, current_csr_tier#669, current_csr_designation#670, current_csr#671, current_csr_percent_to_next_tier#672, current_csr_rank#673, player_rank_on_team#674, player_finished#675, player_average_life#676, player_total_kills#677, player_total_headshots#678, player_total_weapon_damage#679, player_total_shots_landed#680, player_total_melee_kills#681, ... 36 more fields]
   +- SortMergeJoin [match_id#658, player_gamertag#659], [match_id#731, player_gamertag#732], LeftOuter
      :- Sort [match_id#658 ASC NULLS FIRST, player_gamertag#659 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(match_id#658, player_gamertag#659, 200), ENSURE_REQUIREMENTS, [plan_id=465]
      : 

DataFrame[match_id: string, player_gamertag: string, previous_spartan_rank: int, spartan_rank: int, previous_total_xp: int, total_xp: int, previous_csr_tier: int, previous_csr_designation: int, previous_csr: int, previous_csr_percent_to_next_tier: int, previous_csr_rank: int, current_csr_tier: int, current_csr_designation: int, current_csr: int, current_csr_percent_to_next_tier: int, current_csr_rank: int, player_rank_on_team: int, player_finished: boolean, player_average_life: string, player_total_kills: int, player_total_headshots: int, player_total_weapon_damage: double, player_total_shots_landed: int, player_total_melee_kills: int, player_total_melee_damage: double, player_total_assassinations: int, player_total_ground_pound_kills: int, player_total_shoulder_bash_kills: int, player_total_grenade_damage: double, player_total_power_weapon_damage: double, player_total_power_weapon_grabs: int, player_total_deaths: int, player_total_assists: int, player_total_grenade_kills: int, did_win

In [8]:
# Which player averages the most kills per game?
result_1_df = matches_obt_df.groupby("player_gamertag").agg(mean("player_total_kills").alias("avg_kills_per_game")).sort("avg_kills_per_game", ascending=False)
result_1_df.show(1)



+---------------+------------------+
|player_gamertag|avg_kills_per_game|
+---------------+------------------+
|   gimpinator14|             109.0|
+---------------+------------------+
only showing top 1 row



                                                                                

In [9]:
# Which playlist gets played the most?
result_2_df = matches_obt_df.groupby("playlist_id").agg(count_distinct("match_id").alias("num_unique_matches")).sort("num_unique_matches", ascending=False)
result_2_df.show(1)



+--------------------+------------------+
|         playlist_id|num_unique_matches|
+--------------------+------------------+
|f72e0ef0-7c4a-430...|              7657|
+--------------------+------------------+
only showing top 1 row



                                                                                

In [10]:
# Which map gets played the most?
result_3_df = matches_obt_df.groupby("map_name").agg(count_distinct("match_id").alias("num_unique_matches")).sort("num_unique_matches", ascending=False)
result_3_df.show(1)



+--------------+------------------+
|      map_name|num_unique_matches|
+--------------+------------------+
|Breakout Arena|              7049|
+--------------+------------------+
only showing top 1 row



                                                                                

In [11]:
# Which map do players get the most Killing Spree medals on?
result_4_df = matches_obt_df.filter(matches_obt_df.medal_name == "Killing Spree").groupby("map_name").agg(sum("count").alias("medal_count")).sort("medal_count", ascending=False)
result_4_df.show(1)



+--------------+-----------+
|      map_name|medal_count|
+--------------+-----------+
|Breakout Arena|       6738|
+--------------+-----------+
only showing top 1 row



                                                                                

In [13]:
# With the aggregated data set, try different .sortWithinPartitions to see which has the smallest data size
start_df = matches_obt_df.repartition(4, col("completion_date"))
mapid_sort_df = start_df.sortWithinPartitions(col("mapid"))
mapid_playlistid_sort_df = start_df.sortWithinPartitions(col("mapid"), col("playlist_id"))
mapid_playlistid_player_gamertag_sort_df = start_df.sortWithinPartitions(col("mapid"), col("playlist_id"), col("player_gamertag"))

In [14]:
start_df.write.mode("overwrite").saveAsTable("bootcamp.matches_unsorted")
mapid_sort_df.write.mode("overwrite").saveAsTable("bootcamp.matches_sorted_mapid")
mapid_playlistid_sort_df.write.mode("overwrite").saveAsTable("bootcamp.matches_sorted_mapid_playlistid")
mapid_playlistid_player_gamertag_sort_df.write.mode("overwrite").saveAsTable("bootcamp.matches_sorted_mapid_playlistid_player_gamertag")

                                                                                

In [15]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM demo.bootcamp.matches_unsorted.files
UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted_mapid' 
FROM demo.bootcamp.matches_sorted_mapid.files
UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted_mapid_playlistid' 
FROM demo.bootcamp.matches_sorted_mapid_playlistid.files
UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted_mapid_playlistid_player_gamertag' 
FROM demo.bootcamp.matches_sorted_mapid_playlistid_player_gamertag.files

24/12/18 21:05:28 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


size,num_files,unsorted
21769794,4,unsorted
21968240,4,sorted_mapid
21189170,4,sorted_mapid_playlistid
19815602,4,sorted_mapid_playlistid_player_gamertag
