d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px; height: 163px">
</div>

# 2.6 Broadcast Joins

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) In this notebook you:<br>
* Compare and contrast broadcast and shuffle joins
* Examine the Physical Plan that is generated for your queries
* Optimize joins using Broadcast joins

In [0]:
%run ../Includes/Classroom-Setup

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Broadcast and Shuffle Joins

-sandbox
## Standard Join

* In a standard join, **ALL** the data is shuffled.
* This can be really expensive.

<img src="https://files.training.databricks.com/images/eLearning/ucdavis/join-shuffle.png" alt="Join" style="max-height:500px"/>

-sandbox
## Broadcast Join
* In a Broadcast Join, only the "small" data is moved.
* It duplicates the "small" data across all executors.
* But the "big" data is left untouched.
* If the "small" data is small enough, this can be **VERY** efficient.

<img src="https://files.training.databricks.com/images/eLearning/ucdavis/join-broadcast.png" alt="Broadcast Join" style="max-height:500px"/>

In brief, joins are _very_ expensive operations.  This becomes increasingly apparent in big data environments where joins involve transferring data across a network.

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Examining Physical Plans

Let's make sure our data is accessible.

In [0]:
%sql
USE databricks;

DESCRIBE fireCalls

col_name,data_type,comment
Call Number,int,
Unit ID,string,
Incident Number,int,
Call Type,string,
Call Date,string,
Watch Date,string,
Received DtTm,string,
Entry DtTm,string,
Dispatch DtTm,string,
Response DtTm,string,


Now create the table `fireCallsParquet`.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fireCallsParquet
USING Parquet 
OPTIONS (
    path "/mnt/davis/fire-calls/fire-calls-8p.parquet"
  )

We can join these two datasets and examine the physical plan (how data is physically affected) using `EXPLAIN`.

In [0]:
%sql
EXPLAIN 
  SELECT * 
  FROM fireCalls 
  JOIN fireCallsParquet on fireCalls.`Call Number` = fireCallsParquet.`Call_Number`

plan
"== Physical Plan == *(3) SortMergeJoin [Call Number#1833], [Call_Number#1762], Inner :- Sort [Call Number#1833 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(Call Number#1833, 200) : +- *(1) Project [Call Number#1833, Unit ID#1834, Incident Number#1835, Call Type#1836, Call Date#1837, Watch Date#1838, Received DtTm#1839, Entry DtTm#1840, Dispatch DtTm#1841, Response DtTm#1842, On Scene DtTm#1843, Transport DtTm#1844, Hospital DtTm#1845, Call Final Disposition#1846, Available DtTm#1847, Address#1848, City#1849, Zipcode of Incident#1850, Battalion#1851, Station Area#1852, Box#1853, Original Priority#1854, Priority#1855, Final Priority#1856, ... 10 more fields] : +- *(1) Filter isnotnull(Call Number#1833) : +- *(1) FileScan csv databricks.firecalls[Call Number#1833,Unit ID#1834,Incident Number#1835,Call Type#1836,Call Date#1837,Watch Date#1838,Received DtTm#1839,Entry DtTm#1840,Dispatch DtTm#1841,Response DtTm#1842,On Scene DtTm#1843,Transport DtTm#1844,Hospital DtTm#1845,Call Final Disposition#1846,Available DtTm#1847,Address#1848,City#1849,Zipcode of Incident#1850,Battalion#1851,Station Area#1852,Box#1853,Original Priority#1854,Priority#1855,Final Priority#1856,... 10 more fields] Batched: false, DataFilters: [isnotnull(Call Number#1833)], Format: CSV, Location: InMemoryFileIndex[dbfs:/mnt/davis/fire-calls/fire-calls-truncated-comma.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Call Number)], ReadSchema: struct"


#### Automatic and Manual broadcasting

- Depending on size of the data that is being loaded into Spark, Spark uses internal heuristics to decide how to join that data to other data.
- Automatic broadcast depends on `spark.sql.autoBroadcastJoinThreshold`
    - The setting configures the **maximum size in bytes** for a table that will be broadcast to all worker nodes when performing a join 
    - Default is 10MB

- A `broadcast` function can be used in Spark to instruct Catalyst that it should probably broadcast one of the tables that is being joined. 

If the `broadcast` hint isn't used, but one side of the join is small enough (i.e., its size is below the threshold), that data source will be read into
the Driver and broadcast to all Executors.

In [0]:
%python
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")

Now take a look at the physical plan when we broadcast one of the datasets.  The broadcast join hint is going to operate like a SQL hint, but Spark will still parse this even though it is commented out.

In [0]:
%sql
EXPLAIN 
  SELECT /*+ BROADCAST(fireCalls) */ * 
  FROM fireCalls 
  JOIN fireCallsParquet on fireCalls.`Call Number` = fireCallsParquet.`Call_Number`


plan
"== Physical Plan == *(2) BroadcastHashJoin [Call Number#1833], [Call_Number#1762], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) : +- *(1) Project [Call Number#1833, Unit ID#1834, Incident Number#1835, Call Type#1836, Call Date#1837, Watch Date#1838, Received DtTm#1839, Entry DtTm#1840, Dispatch DtTm#1841, Response DtTm#1842, On Scene DtTm#1843, Transport DtTm#1844, Hospital DtTm#1845, Call Final Disposition#1846, Available DtTm#1847, Address#1848, City#1849, Zipcode of Incident#1850, Battalion#1851, Station Area#1852, Box#1853, Original Priority#1854, Priority#1855, Final Priority#1856, ... 10 more fields] : +- *(1) Filter isnotnull(Call Number#1833) : +- *(1) FileScan csv databricks.firecalls[Call Number#1833,Unit ID#1834,Incident Number#1835,Call Type#1836,Call Date#1837,Watch Date#1838,Received DtTm#1839,Entry DtTm#1840,Dispatch DtTm#1841,Response DtTm#1842,On Scene DtTm#1843,Transport DtTm#1844,Hospital DtTm#1845,Call Final Disposition#1846,Available DtTm#1847,Address#1848,City#1849,Zipcode of Incident#1850,Battalion#1851,Station Area#1852,Box#1853,Original Priority#1854,Priority#1855,Final Priority#1856,... 10 more fields] Batched: false, DataFilters: [isnotnull(Call Number#1833)], Format: CSV, Location: InMemoryFileIndex[dbfs:/mnt/davis/fire-calls/fire-calls-truncated-comma.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Call Number)], ReadSchema: struct"


-sandbox
&copy; 2020 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>