In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os

In [2]:
def create_spark_session():
    """Create and configure Spark session with optimizations"""
    spark = SparkSession.builder \
        .appName("Spark Fundamentals Week Analysis") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .getOrCreate()
    
    # Disable automatic broadcast join threshold as required
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
    
    return spark

In [3]:
# 1. Initialize Spark
spark = create_spark_session()

# 2. Define Postgres connection info
url = "jdbc:postgresql://localhost:5434/postgres"
properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/28 10:54:51 WARN Utils: Your hostname, dataspirolinux, resolves to a loopback address: 127.0.1.1; using 192.168.1.139 instead (on interface wlp1s0)
25/08/28 10:54:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/28 10:54:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/28 10:54:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


match_details

a row for every players performance in a match
matches

a row for every match
medals_matches_players

a row for every medal type a player gets in a match
medals

a row for every medal type
Your goal is to make the following things happen:

Build a Spark job that;

Disabled automatic broadcast join with spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

Explicitly broadcast JOINs medals and maps

Bucket join match_details, matches, and medal_matches_players on match_id with 16 buckets

Aggregate the joined data frame to figure out questions like:
Which player averages the most kills per game?

Which playlist gets played the most?

Which map gets played the most?

Which map do players get the most Killing Spree medals on?

With the aggregated data set

Try different .sortWithinPartitions to see which has the smallest data size (hint: playlists and maps are both very low cardinality)

Save these as .py files and submit them this way!



In [9]:
# Create database
spark.sql("CREATE DATABASE IF NOT EXISTS gaming_analysis")
spark.sql("USE gaming_analysis")


# load csv into dataframe

df_maps = spark.read.csv(
    "/home/dataspiro/data-engineer-handbook/bootcamp/materials/3-spark-fundamentals/data//maps.csv",
    header=True,
    inferSchema=True
)

df_matches = spark.read.csv(
    "/home/dataspiro/data-engineer-handbook/bootcamp/materials/3-spark-fundamentals/data//matches.csv",
    header=True,
    inferSchema=True
)
df_medals = spark.read.csv(
    "/home/dataspiro/data-engineer-handbook/bootcamp/materials/3-spark-fundamentals/data//medals.csv",
    header=True,
    inferSchema=True
)
df_match_details = spark.read.csv(
    "/home/dataspiro/data-engineer-handbook/bootcamp/materials/3-spark-fundamentals/data//match_details.csv",
    header=True,
    inferSchema=True
)
df_medals_matches_players = spark.read.csv(
    "/home/dataspiro/data-engineer-handbook/bootcamp/materials/3-spark-fundamentals/data//medals_matches_players.csv",
    header=True,
    inferSchema=True
)

                                                                                

In [5]:
# inspect all loaded dfs
df_maps.show(5)
df_matches.show(5)
df_medals.show(5)
df_match_details.show(5)
df_medals_matches_players.show(5)

+--------------------+-------------------+--------------------+
|               mapid|               name|         description|
+--------------------+-------------------+--------------------+
|c93d708f-f206-11e...|              Urban|Andesia was the c...|
|cb251c51-f206-11e...|     Raid on Apex 7|This unbroken rin...|
|c854e54f-f206-11e...|March on Stormbreak|                NULL|
|c8d69870-f206-11e...| Escape from A.R.C.|Scientists flocke...|
|73ed1fd0-45e5-4bb...|             Osiris|                NULL|
+--------------------+-------------------+--------------------+
only showing top 5 rows
+--------------------+--------------------+------------+--------------------+--------------------+-------------+-------------------+--------------+---------+--------------------+
|            match_id|               mapid|is_team_game|         playlist_id|     game_variant_id|is_match_over|    completion_date|match_duration|game_mode|      map_variant_id|
+--------------------+--------------------

25/08/28 10:55:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+---------------+---------------------+------------+-----------------+--------+-----------------+------------------------+------------+---------------------------------+-----------------+----------------+-----------------------+-----------+--------------------------------+----------------+-------------------+---------------+-------------------+------------------+----------------------+--------------------------+-------------------------+------------------------+-------------------------+---------------------------+-------------------------------+--------------------------------+---------------------------+--------------------------------+-------------------------------+-------------------+--------------------+--------------------------+-------+-------+
|            match_id|player_gamertag|previous_spartan_rank|spartan_rank|previous_total_xp|total_xp|previous_csr_tier|previous_csr_designation|previous_csr|previous_csr_percent_to_next_tier|previous_csr_rank|current_

In [None]:
# print("Creating bucketed match_details table...")
# df_matches.write \
#     .mode("overwrite") \
#     .bucketBy(16, "match_id") \
#     .sortBy("match_id") \
#     .option("path", "/tmp/bucketed_tables/match_details") \
#     .saveAsTable("bucketed_match_details")

In [None]:
# print("Creating bucketed match_details table...")
# df_maps.write \
#     .mode("overwrite") \
#     .bucketBy(16, "mapid") \
#     .sortBy("mapid") \
#     .option("path", "/tmp/bucketed_tables/match_details") \
#     .saveAsTable("bucketed_maps")

In [10]:
spark.conf.set("spark.sql.sources.bucketing.enabled", "true")
spark.conf.set("spark.sql.sources.bucketing.autoBucketedScan.enabled", "true")


# Bucket match_details, matches, and medals_matches_players on match_id with 16 buckets
df_match_details.write.bucketBy(16, "match_id").saveAsTable("bucketed_match_details")
df_matches.write.bucketBy(16, "match_id").saveAsTable("bucketed_matches")
df_medals_matches_players.write.bucketBy(16, "match_id").saveAsTable("bucketed_medals_matches_players")

# regular broadcast

df_medals.write.saveAsTable("broadcast_medals")

df_maps.write.saveAsTable("broadcast_maps")


                                                                                

In [11]:
# Load bucketed tables
bucketed_match_details = spark.table("bucketed_match_details")
bucketed_matches = spark.table("bucketed_matches")
bucketed_medals_matches_players = spark.table("bucketed_medals_matches_players")
    

In [12]:
# Load broadcast tables
df_medals = spark.table("broadcast_medals")
df_maps = spark.table("broadcast_maps")

In [13]:
# manually broadcast medals and maps
broadcast_medals = broadcast(df_medals)
broadcast_maps = broadcast(df_maps)

In [60]:
# show table in warehouse
spark.sql("SHOW TABLES").show()

+---------------+--------------------+-----------+
|      namespace|           tableName|isTemporary|
+---------------+--------------------+-----------+
|gaming_analysis|      broadcast_maps|      false|
|gaming_analysis|    broadcast_medals|      false|
|gaming_analysis|bucketed_match_de...|      false|
|gaming_analysis|    bucketed_matches|      false|
|gaming_analysis|bucketed_medals_m...|      false|
|               |  match_player_table|       true|
|               |               tests|       true|
+---------------+--------------------+-----------+



In [None]:
# The optimized join strategy with aliases
optimized_join = bucketed_match_details.alias("md") \
    .join(bucketed_matches.alias("m"), "match_id") \
    .join(bucketed_medals_matches_players.alias("mmp"), ["match_id", "player_gamertag"], "left")

optimized_join.show(5)

In [None]:
# The optimized join strategy with aliases
optimized_join = bucketed_match_details.alias("md") \
    .join(bucketed_matches.alias("m"), "match_id") \
    .join(bucketed_medals_matches_players.alias("mmp"), ["match_id"], "left")

match_player_table = optimized_join.select(
    "match_id",
    "player_gamertag",
    "player_total_kills",
    "mapid",
    "playlist_id",
    "medal_id"
)
match_player_table.show(5)

In [32]:
# join match, match_details and, medals_matches_players buckets

match_player_table = spark.sql("""SELECT  
    m.match_id,
    md.player_gamertag,
    md.player_total_kills,
    m.mapid,
    m.playlist_id,
    mmp.medal_id
FROM bucketed_matches m
LEFT JOIN bucketed_match_details md 
       ON m.match_id = md.match_id
LEFT JOIN bucketed_medals_matches_players mmp 
       ON md.match_id = mmp.match_id""")
    
match_player_table.show(5)

+--------------------+---------------+------------------+--------------------+--------------------+----------+
|            match_id|player_gamertag|player_total_kills|               mapid|         playlist_id|  medal_id|
+--------------------+---------------+------------------+--------------------+--------------------+----------+
|10e1688e-f0ad-4ab...|    Sky Carries|                10|c7edbf0f-f206-11e...|f72e0ef0-7c4a-430...|3261908037|
|10e1688e-f0ad-4ab...|    Sky Carries|                10|c7edbf0f-f206-11e...|f72e0ef0-7c4a-430...|2838259753|
|10e1688e-f0ad-4ab...|    Sky Carries|                10|c7edbf0f-f206-11e...|f72e0ef0-7c4a-430...|3491849182|
|10e1688e-f0ad-4ab...|    Sky Carries|                10|c7edbf0f-f206-11e...|f72e0ef0-7c4a-430...|2430242797|
|10e1688e-f0ad-4ab...|    Sky Carries|                10|c7edbf0f-f206-11e...|f72e0ef0-7c4a-430...| 824733727|
+--------------------+---------------+------------------+--------------------+--------------------+----------+
o

In [33]:
# Check if your join is using bucket join
match_player_table.explain()
# Look for "BucketedTableScan" or "BucketJoin" in the plan

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [match_id#671, player_gamertag#636, player_total_kills#654, mapid#672, playlist_id#674, medal_id#683L]
   +- SortMergeJoin [match_id#635], [match_id#681], LeftOuter
      :- Sort [match_id#635 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(match_id#635, 16), ENSURE_REQUIREMENTS, [plan_id=2073]
      :     +- SortMergeJoin [match_id#671], [match_id#635], LeftOuter
      :        :- Sort [match_id#671 ASC NULLS FIRST], false, 0
      :        :  +- FileScan parquet spark_catalog.gaming_analysis.bucketed_matches[match_id#671,mapid#672,playlist_id#674] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/dataspiro/spark-warehouse/gaming_analysis.db/bucketed_matches], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<match_id:string,mapid:string,playlist_id:string>, SelectedBucketsCount: 16 out of 16
      :        +- Sort [match_id#635 A

In [34]:
# question 1 - Average kills per player across all matches


match_player_table.createOrReplaceTempView("match_player_table") # Register the DataFrame as a temporary view for SQL queries

ave_kill_per_player = spark.sql("""
    SELECT 
        player_gamertag AS player_id,
        COUNT(DISTINCT match_id) AS games_played,
        ROUND(AVG(player_total_kills), 2) AS avg_total_kills
    FROM match_player_table
    GROUP BY player_gamertag
    ORDER BY avg_total_kills DESC
    LIMIT 10
""")
ave_kill_per_player.show()



+---------------+------------+---------------+
|      player_id|games_played|avg_total_kills|
+---------------+------------+---------------+
|   gimpinator14|           1|          109.0|
|  I Johann117 I|           1|           96.0|
|BudgetLegendary|           1|           83.0|
|      GsFurreal|           1|           75.0|
|   TameablePoet|           2|          74.22|
|   Sexy is Back|           1|           73.0|
|   killerguy789|           1|           68.0|
|THC GUILTYSPARK|           1|           67.0|
|PrimePromethean|           1|           66.0|
|    HisLattice1|           1|           66.0|
+---------------+------------+---------------+



                                                                                

In [None]:
# question 2a  - most played playlist


#match_player_table.createOrReplaceTempView("match_player_table") # Register the DataFrame as a temporary view for SQL queries

most_play_playlist = spark.sql("""
    SELECT 
        playlist_id,
        COUNT(playlist_id) AS no_of_plays
    FROM match_player_table
    GROUP BY playlist_id
    ORDER BY 2 DESC
    LIMIT 10
""")
most_play_playlist.show()



+--------------------+-----------+
|         playlist_id|no_of_plays|
+--------------------+-----------+
|f72e0ef0-7c4a-430...|    1567327|
|780cc101-005c-4fc...|    1116197|
|0bcf2be1-3168-4e4...|    1015907|
|c98949ae-60a8-43d...|     825180|
|2323b76a-db98-4e0...|     692432|
|892189e9-d712-4bd...|     667953|
|f27a65eb-2d11-496...|     167517|
|355dc154-9809-4ed...|     140068|
|d0766624-dbd7-453...|     138500|
|bc0f8ad6-31e6-4a1...|     111186|
+--------------------+-----------+



                                                                                

In [None]:
# question 2b - most played playlist by players and matches

most_play_playlist = spark.sql("""
    SELECT 
        playlist_id,
        COUNT(DISTINCT match_id) as total_matches,
        COUNT(DISTINCT player_gamertag) as unique_players
    FROM match_player_table
    GROUP BY playlist_id
    ORDER BY 2 DESC
    LIMIT 10
""")
most_play_playlist.show()



+--------------------+-------------+--------------+
|         playlist_id|total_matches|unique_players|
+--------------------+-------------+--------------+
|f72e0ef0-7c4a-430...|         9350|         22057|
|2323b76a-db98-4e0...|         3244|          9919|
|892189e9-d712-4bd...|         2159|          9779|
|c98949ae-60a8-43d...|         1984|          8887|
|b50c4dc2-6c86-4d7...|         1462|             0|
|0e39ead4-383b-445...|          909|             0|
|f27a65eb-2d11-496...|          701|          3476|
|d0766624-dbd7-453...|          643|          2537|
|0bcf2be1-3168-4e4...|          564|          6868|
|780cc101-005c-4fc...|          527|          6101|
+--------------------+-------------+--------------+
only showing top 10 rows


                                                                                

In [None]:
# question 3a most played on map

most_played_map= spark.sql ('''SELECT 
        mapid,
        COUNT(mapid) AS no_of_plays
    FROM match_player_table
    GROUP BY mapid
    ORDER BY 2 DESC
    LIMIT 10
                            ''')

most_played_map.show()



+--------------------+-----------+
|               mapid|no_of_plays|
+--------------------+-----------+
|c74c9d0f-f206-11e...|    1446047|
|c7edbf0f-f206-11e...|    1436691|
|c7805740-f206-11e...|     953559|
|cdb934b0-f206-11e...|     396464|
|cb914b9e-f206-11e...|     309154|
|ce1dc2de-f206-11e...|     299858|
|cebd854f-f206-11e...|     299029|
|caacb800-f206-11e...|     291609|
|cd844200-f206-11e...|     261249|
|cc040aa1-f206-11e...|     257047|
+--------------------+-----------+



                                                                                

In [None]:
# question 3b most played on map by players and matches

most_played_map= spark.sql ('''SELECT 
        mapid,
        COUNT(DISTINCT match_id) as total_matches,
        COUNT(DISTINCT player_gamertag) as unique_players
    FROM match_player_table
    GROUP BY mapid
    ORDER BY 2 DESC
    LIMIT 10
                            ''')

most_played_map.show()



+--------------------+-------------+--------------+
|               mapid|total_matches|unique_players|
+--------------------+-------------+--------------+
|c7edbf0f-f206-11e...|         8587|         21356|
|cdb934b0-f206-11e...|         1489|          6996|
|c74c9d0f-f206-11e...|         1461|         12403|
|cb914b9e-f206-11e...|         1088|          5550|
|c7805740-f206-11e...|         1052|          7949|
|ce1dc2de-f206-11e...|         1036|          5209|
|caacb800-f206-11e...|          996|          5293|
|cdee4e70-f206-11e...|          982|          4966|
|cebd854f-f206-11e...|          971|          5331|
|cd844200-f206-11e...|          915|          4780|
+--------------------+-------------+--------------+



                                                                                

In [None]:
# Question 4 - Top 10 killing spree medals by map

killing_spree = spark.sql (''' SELECT 
        mpt.mapid AS map_id, 
        COUNT(*) AS killing_spree_count
        FROM match_player_table mpt
        JOIN broadcast_medals bcm
        ON mpt.medal_id = bcm.medal_id
        WHERE bcm.name = 'Killing Spree'
        GROUP BY mpt.mapid, mpt.player_gamertag 
        ORDER BY killing_spree_count DESC 
        LIMIT 10''')

killing_spree.show()



+--------------------+-------------------+
|              map_id|killing_spree_count|
+--------------------+-------------------+
|c7edbf0f-f206-11e...|               1212|
|c7edbf0f-f206-11e...|               1187|
|c7edbf0f-f206-11e...|                769|
|c7edbf0f-f206-11e...|                652|
|c74c9d0f-f206-11e...|                515|
|c7edbf0f-f206-11e...|                496|
|c7805740-f206-11e...|                482|
|c7edbf0f-f206-11e...|                474|
|cdb934b0-f206-11e...|                422|
|c7edbf0f-f206-11e...|                409|
+--------------------+-------------------+



                                                                                