# Spark SQL and Joining Distributed Data Frames

**Alessandro Gagliardi**  
_Lead Professor_  
[GalvanizeU](http://www.galvanizeu.com/) powered by [University of New Haven](http://galvanizeu.newhaven.edu/)  
Twitter: [`@MadDataScience`](https://twitter.com/MadDataScience)

### Overview

1. A (very) brief history of SQL and the relational model
2. Scaling and NoSQL
3. Spark SQL to the rescue
4. The Trouble with Joins

### You are expected to already:

1. Be familiar with the basics of SQL including JOIN
2. Be familiar with the basics of Apache Spark including RDDs and Map and Reduce operations

### By the end of this workshop, you should be able to:

1. Place SQL in its historical context with regard to databases and distributed architectures
2. Explain the difference between NoSQL and NOSQL
3. Identify which JOIN method to use on distributed data

## DB Evolution

#### 1960s - B.C. (Before Codd)

- Hierarchical data structure (IBM IMS)
- Network data structure (CODASYL)

#### 1970s - The Birth of the Relational Model
- [_A Relational Model of Data for Large Shared Data Banks_](https://www.seas.upenn.edu/~zives/03f/cis550/codd.pdf) by E.F. Codd (1970)
- Relational Model: Relations, Tuples, and Attributes
- Structured Query Language (SQL): Tables, Rows, and Columns

#### 1980s - Commercialization of RDBMS
- [_Principles of Transaction-Oriented Database Recovery_](https://web.stanford.edu/class/cs340v/papers/recovery.pdf) by Haerder & Reuter (1983)
- Atomicity, Consistency, Isolation, Durability (ACID)
- Oracle, IBM DB2, Sybase, Microsoft SQL Server_etc._

#### 1990s - PC and Open Source RDBMS
- [_The Third Manifesto_](http://www09.sigmod.org/sigmod/record/issues/9503/manifesto.ps) by Darwen & Date (1995)
- MySQL and PostgreSQL
- LAMP (Linux, Apache, MySQL, PHP) Stack

#### 2000s - Begining of Big Data
- [_Towards Robust Distributed Systems_](https://people.eecs.berkeley.edu/~brewer/cs262b-2004/PODC-keynote.pdf) by Eric Brewer (2000)
- Consistency, Availability, and Partition tolerance (CAP)
- Google File System, Bigtable, MapReduce, _etc._

#### 2010s - Open Source Big Data

- Distributed Data Analysis - Hadoop, Spark, Storm, _etc._
- NoSQL (or more appropriately 'NoREL') - Cassandra, Mongo, Riak, _etc._

### The Relational Model in SQL

#### CRUD

- **C**reate - **INSERT**
- **R**ead   - **SELECT**
- **U**pdate - **UPDATE**
- **D**elete - **DELETE**

#### Operations on Relations

1. Permutation
2. Projection
3. Join
4. Composition
5. Restriction

##### SQL Example: 
```sql
  
SELECT customers.name, -- permuation and projection
     visits.created_at 
FROM visits JOIN customers -- join and composition
     USING (customer_id)    
WHERE customer_id = 1  -- restriction
```

##### Which operation do you think is the most challenging to implement in a distributed architecture?

Q: Are joins slow? <br /> A: It depends.

#### Join Algorithms

- Nested Loop Join
  - Pseudocode
  
  ```
  For each tuple r in R do
     For each tuple s in S do
        If r and s satisfy the join condition
           Then output the tuple <r,s>
           
  ```  
  - Time: `O(R*S)`

- Sort-Merge Join
  - If sorted: `O(R+S)`
  - If not sorted: `O(R*log(R)+S*log(S))`

- Hash Join
  1. Build hash table (_i.e._ key-value store) of smaller relation in memory
  2. Scan larger relation for relevant rows

## NoSQL

NoSQL means NoREL  
NoREL means NoJOIN

[CQL (Cassandra Query Language) example](https://docs.datastax.com/en/cql/3.0/cql/cql_reference/select_r.html):
```sql
SELECT select_expression
FROM keyspace_name.table_name
WHERE relation AND relation ... 
ORDER BY ( clustering_column ( ASC | DESC )...)
LIMIT n
```

Q: What do you do if you can't join?  
A: Denormalize!  
Q: What's wrong with denormalization?

In [None]:
{u'contributors': None,
 u'coordinates': None,
 u'created_at': u'Fri Jan 23 04:23:34 +0000 2015',
 u'entities': {u'hashtags': [{u'indices': [84, 95], u'text': u'500kLubaTV'}],
  u'media': [{u'display_url': u'pic.twitter.com/1zhgdIaxsl',
    u'expanded_url': u'http://twitter.com/LubaTV/status/558474724968529921/photo/1',
    u'id': 558474718849007616L,
    u'id_str': u'558474718849007616',
    u'indices': [96, 118],
    u'media_url': u'http://pbs.twimg.com/media/B8AZzEEIAAAFGY0.jpg',
    u'media_url_https': u'https://pbs.twimg.com/media/B8AZzEEIAAAFGY0.jpg',
    u'sizes': {u'large': {u'h': 270, u'resize': u'fit', u'w': 750},
     u'medium': {u'h': 216, u'resize': u'fit', u'w': 600},
     u'small': {u'h': 122, u'resize': u'fit', u'w': 340},
     u'thumb': {u'h': 150, u'resize': u'crop', u'w': 150}},
    u'source_status_id': 558474724968529921L,
    u'source_status_id_str': u'558474724968529921',
    u'source_user_id': 121517065,
    u'source_user_id_str': u'121517065',
    u'type': u'photo',
    u'url': u'http://t.co/1zhgdIaxsl'}],
  u'symbols': [],
  u'urls': [],
  u'user_mentions': [{u'id': 121517065,
    u'id_str': u'121517065',
    u'indices': [3, 10],
    u'name': u'Lucas Feuersch\xfctte',
    u'screen_name': u'LubaTV'}]},
 u'favorite_count': 0,
 u'favorited': False,
 u'geo': None,
 u'id': 558480131778691072L,
 u'id_str': u'558480131778691072',
 u'in_reply_to_screen_name': None,
 u'in_reply_to_status_id': None,
 u'in_reply_to_status_id_str': None,
 u'in_reply_to_user_id': None,
 u'in_reply_to_user_id_str': None,
 u'lang': u'pt',
 u'metadata': {u'iso_language_code': u'pt', u'result_type': u'recent'},
 u'place': None,
 u'possibly_sensitive': False,
 u'retweet_count': 395,
 u'retweeted': False,
 u'retweeted_status': {u'contributors': None,
  u'coordinates': None,
  u'created_at': u'Fri Jan 23 04:02:05 +0000 2015',
  u'entities': {u'hashtags': [{u'indices': [72, 83], u'text': u'500kLubaTV'}],
   u'media': [{u'display_url': u'pic.twitter.com/1zhgdIaxsl',
     u'expanded_url': u'http://twitter.com/LubaTV/status/558474724968529921/photo/1',
     u'id': 558474718849007616L,
     u'id_str': u'558474718849007616',
     u'indices': [84, 106],
     u'media_url': u'http://pbs.twimg.com/media/B8AZzEEIAAAFGY0.jpg',
     u'media_url_https': u'https://pbs.twimg.com/media/B8AZzEEIAAAFGY0.jpg',
     u'sizes': {u'large': {u'h': 270, u'resize': u'fit', u'w': 750},
      u'medium': {u'h': 216, u'resize': u'fit', u'w': 600},
      u'small': {u'h': 122, u'resize': u'fit', u'w': 340},
      u'thumb': {u'h': 150, u'resize': u'crop', u'w': 150}},
     u'type': u'photo',
     u'url': u'http://t.co/1zhgdIaxsl'}],
   u'symbols': [],
   u'urls': [],
   u'user_mentions': []},
  u'favorite_count': 901,
  u'favorited': False,
  u'geo': None,
  u'id': 558474724968529921L,
  u'id_str': u'558474724968529921',
  u'in_reply_to_screen_name': None,
  u'in_reply_to_status_id': None,
  u'in_reply_to_status_id_str': None,
  u'in_reply_to_user_id': None,
  u'in_reply_to_user_id_str': None,
  u'lang': u'pt',
  u'metadata': {u'iso_language_code': u'pt', u'result_type': u'recent'},
  u'place': None,
  u'possibly_sensitive': False,
  u'retweet_count': 395,
  u'retweeted': False,
  u'source': u'<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>',
  u'text': u"\xc9 oficial, turma. Somos 500 mil agora! Obrigado, obrigado por tudo. =') #500kLubaTV http://t.co/1zhgdIaxsl",
  u'truncated': False,
  u'user': {u'contributors_enabled': False,
   u'created_at': u'Tue Mar 09 19:04:06 +0000 2010',
   u'default_profile': False,
   u'default_profile_image': False,
   u'description': u'Sou da turma! \u270c\ufe0f Contato: lubatv3@gmail.com | Instagram & Facebook: LubaTV | Caixa Postal 174, 88701-970 Tubar\xe3o - SC | \xdaltimo v\xeddeo: \u2b07\ufe0f',
   u'entities': {u'description': {u'urls': []},
    u'url': {u'urls': [{u'display_url': u'youtu.be/ntMBRQYs-hI',
       u'expanded_url': u'http://youtu.be/ntMBRQYs-hI',
       u'indices': [0, 22],
       u'url': u'http://t.co/4n2e3pI9To'}]}},
   u'favourites_count': 722,
   u'follow_request_sent': False,
   u'followers_count': 118167,
   u'following': False,
   u'friends_count': 548,
   u'geo_enabled': True,
   u'id': 121517065,
   u'id_str': u'121517065',
   u'is_translation_enabled': False,
   u'is_translator': False,
   u'lang': u'pt',
   u'listed_count': 261,
   u'location': u'',
   u'name': u'Lucas Feuersch\xfctte',
   u'notifications': False,
   u'profile_background_color': u'FFF04D',
   u'profile_background_image_url': u'http://pbs.twimg.com/profile_background_images/520656124547067904/of-yDc4j.jpeg',
   u'profile_background_image_url_https': u'https://pbs.twimg.com/profile_background_images/520656124547067904/of-yDc4j.jpeg',
   u'profile_background_tile': True,
   u'profile_banner_url': u'https://pbs.twimg.com/profile_banners/121517065/1419199713',
   u'profile_image_url': u'http://pbs.twimg.com/profile_images/553302094132690944/cEZfRo8r_normal.png',
   u'profile_image_url_https': u'https://pbs.twimg.com/profile_images/553302094132690944/cEZfRo8r_normal.png',
   u'profile_link_color': u'0099CC',
   u'profile_location': None,
   u'profile_sidebar_border_color': u'FFFFFF',
   u'profile_sidebar_fill_color': u'DDEEF6',
   u'profile_text_color': u'333333',
   u'profile_use_background_image': True,
   u'protected': False,
   u'screen_name': u'LubaTV',
   u'statuses_count': 36586,
   u'time_zone': u'Brasilia',
   u'url': u'http://t.co/4n2e3pI9To',
   u'utc_offset': -7200,
   u'verified': False}},
 u'source': u'<a href="http://twitter.com" rel="nofollow">Twitter Web Client</a>',
 u'text': u"RT @LubaTV: \xc9 oficial, turma. Somos 500 mil agora! Obrigado, obrigado por tudo. =') #500kLubaTV http://t.co/1zhgdIaxsl",
 u'truncated': False,
 u'user': {u'contributors_enabled': False,
  u'created_at': u'Thu Dec 20 20:15:56 +0000 2012',
  u'default_profile': False,
  u'default_profile_image': False,
  u'description': u'F\xe3 de uns carinhas ai do YouTube \u2665 1/1/15 - Rafa respondeu *u*                       19/01/15 - Nay Respondeu *u* #Forfun\xe1tica',
  u'entities': {u'description': {u'urls': []}},
  u'favourites_count': 596,
  u'follow_request_sent': False,
  u'followers_count': 36,
  u'following': False,
  u'friends_count': 94,
  u'geo_enabled': False,
  u'id': 1024945802,
  u'id_str': u'1024945802',
  u'is_translation_enabled': False,
  u'is_translator': False,
  u'lang': u'pt',
  u'listed_count': 1,
  u'location': u'Rio de Janeiro',
  u'name': u'Biah #ADR ',
  u'notifications': False,
  u'profile_background_color': u'ACDED6',
  u'profile_background_image_url': u'http://abs.twimg.com/images/themes/theme18/bg.gif',
  u'profile_background_image_url_https': u'https://abs.twimg.com/images/themes/theme18/bg.gif',
  u'profile_background_tile': False,
  u'profile_banner_url': u'https://pbs.twimg.com/profile_banners/1024945802/1421771885',
  u'profile_image_url': u'http://pbs.twimg.com/profile_images/556923388615090176/UiQb8z54_normal.jpeg',
  u'profile_image_url_https': u'https://pbs.twimg.com/profile_images/556923388615090176/UiQb8z54_normal.jpeg',
  u'profile_link_color': u'9266CC',
  u'profile_location': None,
  u'profile_sidebar_border_color': u'FFFFFF',
  u'profile_sidebar_fill_color': u'F6F6F6',
  u'profile_text_color': u'333333',
  u'profile_use_background_image': True,
  u'protected': False,
  u'screen_name': u'_beatriz_27',
  u'statuses_count': 838,
  u'time_zone': u'Brasilia',
  u'url': None,
  u'utc_offset': -7200,
  u'verified': False}}

(to be fair, normalized data can be quite complex too)

Q: What's the *real* problem with denormalization?  
A: It limits the kinds of questions you can ask.

## Spark SQL to the rescue!

Spark SQL is not an RDBMS

Rather (like Hive) merely translates SQL queries into jobs that can be run against a distributed file system
* Hive translates HQL into Hadoop MapReduce jobs
* Spark SQL translates SQL into Spark a DAG of transformations (including `map()` and `reduce()`)

Spark SQL employs SchemaRDDs, more specifically Spark DataFrames.

What is a DataFrame?

- DataFrames are the primary abstraction in Spark SQL.

- Think of a DataFrames as RDDs with schema. 

What is a schema?

- Schemas are metadata about your data.

- Schemas define table names, column names, and column types over your
  data.

- Schemas enable using SQL and DataFrame syntax to query your RDDs,
  instead of using column positions.

Spark SQL Using CSV
-------------------

To proceed with the demo, please go to [http://tinyurl.com/sparksql16](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/6780843632664124/3482466428365889/2932358616588278/latest.html) and import the notebook into your own [Databricks](https://community.cloud.databricks.com) cloud.

How can I pull in my CSV data and use Spark SQL on it?

- Make sure the CSV exists.

In [2]:
%%writefile sales.csv
#ID,Date,Store,State,Product,Amount
101,11/13/2014,100,WA,331,300.00
104,11/18/2014,700,OR,329,450.00
102,11/15/2014,203,CA,321,200.00
106,11/19/2014,202,CA,331,330.00
103,11/17/2014,101,WA,373,750.00
105,11/19/2014,202,CA,321,200.00

Overwriting sales.csv


- Read the file and convert columns to right types.

In [3]:
import pyspark
sc = pyspark.SparkContext()
sqlContext = pyspark.SQLContext(sc)

In [4]:
rdd = sc.textFile('sales.csv')\
    .filter(lambda line: not line.startswith('#'))\
    .map(lambda line: line.split(','))\
    .map(lambda \
      (id,date,store,state,product,amount):\
      (int(id),date,int(store),state,int(product),float(amount)))
rdd.collect()

[(101, u'11/13/2014', 100, u'WA', 331, 300.0),
 (104, u'11/18/2014', 700, u'OR', 329, 450.0),
 (102, u'11/15/2014', 203, u'CA', 321, 200.0),
 (106, u'11/19/2014', 202, u'CA', 331, 330.0),
 (103, u'11/17/2014', 101, u'WA', 373, 750.0),
 (105, u'11/19/2014', 202, u'CA', 321, 200.0)]

- Import data types.

In [5]:
from pyspark.sql.types import *

- Define a schema.

In [6]:
schema = StructType( [
    StructField('id',IntegerType(),True),
    StructField('date',StringType(),True),
    StructField('store',IntegerType(),True),
    StructField('state',StringType(),True),
    StructField('product',IntegerType(),True),
    StructField('amount',FloatType(),True) ] )

- Define the DataFrame object.

In [7]:
df = sqlContext.createDataFrame(rdd, schema)
df.show()

+---+----------+-----+-----+-------+------+
| id|      date|store|state|product|amount|
+---+----------+-----+-----+-------+------+
|101|11/13/2014|  100|   WA|    331| 300.0|
|104|11/18/2014|  700|   OR|    329| 450.0|
|102|11/15/2014|  203|   CA|    321| 200.0|
|106|11/19/2014|  202|   CA|    331| 330.0|
|103|11/17/2014|  101|   WA|    373| 750.0|
|105|11/19/2014|  202|   CA|    321| 200.0|
+---+----------+-----+-----+-------+------+



<!--
Pop Quiz
--------

<details><summary>
What change do we have to make to the code above if we are
processing a TSV file instead of a CSV file?
</summary>
<br>
Replace `line.split(',')` with `line.split('\t')`
</details>
-->
Using SQL With DataFrames
-------------------------

How can I run SQL queries on DataFrames?

- Register the table with SqlContext.

In [8]:
df.registerTempTable('sales')

- Run queries on the registered tables.

In [9]:
sqlContext.sql('SELECT state,amount from sales where amount > 100').show()

+-----+------+
|state|amount|
+-----+------+
|   WA| 300.0|
|   OR| 450.0|
|   CA| 200.0|
|   CA| 330.0|
|   WA| 750.0|
|   CA| 200.0|
+-----+------+



# **The Trouble with Joins**

There are certain times when you may have to customize joins performed by Spark. One of those options is the `BroadcastHashJoin` while the other is the `ShuffledHashJoin`.
* Enabling `BroadcastHashJoin` can optimize joining a large and a small table in Spark SQL.
* This notebook will cover the how to configure a `BroadcastHashJoin` and why to choose it over a `ShuffledHashJoin`.

The following is adapted from [this presentation](http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs).

### Join a Large Table with a Small Table

```sql
SELECT * 
  FROM people_in_the_us 
  JOIN states
  ON people_in_the_us.state = states.name
```  
<BR />  
  
* **`ShuffledHashJoin?`**
* **`BroadcastHashJoin?`**

![ShuffledHashJoin](http://image.slidesharecdn.com/stratasj-everydayimshuffling-tipsforwritingbettersparkprograms-150223113317-conversion-gate02/95/everyday-im-shuffling-tips-for-writing-better-spark-programs-strata-san-jose-2015-13-638.jpg?cb=1427111079)

![BroadcastHashJoin](http://image.slidesharecdn.com/stratasj-everydayimshuffling-tipsforwritingbettersparkprograms-150223113317-conversion-gate02/95/everyday-im-shuffling-tips-for-writing-better-spark-programs-strata-san-jose-2015-14-638.jpg?cb=1427111079)

### Join a Medium Table with a Huge Table
```sql
SELECT *
  FROM people_in_california
  LEFT JOIN all_the_people_in_the_world
  ON people_in_california.id = all_the_people_in_the_world.id
```  
  
**Final output keys = keys `people_in_california`, so this doesn't need a huge Spark cluster, right?**

![Left Join - Shuffle Step](http://image.slidesharecdn.com/stratasj-everydayimshuffling-tipsforwritingbettersparkprograms-150223113317-conversion-gate02/95/everyday-im-shuffling-tips-for-writing-better-spark-programs-strata-san-jose-2015-17-638.jpg?cb=1427111079)

![What's a Better Solution?](http://image.slidesharecdn.com/stratasj-everydayimshuffling-tipsforwritingbettersparkprograms-150223113317-conversion-gate02/95/everyday-im-shuffling-tips-for-writing-better-spark-programs-strata-san-jose-2015-18-638.jpg?cb=1427111079)

### **Practice:** Create a large table that will be joined with a smaller table.

In [10]:
from pyspark.sql import Row

array = []
for i in range(0, 1000000):
  array.append(Row(num=i, bit = i % 2))
  
dataFrame = sqlContext.createDataFrame(sc.parallelize(array))
dataFrame.repartition(100).registerTempTable("my_large_table")

In [11]:
dataFrame.show()

+---+---+
|bit|num|
+---+---+
|  0|  0|
|  1|  1|
|  0|  2|
|  1|  3|
|  0|  4|
|  1|  5|
|  0|  6|
|  1|  7|
|  0|  8|
|  1|  9|
|  0| 10|
|  1| 11|
|  0| 12|
|  1| 13|
|  0| 14|
|  1| 15|
|  0| 16|
|  1| 17|
|  0| 18|
|  1| 19|
+---+---+
only showing top 20 rows



### By default, Spark will not use BroadcastHashJoin to join this table with a small table.

In [12]:
from pyspark.sql import Row

array = []
for i in range(0, 2):
  array.append(Row(bit=i))
  
dataFrame = sqlContext.createDataFrame(sc.parallelize(array))
dataFrame.registerTempTable("my_small_temp_table")

#### Tip: `EXPLAIN` can be used to print out the Spark execution plan for a Spark SQL query.

In [13]:
e = sqlContext.sql('''EXPLAIN SELECT * 
    FROM my_large_table 
    JOIN my_small_temp_table 
    ON my_large_table.bit = my_small_temp_table.bit''').collect()
print(e[0]['plan'])

== Physical Plan ==
*SortMergeJoin [bit#29L], [bit#40L], Inner
:- *Sort [bit#29L ASC], false, 0
:  +- Exchange hashpartitioning(bit#29L, 200)
:     +- Exchange RoundRobinPartitioning(100)
:        +- *Filter isnotnull(bit#29L)
:           +- Scan ExistingRDD[bit#29L,num#30L]
+- *Sort [bit#40L ASC], false, 0
   +- Exchange hashpartitioning(bit#40L, 200)
      +- *Filter isnotnull(bit#40L)
         +- Scan ExistingRDD[bit#40L]


### **ShuffleHashJoin** and **SortMergeJoin** are not a great ways to join the two tables above.
*  It will take all the rows in `my_large_table` and shuffle them with the "bit" key.
* **NOTE:** There will only be **2** non-empty partitions for the whole table, and adding more worker nodes to the job would not help.

In [14]:
def output_index_and_count(index, iter):
  count = 0
  for item in iter:
    count += 1
  yield (index, count)

sqlContext.sql("""SELECT * 
    FROM my_large_table 
    JOIN my_small_temp_table 
    ON my_large_table.bit = my_small_temp_table.bit""")\
  .rdd.mapPartitionsWithIndex(output_index_and_count)\
      .filter(lambda p: p[1] > 0).collect()

[(5, 500000), (69, 500000)]

### **BroadcastHashJoin** is a much better way to join these two tables.
* SparkSQL detects that one table is small enough to broadcast.
* The small table is broadcast to each Spark worker and joined with each element in the larger RDD.
* There will be more than 2 non-empty partitions, so Spark would run faster with more than two worker nodes.

In order to use `BroadcastHashJoin`, save the table with DataFrames API. In that way spark will automatically figure out that the table is small enough to be broadcasted.

In [15]:
sqlContext.sql('DROP TABLE IF EXISTS my_small_table')

DataFrame[]

In [16]:
from pyspark.sql import Row

array = []
for i in range(0, 2):
  array.append(Row(bit=i))
  
dataFrame = sqlContext.createDataFrame(sc.parallelize(array))
dataFrame.write.saveAsTable("my_small_table")

### Use an `EXPLAIN` command to see the BroadcastHashJoin

In [17]:
e = sqlContext.sql('''EXPLAIN SELECT * 
    FROM my_large_table 
    JOIN my_small_table 
    ON my_large_table.bit = my_small_table.bit''').collect()
print(e[0]['plan'])

== Physical Plan ==
*BroadcastHashJoin [bit#29L], [bit#64L], Inner, BuildRight
:- Exchange RoundRobinPartitioning(100)
:  +- *Filter isnotnull(bit#29L)
:     +- Scan ExistingRDD[bit#29L,num#30L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
   +- *Project [bit#64L]
      +- *Filter isnotnull(bit#64L)
         +- *BatchedScan parquet default.my_small_table[bit#64L] Format: ParquetFormat, InputPaths: file:/Users/alessandro/Dropbox/MadDataScience.github.io/spark-warehouse/my_small_table, PushedFilters: [IsNotNull(bit)], ReadSchema: struct<bit:bigint>


If you didn't see a `BroadcastHashJoin` - uncomment and run the following cell:
* Prior to Spark 1.4, you must manually run an analyze on the table.
* With Spark 1.4 or greater - tables created with Hive DDL's rather than the Dataframes API may also need an analyze command.

In [19]:
# Uncomment the following command and run it if you did not see a BroadcastHashJoin.
# sqlContext.sql('ANALYZE TABLE my_small_table COMPUTE STATISTICS noscan').collect()

### Join of the tables with BroadcastHashJoin

See how there are now 100 evenly split partitions in the RDD.

In [20]:
broadcastHashJoinRdd = sqlContext.sql("""SELECT * 
    FROM my_large_table 
    JOIN my_small_table 
    ON my_large_table.bit = my_small_table.bit""").rdd

In [21]:
broadcastHashJoinRdd.mapPartitionsWithIndex(output_index_and_count).filter(lambda p: p[1] > 0).collect()

[(0, 10000),
 (1, 10000),
 (2, 10000),
 (3, 10000),
 (4, 10000),
 (5, 10000),
 (6, 10000),
 (7, 10000),
 (8, 10000),
 (9, 10001),
 (10, 10001),
 (11, 10001),
 (12, 10001),
 (13, 10001),
 (14, 10001),
 (15, 10001),
 (16, 10001),
 (17, 10000),
 (18, 10000),
 (19, 10000),
 (20, 10000),
 (21, 10000),
 (22, 10000),
 (23, 10000),
 (24, 10000),
 (25, 10000),
 (26, 10000),
 (27, 10000),
 (28, 10000),
 (29, 10000),
 (30, 10000),
 (31, 10000),
 (32, 10000),
 (33, 10000),
 (34, 10000),
 (35, 10001),
 (36, 10001),
 (37, 10001),
 (38, 10001),
 (39, 10001),
 (40, 10001),
 (41, 10001),
 (42, 10000),
 (43, 10000),
 (44, 10000),
 (45, 10000),
 (46, 10000),
 (47, 10000),
 (48, 10000),
 (49, 10000),
 (50, 10000),
 (51, 10000),
 (52, 10000),
 (53, 10000),
 (54, 10000),
 (55, 10000),
 (56, 10000),
 (57, 10000),
 (58, 10000),
 (59, 10000),
 (60, 10000),
 (61, 10001),
 (62, 10001),
 (63, 10001),
 (64, 10001),
 (65, 10000),
 (66, 10000),
 (67, 9999),
 (68, 9999),
 (69, 9999),
 (70, 9999),
 (71, 9999),
 (72, 9

**Cleanup: delete any tables that were created for this example.**

In [22]:
sqlContext.sql('DROP TABLE IF EXISTS my_small_table')

DataFrame[]