# Repartitioning Variables

# 1 Load Data

In [1]:
storageLocation = "s3://dimajix-training/data/weather"

## 1.1 Load Measurements

In [2]:
from pyspark.sql.functions import *
from functools import reduce

# Read in all years, store them in an Python array
raw_weather_per_year = [spark.read.text(storageLocation + "/" + str(i)).withColumn("year", lit(i)) for i in range(2003,2015)]

# Union all years together
raw_weather = reduce(lambda l,r: l.union(r), raw_weather_per_year)                        

Use a single year to keep execution plans small

In [3]:
raw_weather = spark.read.text(storageLocation + "/2003").withColumn("year", lit(2003))

In [4]:
weather = raw_weather.select(
    col("year"),
    substring(col("value"),5,6).alias("usaf"),
    substring(col("value"),11,5).alias("wban"),
    substring(col("value"),16,8).alias("date"),
    substring(col("value"),24,4).alias("time"),
    substring(col("value"),42,5).alias("report_type"),
    substring(col("value"),61,3).alias("wind_direction"),
    substring(col("value"),64,1).alias("wind_direction_qual"),
    substring(col("value"),65,1).alias("wind_observation"),
    (substring(col("value"),66,4).cast("float") / lit(10.0)).alias("wind_speed"),
    substring(col("value"),70,1).alias("wind_speed_qual"),
    (substring(col("value"),88,5).cast("float") / lit(10.0)).alias("air_temperature"),
    substring(col("value"),93,1).alias("air_temperature_qual")
)

## 1.2 Load Station Metadata

In [5]:
stations = spark.read \
    .option("header", True) \
    .csv(storageLocation + "/isd-history")

# 2 Partitions

In [6]:
weather.rdd.getNumPartitions()

5

In [7]:
weather_rep = weather.repartition(10, weather["usaf"], weather["wban"])
weather_rep.rdd.getNumPartitions()

10

## 2.1 Repartition & Joins

In [8]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

### Inspect execution plan of normal JOIN

In [9]:
result = weather.join(stations, (weather["usaf"] == stations["usaf"]) & (weather["wban"] == stations["wban"]))
result.explain()

== Physical Plan ==
*(5) SortMergeJoin [usaf#87, wban#88], [usaf#122, wban#123], Inner
:- *(2) Sort [usaf#87 ASC NULLS FIRST, wban#88 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(usaf#87, wban#88, 200)
:     +- *(1) Project [2003 AS year#84, substring(value#82, 5, 6) AS usaf#87, substring(value#82, 11, 5) AS wban#88, substring(value#82, 16, 8) AS date#89, substring(value#82, 24, 4) AS time#90, substring(value#82, 42, 5) AS report_type#91, substring(value#82, 61, 3) AS wind_direction#92, substring(value#82, 64, 1) AS wind_direction_qual#93, substring(value#82, 65, 1) AS wind_observation#94, (cast(cast(substring(value#82, 66, 4) as float) as double) / 10.0) AS wind_speed#95, substring(value#82, 70, 1) AS wind_speed_qual#96, (cast(cast(substring(value#82, 88, 5) as float) as double) / 10.0) AS air_temperature#97, substring(value#82, 93, 1) AS air_temperature_qual#98]
:        +- *(1) Filter (isnotnull(substring(value#82, 11, 5)) && isnotnull(substring(value#82, 5, 6)))
:    

### Pre-partition data (first try)

In [14]:
weather_rep = weather.repartition(200, weather["usaf"], weather["wban"])
weather_rep.cache()

DataFrame[year: int, usaf: string, wban: string, date: string, time: string, report_type: string, wind_direction: string, wind_direction_qual: string, wind_observation: string, wind_speed: double, wind_speed_qual: string, air_temperature: double, air_temperature_qual: string]

In [15]:
result = weather_rep.join(stations, (weather["usaf"] == stations["usaf"]) & (weather["wban"] == stations["wban"]))
result.explain()

== Physical Plan ==
*(4) SortMergeJoin [usaf#87, wban#88], [usaf#122, wban#123], Inner
:- *(1) Sort [usaf#87 ASC NULLS FIRST, wban#88 ASC NULLS FIRST], false, 0
:  +- *(1) Filter (isnotnull(wban#88) && isnotnull(usaf#87))
:     +- InMemoryTableScan [year#84, usaf#87, wban#88, date#89, time#90, report_type#91, wind_direction#92, wind_direction_qual#93, wind_observation#94, wind_speed#95, wind_speed_qual#96, air_temperature#97, air_temperature_qual#98], [isnotnull(wban#88), isnotnull(usaf#87)]
:           +- InMemoryRelation [year#84, usaf#87, wban#88, date#89, time#90, report_type#91, wind_direction#92, wind_direction_qual#93, wind_observation#94, wind_speed#95, wind_speed_qual#96, air_temperature#97, air_temperature_qual#98], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
:                 +- Exchange hashpartitioning(usaf#87, wban#88, 200)
:                    +- *(1) Project [2003 AS year#84, substring(value#82, 5, 6) AS usaf#87, substring(value#82, 11, 5) AS wban#

Note the uncached sort and filter!

### Pre-partition data (second try)

In [16]:
weather_rep = weather.repartition(200, weather["usaf"], weather["wban"]) \
    .orderBy(weather["usaf"], weather["wban"])
weather_rep.cache()

DataFrame[year: int, usaf: string, wban: string, date: string, time: string, report_type: string, wind_direction: string, wind_direction_qual: string, wind_observation: string, wind_speed: double, wind_speed_qual: string, air_temperature: double, air_temperature_qual: string]

In [17]:
result = weather_rep.join(stations, (weather["usaf"] == stations["usaf"]) & (weather["wban"] == stations["wban"]))
result.explain()

== Physical Plan ==
*(5) SortMergeJoin [usaf#87, wban#88], [usaf#122, wban#123], Inner
:- *(2) Sort [usaf#87 ASC NULLS FIRST, wban#88 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(usaf#87, wban#88, 200)
:     +- *(1) Filter (isnotnull(wban#88) && isnotnull(usaf#87))
:        +- InMemoryTableScan [year#84, usaf#87, wban#88, date#89, time#90, report_type#91, wind_direction#92, wind_direction_qual#93, wind_observation#94, wind_speed#95, wind_speed_qual#96, air_temperature#97, air_temperature_qual#98], [isnotnull(wban#88), isnotnull(usaf#87)]
:              +- InMemoryRelation [year#84, usaf#87, wban#88, date#89, time#90, report_type#91, wind_direction#92, wind_direction_qual#93, wind_observation#94, wind_speed#95, wind_speed_qual#96, air_temperature#97, air_temperature_qual#98], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
:                    +- *(2) Sort [usaf#87 ASC NULLS FIRST, wban#88 ASC NULLS FIRST], true, 0
:                       +- Exchange rang

Note the two sorts!

### Pre-partition data (final try)

In [36]:
weather_rep = weather.repartition(200, weather["usaf"], weather["wban"]) \
    .sortWithinPartitions(weather["usaf"], weather["wban"])
weather_rep.cache()

DataFrame[year: int, usaf: string, wban: string, date: string, time: string, report_type: string, wind_direction: string, wind_direction_qual: string, wind_observation: string, wind_speed: double, wind_speed_qual: string, air_temperature: double, air_temperature_qual: string]

In [22]:
result = weather_rep.join(stations, (weather["usaf"] == stations["usaf"]) & (weather["wban"] == stations["wban"]))
result.explain()

== Physical Plan ==
*(4) SortMergeJoin [usaf#87, wban#88], [usaf#122, wban#123], Inner
:- *(1) Filter (isnotnull(wban#88) && isnotnull(usaf#87))
:  +- InMemoryTableScan [year#84, usaf#87, wban#88, date#89, time#90, report_type#91, wind_direction#92, wind_direction_qual#93, wind_observation#94, wind_speed#95, wind_speed_qual#96, air_temperature#97, air_temperature_qual#98], [isnotnull(wban#88), isnotnull(usaf#87)]
:        +- InMemoryRelation [year#84, usaf#87, wban#88, date#89, time#90, report_type#91, wind_direction#92, wind_direction_qual#93, wind_observation#94, wind_speed#95, wind_speed_qual#96, air_temperature#97, air_temperature_qual#98], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
:              +- *(2) Sort [usaf#87 ASC NULLS FIRST, wban#88 ASC NULLS FIRST], false, 0
:                 +- Exchange hashpartitioning(usaf#87, wban#88, 200)
:                    +- *(1) Project [2003 AS year#84, substring(value#82, 5, 6) AS usaf#87, substring(value#82, 11, 5) AS

### Inspect WebUI

Phase 1: Build cache

In [23]:
result.count()

1798753

Phase 2: Use cache

In [24]:
result.count()

1798753

# 3 Repartition & Aggregations

## 3.1 Simple Aggregation

In [53]:
result = weather.groupBy(weather["usaf"], weather["wban"]).agg(
        min(when(joined.air_temperature_qual == lit(1), joined.air_temperature)).alias('min_temp'),
        max(when(joined.air_temperature_qual == lit(1), joined.air_temperature)).alias('max_temp'),
)
result.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[usaf#87, wban#88], functions=[min(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END), max(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END)])
+- Exchange hashpartitioning(usaf#87, wban#88, 200)
   +- *(1) HashAggregate(keys=[usaf#87, wban#88], functions=[partial_min(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END), partial_max(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END)])
      +- *(1) Project [substring(value#82, 5, 6) AS usaf#87, substring(value#82, 11, 5) AS wban#88, (cast(cast(substring(value#82, 88, 5) as float) as double) / 10.0) AS air_temperature#97, substring(value#82, 93, 1) AS air_temperature_qual#98]
         +- *(1) FileScan text [value#82] Batched: false, Format: Text, Location: InMemoryFileIndex[s3://dimajix-training/data/weather/2003], PartitionFilters: [], PushedFilters: [], ReadSchema: struct

## 3.2 Aggregation after repartition

In [54]:
result = weather_rep.groupBy(weather["usaf"], weather["wban"]).agg(
        min(when(joined.air_temperature_qual == lit(1), joined.air_temperature)).alias('min_temp'),
        max(when(joined.air_temperature_qual == lit(1), joined.air_temperature)).alias('max_temp'),
)
result.explain()

== Physical Plan ==
*(1) HashAggregate(keys=[usaf#87, wban#88], functions=[min(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END), max(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END)])
+- *(1) HashAggregate(keys=[usaf#87, wban#88], functions=[partial_min(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END), partial_max(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END)])
   +- InMemoryTableScan [usaf#87, wban#88, air_temperature#97, air_temperature_qual#98]
         +- InMemoryRelation [year#84, usaf#87, wban#88, date#89, time#90, report_type#91, wind_direction#92, wind_direction_qual#93, wind_observation#94, wind_speed#95, wind_speed_qual#96, air_temperature#97, air_temperature_qual#98], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(2) Sort [usaf#87 ASC NULLS FIRST, wban#88 ASC NULLS FIRST], false, 0
                  +- E

# 4 Interaction between Join, Aggregate & Repartition

## 4.1 Aggregation after Join on same key

In [47]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [48]:
joined = weather.join(stations, (weather["usaf"] == stations["usaf"]) & (weather["wban"] == stations["wban"]))
result = joined.groupBy(weather["usaf"], weather["wban"]).agg(
        min(when(joined.air_temperature_qual == lit(1), joined.air_temperature)).alias('min_temp'),
        max(when(joined.air_temperature_qual == lit(1), joined.air_temperature)).alias('max_temp'),
)
result.explain()

== Physical Plan ==
*(5) HashAggregate(keys=[usaf#87, wban#88], functions=[min(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END), max(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END)])
+- *(5) HashAggregate(keys=[usaf#87, wban#88], functions=[partial_min(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END), partial_max(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END)])
   +- *(5) Project [usaf#87, wban#88, air_temperature#97, air_temperature_qual#98]
      +- *(5) SortMergeJoin [usaf#87, wban#88], [usaf#122, wban#123], Inner
         :- *(2) Sort [usaf#87 ASC NULLS FIRST, wban#88 ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(usaf#87, wban#88, 200)
         :     +- *(1) Project [substring(value#82, 5, 6) AS usaf#87, substring(value#82, 11, 5) AS wban#88, (cast(cast(substring(value#82, 88, 5) as float) as double) / 10.0) AS air_temperature#

## 4.2 Aggregation after Join using repartitioned data

In [51]:
joined = weather_rep.join(stations, (weather["usaf"] == stations["usaf"]) & (weather["wban"] == stations["wban"]))
result = joined.groupBy(weather["usaf"], weather["wban"]).agg(
        min(when(joined.air_temperature_qual == lit(1), joined.air_temperature)).alias('min_temp'),
        max(when(joined.air_temperature_qual == lit(1), joined.air_temperature)).alias('max_temp'),
)
result.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[usaf#87, wban#88], functions=[min(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END), max(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END)])
+- *(2) HashAggregate(keys=[usaf#87, wban#88], functions=[partial_min(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END), partial_max(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END)])
   +- *(2) Project [usaf#87, wban#88, air_temperature#97, air_temperature_qual#98]
      +- *(2) BroadcastHashJoin [usaf#87, wban#88], [usaf#122, wban#123], Inner, BuildRight
         :- *(2) Filter (isnotnull(wban#88) && isnotnull(usaf#87))
         :  +- InMemoryTableScan [usaf#87, wban#88, air_temperature#97, air_temperature_qual#98], [isnotnull(wban#88), isnotnull(usaf#87)]
         :        +- InMemoryRelation [year#84, usaf#87, wban#88, date#89, time#90, report_type#91, wind_direction#92, w

## 4.3 Aggregation after Join with different key

In [52]:
joined = weather.join(stations, (weather["usaf"] == stations["usaf"]) & (weather["wban"] == stations["wban"]))
result = joined.groupBy(stations["ctry"]).agg(
        min(when(joined.air_temperature_qual == lit(1), joined.air_temperature)).alias('min_temp'),
        max(when(joined.air_temperature_qual == lit(1), joined.air_temperature)).alias('max_temp'),
)
result.explain()

== Physical Plan ==
*(3) HashAggregate(keys=[ctry#125], functions=[min(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END), max(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END)])
+- Exchange hashpartitioning(ctry#125, 200)
   +- *(2) HashAggregate(keys=[ctry#125], functions=[partial_min(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END), partial_max(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END)])
      +- *(2) Project [air_temperature#97, air_temperature_qual#98, CTRY#125]
         +- *(2) BroadcastHashJoin [usaf#87, wban#88], [usaf#122, wban#123], Inner, BuildRight
            :- *(2) Project [substring(value#82, 5, 6) AS usaf#87, substring(value#82, 11, 5) AS wban#88, (cast(cast(substring(value#82, 88, 5) as float) as double) / 10.0) AS air_temperature#97, substring(value#82, 93, 1) AS air_temperature_qual#98]
            :  +- *(2) Filter (isnotnull(subst

## 4.4 Aggregation after Broadcast-Join 

In [49]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100*1024*1024)

In [50]:
joined = weather.join(stations, (weather["usaf"] == stations["usaf"]) & (weather["wban"] == stations["wban"]))
result = joined.groupBy(weather["usaf"], weather["wban"]).agg(
        min(when(joined.air_temperature_qual == lit(1), joined.air_temperature)).alias('min_temp'),
        max(when(joined.air_temperature_qual == lit(1), joined.air_temperature)).alias('max_temp'),
)
result.explain()

== Physical Plan ==
*(3) HashAggregate(keys=[usaf#87, wban#88], functions=[min(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END), max(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END)])
+- Exchange hashpartitioning(usaf#87, wban#88, 200)
   +- *(2) HashAggregate(keys=[usaf#87, wban#88], functions=[partial_min(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END), partial_max(CASE WHEN (cast(air_temperature_qual#98 as int) = 1) THEN air_temperature#97 END)])
      +- *(2) Project [usaf#87, wban#88, air_temperature#97, air_temperature_qual#98]
         +- *(2) BroadcastHashJoin [usaf#87, wban#88], [usaf#122, wban#123], Inner, BuildRight
            :- *(2) Project [substring(value#82, 5, 6) AS usaf#87, substring(value#82, 11, 5) AS wban#88, (cast(cast(substring(value#82, 88, 5) as float) as double) / 10.0) AS air_temperature#97, substring(value#82, 93, 1) AS air_temperature_qual#98]
            : 

# 5 Coalesce

In [38]:
weather_rep.rdd.getNumPartitions()

200

## 5.1 Merge Partitions using coalesce

In [39]:
weather_small = weather_rep.coalesce(16)
weather_small.explain()

== Physical Plan ==
Coalesce 16
+- InMemoryTableScan [year#84, usaf#87, wban#88, date#89, time#90, report_type#91, wind_direction#92, wind_direction_qual#93, wind_observation#94, wind_speed#95, wind_speed_qual#96, air_temperature#97, air_temperature_qual#98]
      +- InMemoryRelation [year#84, usaf#87, wban#88, date#89, time#90, report_type#91, wind_direction#92, wind_direction_qual#93, wind_observation#94, wind_speed#95, wind_speed_qual#96, air_temperature#97, air_temperature_qual#98], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(2) Sort [usaf#87 ASC NULLS FIRST, wban#88 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(usaf#87, wban#88, 200)
                  +- *(1) Project [2003 AS year#84, substring(value#82, 5, 6) AS usaf#87, substring(value#82, 11, 5) AS wban#88, substring(value#82, 16, 8) AS date#89, substring(value#82, 24, 4) AS time#90, substring(value#82, 42, 5) AS report_type#91, substring(value#82, 61, 3) AS wind_d

### Inspect WebUI

In [40]:
weather_rep.count()

1798753

## 5.2 Saving files

In [41]:
weather_rep.write.mode("overwrite").parquet("/tmp/weather_rep")

In [43]:
%%bash
hdfs dfs -ls /tmp/weather_rep

Found 91 items
-rw-r--r--   1 hadoop hadoop          0 2018-10-07 07:17 /tmp/weather_rep/_SUCCESS
-rw-r--r--   1 hadoop hadoop       1337 2018-10-07 07:16 /tmp/weather_rep/part-00000-19014412-a1e6-4348-a41d-49986590b40b-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop      24241 2018-10-07 07:16 /tmp/weather_rep/part-00003-19014412-a1e6-4348-a41d-49986590b40b-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop      63340 2018-10-07 07:17 /tmp/weather_rep/part-00005-19014412-a1e6-4348-a41d-49986590b40b-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop      32695 2018-10-07 07:17 /tmp/weather_rep/part-00006-19014412-a1e6-4348-a41d-49986590b40b-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     126661 2018-10-07 07:17 /tmp/weather_rep/part-00011-19014412-a1e6-4348-a41d-49986590b40b-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop      89610 2018-10-07 07:17 /tmp/weather_rep/part-00013-19014412-a1e6-4348-a41d-49986590b40b-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop      73063 2018-10-07

In [42]:
weather_small.write.mode("overwrite").parquet("/tmp/weather_small")

In [44]:
%%bash
hdfs dfs -ls /tmp/weather_small

Found 17 items
-rw-r--r--   1 hadoop hadoop          0 2018-10-07 07:17 /tmp/weather_small/_SUCCESS
-rw-r--r--   1 hadoop hadoop     290888 2018-10-07 07:17 /tmp/weather_small/part-00000-31d2cdf1-532c-4549-b706-d040d0a0921b-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     539188 2018-10-07 07:17 /tmp/weather_small/part-00001-31d2cdf1-532c-4549-b706-d040d0a0921b-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     490533 2018-10-07 07:17 /tmp/weather_small/part-00002-31d2cdf1-532c-4549-b706-d040d0a0921b-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     338415 2018-10-07 07:17 /tmp/weather_small/part-00003-31d2cdf1-532c-4549-b706-d040d0a0921b-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     460959 2018-10-07 07:17 /tmp/weather_small/part-00004-31d2cdf1-532c-4549-b706-d040d0a0921b-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     358779 2018-10-07 07:17 /tmp/weather_small/part-00005-31d2cdf1-532c-4549-b706-d040d0a0921b-c000.snappy.parquet
-rw-r--r--   1 hadoop hadoop     394