In [1]:
#pyspark initialization
import findspark
findspark.init()

In [2]:
#building spark session
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

# Pointers

In [3]:
# Spark chooses a join strategy based on the size of the data. 
# To avoid costly shuffle and sort operations, it favors hash-based join strategies, especially when data can be broadcasted.
# Spark supports both Equi Join (using “=”) and Non-Equi Join (using “<,” “>”, “≥,” “≤”)
# clusters: connecting multiple computers (desktops/laptops) through network. 20 core per machine, and 100GB RAM in each. Hence, for 10 clusters, 200 core and 1000 GB (1 TB) RAM.
# Clusters are working in master-slave architecture; slaves are the worker nodes.
# Master node consists of YARN (Yet Another Resource Negotiator), and resource manager, whereas, worker nodes consist Node Manager
# Spark Architecture: https://youtu.be/xJVonk4yxJY?si=jiXGgEfHymD_sy0Y

In [4]:
#dataframe_1
df_clients = [
    (0, "client1"),
    (1, "client2"),
    (2, "client3")]
columns = ("client_id", "name")
df_clients = spark.createDataFrame(df_clients, columns)
df_clients.show()

+---------+-------+
|client_id|   name|
+---------+-------+
|        0|client1|
|        1|client2|
|        2|client3|
+---------+-------+



In [5]:
#dataframe_2
df_orders = [
    (0, "order1", 100),
    (1, "order2", 200),
    (2, "order3", 150)]
columns = ("client_id", "order_id", "order_amount")
df_orders = spark.createDataFrame(df_orders, columns)
df_orders.show()

+---------+--------+------------+
|client_id|order_id|order_amount|
+---------+--------+------------+
|        0|  order1|         100|
|        1|  order2|         200|
|        2|  order3|         150|
+---------+--------+------------+



# Broadcast Hash Join

In [6]:
# It’s ideal when one DataFrame is small enough to fit in the memory of each executor.
# Spark broadcasts the smaller DataFrame to all workers.
# This minimizes data shuffling and accelerates the join operation, as the join occurs within the same node, resulting in a decrease in network overhead.

In [7]:
broadcast_df = df_clients.hint('BROADCAST').join(df_orders, on = "client_id", how = "inner")
# broadcast_df.explain(mode="formatted")
broadcast_df.show()

+---------+-------+--------+------------+
|client_id|   name|order_id|order_amount|
+---------+-------+--------+------------+
|        0|client1|  order1|         100|
|        1|client2|  order2|         200|
|        2|client3|  order3|         150|
+---------+-------+--------+------------+



# Shuffle Hash Join

In [8]:
# It’s suitable when neither of the joined tables can fit in memory.
# It involves a shuffle phase, where data is redistributed across partitions based on the join key.
# Be careful when using this strategy, as it may incur higher network and disk I/O costs, which largely decreases the performance due to the full shuffle.

In [9]:
shuffle_hash_df = df_clients.hint("SHUFFLE_HASH").join(df_orders, on = "client_id", how = "inner")
shuffle_hash_df.show()

+---------+-------+--------+------------+
|client_id|   name|order_id|order_amount|
+---------+-------+--------+------------+
|        0|client1|  order1|         100|
|        1|client2|  order2|         200|
|        2|client3|  order3|         150|
+---------+-------+--------+------------+



# Sort Merge Join

In [10]:
# It’s appropriate when both tables are large and cannot fit in the memory.
# It Involves sorting both tables based on the join key and then merging them.
# It provides good performance for certain types of queries but requires sorting, which can be computationally expensive.
# Shuffle: The data from both tables is partitioned based on the join key. This partitioning ensures that records with the same join key are directed to the same partition.
# Sort: Within each partition, the data is then sorted based on the join key.
# Merge: The sorted data is subsequently merged across partitions to execute the join operation.

In [11]:
sort_merge_join_df = df_clients.hint("MERGE").join(df_orders, on = "client_id", how = "inner")
sort_merge_join_df.show()

+---------+-------+--------+------------+
|client_id|   name|order_id|order_amount|
+---------+-------+--------+------------+
|        0|client1|  order1|         100|
|        1|client2|  order2|         200|
|        2|client3|  order3|         150|
+---------+-------+--------+------------+



# Cartesian Product Join

In [12]:
# It involves joining every row from the first table with every row from the second table, making it highly resource-intensive. 
# This strategy should be avoided for large datasets due to a significant increase in the number of records.

In [13]:
cartesian_product_join_df = df_clients.hint("SHUFFLE_REPLICATE_NL").join(df_orders, on = "client_id", how = "inner")
cartesian_product_join_df.show()

+---------+-------+--------+------------+
|client_id|   name|order_id|order_amount|
+---------+-------+--------+------------+
|        0|client1|  order1|         100|
|        1|client2|  order2|         200|
|        2|client3|  order3|         150|
+---------+-------+--------+------------+



# Broadcast Nested Loop Join

In [14]:
# It’s useful when joining a large table with a small table that doesn’t fit in memory but has a filter condition. 
# The smaller table is broadcasted, and a nested loop is used to join matching records.

In [15]:
spark.conf.set("spark.sql.crossJoin.enabled", True)
broadcast_nested_loop_join_df = df_clients.hint("BROADCAST").join(df_orders)
broadcast_nested_loop_join_df.show()

+---------+-------+---------+--------+------------+
|client_id|   name|client_id|order_id|order_amount|
+---------+-------+---------+--------+------------+
|        0|client1|        0|  order1|         100|
|        1|client2|        0|  order1|         100|
|        2|client3|        0|  order1|         100|
|        0|client1|        1|  order2|         200|
|        1|client2|        1|  order2|         200|
|        2|client3|        1|  order2|         200|
|        0|client1|        2|  order3|         150|
|        1|client2|        2|  order3|         150|
|        2|client3|        2|  order3|         150|
+---------+-------+---------+--------+------------+

