### Setting Up Environment

In [None]:
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col

In [None]:
%env DATA_ENGINEER_IO_WAREHOUSE_CREDENTIAL=t-v9t2--SpkvM:ZaduIIiKlUgXIoCxiXWL8pe5F8M 

%env DATA_ENGINEER_IO_WAREHOUSE=eczachly-academy-warehouse

In [None]:
schema = 'abbad'
if not os.environ['DATA_ENGINEER_IO_WAREHOUSE_CREDENTIAL'] or not os.environ['DATA_ENGINEER_IO_WAREHOUSE']:
    raise ValueError("""You need to set environment variables:
                DATA_ENGINEER_IO_WAREHOUSE_CREDENTIAL, 
                DATA_ENGINEER_IO_WAREHOUSE to run this PySpark job!
    """)

In [None]:
spark = SparkSession.builder \
        .appName("abbadSession") \
        .config("spark.driver.memory", "6g") \
        .config("spark.sql.extensions",
                "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.defaultCatalog", os.environ['DATA_ENGINEER_IO_WAREHOUSE']) \
        .config("spark.sql.catalog.eczachly-academy-warehouse",
                "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.eczachly-academy-warehouse.catalog-impl",
                "org.apache.iceberg.rest.RESTCatalog") \
        .config("spark.sql.catalog.eczachly-academy-warehouse.uri",
                "https://api.tabular.io/ws/") \
        .config("spark.sql.catalog.eczachly-academy-warehouse.credential",
                os.environ['DATA_ENGINEER_IO_WAREHOUSE_CREDENTIAL']) \
        .config("spark.sql.catalog.eczachly-academy-warehouse.warehouse",
                os.environ['DATA_ENGINEER_IO_WAREHOUSE']) \
        .getOrCreate()

# Query 1

disable the default behavior of broadcast joins

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# Query 2

join the `medals` and `maps` tables with an explicitly specified a broadcast join

In [None]:
from pyspark.sql.functions import broadcast, lit, col, date_format

### Medals Broadcast Join

In [None]:
medalsBucketed = spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("/home/iceberg/data/medals.csv")

spark.sql("""DROP TABLE IF EXISTS abbad.medals_bucketed""")

bucketedDDL = """
 CREATE TABLE IF NOT EXISTS abbad.medals_bucketed (
    medal_id STRING,
    classification STRING,
     name STRING,
     difficulty INTEGER
 )
 USING iceberg
 PARTITIONED BY (difficulty, bucket(16, medal_id));
 """
spark.sql(bucketedDDL)

medalsBucketed.select(
     col("medal_id"), col("classification"), col("name"), col("difficulty")).show()

medalsBucketed.select(
     col("medal_id"), col("classification"), col("name"), col("difficulty")
     ) \
     .write.mode("append")  \
     .partitionBy("difficulty") \
     .bucketBy(16, "medal_id").saveAsTable("abbad.medals_bucketed")

In [None]:
medalsMatchesPlayersBucketed = spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("/home/iceberg/data/medals_matches_players.csv")

spark.sql("""DROP TABLE IF EXISTS abbad.medals_matches_players_bucketed""")
bucketedDDL = """
 CREATE TABLE IF NOT EXISTS abbad.medals_matches_players_bucketed (
    match_id STRING,
    player_gamertag STRING,
    medal_id STRING,
    count INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(16, medal_id));
 """
spark.sql(bucketedDDL)

medalsMatchesPlayersBucketed.select(
     col("match_id"), col("player_gamertag"), col("medal_id"), col("count")
     ).show()

medalsMatchesPlayersBucketed.select(
     col("match_id"), col("player_gamertag"), col("medal_id"), col("count")
     ) \
     .write.mode("append")  \
     .bucketBy(16, "medal_id").saveAsTable("abbad.medals_matches_players_bucketed")

In [None]:
explicitBroadcast = medalsMatchesPlayersBucketed.alias("mmp").join(broadcast(medalsBucketed).alias("m"), col("mmp.medal_id") == col("m.medal_id")) \
   .select(col("mmp.*"))

explicitBroadcast.write.mode("overwrite").insertInto("abbad.medals_matches_players_bucketed")

### Maps Broadcast Join

In [None]:
mapsBucketed = spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("/home/iceberg/data/maps.csv")

spark.sql("""DROP TABLE IF EXISTS abbad.maps_bucketed""")
bucketedDDL = """
 CREATE TABLE IF NOT EXISTS abbad.maps_bucketed (
    mapid STRING,
    name STRING,
    description STRING
 )
 USING iceberg
 PARTITIONED BY (bucket(16, mapid));
 """
spark.sql(bucketedDDL)

mapsBucketed.select(
     col("mapid"), col("name"), col("description")
     ) \
     .show()

mapsBucketed.select(
     col("mapid"), col("name"), col("description")
     ) \
     .write.mode("append")  \
     .bucketBy(16, "mapid").saveAsTable("abbad.maps_bucketed")

In [None]:
matchesBucketed = spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("/home/iceberg/data/matches.csv")

spark.sql("""DROP TABLE IF EXISTS abbad.matches_bucketed""")
bucketedDDL = """
 CREATE TABLE IF NOT EXISTS abbad.matches_bucketed (
    match_id STRING,
    mapid STRING,
    is_team_game BOOLEAN,
    playlist_id STRING,
    game_variant_id STRING,
    is_match_over BOOLEAN,
    completion_date STRING,
    match_duration STRING,
    game_mode STRING,
    map_variant_id STRING
 )
 USING iceberg
 PARTITIONED BY (completion_date, bucket(16, mapid));
 """
spark.sql(bucketedDDL)

matchesBucketed = matchesBucketed.withColumn("completion_date", date_format(col("completion_date"), "yyyy-MM"))

matchesBucketed.select(
    col("match_id"), col("mapid"), col("is_team_game"), col("playlist_id"), col("game_variant_id"),
    col("is_match_over"), col("completion_date"),
    col("match_duration"), col("game_mode"), col("map_variant_id")
).show()

matchesBucketed.select(
    col("match_id"), col("mapid"), col("is_team_game"), col("playlist_id"), col("game_variant_id"),
    col("is_match_over"), col("completion_date"),
    col("match_duration"), col("game_mode"), col("map_variant_id")
) \
     .write.mode("append")  \
     .partitionBy("completion_date") \
     .bucketBy(16, "mapid").saveAsTable("abbad.matches_bucketed")

In [None]:
explicitBroadcast = matchesBucketed.alias("matches").join(broadcast(mapsBucketed).alias("maps"), col("matches.mapid") == col("maps.mapid")) \
   .select(col("matches.*"))

explicitBroadcast.write.mode("overwrite").insertInto("abbad.matches_bucketed")

# Query 3 

join the `match_details`, `matches` and `medal_matches_players` using a bucket join on `match_id` with 16 buckets

In [None]:
matchDetailsBucketed = spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("/home/iceberg/data/match_details.csv")

spark.sql("""DROP TABLE IF EXISTS abbad.match_details_bucketed""")
bucketedDDL = """
 CREATE TABLE IF NOT EXISTS abbad.match_details_bucketed (
    match_id STRING,
    player_gamertag STRING,
    player_total_kills INTEGER
 )
 USING iceberg
 PARTITIONED BY (player_total_kills, bucket(16, match_id));
 """
spark.sql(bucketedDDL)

matchDetailsBucketed.select(
     col("match_id"),col("player_gamertag"), col("player_total_kills") \
     ) \
     .show()

matchDetailsBucketed.select(
     col("match_id"),col("player_gamertag"), col("player_total_kills") \
     ) \
     .write.mode("append")  \
     .partitionBy("player_total_kills") \
     .bucketBy(16, "match_id").saveAsTable("abbad.match_details_bucketed")

In [None]:
joinedDF = matchDetailsBucketed.select(col("match_id"), col("player_gamertag"), col("player_total_kills")) \
    .join(matchesBucketed.select(col("match_id"), col("mapid"), col("playlist_id"), col("completion_date")), "match_id") \
    .join(medalsMatchesPlayersBucketed.select(col("match_id"),col("medal_id"), col("count")), "match_id")

In [None]:
joinedDF.show()

# 4. Queries

In [14]:
from pyspark.sql import functions as F

## Query 4a

which player has the highest average kills per game?

In [15]:
avgKillsPerPlayer = joinedDF.groupBy("player_gamertag") \
                            .agg(F.avg("player_total_kills").alias("average_kills")) \
                            .orderBy(F.desc("average_kills")) \
                            .limit(1)

avgKillsPerPlayer.show()

[Stage 16:>                                                         (0 + 7) / 7]

+---------------+-------------+
|player_gamertag|average_kills|
+---------------+-------------+
|   gimpinator14|        109.0|
+---------------+-------------+



                                                                                

## Query 4b

which playlist has received the most plays?

In [16]:
mostPlayedPlaylist = joinedDF.groupBy("playlist_id") \
                             .count() \
                             .withColumnRenamed("count", "number_of_plays") \
                             .orderBy(F.desc("number_of_plays")) \
                             .limit(1)

mostPlayedPlaylist.show()



+--------------------+---------------+
|         playlist_id|number_of_plays|
+--------------------+---------------+
|f72e0ef0-7c4a-430...|        1565529|
+--------------------+---------------+



                                                                                

## Query 4c

which map was played the most?

In [17]:
mostPlayedMap = joinedDF.groupBy("mapid") \
                        .count() \
                        .withColumnRenamed("count", "number_of_plays") \
                        .orderBy(F.desc("number_of_plays")) \
                        .limit(1)

mostPlayedMap.show()



+--------------------+---------------+
|               mapid|number_of_plays|
+--------------------+---------------+
|c74c9d0f-f206-11e...|        1445545|
+--------------------+---------------+



                                                                                

## Query 4d

on which map do players receive the highest number of Killing Spree medals?

In [18]:
killingSpreeMedalId = "2430242797"

mostKillingSpreeMedalsMap = joinedDF.filter(joinedDF["medal_id"] == killingSpreeMedalId) \
                                    .groupBy("mapid") \
                                    .count() \
                                    .withColumnRenamed("count", "number_of_killing_spree_medals") \
                                    .orderBy(F.desc("number_of_killing_spree_medals")) \
                                    .limit(1)

mostKillingSpreeMedalsMap.show()

[Stage 48:>                                                       (0 + 12) / 12]

+--------------------+------------------------------+
|               mapid|number_of_killing_spree_medals|
+--------------------+------------------------------+
|c74c9d0f-f206-11e...|                         56908|
+--------------------+------------------------------+



                                                                                

# Queries 5

In [24]:
start_df = joinedDF.repartition(4, "playlist_id")
    

first_sort_df = start_df.sortWithinPartitions("playlist_id")

sorted = joinedDF.repartition(10, "playlist_id") \
        .sortWithinPartitions("playlist_id")

start_df.write.mode("overwrite").saveAsTable("abbad.playlist_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("abbad.playlist_sorted")

                                                                                

In [26]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM abbad.playlist_unsorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM abbad.playlist_sorted.files


                                                                                

size,num_files,sorted
6432359,4,sorted
10143255,4,unsorted


In [27]:
start_df = joinedDF.repartition(4, "playlist_id")
    

first_sort_df = start_df.sortWithinPartitions("match_id")

sorted = joinedDF.repartition(10, "playlist_id") \
        .sortWithinPartitions("match_id")

start_df.write.mode("overwrite").saveAsTable("abbad.playlist_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("abbad.playlist_sorted")

                                                                                

In [28]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM abbad.playlist_unsorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM abbad.playlist_sorted.files


                                                                                

size,num_files,sorted
6432359,4,sorted
6528702,4,unsorted


In [29]:
start_df = joinedDF.repartition(4, "mapid")
    

first_sort_df = start_df.sortWithinPartitions("match_id")

sorted = joinedDF.repartition(10, "mapid") \
        .sortWithinPartitions("match_id")

start_df.write.mode("overwrite").saveAsTable("abbad.playlist_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("abbad.playlist_sorted")

                                                                                

In [30]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM abbad.playlist_unsorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM abbad.playlist_sorted.files

                                                                                

size,num_files,sorted
5117695,4,sorted
5034605,4,unsorted


In [31]:
start_df = joinedDF.repartition(4, "playlist_id", "mapid")
    

first_sort_df = start_df.sortWithinPartitions("match_id")

sorted = joinedDF.repartition(10, "playlist_id", "mapid") \
        .sortWithinPartitions("match_id")

start_df.write.mode("overwrite").saveAsTable("abbad.playlist_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("abbad.playlist_sorted")

                                                                                

In [32]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM abbad.playlist_unsorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM abbad.playlist_sorted.files

                                                                                

size,num_files,sorted
5165560,4,sorted
5094272,4,unsorted


In [33]:
start_df = joinedDF.repartition(4, "completion_date")
    

first_sort_df = start_df.sortWithinPartitions("match_id")

sorted = joinedDF.repartition(10, "completion_date") \
        .sortWithinPartitions("match_id")

start_df.write.mode("overwrite").saveAsTable("abbad.playlist_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("abbad.playlist_sorted")

                                                                                

In [34]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM abbad.playlist_unsorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM abbad.playlist_sorted.files

                                                                                

size,num_files,sorted
4607122,4,sorted
4677366,4,unsorted


In [35]:
start_df = joinedDF.repartition(4, "player_total_kills")
    

first_sort_df = start_df.sortWithinPartitions("match_id")

sorted = joinedDF.repartition(10, "player_total_kills") \
        .sortWithinPartitions("match_id")

start_df.write.mode("overwrite").saveAsTable("abbad.playlist_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("abbad.playlist_sorted")

                                                                                

In [36]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM abbad.playlist_unsorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM abbad.playlist_sorted.files

                                                                                

size,num_files,sorted
7501382,4,sorted
7307947,4,unsorted
