In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, desc, broadcast

In [27]:
spark = SparkSession.builder \
    .appName("Assignment 3 - Spark Job") \
    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
    .getOrCreate()

24/12/12 12:30:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [29]:
# Load the datasets
match_details_df = spark.read.csv("/home/iceberg/data/match_details.csv", header=True, inferSchema=True)
matches_df = spark.read.csv("/home/iceberg/data/matches.csv", header=True, inferSchema=True)
medals_matches_players_df = spark.read.csv("/home/iceberg/data/medals_matches_players.csv", header=True, inferSchema=True)
medals_df = spark.read.csv("/home/iceberg/data/medals.csv", header=True, inferSchema=True)

In [52]:
# Bucket join on match_id for match_details, matches, and medals_matches_players
bucketed_match_details = match_details_df.repartitionByRange(16, "match_id")
bucketed_matches = matches_df.repartitionByRange(16, "match_id")
bucketed_medals_matches_players = medals_matches_players_df.repartitionByRange(16, "match_id")

medals_df.show()

+----------+--------------------+-----------+----------+------------------+-------------------+------------+-------------+-----------------+--------------------+--------------+----------+
|  medal_id|          sprite_uri|sprite_left|sprite_top|sprite_sheet_width|sprite_sheet_height|sprite_width|sprite_height|   classification|         description|          name|difficulty|
+----------+--------------------+-----------+----------+------------------+-------------------+------------+-------------+-----------------+--------------------+--------------+----------+
|2315448068|                NULL|       NULL|      NULL|              NULL|               NULL|        NULL|         NULL|             NULL|                NULL|          NULL|      NULL|
|3565441934|                NULL|       NULL|      NULL|              NULL|               NULL|        NULL|         NULL|             NULL|                NULL|          NULL|      NULL|
|4162659350|https://content.h...|        750|       750|    

In [36]:
# Join match_details and matches using bucket join
matches_with_details = bucketed_match_details.join(bucketed_matches, "match_id", "inner")

# Join with medals_matches_players using bucket join
complete_data = matches_with_details.join(bucketed_medals_matches_players, "match_id", "inner")

# Broadcast join medals to complete_data
complete_data_with_medals = complete_data.join(broadcast(medals_df), "medal_id", "left")

In [56]:
# Aggregations
# 1. Which player averages the most kills per game?
player_avg_kills = match_details_df.groupBy("player_gamertag").agg(
    avg("player_total_kills").alias("avg_kills_per_game")
).orderBy(desc("avg_kills_per_game")).limit(1)

# 2. Which playlist gets played the most?
most_played_playlist = matches_df.groupBy("playlist_id").agg(
    count("match_id").alias("total_matches")
).orderBy(desc("total_matches")).limit(1)

# 3. Which map gets played the most?
most_played_map = matches_df.groupBy("mapid").agg(
    count("match_id").alias("total_matches")
).orderBy(desc("total_matches")).limit(1)

# 4. Which map do players get the most Killing Spree medals on?
killing_spree_medals = complete_data_with_medals.filter(
    col("name") == "Killing Spree"
).groupBy("mapid").agg(
    count("medal_id").alias("total_killing_spree_medals")
).orderBy(desc("total_killing_spree_medals")).limit(1)

# # Optimize with sortWithinPartitions
# player_avg_kills_sorted = player_avg_kills.sortWithinPartitions("avg_kills_per_game")
# most_played_playlist_sorted = most_played_playlist.sortWithinPartitions("playlist")
# most_played_map_sorted = most_played_map.sortWithinPartitions("map_id")

In [46]:
player_avg_kills.show()



+---------------+------------------+
|player_gamertag|avg_kills_per_game|
+---------------+------------------+
|   gimpinator14|             109.0|
+---------------+------------------+



                                                                                

In [49]:
most_played_playlist.show()

+--------------------+-------------+
|         playlist_id|total_matches|
+--------------------+-------------+
|f72e0ef0-7c4a-430...|         9350|
+--------------------+-------------+



In [51]:
most_played_map.show()

+--------------------+-------------+
|               mapid|total_matches|
+--------------------+-------------+
|c7edbf0f-f206-11e...|         8587|
+--------------------+-------------+



In [57]:
killing_spree_medals.show()



+--------------------+--------------------------+
|               mapid|total_killing_spree_medals|
+--------------------+--------------------------+
|c74c9d0f-f206-11e...|                     56908|
+--------------------+--------------------------+



                                                                                

In [60]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, desc, broadcast

def initialize_spark(app_name: str) -> SparkSession:
    """Initialize Spark Session with required configuration."""
    return SparkSession.builder \
        .appName(app_name) \
        .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
        .getOrCreate()

def load_data(spark: SparkSession, path: str):
    """Load CSV data into a DataFrame."""
    return spark.read.csv(path, header=True, inferSchema=True)

def bucket_data(df, bucket_column: str, num_buckets: int):
    """Repartition DataFrame into buckets."""
    return df.repartitionByRange(num_buckets, bucket_column)

def join_with_broadcast(df_left, df_right, join_column: str, join_type: str = "inner"):
    """Perform a broadcast join."""
    return df_left.join(broadcast(df_right), join_column, join_type)

def calculate_top_player_avg_kills(match_details_df):
    """Calculate which player averages the most kills per game."""
    return match_details_df.groupBy("player_gamertag").agg(
        avg("player_total_kills").alias("avg_kills_per_game")
        ).orderBy(desc("avg_kills_per_game")).limit(1)

def calculate_top_most_played_playlist(matches_df):
    """Determine which playlist gets played the most."""
    return matches_df.groupBy("playlist_id").agg(
        count("match_id").alias("total_matches")
        ).orderBy(desc("total_matches")).limit(1)

def calculate_top_most_played_map(matches_df):
    """Determine which map gets played the most."""
    return matches_df.groupBy("mapid").agg(
        count("match_id").alias("total_matches")
        ).orderBy(desc("total_matches")).limit(1)

def calculate_top_killing_spree_medals(complete_data_with_medals):
    """Determine which map has the most Killing Spree medals."""
    return complete_data_with_medals.filter(
        col("name") == "Killing Spree"
        ).groupBy("mapid").agg(
        count("medal_id").alias("total_killing_spree_medals")
        ).orderBy(desc("total_killing_spree_medals")).limit(1)

# def save_results(df, output_path: str):
#     """Save DataFrame results to disk."""
#     df.write.csv(output_path, header=True, mode="overwrite")

def main():
    """Main function to execute the Spark job."""
    spark = initialize_spark("Assignment 3 - Functional Programming")

    # Load datasets
    match_details_df = load_data(spark, "/home/iceberg/data/match_details.csv")
    matches_df = load_data(spark, "/home/iceberg/data/matches.csv")
    medals_matches_players_df = load_data(spark, "/home/iceberg/data/medals_matches_players.csv")
    medals_df = load_data(spark, "/home/iceberg/data/medals.csv")

    # Bucket data
    bucketed_match_details = bucket_data(match_details_df, "match_id", 16)
    bucketed_matches = bucket_data(matches_df, "match_id", 16)
    bucketed_medals_matches_players = bucket_data(medals_matches_players_df, "match_id", 16)

    # Perform bucketed joins
    matches_with_details = bucketed_match_details.join(bucketed_matches, "match_id", "inner")
    complete_data = matches_with_details.join(bucketed_medals_matches_players, "match_id", "inner")

    # Broadcast join with medals
    complete_data_with_medals = join_with_broadcast(complete_data, medals_df, "medal_id")

    # Aggregations with top 1 results
    top_player_avg_kills = calculate_top_player_avg_kills(match_details_df)
    top_most_played_playlist = calculate_top_most_played_playlist(matches_df)
    top_most_played_map = calculate_top_most_played_map(matches_df)
    top_killing_spree_medals = calculate_top_killing_spree_medals(complete_data_with_medals)

    top_player_avg_kills.show()
    top_most_played_playlist.show()
    top_most_played_map.show()
    top_killing_spree_medals.show()    

    # Save results
    # save_results(top_player_avg_kills, "/path/to/output/top_player_avg_kills")
    # save_results(top_most_played_playlist, "/path/to/output/top_most_played_playlist")
    # save_results(top_most_played_map, "/path/to/output/top_most_played_map")
    # save_results(top_killing_spree_medals, "/path/to/output/top_killing_spree_medals")

    # Stop Spark session
    spark.stop()

if __name__ == "__main__":
    main()


24/12/12 14:13:21 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+---------------+------------------+
|player_gamertag|avg_kills_per_game|
+---------------+------------------+
|   gimpinator14|             109.0|
+---------------+------------------+

+--------------------+-------------+
|         playlist_id|total_matches|
+--------------------+-------------+
|f72e0ef0-7c4a-430...|         9350|
+--------------------+-------------+

+--------------------+-------------+
|               mapid|total_matches|
+--------------------+-------------+
|c7edbf0f-f206-11e...|         8587|
+--------------------+-------------+



                                                                                

+--------------------+--------------------------+
|               mapid|total_killing_spree_medals|
+--------------------+--------------------------+
|c74c9d0f-f206-11e...|                     56908|
+--------------------+--------------------------+



In [61]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, desc, broadcast

def initialize_spark(app_name: str) -> SparkSession:
    """Initialize Spark Session with required configuration."""
    return SparkSession.builder \
        .appName(app_name) \
        .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
        .getOrCreate()

def load_data(spark: SparkSession, path: str):
    """Load CSV data into a DataFrame."""
    return spark.read.csv(path, header=True, inferSchema=True)

def bucket_data(df, bucket_column: str, num_buckets: int):
    """Repartition DataFrame into buckets."""
    return df.repartition(num_buckets, bucket_column)

def sort_within_partitions(df, sort_column: str):
    """Sort data within partitions."""
    return df.sortWithinPartitions(sort_column)

def join_with_broadcast(df_left, df_right, join_column: str, join_type: str = "inner"):
    """Perform a broadcast join."""
    return df_left.join(broadcast(df_right), join_column, join_type)

def calculate_top_player_avg_kills(match_details_df):
    """Calculate which player averages the most kills per game."""
    return match_details_df.groupBy("player_gamertag").agg(
        avg("player_total_kills").alias("avg_kills_per_game")
        ).orderBy(desc("avg_kills_per_game")).limit(1)

def calculate_top_most_played_playlist(matches_df):
    """Determine which playlist gets played the most."""
    return matches_df.groupBy("playlist_id").agg(
        count("match_id").alias("total_matches")
        ).orderBy(desc("total_matches")).limit(1)

def calculate_top_most_played_map(matches_df):
    """Determine which map gets played the most."""
    return matches_df.groupBy("mapid").agg(
        count("match_id").alias("total_matches")
        ).orderBy(desc("total_matches")).limit(1)

def calculate_top_killing_spree_medals(complete_data_with_medals):
    """Determine which map has the most Killing Spree medals."""
    return complete_data_with_medals.filter(
        col("name") == "Killing Spree"
        ).groupBy("mapid").agg(
        count("medal_id").alias("total_killing_spree_medals")
        ).orderBy(desc("total_killing_spree_medals")).limit(1)

def save_results(df, output_path: str):
    """Save DataFrame results to disk."""
    df.write.csv(output_path, header=True, mode="overwrite")

def main():
    """Main function to execute the Spark job with optimizations."""
    spark = initialize_spark("Assignment 3 - Functional Programming with Optimizations")

    # Load datasets
    match_details_df = load_data(spark, "/home/iceberg/data/match_details.csv")
    matches_df = load_data(spark, "/home/iceberg/data/matches.csv")
    medals_matches_players_df = load_data(spark, "/home/iceberg/data/medals_matches_players.csv")
    medals_df = load_data(spark, "/home/iceberg/data/medals.csv")

    # Bucket data
    bucketed_match_details = bucket_data(match_details_df, "match_id", 16)
    bucketed_matches = bucket_data(matches_df, "match_id", 16)
    bucketed_medals_matches_players = bucket_data(medals_matches_players_df, "match_id", 16)

    # Perform bucketed joins
    matches_with_details = bucketed_match_details.join(bucketed_matches, "match_id", "inner")
    complete_data = matches_with_details.join(bucketed_medals_matches_players, "match_id", "inner")

    # Broadcast join with medals
    complete_data_with_medals = join_with_broadcast(complete_data, medals_df, "medal_id")

    # **Optimization: Sorting Within Partitions**
    sorted_match_details = sort_within_partitions(match_details_df, "player_gamertag")
    sorted_matches = sort_within_partitions(matches_df, "playlist_id")
    sorted_medals_matches_players = sort_within_partitions(medals_matches_players_df, "match_id")

    # Aggregations with top 1 results
    top_player_avg_kills = calculate_top_player_avg_kills(sorted_match_details)
    top_most_played_playlist = calculate_top_most_played_playlist(sorted_matches)
    top_most_played_map = calculate_top_most_played_map(sorted_matches)
    top_killing_spree_medals = calculate_top_killing_spree_medals(complete_data_with_medals)

    # Show results
    top_player_avg_kills.show()
    top_most_played_playlist.show()
    top_most_played_map.show()
    top_killing_spree_medals.show()    

    # Save results
    save_results(top_player_avg_kills, "/path/to/output/top_player_avg_kills")
    save_results(top_most_played_playlist, "/path/to/output/top_most_played_playlist")
    save_results(top_most_played_map, "/path/to/output/top_most_played_map")
    save_results(top_killing_spree_medals, "/path/to/output/top_killing_spree_medals")

    # Stop Spark session
    spark.stop()

if __name__ == "__main__":
    main()







24/12/12 14:28:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

+---------------+------------------+
|player_gamertag|avg_kills_per_game|
+---------------+------------------+
|   gimpinator14|             109.0|
+---------------+------------------+

+--------------------+-------------+
|         playlist_id|total_matches|
+--------------------+-------------+
|f72e0ef0-7c4a-430...|         9350|
+--------------------+-------------+

+--------------------+-------------+
|               mapid|total_matches|
+--------------------+-------------+
|c7edbf0f-f206-11e...|         8587|
+--------------------+-------------+



                                                                                

+--------------------+--------------------------+
|               mapid|total_killing_spree_medals|
+--------------------+--------------------------+
|c74c9d0f-f206-11e...|                     56908|
+--------------------+--------------------------+



                                                                                

In [64]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, desc, broadcast

# Initialize Spark Session
def initialize_spark(app_name: str) -> SparkSession:
    """Initialize Spark Session with required configuration."""
    return SparkSession.builder \
        .appName(app_name) \
        .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
        .enableHiveSupport() \
        .getOrCreate()

# Load data from CSV
def load_data(spark: SparkSession, path: str):
    """Load CSV data into a DataFrame."""
    return spark.read.csv(path, header=True, inferSchema=True)

# Bucket data
def bucket_data(df, bucket_column: str, num_buckets: int):
    """Repartition DataFrame into buckets."""
    return df.repartitionByRange(num_buckets, bucket_column)

# Perform bucketed join
def bucketed_join(df_left, df_right, join_column: str, join_type: str = "inner"):
    """Perform a bucket join."""
    return df_left.join(df_right, join_column, join_type)

# Perform broadcast join
def join_with_broadcast(df_left, df_right, join_column: str, join_type: str = "inner"):
    """Perform a broadcast join."""
    return df_left.join(broadcast(df_right), join_column, join_type)

# Aggregation queries
def calculate_top_player_avg_kills(match_details_df):
    """Calculate which player averages the most kills per game."""
    return match_details_df.groupBy("player_gamertag").agg(
        avg("player_total_kills").alias("avg_kills_per_game")
    ).orderBy(desc("avg_kills_per_game")).limit(1)

def calculate_top_most_played_playlist(matches_df):
    """Determine which playlist gets played the most."""
    return matches_df.groupBy("playlist_id").agg(
        count("match_id").alias("total_matches")
    ).orderBy(desc("total_matches")).limit(1)

def calculate_top_most_played_map(matches_df):
    """Determine which map gets played the most."""
    return matches_df.groupBy("mapid").agg(
        count("match_id").alias("total_matches")
    ).orderBy(desc("total_matches")).limit(1)

def calculate_top_killing_spree_medals(complete_data_with_medals):
    """Determine which map has the most Killing Spree medals."""
    return complete_data_with_medals.filter(
        col("name") == "Killing Spree"
    ).groupBy("mapid").agg(
        count("medal_id").alias("total_killing_spree_medals")
    ).orderBy(desc("total_killing_spree_medals")).limit(1)

# Demonstrate partitioning and sorting
def demonstrate_partitioning_and_sort(df, partition_column: str, num_partitions: int, sort_column: str):
    """Demonstrate partitioning and sorting within partitions."""
    repartitioned_df = df.repartition(num_partitions, col(partition_column))
    sorted_df = repartitioned_df.sortWithinPartitions(sort_column)
    print(f"Partitions: {sorted_df.rdd.getNumPartitions()}")
    sorted_df.explain(True)
    return sorted_df

def main():
    """Main function to execute the Spark job."""
    spark = initialize_spark("Assignment 3 - Functional Programming")

    # Load datasets
    match_details_df = load_data(spark, "/home/iceberg/data/match_details.csv")
    matches_df = load_data(spark, "/home/iceberg/data/matches.csv")
    medals_matches_players_df = load_data(spark, "/home/iceberg/data/medals_matches_players.csv")
    medals_df = load_data(spark, "/home/iceberg/data/medals.csv")

    # Bucket data
    bucketed_match_details = bucket_data(match_details_df, "match_id", 16)
    bucketed_matches = bucket_data(matches_df, "match_id", 16)
    bucketed_medals_matches_players = bucket_data(medals_matches_players_df, "match_id", 16)

    # Perform bucketed joins
    matches_with_details = bucketed_join(bucketed_match_details, bucketed_matches, "match_id")
    complete_data = bucketed_join(matches_with_details, bucketed_medals_matches_players, "match_id")

    # Broadcast join with medals
    complete_data_with_medals = join_with_broadcast(complete_data, medals_df, "medal_id")

    # Aggregations
    top_player_avg_kills = calculate_top_player_avg_kills(match_details_df)
    top_most_played_playlist = calculate_top_most_played_playlist(matches_df)
    top_most_played_map = calculate_top_most_played_map(matches_df)
    top_killing_spree_medals = calculate_top_killing_spree_medals(complete_data_with_medals)

    # Partitioning demonstration
    demonstrate_partitioning_and_sort(match_details_df, "player_gamertag", 8, "player_total_kills")
    demonstrate_partitioning_and_sort(matches_df, "playlist_id", 4, "match_id")

    # Show results
    top_player_avg_kills.show()
    top_most_played_playlist.show()
    top_most_played_map.show()
    top_killing_spree_medals.show()

    # Stop Spark session
    spark.stop()

if __name__ == "__main__":
    main()




Partitions: 8
== Parsed Logical Plan ==
'Sort ['player_total_kills ASC NULLS FIRST], false
+- RepartitionByExpression [player_gamertag#5499], 8
   +- Relation [match_id#5498,player_gamertag#5499,previous_spartan_rank#5500,spartan_rank#5501,previous_total_xp#5502,total_xp#5503,previous_csr_tier#5504,previous_csr_designation#5505,previous_csr#5506,previous_csr_percent_to_next_tier#5507,previous_csr_rank#5508,current_csr_tier#5509,current_csr_designation#5510,current_csr#5511,current_csr_percent_to_next_tier#5512,current_csr_rank#5513,player_rank_on_team#5514,player_finished#5515,player_average_life#5516,player_total_kills#5517,player_total_headshots#5518,player_total_weapon_damage#5519,player_total_shots_landed#5520,player_total_melee_kills#5521,... 12 more fields] csv

== Analyzed Logical Plan ==
match_id: string, player_gamertag: string, previous_spartan_rank: int, spartan_rank: int, previous_total_xp: int, total_xp: int, previous_csr_tier: int, previous_csr_designation: int, previous_

                                                                                

+---------------+------------------+
|player_gamertag|avg_kills_per_game|
+---------------+------------------+
|   gimpinator14|             109.0|
+---------------+------------------+

+--------------------+-------------+
|         playlist_id|total_matches|
+--------------------+-------------+
|f72e0ef0-7c4a-430...|         9350|
+--------------------+-------------+

+--------------------+-------------+
|               mapid|total_matches|
+--------------------+-------------+
|c7edbf0f-f206-11e...|         8587|
+--------------------+-------------+



                                                                                

+--------------------+--------------------------+
|               mapid|total_killing_spree_medals|
+--------------------+--------------------------+
|c74c9d0f-f206-11e...|                     56908|
+--------------------+--------------------------+

