<a href="https://colab.research.google.com/github/anjli01/PySpark-Notes/blob/main/08_DataFrame_Joins.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## I. DataFrame Joins

Joining DataFrames is a fundamental operation for combining data from different sources based on common columns.

### Types of Joins

Spark supports various SQL-like join types:

| Join Type     | Alias (if any)      | Description                                                                                                                                                                                                                                                                      | Common Use Case                                                                                                    |
| :------------ | :------------------ | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :----------------------------------------------------------------------------------------------------------------- |
| **inner**     | (default)           | Returns rows when there is a match in *both* DataFrames.                                                                                                                                                                                                                         | Finding common records, combining related information where both sides must exist.                                 |
| **left**      | `left_outer`        | Returns *all* rows from the left DataFrame, and the matched rows from the right DataFrame. If no match, `nulls` are introduced for columns from the right.                                                                                                                         | Preserving all records from the "main" (left) table and adding supplemental info from the right.                   |
| **right**     | `right_outer`       | Returns *all* rows from the right DataFrame, and the matched rows from the left DataFrame. If no match, `nulls` are introduced for columns from the left.                                                                                                                          | Preserving all records from a secondary (right) table and adding supplemental info from the left.                  |
| **outer**     | `full_outer`        | Returns *all* rows when there is a match in *either* DataFrame. If no match, `nulls` are introduced for columns from the non-matching DataFrame.                                                                                                                                | Combining two datasets where you want to keep all records from both, regardless of a match.                        |
| **left_semi** |                     | Returns rows from the left DataFrame *where there is a match in the right DataFrame*. **Only returns columns from the left DataFrame.** It's like an INNER JOIN but only selecting columns from the left, effectively filtering the left DataFrame.                                 | Filtering a main table based on the existence of records in another table without bringing in the other table's data. |
| **left_anti** |                     | Returns rows from the left DataFrame *where there is NO match in the right DataFrame*. Useful for finding missing records or non-existent relationships. **Only returns columns from the left DataFrame.**                                                                           | Identifying records in one table that do *not* have a corresponding entry in another (e.g., missing configurations). |

### Code Examples (Python)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize SparkSession
spark = SparkSession.builder.appName("DataFrameJoins").getOrCreate()

# --- Create two DataFrames ---
employees_data = [
    ("Alice", 1, "HR"),
    ("Bob", 2, "Sales"),
    ("Charlie", 3, "IT"),
    ("David", 4, "Marketing")
]
employees_columns = ["Name", "EmpID", "Department"]
employees_df = spark.createDataFrame(employees_data, employees_columns)
print("Employees DataFrame:")
employees_df.show()

departments_data = [
    (1, "HR", "New York"),
    (2, "Sales", "London"),
    (5, "Finance", "Paris")  # EmpID 5 has no matching employee
]
departments_columns = ["DeptID", "DeptName", "Location"]
departments_df = spark.createDataFrame(departments_data, departments_columns)
print("Departments DataFrame:")
departments_df.show()

# --- Inner Join ---
print("\nInner Join:")
employees_df.join(
    departments_df,
    employees_df.Department == departments_df.DeptName,
    "inner"
).show()

# --- Left Join ---
print("\nLeft Join:")
employees_df.join(
    departments_df,
    employees_df.Department == departments_df.DeptName,
    "left"
).show()

# --- Right Join ---
print("\nRight Join:")
employees_df.join(
    departments_df,
    employees_df.Department == departments_df.DeptName,
    "right"
).show()

# --- Full Outer Join ---
print("\nFull Outer Join:")
employees_df.join(
    departments_df,
    employees_df.Department == departments_df.DeptName,
    "full_outer"
).show()

# --- Left Semi Join ---
# Only columns from left (employees_df), where a match exists in departments_df
print("\nLeft Semi Join (only columns from left, where match exists):")
employees_df.join(
    departments_df,
    employees_df.Department == departments_df.DeptName,
    "left_semi"
).show()

# --- Left Anti Join ---
# Rows in left (employees_df) that do NOT have a match in right (departments_df)
print("\nLeft Anti Join (rows in left NOT in right):")
employees_df.join(
    departments_df,
    employees_df.Department == departments_df.DeptName,
    "left_anti"
).show()

# Stop SparkSession
spark.stop()

Employees DataFrame:
+-------+-----+----------+
|   Name|EmpID|Department|
+-------+-----+----------+
|  Alice|    1|        HR|
|    Bob|    2|     Sales|
|Charlie|    3|        IT|
|  David|    4| Marketing|
+-------+-----+----------+

Departments DataFrame:
+------+--------+--------+
|DeptID|DeptName|Location|
+------+--------+--------+
|     1|      HR|New York|
|     2|   Sales|  London|
|     5| Finance|   Paris|
+------+--------+--------+


Inner Join:
+-----+-----+----------+------+--------+--------+
| Name|EmpID|Department|DeptID|DeptName|Location|
+-----+-----+----------+------+--------+--------+
|Alice|    1|        HR|     1|      HR|New York|
|  Bob|    2|     Sales|     2|   Sales|  London|
+-----+-----+----------+------+--------+--------+


Left Join:
+-------+-----+----------+------+--------+--------+
|   Name|EmpID|Department|DeptID|DeptName|Location|
+-------+-----+----------+------+--------+--------+
|    Bob|    2|     Sales|     2|   Sales|  London|
|  Alice|    1|

---

## II. Handling Duplicate Columns in Joins

When joining DataFrames with common column names, Spark can produce duplicate columns in the result, which needs careful handling.

### Problem

If you join on columns with the same name without explicitly managing them, you might end up with ambiguous column names like `ID` and `ID`.

### Solutions

1.  **Specify join condition explicitly (`df1.col == df2.col`)**:
    *   This keeps both columns (e.g., `df1.ID`, `df2.ID`).
    *   You can then explicitly `drop()` one if it's no longer needed, or select/alias them.
2.  **Use `on` parameter with a string or list of strings**:
    *   If you join on columns with the *same name* in both DataFrames using `df.join(other_df, "common_col_name")` or `df.join(other_df, ["col1", "col2"])`, Spark will automatically combine them into a single column in the result.
3.  **Alias columns before joining (`withColumnRenamed`)**:
    *   Rename one of the conflicting columns in one DataFrame *before* the join operation to avoid name collisions.

### Code Examples (Python)

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize SparkSession
spark = SparkSession.builder.appName("HandlingDuplicateColumns").getOrCreate()

# --- Create two DataFrames with common 'ID' column ---
df1_data = [("A", 10), ("B", 20)]
df1_cols = ["ID", "Value1"]
df1 = spark.createDataFrame(df1_data, df1_cols)
print("df1 DataFrame:")
df1.show()
df1.printSchema()

df2_data = [("A", 100), ("B", 200)]
df2_cols = ["ID", "Value2"]
df2 = spark.createDataFrame(df2_data, df2_cols)
print("df2 DataFrame:")
df2.show()
df2.printSchema()

# --- Case 1: Join on common column name using a string (Spark handles it) ---
print("\nJoin on 'ID' string (Spark handles duplicate 'ID'):")
df_joined_str = df1.join(df2, "ID") # or on=["ID"]
df_joined_str.show()
df_joined_str.printSchema() # Notice 'ID' appears only once

# --- Case 2: Join on common column name using a boolean expression ('ID' is duplicated) ---
print("\nJoin on 'ID' using boolean expression (duplicates 'ID'):")
df_joined_bool = df1.join(df2, df1.ID == df2.ID)
df_joined_bool.show()
df_joined_bool.printSchema() # Notice 'ID' appears twice (df1.ID, df2.ID)

# --- To resolve duplicate columns after boolean join: ---

# Option A: Drop one of the duplicate columns
print("\nAfter dropping one of the duplicate 'ID' columns:")
df_joined_bool.drop(df2.ID).show()

# Option B: Select specific columns and alias
print("\nSelecting and aliasing to resolve duplicates:")
df_resolved = df1.join(df2, df1.ID == df2.ID) \
    .select(df1.ID.alias("CustomerID"), df1.Value1, df2.Value2) # Alias one ID
df_resolved.show()
df_resolved.printSchema()

# --- Case 3: Alias column before joining ---
print("\nAlias 'ID' in df2 before joining:")
df2_renamed = df2.withColumnRenamed("ID", "ID_df2")
df_aliased_join = df1.join(df2_renamed, df1.ID == df2_renamed.ID_df2)
df_aliased_join.show()
df_aliased_join.printSchema() # Now ID and ID_df2 are distinct

# Stop SparkSession
spark.stop()

df1 DataFrame:
+---+------+
| ID|Value1|
+---+------+
|  A|    10|
|  B|    20|
+---+------+

root
 |-- ID: string (nullable = true)
 |-- Value1: long (nullable = true)

df2 DataFrame:
+---+------+
| ID|Value2|
+---+------+
|  A|   100|
|  B|   200|
+---+------+

root
 |-- ID: string (nullable = true)
 |-- Value2: long (nullable = true)


Join on 'ID' string (Spark handles duplicate 'ID'):
+---+------+------+
| ID|Value1|Value2|
+---+------+------+
|  A|    10|   100|
|  B|    20|   200|
+---+------+------+

root
 |-- ID: string (nullable = true)
 |-- Value1: long (nullable = true)
 |-- Value2: long (nullable = true)


Join on 'ID' using boolean expression (duplicates 'ID'):
+---+------+---+------+
| ID|Value1| ID|Value2|
+---+------+---+------+
|  A|    10|  A|   100|
|  B|    20|  B|   200|
+---+------+---+------+

root
 |-- ID: string (nullable = true)
 |-- Value1: long (nullable = true)
 |-- ID: string (nullable = true)
 |-- Value2: long (nullable = true)


After dropping one of th

---

## III. Broadcast Joins

A **broadcast join** (also known as a map-side join) is a crucial optimization technique in Spark for joins involving a *small DataFrame* and a *large DataFrame*.

### How it Works

1.  Spark "broadcasts" (sends) the *smaller* DataFrame to all executor nodes.
2.  Each executor then holds a copy of the smaller DataFrame in its memory.
3.  When the join operation occurs, each partition of the *larger* DataFrame can directly join with the broadcasted smaller DataFrame without requiring a costly shuffle of the large DataFrame.

### Benefits

*   **Significant performance improvement**: Eliminates the expensive shuffle phase for the larger DataFrame, which is typically the bottleneck in joins.
*   **Reduced network I/O**: Less data needs to be transferred across the network, as the large DataFrame doesn't move.

### When it Triggers

Spark's Catalyst Optimizer automatically decides whether to perform a broadcast join based on the size of the DataFrames.

1.  **Automatic Broadcasting**:
    *   **`spark.sql.autoBroadcastJoinThreshold`**: This configuration property determines the maximum size (in bytes) of a DataFrame that will be broadcast.
    *   The default value is typically `10MB` (10 * 1024 * 1024 bytes).
    *   If a DataFrame's size is **less than or equal to** this threshold, Spark *might* broadcast it.
2.  **Manually Forcing Broadcast**:
    *   You can explicitly tell Spark to broadcast a DataFrame using `pyspark.sql.functions.broadcast()`.
    *   This is useful if Spark's automatic threshold isn't ideal for your specific use case, or if you know a DataFrame is small but Spark's statistics haven't caught up. Use with caution for very large DataFrames, as it can lead to out-of-memory errors on executors.

### Key Takeaway

Always aim for broadcast joins when one side of the join is significantly smaller than the other. Monitor the Spark UI to confirm if broadcast joins are happening as expected (`BroadcastHashJoin` or `BroadcastNestedLoopJoin` in the plan).

### Code Examples (Python)

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, col

# Initialize SparkSession
spark = SparkSession.builder.appName("BroadcastJoin").getOrCreate()

# --- Configure the broadcast join threshold (e.g., to 1MB for demonstration) ---
# Reset to default 10MB later, or stop SparkSession
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1 * 1024 * 1024) # 1MB

# --- Create a small DataFrame (will be broadcasted) ---
small_df_data = [(1, "Apple"), (2, "Banana"), (3, "Orange")]
small_df_cols = ["ID", "Fruit"]
small_df = spark.createDataFrame(small_df_data, small_df_cols)
print("Small DataFrame:")
small_df.show()

# --- Create a large DataFrame (simulated by generating more rows) ---
large_df_data = [(i % 3 + 1, f"User_{i}") for i in range(100000)] # 100,000 rows
large_df_cols = ["FruitID", "UserName"]
large_df = spark.createDataFrame(large_df_data, large_df_cols)
print("Large DataFrame (first 5 rows):")
large_df.show(5)

# --- Perform the join - Spark will likely broadcast small_df automatically ---
print("\nAutomatic Broadcast Join (check Spark UI for details):")
joined_df_auto = large_df.join(small_df, large_df.FruitID == small_df.ID, "inner")
joined_df_auto.show(5)
joined_df_auto.explain() # Look for "BroadcastHashJoin" or "BroadcastNestedLoopJoin" in the plan

# --- Manually force broadcast (even if it's larger than threshold, use with caution) ---
print("\nManual Broadcast Join (forced):")
joined_df_forced = large_df.join(broadcast(small_df), large_df.FruitID == small_df.ID, "inner")
joined_df_forced.show(5)
joined_df_forced.explain() # Should explicitly show BroadcastHashJoin

# --- Reset the threshold to default or stop SparkSession ---
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024) # Reset to default 10MB
spark.stop()

Small DataFrame:
+---+------+
| ID| Fruit|
+---+------+
|  1| Apple|
|  2|Banana|
|  3|Orange|
+---+------+

Large DataFrame (first 5 rows):
+-------+--------+
|FruitID|UserName|
+-------+--------+
|      1|  User_0|
|      2|  User_1|
|      3|  User_2|
|      1|  User_3|
|      2|  User_4|
+-------+--------+
only showing top 5 rows


Automatic Broadcast Join (check Spark UI for details):
+-------+--------+---+-----+
|FruitID|UserName| ID|Fruit|
+-------+--------+---+-----+
|      1|  User_0|  1|Apple|
|      1|  User_3|  1|Apple|
|      1|  User_6|  1|Apple|
|      1|  User_9|  1|Apple|
|      1| User_12|  1|Apple|
+-------+--------+---+-----+
only showing top 5 rows

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [FruitID#373L], [ID#360L], Inner
   :- Sort [FruitID#373L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(FruitID#373L, 200), ENSURE_REQUIREMENTS, [plan_id=1436]
   :     +- Filter isnotnull(FruitID#373L)
   :        +- Scan ExistingR