In [4]:
sc.stop()

In [5]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{broadcast, split, lit}
import java.sql.Date
val spark = SparkSession.builder().appName("Jupyter").getOrCreate()
//explicitly set the iceberg connection due to connection lost after several minutes
spark.sql("SET spark.sql.catalog.my_catalog.uri=http://192.168.56.1:8181")

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{broadcast, split, lit}
import java.sql.Date
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@476a0d77
res4: org.apache.spark.sql.DataFrame = [key: string, value: string]


In [27]:
//read the csv files first

val match_detailsBucketed = spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/match_details.csv")

val matchesBucketed = spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/matches.csv")
                        
val me_ma_plBucketed = spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/medals_matches_players.csv")

val medalsBucketed = spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/medals.csv")
                        
val mapsBucketed = spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/maps.csv")

// creating tables and populates them with the csv data

// matches details

val matchDetailsDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
     match_id STRING,
     player_gamertag STRING,
     player_total_kills INTEGER,
     player_total_deaths INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(4, match_id));
"""
 spark.sql(matchDetailsDDL)

 match_detailsBucketed.select(
     $"match_id", $"player_gamertag", $"player_total_kills", $"player_total_deaths")
     .write.mode("append")
   .bucketBy(4, "match_id").saveAsTable("bootcamp.match_details_bucketed")


// matches table
                        
spark.sql("""DROP TABLE IF EXISTS bootcamp.matches_bucketed""")
val matchesDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (
    match_id STRING,
    mapid string,
    is_team_game BOOLEAN,
    playlist_id STRING,
    completion_date TIMESTAMP
 )
 USING iceberg
 PARTITIONED BY (completion_date, bucket(4, match_id));
"""

spark.sql(matchesDDL)

// Break large dataset into smaller chunks
val processedData = matchesBucketed
  .select("match_id", "mapid", "is_team_game", "playlist_id", "completion_date")

// Process in batches if extremely large
val distinctDates = processedData.select("completion_date").distinct().collect()

distinctDates.foreach { dateRow =>
  val specificDate = dateRow.getAs[java.sql.Timestamp](0)
  
  processedData
    .filter(col("completion_date") === specificDate)
    .write
    .mode("append")
    .partitionBy("completion_date")
    .bucketBy(4, "match_id")
    .saveAsTable("bootcamp.matches_bucketed")
}



//medal_maps_players table

spark.sql("""DROP TABLE IF EXISTS bootcamp.me_ma_pl_bucketed""")
val memapldDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.me_ma_pl_bucketed (
    match_id STRING,
    player_gamertag STRING,
    medal_id bigint,
    count integer
 )
 USING iceberg
 partitioned by (bucket(4,match_id));
"""

spark.sql(memapldDDL)

me_ma_plBucketed.select(
     $"match_id", $"player_gamertag", $"medal_id", $"count"
     )
     .write.mode("append")
     .bucketBy(4,"match_id")
   .saveAsTable("bootcamp.me_ma_pl_bucketed")


//medals table

spark.sql("""DROP TABLE IF EXISTS bootcamp.medals_bucketed""")
val medalsDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.medals_bucketed (
    medal_id STRING,
    name STRING
 )
 USING iceberg;
"""

spark.sql(medalsDDL)

medalsBucketed.select(
     $"medal_id", $"name"
     )
     .write.mode("append")
   .saveAsTable("bootcamp.medals_bucketed")


//maps table

spark.sql("""DROP TABLE IF EXISTS bootcamp.maps_bucketed""")
val mapsDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.maps_bucketed (
    mapid STRING,
    name STRING
 )
 USING iceberg;
"""

spark.sql(mapsDDL)

mapsBucketed.select(
     $"mapid", $"name"
     )
     .write.mode("append")
   .saveAsTable("bootcamp.maps_bucketed")





match_detailsBucketed: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 34 more fields]
matchesBucketed: org.apache.spark.sql.DataFrame = [match_id: string, mapid: string ... 8 more fields]
me_ma_plBucketed: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 2 more fields]
medalsBucketed: org.apache.spark.sql.DataFrame = [medal_id: bigint, sprite_uri: string ... 10 more fields]
mapsBucketed: org.apache.spark.sql.DataFrame = [mapid: string, name: string ... 1 more field]
matchDetailsDDL: String =
"
CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
     match_id STRING,
     player_gamertag STRING,
     player_total_kills INTEGER,
     player_total_deaths INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(4, match_id));...


In [3]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

val bucketedMatches = spark.table("bootcamp.matches_bucketed").sort("match_id")
val bucketedMatchDetails = spark.table("bootcamp.match_details_bucketed").sort("match_id")

val bucketedJoin = bucketedMatches.as("m")
   .join(bucketedMatchDetails.as("md"), $"m.match_id" === $"md.match_id")
   .select($"m.match_id", $"m.completion_date", $"md.player_gamertag", $"md.player_total_kills", $"md.player_total_deaths")
// .take(5)

bucketedJoin.createOrReplaceTempView("bucketed_result")
spark.sql("SELECT * FROM bucketed_result LIMIT 10").show()

+--------------------+-------------------+---------------+------------------+-------------------+
|            match_id|    completion_date|player_gamertag|player_total_kills|player_total_deaths|
+--------------------+-------------------+---------------+------------------+-------------------+
|00169217-cca6-4b4...|2016-03-13 00:00:00|  King Terror V|                14|                  7|
|00169217-cca6-4b4...|2016-03-13 00:00:00|      King Sope|                11|                  5|
|00169217-cca6-4b4...|2016-03-13 00:00:00|       mcnaeric|                10|                 14|
|00169217-cca6-4b4...|2016-03-13 00:00:00|    EXTREMENOVA|                 8|                 10|
|00169217-cca6-4b4...|2016-03-13 00:00:00| Psych0ticCamel|                 8|                 14|
|00169217-cca6-4b4...|2016-03-13 00:00:00|Trap Lord David|                 8|                 12|
|00169217-cca6-4b4...|2016-03-13 00:00:00|       DJ RAHHH|                13|                 11|
|00169217-cca6-4b4..

bucketedMatches: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [match_id: string, is_team_game: boolean ... 2 more fields]
bucketedMatchDetails: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [match_id: string, player_gamertag: string ... 2 more fields]
bucketedJoin: org.apache.spark.sql.DataFrame = [match_id: string, completion_date: timestamp ... 3 more fields]


In [12]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

val bucketedMatches = spark.table("bootcamp.matches_bucketed").sort("match_id")
val bucketedMatchDetails = spark.table("bootcamp.match_details_bucketed").sort("match_id")
val bucketedMedalMatchPlayers = spark.table("bootcamp.me_ma_pl_bucketed").sort("match_id")
val bucketedMedals = spark.table("bootcamp.medals_bucketed")
val bucketedMaps = spark.table("bootcamp.maps_bucketed")

val bucketedJoin = bucketedMatches.as("m")
   .join(bucketedMatchDetails.as("md"), $"m.match_id" === $"md.match_id")
   .join(bucketedMedalMatchPlayers.as("mmp"), $"md.match_id" === $"mmp.match_id" && $"md.player_gamertag" === $"mmp.player_gamertag")
   .join(broadcast(bucketedMedals).as("me"), $"mmp.medal_id" === $"me.medal_id")
   .join(broadcast(bucketedMaps).as("mp"), $"m.mapid" === $"mp.mapid")
   .select($"m.match_id", $"m.is_team_game", $"m.playlist_id", $"completion_date", 
       $"md.player_gamertag", $"md.player_total_kills",  $"md.player_total_deaths",
        $"mmp.medal_id", $"me.name".as ("medal_name"), $"mmp.count",
        $"mp.mapid", $"mp.name".as ("map_name"))
//       .explain()

bucketedJoin.createOrReplaceTempView("bucketed_result")
spark.sql("SELECT * FROM bucketed_result LIMIT 3").show()

+--------------------+------------+--------------------+-------------------+---------------+------------------+-------------------+----------+-------------------+-----+--------------------+--------+
|            match_id|is_team_game|         playlist_id|    completion_date|player_gamertag|player_total_kills|player_total_deaths|  medal_id|         medal_name|count|               mapid|map_name|
+--------------------+------------+--------------------+-------------------+---------------+------------------+-------------------+----------+-------------------+-----+--------------------+--------+
|0001a1c4-83dc-4f4...|        true|780cc101-005c-4fc...|2016-01-06 00:00:00|    ILLICIT 117|                23|                 28|3565443938|Stronghold Captured|    4|c7805740-f206-11e...| Glacier|
|0001a1c4-83dc-4f4...|        true|780cc101-005c-4fc...|2016-01-06 00:00:00|    ILLICIT 117|                23|                 28|3261908037|           Headshot|    8|c7805740-f206-11e...| Glacier|
|0001

bucketedMatches: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [match_id: string, mapid: string ... 3 more fields]
bucketedMatchDetails: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [match_id: string, player_gamertag: string ... 2 more fields]
bucketedMedalMatchPlayers: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [match_id: string, player_gamertag: string ... 2 more fields]
bucketedMedals: org.apache.spark.sql.DataFrame = [medal_id: string, name: string]
bucketedMaps: org.apache.spark.sql.DataFrame = [mapid: string, name: string]
bucketedJoin: org.apache.spark.sql.DataFrame = [match_id: string, is_team_game: boolean ... 10 more fields]


In [30]:
spark.sql("SELECT mapid, map_name, sum(count) FROM bucketed_result where medal_name = 'Killing Spree' group by mapid, map_name order by 3 desc").show()

+--------------------+--------------+----------+
|               mapid|      map_name|sum(count)|
+--------------------+--------------+----------+
|c7edbf0f-f206-11e...|Breakout Arena|     26952|
|c74c9d0f-f206-11e...|        Alpine|     21436|
|c7805740-f206-11e...|       Glacier|     13608|
|cdb934b0-f206-11e...|        Empire|      8932|
|ce1dc2de-f206-11e...|         Truth|      8244|
|cb914b9e-f206-11e...|       The Rig|      8012|
|caacb800-f206-11e...|         Plaza|      7648|
|cebd854f-f206-11e...|      Coliseum|      7640|
|cd844200-f206-11e...|          Eden|      7076|
|cdee4e70-f206-11e...|        Regret|      7044|
|cc040aa1-f206-11e...|        Fathom|      6632|
|ca737f8f-f206-11e...|    Overgrowth|      4196|
|cbcea2c0-f206-11e...|      Riptide |      3660|
|cc74f4e1-f206-11e...|          NULL|      3572|
|c7b7baf0-f206-11e...|      Parallax|      2900|
|ce89a40f-f206-11e...|          NULL|      1420|
+--------------------+--------------+----------+



In [27]:
// Which player averages the most kills per game?
//val avgKillPerGame = bucketedJoin.distinct()
//     .groupBy($"player_gamertag")
//      .agg(avg($"player_total_kills").as("avg_kills_per_match"))
//      .orderBy(avg($"player_total_kills").desc)

//Which playlist gets played the most?
//val playlistCount = bucketedJoin.groupBy($"playlist_id")
//  .agg(countDistinct($"match_id").as("total_games_played"))
//  .orderBy(countDistinct($"match_id").desc)

//Which map gets played the most?
//val mapCount = bucketedJoin.filter($"map_name".isNotNull)
//    .groupBy($"map_name")  
//    .agg(countDistinct($"match_id").as("total_games"))
//  .orderBy($"total_games".desc)

//Which map do players get the most Killing Spree medals on?
//val mapKSCount = bucketedJoin.distinct()
//        .filter($"medal_name"=== "Killing Spree")
//        .groupBy($"map_name")
//        .agg(sum($"count").as("total_games"))
//      .orderBy($"total_games".desc)


mapKSCount.show()


+--------------+-----------+
|      map_name|total_games|
+--------------+-----------+
|Breakout Arena|       6738|
|        Alpine|       5359|
|       Glacier|       3402|
|        Empire|       2233|
|         Truth|       2061|
|       The Rig|       2003|
|         Plaza|       1912|
|      Coliseum|       1910|
|          Eden|       1769|
|        Regret|       1761|
|        Fathom|       1658|
|          NULL|       1248|
|    Overgrowth|       1049|
|      Riptide |        915|
|      Parallax|        725|
+--------------+-----------+



mapKSCount: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [map_name: string, total_games: bigint]


In [8]:
spark.sql("SELECT player_gamertag, avg(player_total_kills) over (partition by player_gamertag) as avg_kill FROM bucketed_result LIMIT 10").show()

+---------------+--------+
|player_gamertag|avg_kill|
+---------------+--------+
|  A 29 Delivery|    11.0|
|  A 29 Delivery|    11.0|
|  A 29 Delivery|    11.0|
|    A BOOTY TAP|     2.0|
|    A BOOTY TAP|     2.0|
|    A BOOTY TAP|     2.0|
|    A Baby Lynx|    12.0|
|    A Baby Lynx|    12.0|
|    A Baby Lynx|    12.0|
| A Blind Kid 8P|    29.0|
+---------------+--------+

