In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('joins').master('local').getOrCreate()

In [25]:
df1 = spark.range(10)
df2= spark.range(10000)

# Broadcast Hash Join
In this join strategy, when one of the dataframe to be joined is smaller, then instead of shuffling we will broadcast the smaller dataframe to all the worker nodes and it will be joined internally with the other dataframe inside workers.

There is a threshold which specifies the maximum size of the dataframe that can be broadcasted, it is 10MB by default but we can update it to maximum  of 8GB.

In [26]:
# we can see that default is set to 10MB
spark.conf.get('spark.sql.autoBroadcastJoinThreshold')

'10485760b'

In [28]:
df1.join(df2,df1.id==df2.id,'inner').explain()

#we can see that spark automatically performing broadcast join as the size of df1 is less than threshold of 10MB


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [id#179L], [id#181L], Inner, BuildLeft, false
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=594]
   :  +- Range (0, 10, step=1, splits=1)
   +- Range (0, 10000, step=1, splits=1)




In [29]:
# If we want spark not to apply broadcast join, we can do that by setting the threshold to -1

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df1.join(df2,df1.id==df2.id,'inner').explain()

#now we can see that sort merge join is happening

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#179L], [id#181L], Inner
   :- Range (0, 10, step=1, splits=1)
   +- Range (0, 10000, step=1, splits=1)




In [30]:
# even after setting threshold to -1 to disable automatic broadcast join, we can override it by explicitly
#by using broadcast function to broadcast the smaller dataframe
from pyspark.sql.functions import broadcast

df2.join(broadcast(df1),df1.id==df2.id,'inner').explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [id#181L], [id#179L], Inner, BuildRight, false
   :- Range (0, 10000, step=1, splits=1)
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=621]
      +- Range (0, 10, step=1, splits=1)




Only supported for equi-joins, while the join keys do not need to be sortable.

Supported for all join types except full outer joins.

In case of left outer and right outer joins, the small table should not be present in left incase of left join and on right in case of right join, else the small table will not be broadcasted.

BHJ usually performs faster than the other join algorithms when the broadcast side is small. However, broadcasting tables is a network-intensive operation and it could cause OOM or perform badly in some cases, especially when the build/broadcast side is big.

Even when the two dataframes size is less than threshold, then the smaller dataframe among the two will be broadcasted.

# Shuffle Hash Join
Shuffles data across the executors based on join key to gather data having same join keys into the same executor. Then performs hash join on the data inside the partition.

For spark to prefer this join, we need to disable both broadcast hash join and sort merge join.

In [47]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.join.preferSortMergeJoin", False)

In [51]:
a= spark.range(100)
b=spark.range(90)
a.join(b,a.id==b.id,'inner').explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#324L], [id#326L], Inner
   :- Range (0, 100, step=1, splits=1)
   +- Range (0, 90, step=1, splits=1)




Even though when we set sortmerge to be false, spark can still force it based on certain factors.
For shuffle hashed join to be preferred, along with sortmerge join set to false, we need to set other configurations accordingly.
Spark will prefer shuffle hash join only when hashing a certain partition is less costlier than sorting it. If our dataframes are both of nearly same sizes, then shuffle hashed join will not be prefer as sorting is much cheaper than hashing in this case.
However, we have a parameter called shuffledHashJoinFactor which is default 3, which tells us that size of one dataframe should be 3 times more than size of other.

https://stackoverflow.com/questions/57987613/spark-sortmergejoin-is-not-changing-to-shufflehashjoin

Another factor is size of a partition to be hashed should be lesser than broadcastthreshold * shufflepartitions, in our case if we set broadcast threshold to -1 or 0 then it will always be false, hence shuffle hash joined will not be preferred. Hence we need to set the broadcastThreshold to atleast 2 or 3.


In [50]:
# we can see that default value is 3
spark.conf.get("spark.sql.shuffledHashJoinFactor")

'3'

In [52]:
#now lets set our configurations for shuffle hashed joined to be preferred.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 2)
spark.conf.set("spark.sql.join.preferSortMergeJoin", False)

a1= spark.range(100)
b1=spark.range(9)
a1.join(b1,a1.id==b1.id,'inner').explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- ShuffledHashJoin [id#332L], [id#334L], Inner, BuildRight
   :- Range (0, 100, step=1, splits=1)
   +- Range (0, 9, step=1, splits=1)




we can see that shuffle hash join is preferred here as one dataframe is three time less size than other and also other configurations are set accordingly.

The join keys need NOT be sortable
This join supports only equi joins. Also all types of joins excluding full outer.
This is a costly operations as it involves both shuffling and hashing and hash tables takes both memory and computation.

# Shuffle Sort Merge Join
Involved shuffling the data to get the same join keys in the same executors and then sort-merge join on them.
Join keys should be sortable in this unlike the above two.

In [53]:
# Although this join is default, lets us change the configuration back to True as we had changed it to false.
spark.conf.set("spark.sql.join.preferSortMergeJoin", True)

In [55]:
dataf = spark.createDataFrame([(1,"Shreyas"),(2,"Jayaram")],["Id","Name"])
datag = spark.createDataFrame([(1,25),(2,60)],["Id","Age"])

dataf.join(datag,['id'],'inner').explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Id#348L, Name#349, Age#353L]
   +- SortMergeJoin [Id#348L], [Id#352L], Inner
      :- Sort [Id#348L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(Id#348L, 200), ENSURE_REQUIREMENTS, [plan_id=956]
      :     +- Filter isnotnull(Id#348L)
      :        +- Scan ExistingRDD[Id#348L,Name#349]
      +- Sort [Id#352L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(Id#352L, 200), ENSURE_REQUIREMENTS, [plan_id=957]
            +- Filter isnotnull(Id#352L)
               +- Scan ExistingRDD[Id#352L,Age#353L]




We can see that data is shuffled first and then sorted and sortmerge is applied.

Works only for equijoins.

Supports all types of joins including full outer.

# Cartesian Join
Cartesian product join ( Shuffle and Replication Nested loop Join) works very similar to broadcast nested loop join except the dataframe is not brodcasted.

Shuffle and replication does not mean a true shuffle as in records with same keys are sent to the same partitions.
Instead an entire partition of the dataset is sent over or replicated to all the partitions for a full cross or nested loop join.

In [62]:
c1 = spark.createDataFrame([[1],[2],[3]],"ID Long")
c2 = spark.createDataFrame([[4],[2],[10],[20]],"ID Long")

c1.join(c2, c1.ID >= c2.ID).explain()

== Physical Plan ==
CartesianProduct (ID#366L >= ID#368L)
:- *(1) Filter isnotnull(ID#366L)
:  +- *(1) Scan ExistingRDD[ID#366L]
+- *(2) Filter isnotnull(ID#368L)
   +- *(2) Scan ExistingRDD[ID#368L]




We can see that as we have used non equi join condition, cartesian join is getting performed as the above three joins does not work on non equi, spark will automatically choose this.

Very expensive algorithm, high possibility of OOM errors.
Works only with inner types of joins

# Broadcast Nested Loop Join

Broadcast Nested Loop join works by broadcasting one of the entire datasets and performing a nested loop to join the data. So essentially every record from dataset 1 is attempted to join with every record from dataset 2.

As you could guess, Broadcast Nested Loop is not preferred and could be quite slow.

Smallest dataset is broadcasted to all executors or tasks processing the bigger dataset Left side will be broadcasted in a right outer join. Right side in a left outer, left semi, left anti or existence join will be broadcasted. Either side can be broadcasted in an inner-like join.


In [63]:
# lets set broadcast threshold back to 10MB for this join to be selected by spark.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760)

In [69]:
b1 = spark.range(10)
b2 = spark.range(100)

b1.join(b2, b1.id >= b2.id).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastNestedLoopJoin BuildLeft, Inner, (id#432L >= id#434L)
   :- BroadcastExchange IdentityBroadcastMode, [plan_id=1140]
   :  +- Range (0, 10, step=1, splits=1)
   +- Range (0, 100, step=1, splits=1)




This join is slow and this join will not work when either sides are big enough for broadcasting and you could see Out Of Memory exceptions.

Works for both equi and non-equi joins Works for all join types

# Hints
we can also give hint to spark to choose a certain type of join. 4 types of hints are:

BROADCAST

MERGE 

SHUFFLE_HASH and 

SHUFFLE_REPLICATE_NL


In [70]:
#lets try to use the same data joined to demonstrate sort merge join

dataf = spark.createDataFrame([(1,"Shreyas"),(2,"Jayaram")],["Id","Name"])
datag = spark.createDataFrame([(1,25),(2,60)],["Id","Age"])

dataf.join(datag.hint("shuffle_hash"),['id'],'inner').explain()

# we can see that although this was doing sort merge join previously, as we have given hint, it is performing shuffle hash

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Id#440L, Name#441, Age#445L]
   +- ShuffledHashJoin [Id#440L], [Id#444L], Inner, BuildRight
      :- Exchange hashpartitioning(Id#440L, 200), ENSURE_REQUIREMENTS, [plan_id=1167]
      :  +- Filter isnotnull(Id#440L)
      :     +- Scan ExistingRDD[Id#440L,Name#441]
      +- Exchange hashpartitioning(Id#444L, 200), ENSURE_REQUIREMENTS, [plan_id=1168]
         +- Filter isnotnull(Id#444L)
            +- Scan ExistingRDD[Id#444L,Age#445L]




Spark algorithm to choose join strategy

If it is an equi-join, we first look at the join hints w.r.t. the following order:
      //   1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides
      //      have the broadcast hints, choose the smaller side (based on stats) to broadcast.
      //   2. sort merge hint: pick sort merge join if join keys are sortable.
      //   3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both
      //      sides have the shuffle hash hints, choose the smaller side (based on stats) as the
      //      build side.
      //   4. shuffle replicate NL hint: pick cartesian product if join type is inner like.
      //
      
      
If there is no hint or the hints are not applicable, we follow these rules one by one:
      //   1. Pick broadcast hash join if one side is small enough to broadcast, and the join type
      //      is supported. If both sides are small, choose the smaller side (based on stats)
      //      to broadcast.
      //   2. Pick shuffle hash join if one side is small enough to build local hash map, and is
      //      much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false.
      //   3. Pick sort merge join if the join keys are sortable.
      //   4. Pick cartesian product if join type is inner like.
      //   5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
      //      other choice.