# Bucketing

We already saw that using a prepartitioned DataFrame for joins and grouped aggregations can accelerate execution time, but so far the prepartitioning had to be performed every time data is loaded from disk. It would be really nice, if there was some way to store prepartitioned data, such that Spark understands which columns were used to create the partitions.

This is where bucketing comes into play, which is a special way to store data, such that Spark actually understands the partitioning schema.

### Adjust Spark config
Again, we need to disable automatic broadcast joins and make sure that bucketing is enabled.

In [1]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.sources.bucketing.enabled", True)

# 1 Load Data

First we load the weather data, which consists of the measurement data and some station metadata.

In [2]:
storageLocation = "s3://dimajix-training/data/weather"

## 1.1 Load Measurements

Measurements are stored in multiple directories (one per year). But we will limit ourselves to a single year in the analysis to improve readability of execution plans.

In [3]:
from functools import reduce

from pyspark.sql.functions import *


# Read in all years, store them in an Python array
raw_weather_per_year = [
    spark.read.text(storageLocation + "/" + str(i)).withColumn("year", lit(i))
    for i in range(2003, 2015)
]

# Union all years together
raw_weather = reduce(lambda l, r: l.union(r), raw_weather_per_year)

Use a single year to keep execution plans small

In [4]:
raw_weather = spark.read.text(storageLocation + "/2003").withColumn("year", lit(2003))

### Extract Measurements

Measurements were stored in a proprietary text based format, with some values at fixed positions. We need to extract these values with a simple `SELECT` statement.

In [5]:
weather = raw_weather.select(
    col("year"),
    substring(col("value"), 5, 6).alias("usaf"),
    substring(col("value"), 11, 5).alias("wban"),
    substring(col("value"), 16, 8).alias("date"),
    substring(col("value"), 24, 4).alias("time"),
    substring(col("value"), 42, 5).alias("report_type"),
    substring(col("value"), 61, 3).alias("wind_direction"),
    substring(col("value"), 64, 1).alias("wind_direction_qual"),
    substring(col("value"), 65, 1).alias("wind_observation"),
    (substring(col("value"), 66, 4).cast("float") / lit(10.0)).alias("wind_speed"),
    substring(col("value"), 70, 1).alias("wind_speed_qual"),
    (substring(col("value"), 88, 5).cast("float") / lit(10.0)).alias("air_temperature"),
    substring(col("value"), 93, 1).alias("air_temperature_qual"),
)

## 1.2 Load Station Metadata

We also need to load the weather station meta data containing information about the geo location, country etc of individual weather stations.

In [6]:
stations = spark.read.option("header", True).csv(storageLocation + "/isd-history")

# 2 Bucketing Data

Now we want to create a so called *bucketed table* of the weather measurements data. Bucketing is only possible within Hive, because additional meta data about the bucketing is required. That meta data is not stored on HDFS but persisted in the Hive metastore instead.

## 2.1 Create Hive Table

A bucketed Hive table can easily be created from within Spark by using the `bucketBy` and optionally `sortBy` method of the `DataFrameWriter` class.

In [8]:
weather.write.bucketBy(200, "usaf", "wban").sortBy("usaf", "wban").mode(
    "overwrite"
).saveAsTable("weather_buckets")

## 2.2 Inspect Table

We can inspect the Hive table, which unverils that both bucketing columns and sorting columns are present in the Hive table.

In [9]:
spark.sql("DESCRIBE EXTENDED weather_buckets").show(100)

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|                year|                 int|   null|
|                usaf|              string|   null|
|                wban|              string|   null|
|                date|              string|   null|
|                time|              string|   null|
|         report_type|              string|   null|
|      wind_direction|              string|   null|
| wind_direction_qual|              string|   null|
|    wind_observation|              string|   null|
|          wind_speed|              double|   null|
|     wind_speed_qual|              string|   null|
|     air_temperature|              double|   null|
|air_temperature_qual|              string|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|            Database|             default|       |
|           

### `CREATE TABLE` statement

We could also have used Hive SQL to create the table. Let's retrieve the statement via SQL `SHOW CREATE TABLE`

In [34]:
stmts = spark.sql("SHOW CREATE TABLE weather_buckets").collect()
print(stmts[0][0])

CREATE TABLE `weather_buckets` (`year` INT, `usaf` STRING, `wban` STRING, `date` STRING, `time` STRING, `report_type` STRING, `wind_direction` STRING, `wind_direction_qual` STRING, `wind_observation` STRING, `wind_speed` DOUBLE, `wind_speed_qual` STRING, `air_temperature` DOUBLE, `air_temperature_qual` STRING)
USING parquet
OPTIONS (
  `serialization.format` '1'
)
CLUSTERED BY (usaf, wban)
SORTED BY (usaf, wban)
INTO 200 BUCKETS



### Inspect Files
Of couse there also need to be some files in HDFS now. These are stored in the directory `/user/hive/warehouse/weather_buckets`

In [19]:
!hdfs dfs -ls /user/hive/warehouse/weather_buckets

Found 107 items
-rw-r--r--   1 hadoop hadoop          0 2018-10-20 07:24 /user/hive/warehouse/weather_buckets/_SUCCESS
-rw-r--r--   1 hadoop hadoop      89610 2018-10-20 07:24 /user/hive/warehouse/weather_buckets/part-00000-569127b5-0a94-4e65-ba66-dbea0babe322_00013.c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop      70655 2018-10-20 07:24 /user/hive/warehouse/weather_buckets/part-00000-569127b5-0a94-4e65-ba66-dbea0babe322_00016.c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop      84538 2018-10-20 07:24 /user/hive/warehouse/weather_buckets/part-00000-569127b5-0a94-4e65-ba66-dbea0babe322_00034.c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop      73316 2018-10-20 07:24 /user/hive/warehouse/weather_buckets/part-00000-569127b5-0a94-4e65-ba66-dbea0babe322_00035.c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop      80773 2018-10-20 07:24 /user/hive/warehouse/weather_buckets/part-00000-569127b5-0a94-4e65-ba66-dbea0babe322_00046.c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop      

# 3 Bucketing & Joins

Now we can perform the same join again, but this time against the bucketed version of the weather table.

## 3.1 Normal Join

To see the effect of bucketing, we first perform a traditional join to see the execution plan as a reference.

In [7]:
result = weather.join(
    stations,
    (weather["usaf"] == stations["usaf"]) & (weather["wban"] == stations["wban"]),
)
result.explain()

== Physical Plan ==
*(5) SortMergeJoin [usaf#87, wban#88], [usaf#122, wban#123], Inner
:- *(2) Sort [usaf#87 ASC NULLS FIRST, wban#88 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(usaf#87, wban#88, 200)
:     +- *(1) Project [2003 AS year#84, substring(value#82, 5, 6) AS usaf#87, substring(value#82, 11, 5) AS wban#88, substring(value#82, 16, 8) AS date#89, substring(value#82, 24, 4) AS time#90, substring(value#82, 42, 5) AS report_type#91, substring(value#82, 61, 3) AS wind_direction#92, substring(value#82, 64, 1) AS wind_direction_qual#93, substring(value#82, 65, 1) AS wind_observation#94, (cast(cast(substring(value#82, 66, 4) as float) as double) / 10.0) AS wind_speed#95, substring(value#82, 70, 1) AS wind_speed_qual#96, (cast(cast(substring(value#82, 88, 5) as float) as double) / 10.0) AS air_temperature#97, substring(value#82, 93, 1) AS air_temperature_qual#98]
:        +- *(1) Filter (isnotnull(substring(value#82, 11, 5)) && isnotnull(substring(value#82, 5, 6)))
:    

## 3.2 Bucketed Join

Now we want to replace the original `weather` DataFrame by a bucketed version. This means that first we have to create a bucketed version in HDFS. This is only possible by creating a Hive table, since this is the only way to persist the bucketing information as table properties.

In [10]:
weather_hive = spark.read.table("weather_buckets")
result = weather_hive.join(
    stations,
    (weather_hive["usaf"] == stations["usaf"])
    & (weather_hive["wban"] == stations["wban"]),
)
result.explain()

== Physical Plan ==
*(4) SortMergeJoin [usaf#248, wban#249], [usaf#122, wban#123], Inner
:- *(1) Sort [usaf#248 ASC NULLS FIRST, wban#249 ASC NULLS FIRST], false, 0
:  +- *(1) Project [year#247, usaf#248, wban#249, date#250, time#251, report_type#252, wind_direction#253, wind_direction_qual#254, wind_observation#255, wind_speed#256, wind_speed_qual#257, air_temperature#258, air_temperature_qual#259]
:     +- *(1) Filter (isnotnull(wban#249) && isnotnull(usaf#248))
:        +- *(1) FileScan parquet default.weather_buckets[year#247,usaf#248,wban#249,date#250,time#251,report_type#252,wind_direction#253,wind_direction_qual#254,wind_observation#255,wind_speed#256,wind_speed_qual#257,air_temperature#258,air_temperature_qual#259] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-10-200-101-213.eu-central-1.compute.internal:8020/user/hive/warehouse..., PartitionFilters: [], PushedFilters: [IsNotNull(wban), IsNotNull(usaf)], ReadSchema: struct<year:int,usaf:string,wban:strin

#### Remarks
The execution plan now looks differently than before.
* The station meta data is still shuffled (we didn't bucketize it)
* The weather data does not require a shuffle any more, the join can be executed almost directly (A sort will still be performed, maybe a bug?)

### Bucketing Strategie

The following attributes have to match
* bucketing columns
* number of buckets = number of partitions

# 4 Bucketing & Aggregation

Similar to `JOIN` operations, grouped aggregations (`GROUP BY`) also require a shuffle operation. Again this can be avoided if a Hive table is used that is already bucketed according to the grouping columns.

## 4.1 Normal Aggregation

First let us analyze the execution plan of a normal grouped aggregation operation without a bucketed table. This will result in an execution plan containing a shuffle operation

In [13]:
result = weather.groupBy(weather["usaf"], weather["wban"]).agg(
    min(when(weather.air_temperature_qual == lit(1), weather.air_temperature)).alias(
        'min_temp'
    ),
    max(when(weather.air_temperature_qual == lit(1), weather.air_temperature)).alias(
        'max_temp'
    ),
)
result.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[usaf#87, wban#88], functions=[min(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END), max(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END)])
+- Exchange hashpartitioning(usaf#87, wban#88, 200)
   +- *(1) HashAggregate(keys=[usaf#87, wban#88], functions=[partial_min(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END), partial_max(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END)])
      +- *(1) Project [substring(value#82, 5, 6) AS usaf#87, substring(value#82, 11, 5) AS wban#88, (cast(cast(substring(value#82, 88, 5) as float) as double) / 10.0) AS air_temperature#97, substring(value#82, 93, 1) AS air_temperature_qual#98]
         +- *(1) FileScan text [value#82] Batched: false, Format: Text, Location: InMemoryFileIndex[s3://dimajix-training/data/weather/2003], PartitionFilters: [], PushedFilters: [], ReadSchema: struct

### Remarks
As expected the execution plan has three steps related to the grouped aggregation:
1. Partial aggregate (`HashAggregate`)
2. Shuffle operation (`Exchange hashpartitioning`)
3. Final aggregate (`HashAggregate`)

## 4.2 Bucketed Aggregation

Now let's perform the same operation, but this time using the bucketed Hive table.

In [11]:
result = weather_hive.groupBy(weather_hive["usaf"], weather_hive["wban"]).agg(
    min(
        when(weather_hive.air_temperature_qual == lit(1), weather_hive.air_temperature)
    ).alias('min_temp'),
    max(
        when(weather_hive.air_temperature_qual == lit(1), weather_hive.air_temperature)
    ).alias('max_temp'),
)
result.explain()

== Physical Plan ==
*(1) HashAggregate(keys=[usaf#248, wban#249], functions=[min(CASE WHEN (cast(air_temperature_qual#259 as int) = 1) THEN air_temperature#258 END), max(CASE WHEN (cast(air_temperature_qual#259 as int) = 1) THEN air_temperature#258 END)])
+- *(1) HashAggregate(keys=[usaf#248, wban#249], functions=[partial_min(CASE WHEN (cast(air_temperature_qual#259 as int) = 1) THEN air_temperature#258 END), partial_max(CASE WHEN (cast(air_temperature_qual#259 as int) = 1) THEN air_temperature#258 END)])
   +- *(1) FileScan parquet default.weather_buckets[usaf#248,wban#249,air_temperature#258,air_temperature_qual#259] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-10-200-101-213.eu-central-1.compute.internal:8020/user/hive/warehouse..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<usaf:string,wban:string,air_temperature:double,air_temperature_qual:string>


### Remarks
As we hoped for, Spark will not perform a shuffle operation any more, since the data is already partitioned as needed. The execution plan now only contains two steps for implementing the grouped aggregation
1. Partial aggregate (`HashAggregate`)
2. Final aggregate (`HashAggregate`)

# 5 Bucketing & Filtering

Unfortunately Spark does not use bucketing information for filtering yet. Let's prove that by a simple example.

## 5.1 Filter without bucketing

Let read in the raw data and add a filter operation that refers to the bucketing columns.

In [35]:
result = weather.where("usaf = '123' AND wban='456'")
result.explain()

== Physical Plan ==
*(1) Project [2003 AS year#84, substring(value#82, 5, 6) AS usaf#87, substring(value#82, 11, 5) AS wban#88, substring(value#82, 16, 8) AS date#89, substring(value#82, 24, 4) AS time#90, substring(value#82, 42, 5) AS report_type#91, substring(value#82, 61, 3) AS wind_direction#92, substring(value#82, 64, 1) AS wind_direction_qual#93, substring(value#82, 65, 1) AS wind_observation#94, (cast(cast(substring(value#82, 66, 4) as float) as double) / 10.0) AS wind_speed#95, substring(value#82, 70, 1) AS wind_speed_qual#96, (cast(cast(substring(value#82, 88, 5) as float) as double) / 10.0) AS air_temperature#97, substring(value#82, 93, 1) AS air_temperature_qual#98]
+- *(1) Filter ((isnotnull(value#82) && (substring(value#82, 5, 6) = 123)) && (substring(value#82, 11, 5) = 456))
   +- *(1) FileScan text [value#82] Batched: false, Format: Text, Location: InMemoryFileIndex[s3://dimajix-training/data/weather/2003], PartitionFilters: [], PushedFilters: [IsNotNull(value)], ReadSch

In [36]:
result.count()

0

## 5.2 Filter with bucketing

Now let's try the same example, but this time we use the bucketed Hive table instead of the raw data.

In [23]:
result = weather_hive.where("usaf = '123' AND wban='456'")
result.explain()

== Physical Plan ==
*(1) Project [year#247, usaf#248, wban#249, date#250, time#251, report_type#252, wind_direction#253, wind_direction_qual#254, wind_observation#255, wind_speed#256, wind_speed_qual#257, air_temperature#258, air_temperature_qual#259]
+- *(1) Filter (((isnotnull(usaf#248) && isnotnull(wban#249)) && (usaf#248 = 123)) && (wban#249 = 456))
   +- *(1) FileScan parquet default.weather_buckets[year#247,usaf#248,wban#249,date#250,time#251,report_type#252,wind_direction#253,wind_direction_qual#254,wind_observation#255,wind_speed#256,wind_speed_qual#257,air_temperature#258,air_temperature_qual#259] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-10-200-101-213.eu-central-1.compute.internal:8020/user/hive/warehouse..., PartitionFilters: [], PushedFilters: [IsNotNull(usaf), IsNotNull(wban), EqualTo(usaf,123), EqualTo(wban,456)], ReadSchema: struct<year:int,usaf:string,wban:string,date:string,time:string,report_type:string,wind_direction...


In [24]:
result.count()

0

### Remarks
The execution plan contains *`PushedFilters`*, but the Spark web ui will reveil, that these filters are only pushed down to the parquet reader and Spark still reads all files.