In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("Optimized Bucket Join") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.569") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "s3a://warehouse/") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "password") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9001") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .enableHiveSupport() \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

25/02/16 18:13:00 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# Test Bucket

In [1]:
pip install Faker

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [3]:
from faker import Faker
import random

In [4]:

# Sử dụng thư viện Faker để tạo dữ liệu giả
fake = Faker()

# Hàm tạo dữ liệu giả
def generate_fake_data(num_records):
    data = []
    for _ in range(num_records):
        match_id = f"M{random.randint(1, 100)}"  # Match ID từ M1 đến M100
        player = fake.name()  # Tên người chơi giả
        score = random.randint(0, 100)  # Điểm số ngẫu nhiên từ 0 đến 100
        data.append((match_id, player, score))
    return data

# Số lượng bản ghi
num_records = 1000000

# Tạo dữ liệu
columns = ["match_id", "player", "score"]
data = generate_fake_data(num_records)

# Tạo DataFrame
df = spark.createDataFrame(data, columns)

# Hiển thị một phần dữ liệu
df.show(5)

25/02/16 14:59:25 WARN TaskSetManager: Stage 0 contains a task of very large size (1607 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------+----------------+-----+
|match_id|          player|score|
+--------+----------------+-----+
|     M68| Joseph Phillips|   22|
|     M13|    Olivia Avila|   20|
|     M65|       Jack Long|   84|
|     M16|   Gerald Gamble|   29|
|     M91|Gabrielle Walker|   75|
+--------+----------------+-----+
only showing top 5 rows



In [5]:
# Lưu bảng với bucketing
(df.write
  .mode('overwrite')
  .format("iceberg")
  .bucketBy(16, "match_id")  # Bucket theo cột match_id với 16 buckets
  .saveAsTable("bootcamp.df_bucketed"))  # Lưu vào catalog 'bootcamp'

# Kiểm tra dữ liệu
spark.sql("SELECT * FROM bootcamp.df_bucketed LIMIT 10").show()

25/02/16 14:59:29 WARN TaskSetManager: Stage 1 contains a task of very large size (1607 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------+----------------+-----+
|match_id|          player|score|
+--------+----------------+-----+
|    M100|William Robinson|   13|
|     M55|   Kelly Johnson|   44|
|     M95|  Kevin Anderson|   69|
|     M55|   Joseph Martin|   27|
|    M100| Latasha Edwards|   91|
|     M95|     Adam Holden|   36|
|     M25|  Rebecca Romero|   86|
|     M87|      Emily Ruiz|   33|
|     M25|     Erik Martin|   28|
|     M52|   Pamela Miller|   79|
+--------+----------------+-----+



                                                                                

In [6]:
# Đọc lại bảng Iceberg
df_bucketed = spark.read.format("iceberg").load("bootcamp.df_bucketed")
df_bucketed.limit(5).toPandas()

Unnamed: 0,match_id,player,score
0,M100,William Robinson,13
1,M55,Kelly Johnson,44
2,M95,Kevin Anderson,69
3,M55,Joseph Martin,27
4,M100,Latasha Edwards,91


In [15]:
bucket_path = "s3a://warehouse/bootcamp/df_bucketed/data/match_id_bucket=0"
df_bucket_0 = spark.read.format("parquet").load(bucket_path)
df_bucket_0.show()

+--------+------------------+-----+
|match_id|            player|score|
+--------+------------------+-----+
|     M11|Mr. Anthony Hudson|  100|
|     M83|        Lisa Riggs|   20|
|     M11|    Jeremy Hensley|   38|
|     M90|  Thomas Carpenter|   11|
|     M62|     Micheal Smith|   46|
|     M31|     Jasmine Allen|   48|
|     M75|  Christina Duncan|   65|
|     M90|       Jamie Tyler|   51|
|     M11|     Mark Williams|   38|
|     M11|Christopher Holmes|   29|
|     M11|      Stacy Larson|   27|
|     M75|Elizabeth Phillips|    6|
|      M4|      George Scott|   15|
|     M11|       Joseph Wood|   16|
|      M4|       James Henry|   93|
|     M75|   William Lindsey|   67|
|     M90|      Amanda Huang|   40|
|     M11|     Steven Flores|   21|
|      M4|Elizabeth Williams|   17|
|     M21|      Stacey Cross|   78|
+--------+------------------+-----+
only showing top 20 rows



In [7]:
# Thêm cột bucket_id để kiểm tra bucket của mỗi dòng
df_with_bucket = df_bucketed.withColumn("bucket_id", (abs(hash(col("match_id"))) % 16))

# Kiểm tra các bucket_id duy nhất
unique_buckets = df_with_bucket.select('bucket_id').dropDuplicates().toPandas()

# Hiển thị kết quả
print(unique_buckets)

    bucket_id
0          12
1          13
2           6
3           3
4          15
5           9
6           4
7           8
8           7
9          10
10         14
11          0
12          1
13          5
14         11
15          2


In [14]:
df_bucketed\
.filter(col('player')=='William Robinson')\
.limit(5).toPandas()

Unnamed: 0,match_id,player,score
0,M100,William Robinson,13
1,M100,William Robinson,71
2,M19,William Robinson,69
3,M100,William Robinson,83
4,M36,William Robinson,42


In [8]:
# Lưu bảng không bucket
(df.write
  .mode('overwrite')
  .format("iceberg")
  .saveAsTable("bootcamp.df_non_bucketed"))

25/02/16 14:59:41 WARN TaskSetManager: Stage 9 contains a task of very large size (1607 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [9]:
import time

In [10]:
# Truy vấn trên bảng đã bucket
start_time = time.time()
spark.sql("""
    SELECT match_id, AVG(score) AS avg_score
    FROM bootcamp.df_bucketed
    WHERE match_id = 'M1'
    GROUP BY match_id
""").show()
print(f"Query on bucketed table took {time.time() - start_time:.2f} seconds")

# Truy vấn trên bảng không bucket
start_time = time.time()
spark.sql("""
    SELECT match_id, AVG(score) AS avg_score
    FROM bootcamp.df_non_bucketed
    WHERE match_id = 'M1'
    GROUP BY match_id
""").show()
print(f"Query on non-bucketed table took {time.time() - start_time:.2f} seconds")

+--------+-----------------+
|match_id|        avg_score|
+--------+-----------------+
|      M1|49.88741721854305|
+--------+-----------------+

Query on bucketed table took 1.15 seconds
+--------+-----------------+
|match_id|        avg_score|
+--------+-----------------+
|      M1|49.88741721854305|
+--------+-----------------+

Query on non-bucketed table took 1.11 seconds


# Homework

In [2]:
match_details_path = '/home/iceberg/data/match_details.csv'
matches_path = '/home/iceberg/data/matches.csv'
medals_matches_players_path = '/home/iceberg/data/medals_matches_players.csv'
medals_path = '/home/iceberg/data/medals.csv'
maps_path = '/home/iceberg/data/maps.csv'

match_details_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .load(match_details_path) \
    .select(
        'match_id',
        'player_gamertag',
        col('player_total_kills').cast('integer').alias('player_total_kills'),
        col('player_total_deaths').cast('integer').alias('player_total_deaths')
    )
matches_df = spark.read.format("csv")\
    .option("header", "true")\
    .option("delimiter", ",")\
    .load(matches_path)\
    .withColumn("is_team_game", when(col("is_team_game") == "true", True).otherwise(False)) \
    .withColumn("completion_date", to_timestamp(col("completion_date")))\
    .select(
        'match_id',
        col('mapid').alias('map_id'),
        'is_team_game',
        'playlist_id',
        'completion_date'
    )
medals_matches_players_df = spark.read.format("csv")\
    .option("header", "true")\
    .option("delimiter", ",")\
    .load(medals_matches_players_path)\
    .select('match_id',
            'player_gamertag',
            'medal_id',
            (col('count').cast('integer')).alias('medal_count')
    )
medals_df = spark.read.format("csv")\
    .option("header", "true")\
    .option("delimiter", ",")\
    .load(medals_path)\
    .select('medal_id',
            col('name').alias('medal_name')
    )
maps_df = spark.read.format("csv")\
    .option("header", "true")\
    .option("delimiter", ",")\
    .load(maps_path)\
    .select(col('mapid').alias('map_id'),
            col('name').alias('map_name')
    )

                                                                                

## Task 1

In [3]:
# Broadcast join 'maps'
matches_mapped_df = matches_df.join(
    broadcast(maps_df),
    "map_id",
    "inner"
)

# Broadcast join 'medals'
medals_matches_players_mapped_df = medals_matches_players_df.join(
    broadcast(medals_df),
    "medal_id",
    "inner"
)

In [4]:
# Lưu match_details với bucketing
match_details_df.write \
    .format("iceberg") \
    .mode("overwrite") \
    .bucketBy(16, "match_id") \
    .saveAsTable("bootcamp.match_details_bucketed")

# Lưu matches với bucketing
matches_mapped_df.write \
    .format("iceberg") \
    .mode("overwrite") \
    .bucketBy(16, "match_id") \
    .saveAsTable("bootcamp.matches_bucketed")

# Lưu medals_matches_players với bucketing
medals_matches_players_mapped_df.write \
    .format("iceberg") \
    .mode("overwrite") \
    .bucketBy(16, "match_id") \
    .saveAsTable("bootcamp.medals_matches_players_bucketed")

                                                                                

In [5]:
# Đọc các bảng đã bucketed
match_details_bucketed = spark.read.format("iceberg").load("bootcamp.match_details_bucketed")
matches_bucketed = spark.read.format("iceberg").load("bootcamp.matches_bucketed")
medals_matches_players_bucketed = spark.read.format("iceberg").load("bootcamp.medals_matches_players_bucketed")

# Thực hiện join giữa các bảng
# Step 1: Join match_details và matches trên match_id
step1 = match_details_bucketed.join(
    matches_bucketed,
    'match_id',
    "inner"
)

# Step 2: Join kết quả với medals_matches_players trên match_id
final_result = step1.join(
    medals_matches_players_bucketed,
    ['match_id', 'player_gamertag'],
    "inner"
)

# Hiển thị kết quả
final_result.limit(5).toPandas()

                                                                                

Unnamed: 0,match_id,player_gamertag,player_total_kills,player_total_deaths,map_id,is_team_game,playlist_id,completion_date,map_name,medal_id,medal_count,medal_name
0,0001a1c4-83dc-4f40-a97e-7910a765c96a,ILLICIT 117,23,28,c7805740-f206-11e4-982c-24be05e24f7e,True,780cc101-005c-4fca-8ce7-6f36d7156ffe,2016-01-06,Glacier,3565443938,4,Stronghold Captured
1,0001a1c4-83dc-4f40-a97e-7910a765c96a,ILLICIT 117,23,28,c7805740-f206-11e4-982c-24be05e24f7e,True,780cc101-005c-4fca-8ce7-6f36d7156ffe,2016-01-06,Glacier,3261908037,8,Headshot
2,0001a1c4-83dc-4f40-a97e-7910a765c96a,ILLICIT 117,23,28,c7805740-f206-11e4-982c-24be05e24f7e,True,780cc101-005c-4fca-8ce7-6f36d7156ffe,2016-01-06,Glacier,824733727,1,Distraction
3,0001a1c4-83dc-4f40-a97e-7910a765c96a,ILLICIT 117,23,28,c7805740-f206-11e4-982c-24be05e24f7e,True,780cc101-005c-4fca-8ce7-6f36d7156ffe,2016-01-06,Glacier,298813630,2,Spartan Charge
4,0001a1c4-83dc-4f40-a97e-7910a765c96a,ILLICIT 117,23,28,c7805740-f206-11e4-982c-24be05e24f7e,True,780cc101-005c-4fca-8ce7-6f36d7156ffe,2016-01-06,Glacier,1351381581,3,Stronghold Defense


In [6]:
# ===========================
# PHÂN TÍCH DỮ LIỆU
# ===========================

# 1. Người chơi nào có trung bình số lần giết cao nhất mỗi trận?
player_avg_kills = final_result.groupBy("player_gamertag") \
    .agg(
        (sum("player_total_kills") / countDistinct("match_id")).alias("avg_kills_per_game")
    ) \
    .orderBy(col("avg_kills_per_game").desc())

print("Top players by average kills per game:")
player_avg_kills.show(5)

# 2. Playlist nào được chơi nhiều nhất?
most_played_playlist = final_result.groupBy("playlist_id") \
    .agg(count("match_id").alias("match_count")) \
    .orderBy(col("match_count").desc())

print("Most played playlists:")
most_played_playlist.show(5)

# 3. Bản đồ nào được chơi nhiều nhất?
most_played_map = final_result.groupBy("map_name") \
    .agg(count("match_id").alias("match_count")) \
    .orderBy(col("match_count").desc())

print("Most played maps:")
most_played_map.show(5)

# 4. Bản đồ nào có nhiều huy chương "Killing Spree" nhất?
killing_spree_medals = final_result.filter(col("medal_name") == "Killing Spree")

most_killing_spree_map = killing_spree_medals.groupBy("map_name") \
    .agg(sum("medal_count").alias("total_killing_spree_medals")) \
    .orderBy(col("total_killing_spree_medals").desc())

print("Maps with the most Killing Spree medals:")
most_killing_spree_map.show(5)

Top players by average kills per game:


                                                                                

+---------------+------------------+
|player_gamertag|avg_kills_per_game|
+---------------+------------------+
|  I Johann117 I|            1440.0|
|   gimpinator14|            1308.0|
|BudgetLegendary|            1162.0|
|      GsFurreal|            1125.0|
|    ManicZ0mb1e|            1098.0|
+---------------+------------------+
only showing top 5 rows

Most played playlists:


                                                                                

+--------------------+-----------+
|         playlist_id|match_count|
+--------------------+-----------+
|f72e0ef0-7c4a-430...|     202489|
|c98949ae-60a8-43d...|     107422|
|2323b76a-db98-4e0...|      92148|
|892189e9-d712-4bd...|      86496|
|0bcf2be1-3168-4e4...|      66477|
+--------------------+-----------+
only showing top 5 rows

Most played maps:


                                                                                

+--------------+-----------+
|      map_name|match_count|
+--------------+-----------+
|Breakout Arena|     186118|
|        Alpine|     105658|
|       Glacier|      70182|
|        Empire|      51845|
|       The Rig|      41098|
+--------------+-----------+
only showing top 5 rows

Maps with the most Killing Spree medals:
+--------------+--------------------------+
|      map_name|total_killing_spree_medals|
+--------------+--------------------------+
|Breakout Arena|                      6738|
|        Alpine|                      5359|
|       Glacier|                      3402|
|        Empire|                      2233|
|         Truth|                      2061|
+--------------+--------------------------+
only showing top 5 rows

