## Bucket Pruning — Optimizing Filtering on Bucketed Column (Reducing Bucket Files to Scan)
### As of Spark 2.4, Spark SQL supports bucket pruning to optimize filtering on bucketed column (by reducing the number of bucket files to scan).

### Bucket pruning supports the following predicate expressions:
### 
### EqualTo (=)
### 
### EqualNullSafe (<=>)
### 
### In
### 
### InSet
### 
### And and Or of the above

In [1]:
lst =[]
for i in range(1000):
    lst.append([str(i),"name"+str(i)])
    

In [3]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.appName("Bucket Pruining").getOrCreate()
sc = spark.sparkContext

In [9]:
rdd_in = sc.parallelize(lst)

In [13]:
rdd_in.collect()[:5]

[['0', 'name0'],
 ['1', 'name1'],
 ['2', 'name2'],
 ['3', 'name3'],
 ['4', 'name4']]

In [14]:
df_in = spark.createDataFrame(rdd_in,["id","name"])

In [15]:
df_in.show()

+---+------+
| id|  name|
+---+------+
|  0| name0|
|  1| name1|
|  2| name2|
|  3| name3|
|  4| name4|
|  5| name5|
|  6| name6|
|  7| name7|
|  8| name8|
|  9| name9|
| 10|name10|
| 11|name11|
| 12|name12|
| 13|name13|
| 14|name14|
| 15|name15|
| 16|name16|
| 17|name17|
| 18|name18|
| 19|name19|
+---+------+
only showing top 20 rows



In [16]:
df_in.rdd.getNumPartitions()

8

### Merging all files to single partition and then writing it to unbucket_demo1 table

In [17]:
df_in.coalesce(1).write.mode("overwrite").saveAsTable("unbucket_demo1")

In [19]:
df_in.coalesce(1).write.bucketBy(4,"id").sortBy("id").mode("overwrite").saveAsTable("bucket_demo1")

In [20]:
df_read_unbucket_table = spark.table("unbucket_demo1")

In [21]:
df_read_unbucket_table.where("id ='1000'").explain()

== Physical Plan ==
*(1) Project [id#25, name#26]
+- *(1) Filter (isnotnull(id#25) AND (id#25 = 1000))
   +- *(1) ColumnarToRow
      +- FileScan parquet default.unbucket_demo1[id#25,name#26] Batched: true, DataFilters: [isnotnull(id#25), (id#25 = 1000)], Format: Parquet, Location: InMemoryFileIndex[file:/home/atif/PycharmProjects/test/spark_codes/spark_learn/spark-warehouse/un..., PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,1000)], ReadSchema: struct<id:string,name:string>




In [22]:
df_read_bucket_table = spark.table("bucket_demo1")
df_read_bucket_table.where("id ='1000'").explain()

== Physical Plan ==
*(1) Project [id#29, name#30]
+- *(1) Filter (isnotnull(id#29) AND (id#29 = 1000))
   +- *(1) ColumnarToRow
      +- FileScan parquet default.bucket_demo1[id#29,name#30] Batched: true, DataFilters: [isnotnull(id#29), (id#29 = 1000)], Format: Parquet, Location: InMemoryFileIndex[file:/home/atif/PycharmProjects/test/spark_codes/spark_learn/spark-warehouse/bu..., PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,1000)], ReadSchema: struct<id:string,name:string>, SelectedBucketsCount: 1 out of 4




In [27]:
spark.sql("DESCRIBE EXTENDED bucket_demo1").show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|                  id|              string|   null|
|                name|              string|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|            Database|             default|       |
|               Table|        bucket_demo1|       |
|        Created Time|Wed Nov 18 22:52:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.0.1|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|         Num Buckets|                   4|       |
|      Bucket Columns|              [`id`]|       |
|        Sort Columns|              [`id`]|       |
|            Location|file:/home/atif/P...|       |
+--------------------+--------------------+-------+



In [25]:
spark.sql("DESCRIBE EXTENDED unbucket_demo1").show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|                  id|              string|   null|
|                name|              string|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|            Database|             default|       |
|               Table|      unbucket_demo1|       |
|        Created Time|Wed Nov 18 22:50:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.0.1|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|            Location|file:/home/atif/P...|       |
+--------------------+--------------------+-------+



In [30]:
spark.sql("SHOW CREATE TABLE UNBUCKET_DEMO1").show(truncate=0)

+----------------------------------------------------------------------------------------+
|createtab_stmt                                                                          |
+----------------------------------------------------------------------------------------+
|CREATE TABLE `default`.`UNBUCKET_DEMO1` (
  `id` STRING,
  `name` STRING)
USING parquet
|
+----------------------------------------------------------------------------------------+



In [31]:
spark.sql("SHOW CREATE TABLE BUCKET_DEMO1").show(truncate=0)

+--------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                        |
+--------------------------------------------------------------------------------------------------------------------------------------+
|CREATE TABLE `default`.`BUCKET_DEMO1` (
  `id` STRING,
  `name` STRING)
USING parquet
CLUSTERED BY (id)
SORTED BY (id)
INTO 4 BUCKETS
|
+--------------------------------------------------------------------------------------------------------------------------------------+

