- Date: 2020-09-05 13:04:03- Author: Ben Du- Date: 2020-09-01 16:31:49
- Title: Using Bucketing in Spark
- Slug: using-bucketing-in-spark
- Category: Computer Science
- Tags: Computer Science, Spark, big data, bucket, partition

## Tips and Traps

1. Bucketed column is only supported in Hive table at this time. 

2. Bucket for optimized filtering is available in Spark 2.4+.
    For examples,
    if the table `person` has a bucketed column `id` with an integer-compatible type,
    then the following query in Spark 2.4+ will be optimized to avoid a scan of the whole table.
    A few things to be aware here. 
    First, 
    you will still see a number of tasks close to the number of buckets in your Spark application.
    This is becuase the optimized job will still have to check all buckets of the table 
    to see whether they are the right bucket corresponding to `id=123`.
    (If yes, Spark will scan all rows in the bucket to filter records.
    If not, the bucket will skipped to save time.)
    Second, 
    the type of the value to compare must be compartible in order for Spark SQL to leverage bucket filtering.
    For example,
    if the `id` column in the `person` table is of the BigInt type 
    and `id = 123` is changed to `id = "123"` in the following query,
    Spark will have to do a full table scan (even if it sounds extremely stupid to do so).

        :::sql
        SELECT *
        FROM persons
        WHERE id = 123

## Bug in `DataFrame.write.partitionBy`

There is currently a bug in `DataFrame.write.partitionBy(col)`.
If there are `p` partitions in a DataFrame `df` and the columne `col` in df has `c` distinct values,
after calling `df.write.partitionBy(year).parquet("/path/to/output.parquet")`
there will be `c` partition directories each containing (up to) `p` files.
This means that the written table has effectively `c * p` partitions which is probably not what the user want.
The following examples illustrate this issue.

In [1]:
import findspark
findspark.init("/opt/spark-3.0.0-bin-hadoop3.2/")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("PySpark_Union") \
    .enableHiveSupport().getOrCreate()

In [15]:
df = spark.read.option("header", "true").csv("../../home/media/data/daily.csv")
df = df.select(
    year("date").alias("year"),
    "date",
    "x",
    "y",
    "z"
).repartition(2)
df.show()

+----+----------+------------------+------------------+------------------+
|year|      date|                 x|                 y|                 z|
+----+----------+------------------+------------------+------------------+
|2018|2018-10-17|11101.180000000006|243019.40156300002|            150.84|
|2018|2018-11-16|32534.199999999993|      322261.41246|191.16000000000003|
|2018|2018-11-29|39085.419999999984|454028.14863700006|            245.46|
|2018|2018-10-18|          10295.15|     230995.140043|122.76000000000002|
|2018|2018-11-02|          26508.74|326394.90205799986| 189.6599999999999|
|2018|2018-12-08|           15176.5|     309785.497157|            119.64|
|2018|2018-12-01| 38378.45999999999|     458313.700681|            231.66|
|2018|2018-11-15| 26415.35000000001|318606.79919499985|188.45999999999995|
|2018|2018-11-20|          30483.56|     336089.788803|144.96000000000004|
|2019|2019-01-07|          29843.17|     375139.756514|172.62000000000003|
|2018|2018-10-20|        

In [19]:
df.rdd.getNumPartitions()

2

In [21]:
df.write.mode("overwrite").partitionBy("year").parquet("daily.parquet")

In [23]:
!ls daily.parquet/year=2018

part-00000-261c528f-ef8b-414f-86a9-c38aa5fd736a.c000.snappy.parquet
part-00001-261c528f-ef8b-414f-86a9-c38aa5fd736a.c000.snappy.parquet


In [25]:
spark.read.parquet("daily.parquet").rdd.getNumPartitions()

4

In [26]:
df.write.mode("overwrite").partitionBy("year").saveAsTable("daily_hive")

In [28]:
spark.table("daily_hive").rdd.getNumPartitions()

4

In [29]:
df.createOrReplaceTempView("df")

In [35]:
spark.sql("""
    create table daily_hive_2
    using parquet     
    partitioned by (year) as
    select * from df
    """)

DataFrame[]

In [37]:
spark.table("daily_hive_2").rdd.getNumPartitions()

4

## Filtering Optimization Leveraging Bucketed Columns

### Spark 3

In [4]:
!/opt/pyenv/versions/3.7.8/bin/python -m pip install findspark

Defaulting to user installation because normal site-packages is not writeable
Collecting findspark
  Using cached findspark-1.4.2-py2.py3-none-any.whl (4.2 kB)
Installing collected packages: findspark
Successfully installed findspark-1.4.2
You should consider upgrading via the '/opt/pyenv/versions/3.7.8/bin/python -m pip install --upgrade pip' command.[0m


In [1]:
import findspark
findspark.init("/opt/spark-3.0.0-bin-hadoop3.2/")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark = SparkSession.builder.appName("PySpark_Union") \
    .enableHiveSupport().getOrCreate()

In [2]:
df = spark.read.option("header", "true").csv("../../home/media/data/daily.csv")
df.show()

+----------+------------------+------------------+------------------+
|      date|                 x|                 y|                 z|
+----------+------------------+------------------+------------------+
|2019-01-11|               0.0|               0.0|               0.0|
|2019-01-10| 30436.96000000001|               0.0|               0.0|
|2019-01-09|          30132.28|     212952.094433|            128.52|
|2019-01-08|29883.240000000005|      352014.45016|            192.18|
|2019-01-07|          29843.17|     375139.756514|172.62000000000003|
|2019-01-06|          29520.23| 420714.7821390001|            217.98|
|2019-01-05|          29308.36|376970.94769900007|             183.3|
|2019-01-04|31114.940000000013|339321.70448899985|174.59999999999997|
|2019-01-03|          30953.24|383834.70136999997|            197.52|
|2019-01-02|          29647.83|     379943.385348|             199.2|
|2019-01-01| 9098.830000000004|     221854.328826|             88.26|
|2018-12-31|3522.929

In [3]:
df.write.bucketBy(10, "date").saveAsTable("daily_b")

In [4]:
spark.table("daily_b").rdd.getNumPartitions()

10

Notice the execution plan does leverage bucketed columns for optimization.

In [5]:
spark.sql("""
    select 
        * 
    from 
        daily_b
    where
        date = "2019-01-11"
    """).explain()

== Physical Plan ==
*(1) Project [date#53, x#54, y#55, z#56]
+- *(1) Filter (isnotnull(date#53) AND (date#53 = 2019-01-11))
   +- *(1) ColumnarToRow
      +- FileScan parquet default.daily_b[date#53,x#54,y#55,z#56] Batched: true, DataFilters: [isnotnull(date#53), (date#53 = 2019-01-11)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark-3.0.0-bin-hadoop3.2/warehouse/daily_b], PartitionFilters: [], PushedFilters: [IsNotNull(date), EqualTo(date,2019-01-11)], ReadSchema: struct<date:string,x:string,y:string,z:string>, SelectedBucketsCount: 1 out of 10




### Spark 2.3

In [2]:
import findspark
findspark.init("/opt/spark-2.3.4-bin-hadoop2.7/")

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
spark23 = SparkSession.builder.appName("PySpark_Union") \
    .enableHiveSupport().getOrCreate()

In [5]:
df = spark23.read.option("header", "true").csv("../../home/media/data/daily.csv")
df.show()

+----------+------------------+------------------+------------------+
|      date|                 x|                 y|                 z|
+----------+------------------+------------------+------------------+
|2019-01-11|               0.0|               0.0|               0.0|
|2019-01-10| 30436.96000000001|               0.0|               0.0|
|2019-01-09|          30132.28|     212952.094433|            128.52|
|2019-01-08|29883.240000000005|      352014.45016|            192.18|
|2019-01-07|          29843.17|     375139.756514|172.62000000000003|
|2019-01-06|          29520.23| 420714.7821390001|            217.98|
|2019-01-05|          29308.36|376970.94769900007|             183.3|
|2019-01-04|31114.940000000013|339321.70448899985|174.59999999999997|
|2019-01-03|          30953.24|383834.70136999997|            197.52|
|2019-01-02|          29647.83|     379943.385348|             199.2|
|2019-01-01| 9098.830000000004|     221854.328826|             88.26|
|2018-12-31|3522.929

In [6]:
df.write.bucketBy(10, "date").saveAsTable("daily_b")

In [8]:
spark23.table("daily_b").rdd.getNumPartitions()

10

Notice the execution plan does not leverage bucketed columns for optimization.

In [9]:
spark23.sql("""
    select 
        * 
    from 
        daily_b
    where
        date = "2019-01-11"
    """).explain()

== Physical Plan ==
*(1) Project [date#44, x#45, y#46, z#47]
+- *(1) Filter (isnotnull(date#44) && (date#44 = 2019-01-11))
   +- *(1) FileScan parquet default.daily_b[date#44,x#45,y#46,z#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark-2.3.4-bin-hadoop2.7/warehouse/daily_b], PartitionFilters: [], PushedFilters: [IsNotNull(date), EqualTo(date,2019-01-11)], ReadSchema: struct<date:string,x:string,y:string,z:string>
