In [1]:
# importing session from sql from pyspark to start the sessio
from pyspark.sql import SparkSession

In [3]:
# creating the seasion
spark = SparkSession.builder.appName("Joins Challenges").getOrCreate()

22/10/18 16:54:01 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [5]:
# When performing joins, there are some specific challenges and some common questions that
# arise. The rest of the chapter will provide answers to these common questions and then explain
# how, at a high level, Spark performs joins. This will hint at some of the optimizations that we are
# going to cover in later parts of this book.

In [50]:
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])])\
    .toDF("id", "name", "graduate_program", "spark_status")

graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley")])\
    .toDF("id", "degree", "department", "school")

sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")])\
    .toDF("id", "status")

In [51]:
type(person)

pyspark.sql.dataframe.DataFrame

In [8]:
# let’s register these as tables so that we use them throughout the chapter:
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")

In [9]:
# Joins on Complex Types
# in SQL
# SELECT * FROM
# (select id as personId, name, graduate_program, spark_status FROM person)
# INNER JOIN sparkStatus ON array_contains(spark_status, id)

# Even though this might seem like a challenge, it’s actually not. Any expression is a valid join
# expression, assuming that it returns a Boolean:
    
from pyspark.sql.functions import expr
person.withColumnRenamed("id", "personId")\
.join(sparkStatus, expr("array_contains(spark_status, id)")).show()



+--------+----------------+----------------+---------------+---+--------------+
|personId|            name|graduate_program|   spark_status| id|        status|
+--------+----------------+----------------+---------------+---+--------------+
|       0|   Bill Chambers|               0|          [100]|100|   Contributor|
|       1|   Matei Zaharia|               1|[500, 250, 100]|500|Vice President|
|       1|   Matei Zaharia|               1|[500, 250, 100]|250|    PMC Member|
|       1|   Matei Zaharia|               1|[500, 250, 100]|100|   Contributor|
|       2|Michael Armbrust|               1|     [250, 100]|250|    PMC Member|
|       2|Michael Armbrust|               1|     [250, 100]|100|   Contributor|
+--------+----------------+----------------+---------------+---+--------------+



                                                                                

In [48]:
from pyspark.sql.functions import col
col

<function pyspark.sql.functions.col(col: str) -> pyspark.sql.column.Column>

In [57]:
# Handling Duplicate Column Names
# One of the tricky things that come up in joins is dealing with duplicate column names in your
# results DataFrame. In a DataFrame, each column has a unique ID within Spark’s SQL Engine,
# Catalyst. This unique ID is purely internal and not something that you can directly reference.
# This makes it quite difficult to refer to a specific column when you have a DataFrame with duplicate column names.
# This can occur in two distinct situations:
#     The join expression that you specify does not remove one key from one of the input DataFrames and the
#         keys have the same column name
#     Two columns on which you are not performing the join have the same name
    
# Let’s create a problem dataset that we can use to illustrate these problems:

gradProgramDuplicate = graduateProgram.withColumnRenamed("id", "graduate_program#40")

joinExpression = person.col("graduate_program#1079")==gradProgramDuplicate.col("graduate_program#40")

# Note that there are now two graduate_program columns, even though we joined on that key:
person.join(gradProgramDuplicate, joinExpression).show()

# The challenge arises when we refer to one of these columns:
person.join(gradProgramDupe, joinExpression).select("graduate_program").show()

AttributeError: 'DataFrame' object has no attribute 'col'

In [23]:
# Approach 1: Different join expression
# When you have two keys that have the same name, probably the easiest fix is to change the join
# expression from a Boolean expression to a string or sequence. This automatically removes one of
# the columns for you during the join:
    
person.join(gradProgramDuplicate,"graduate_program").select("graduate_program").show()




+----------------+
|graduate_program|
+----------------+
|               0|
|               1|
|               1|
+----------------+



                                                                                

In [49]:
# Approach 2: Dropping the column after the join
# Another approach is to drop the offending column after the join. When doing this, we need to
# refer to the column via the original source DataFrame. We can do this if the join uses the same
# key names or if the source DataFrames have columns that simply have the same name:
from pyspark.sql.functions import col
joinExpression = person.col("graduate_program") == graduateProgram.col("id")  
person.join(gradProgramDuplicate, joinExpression).drop(person.col("graduate_program")).select("graduate_program").show()

person.join(graduateProgram, joinExpression).drop(graduateProgram.col("id")).show()

# This is an artifact of Spark’s SQL analysis process in which an explicitly referenced column will
# pass analysis because Spark has no need to resolve the column. Notice how the column uses the .col method 
# instead of a column function. That allows us to implicitly specify that column by its specific ID.


AttributeError: 'DataFrame' object has no attribute 'col'

In [47]:
# Approach 3: Renaming a column before the join
# We can avoid this issue altogether if we rename one of our columns before the join:
    
gradProgram3 = graduateProgram.withColumnRenamed("id", "grad_id")
joinExpression = person.col("graduate_program") == gradProgram3.col("grad_id")
person.join(gradProgram3, joinExpression).show()

AttributeError: 'DataFrame' object has no attribute 'col'

In [None]:
# How Spark Performs Joins
# To understand how Spark performs joins, you need to understand the two core resources at play:
# the node-to-node communication strategy and per node computation strategy. These internals are
# likely irrelevant to your business problem. However, comprehending how Spark performs joins
# can mean the difference between a job that completes quickly and one that never completes at
# all.

In [None]:
# Communication Strategies
# Spark approaches cluster communication in two different ways during joins. It either incurs a
# shuffle join, which results in an all-to-all communication or a broadcast join. Keep in mind that
# there is a lot more detail than we’re letting on at this point, and that’s intentional. Some of these
# internal optimizations are likely to change over time with new improvements to the cost-based
# optimizer and improved communication strategies. For this reason, we’re going to focus on the
# high-level examples to help you understand exactly what’s going on in some of the more
# common scenarios, and let you take advantage of some of the low-hanging fruit that you can use
# right away to try to speed up some of your workloads.
# The core foundation of our simplified view of joins is that in Spark you will have either a big
# table or a small table. Although this is obviously a spectrum (and things do happen differently if
# you have a “medium-sized table”), it can help to be binary about the distinction for the sake of
# this explanation.

#     Big table–to–big table
#     When you join a big table to another big table, you end up with a shuffle join
#     In a shuffle join, every node talks to every other node and they share data according to which
#     node has a certain key or set of keys (on which you are joining). These joins are expensive
#     because the network can become congested with traffic, especially if your data is not partitioned
#     well.
#     This join describes taking a big table of data and joining it to another big table of data. An
#     example of this might be a company that receives billions of messages every day from the
#     Internet of Things, and needs to identify the day-over-day changes that have occurred. The way
#     to do this is by joining on deviceId, messageType, and date in one column, and date - 1 day
#     in the other column.
#     In Figure 8-1, DataFrame 1 and DataFrame 2 are both large DataFrames. This means that all
#     worker nodes (and potentially every partition) will need to communicate with one another during
#     the entire join process (with no intelligent partitioning of data).
    
#     Big table–to–small table
#     When the table is small enough to fit into the memory of a single worker node, with some
#     breathing room of course, we can optimize our join. Although we can use a big table–to–big
#     table communication strategy, it can often be more efficient to use a broadcast join. What this
#     means is that we will replicate our small DataFrame onto every worker node in the cluster (be it
#     located on one machine or many). Now this sounds expensive. However, what this does is
#     prevent us from performing the all-to-all communication during the entire join process. Instead,
#     we perform it only once at the beginning and then let each individual worker node perform the
#     work without having to wait or communicate with any other worker node, as is depicted in
    
#     At the beginning of this join will be a large communication, just like in the previous type of join.
#     However, immediately after that first, there will be no further communication between nodes.
#     This means that joins will be performed on every single node individually, making CPU the
#     biggest bottleneck. For our current set of data, we can see that Spark has automatically set this up
#     as a broadcast join by looking at the explain plan:
#     val joinExpr = person.col("graduate_program") === graduateProgram.col("id")
#     person.join(graduateProgram, joinExpr).explain()
#     == Physical Plan ==
#     *BroadcastHashJoin [graduate_program#40], [id#5....
#     :- LocalTableScan [id#38, name#39, graduate_progr...
#     +- BroadcastExchange HashedRelationBroadcastMode(....
#     +- LocalTableScan [id#56, degree#57, departmen....
#     With the DataFrame API, we can also explicitly give the optimizer a hint that we would like to
#     use a broadcast join by using the correct function around the small DataFrame in question. In this
#     example, these result in the same plan we just saw; however, this is not always the case:
#     import org.apache.spark.sql.functions.broadcast
#     val joinExpr = person.col("graduate_program") === graduateProgram.col("id")
#     person.join(broadcast(graduateProgram), joinExpr).explain()
#     The SQL interface also includes the ability to provide hints to perform joins. These are not
#     enforced, however, so the optimizer might choose to ignore them. You can set one of these hints
#     by using a special comment syntax. MAPJOIN, BROADCAST, and BROADCASTJOIN all do the same
#     thing and are all supported:
#     -- in SQL
#     SELECT /*+ MAPJOIN(graduateProgram) */ * FROM person JOIN graduateProgram
#     ON person.graduate_program = graduateProgram.id
#     This doesn’t come for free either: if you try to broadcast something too large, you can crash your
#     driver node (because that collect is expensive). This is likely an area for optimization in the
#     future.
                       
#     Little table–to–little table
#     When performing joins with small tables, it’s usually best to let Spark decide how to join them.
#     You can always force a broadcast join if you’re noticing strange behavior.