## Understanding Bucketing
Reference: https://www.youtube.com/watch?v=1kWl6d1yeKA&t=1s

Deciding optimal number of buckets
- size of dataset = x
- Optimal bucket size = 128 - 200 MB
- No. of buckets = size of dataset/optimal bucket size = 1000 MB/200 MB = 5

Estimating size of dataset
- No of megabytes  = N*V*W/1024^2
- N = number of records
- V = number of variables
- W = average width in bytes of a variable

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Initialize Spark Session
spark = SparkSession.builder.appName("bucketing").getOrCreate()
sc = spark.sparkContext

In [12]:
orders_df = spark.read.csv("../data/bucketing/orders.csv", header=True, inferSchema=True)
orders_df.show(10, False)

+--------+----------+-----------+--------+-------------------+------------+
|order_id|product_id|customer_id|quantity|order_date         |total_amount|
+--------+----------+-----------+--------+-------------------+------------+
|1       |80        |10         |4       |2023-03-20 00:00:00|1003        |
|2       |69        |30         |3       |2023-12-11 00:00:00|780         |
|3       |61        |20         |4       |2023-04-26 00:00:00|1218        |
|4       |62        |44         |3       |2023-08-26 00:00:00|2022        |
|5       |78        |46         |4       |2023-08-05 00:00:00|1291        |
|6       |57        |34         |1       |2023-09-12 00:00:00|1529        |
|7       |10        |24         |3       |2023-02-26 00:00:00|191         |
|8       |46        |48         |4       |2023-10-15 00:00:00|2170        |
|9       |57        |10         |4       |2023-01-11 00:00:00|1816        |
|10      |31        |39         |1       |2023-01-06 00:00:00|170         |
+--------+--

In [13]:
products_df = spark.read.csv("../data/bucketing/products.csv", header=True, inferSchema=True)
products_df.show(10, False)

+----------+------------+-----------+-------+-----+-----+
|product_id|product_name|category   |brand  |price|stock|
+----------+------------+-----------+-------+-----+-----+
|1         |Product_1   |Electronics|Brand_4|26   |505  |
|2         |Product_2   |Apparel    |Brand_4|489  |15   |
|3         |Product_3   |Apparel    |Brand_4|102  |370  |
|4         |Product_4   |Groceries  |Brand_1|47   |433  |
|5         |Product_5   |Groceries  |Brand_3|244  |902  |
|6         |Product_6   |Apparel    |Brand_1|268  |771  |
|7         |Product_7   |Electronics|Brand_5|300  |229  |
|8         |Product_8   |Groceries  |Brand_1|414  |810  |
|9         |Product_9   |Groceries  |Brand_1|415  |224  |
|10        |Product_10  |Electronics|Brand_5|10   |654  |
+----------+------------+-----------+-------+-----+-----+
only showing top 10 rows



In [5]:
products_df.select('product_id').distinct().count()

100

In [7]:
orders_df.select('order_id').distinct().count()

1000

In [15]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)

Join before bucketing

In [16]:
po_join_df = orders_df.join(products_df, on='product_id',how='inner')
po_join_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [product_id#187, order_id#186, customer_id#188, quantity#189, order_date#190, total_amount#191, product_name#248, category#249, brand#250, price#251, stock#252]
   +- SortMergeJoin [product_id#187], [product_id#247], Inner
      :- Sort [product_id#187 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(product_id#187, 200), ENSURE_REQUIREMENTS, [plan_id=493]
      :     +- Filter isnotnull(product_id#187)
      :        +- FileScan csv [order_id#186,product_id#187,customer_id#188,quantity#189,order_date#190,total_amount#191] Batched: false, DataFilters: [isnotnull(product_id#187)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/saitejadevalla/Desktop/SaiTeja/GitHub_Repos/All_things_Spa..., PartitionFilters: [], PushedFilters: [IsNotNull(product_id)], ReadSchema: struct<order_id:int,product_id:int,customer_id:int,quantity:int,order_date:timestamp,total_amount...
      +- Sort [product_id#247 

Bucketing dfs

In [17]:
products_df.write.bucketBy(4, col="product_id").mode("overwrite").saveAsTable("products_bucketed")

                                                                                

In [18]:
orders_df.write.bucketBy(4, col="product_id").mode("overwrite").saveAsTable("orders_bucketed")

In [19]:
products_df_bucketed = spark.table('products_bucketed')
orders_df_bucketed = spark.table('orders_bucketed')

bucketed_po_join_df = orders_df_bucketed.join(products_df_bucketed, on='product_id',how='inner')
bucketed_po_join_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [product_id#350, order_id#349, customer_id#351, quantity#352, order_date#353, total_amount#354, product_name#338, category#339, brand#340, price#341, stock#342]
   +- SortMergeJoin [product_id#350], [product_id#337], Inner
      :- Sort [product_id#350 ASC NULLS FIRST], false, 0
      :  +- Filter isnotnull(product_id#350)
      :     +- FileScan parquet spark_catalog.default.orders_bucketed[order_id#349,product_id#350,customer_id#351,quantity#352,order_date#353,total_amount#354] Batched: true, Bucketed: true, DataFilters: [isnotnull(product_id#350)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/saitejadevalla/Desktop/SaiTeja/GitHub_Repos/All_things_Spa..., PartitionFilters: [], PushedFilters: [IsNotNull(product_id)], ReadSchema: struct<order_id:int,product_id:int,customer_id:int,quantity:int,order_date:timestamp,total_amount..., SelectedBucketsCount: 4 out of 4
      +- Sort [product_id#337 ASC NUL

No exchange hashpartitioning after bucketing

Bucketing in Aggregations

In [21]:
# Without bucketing

sales_df = orders_df.groupBy("product_id").agg(sum("total_amount").alias("sales"))

sales_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[product_id#187], functions=[sum(total_amount#191)])
   +- Exchange hashpartitioning(product_id#187, 200), ENSURE_REQUIREMENTS, [plan_id=589]
      +- HashAggregate(keys=[product_id#187], functions=[partial_sum(total_amount#191)])
         +- FileScan csv [product_id#187,total_amount#191] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/saitejadevalla/Desktop/SaiTeja/GitHub_Repos/All_things_Spa..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_id:int,total_amount:int>




In [22]:
# With bucketing

sales_df = orders_df_bucketed.groupBy("product_id").agg(sum("total_amount").alias("sales"))

sales_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[product_id#350], functions=[sum(total_amount#354)])
   +- HashAggregate(keys=[product_id#350], functions=[partial_sum(total_amount#354)])
      +- FileScan parquet spark_catalog.default.orders_bucketed[product_id#350,total_amount#354] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/saitejadevalla/Desktop/SaiTeja/GitHub_Repos/All_things_Spa..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_id:int,total_amount:int>, SelectedBucketsCount: 4 out of 4




No exchange hashpartitioning after bucketing

Bucket pruning

In [23]:
product_sales_bucket_pruning = (
    orders_df_bucketed
    .filter(col("product_id") == 1)
    .groupBy("product_id")
    .agg(sum("total_amount").alias("sales"))
)
product_sales_bucket_pruning.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[product_id#350], functions=[sum(total_amount#354)])
   +- HashAggregate(keys=[product_id#350], functions=[partial_sum(total_amount#354)])
      +- Filter (isnotnull(product_id#350) AND (product_id#350 = 1))
         +- FileScan parquet spark_catalog.default.orders_bucketed[product_id#350,total_amount#354] Batched: true, Bucketed: true, DataFilters: [isnotnull(product_id#350), (product_id#350 = 1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/saitejadevalla/Desktop/SaiTeja/GitHub_Repos/All_things_Spa..., PartitionFilters: [], PushedFilters: [IsNotNull(product_id), EqualTo(product_id,1)], ReadSchema: struct<product_id:int,total_amount:int>, SelectedBucketsCount: 1 out of 4




In [24]:
spark.stop()