# Table of Contents 
- [pyspark .join()](#pyspark-dataframe-join--quick-guide)



# PySpark DataFrame `.join()` — quick guide

**Purpose:** Combine two DataFrames row-wise based on matching key(s).

**Signature**
    dataframe1.join(dataframe2, on=None, how=None)

**Arguments**
- `dataframe2` → The right/second DataFrame.
- `on` → Join condition. Any one of:
  - **String** (shared column name)
        df1.join(df2, "id", "inner")
  - **List of strings** (multi-column key)
        df1.join(df2, ["id", "date"], "left")
  - **Column expression** (boolean)
        from pyspark.sql.functions import col
        df1.join(df2, col("df1_id") == col("df2_id"), "inner")
    • If you prefer SQL syntax, wrap it with `expr(...)` to get a Column:
        from pyspark.sql.functions import expr
        df1.join(df2, expr("df1.id = df2.emp_id"), "inner")
- `how` → Join type (default `"inner"`). Options:
  - `"inner"` — only matches
  - `"left"` / `"left_outer"` — all left + matches from right
  - `"right"` / `"right_outer"` — all right + matches from left
  - `"outer"` / `"full"` / `"full_outer"` — all rows from both
  - `"left_semi"` — rows from left **that have a match** (returns only left columns)
  - `"left_anti"` — rows from left **that do not have a match**
  - `"cross"` — Cartesian product (no `on`)

**Examples**
    # 1) Inner join on same column name
    df1.join(df2, "id", "inner")

    # 2) Left join on multiple keys
    df1.join(df2, ["id", "dt"], "left")

    # 3) Join with boolean expression
    df1.join(df2, df1["id"] == df2["emp_id"], "outer")

    # 4) Using expr() to write SQL-like condition
    from pyspark.sql.functions import expr
    df1.join(df2, expr("df1.id = df2.emp_id AND df1.dt = df2.dt"), "left_outer")

    # 5) Semi / Anti joins (filtering left by existence in right)
    df_left_semi = orders.join(customers, "cust_id", "left_semi")
    df_left_anti = orders.join(customers, "cust_id", "left_anti")

**Notes & tips**
- When `on` is a **string/list**, Spark deduplicates the join key(s) (they appear once).  
  When using a **Column expression**, both key columns remain—`select()`/`drop()` or alias as needed.
- Prefer joining on **equi-keys** (e.g., `==`) for performance and broadcast when one side is small:
      from pyspark.sql.functions import broadcast
      big.join(broadcast(small), "id", "inner")
- Avoid column name collisions for non-key columns by renaming before join or selecting explicit columns after.

# Demystifying Spark **Joins**, **Exchanges**, and the **Shuffle** (with the `data_1` / `data_2` demo)

> Big picture: most “slow joins” in Spark are really “slow **shuffles**.” A join over two DataFrames runs two *parent* (map) stages that **write** shuffle buckets, followed by one *child* (reduce) stage that **reads** those buckets and executes the join (typically **Sort-Merge Join**). The **Exchange** nodes you see in the plan are the points where data is **repartitioned and moved**.

---

## 1) Demo Setup (the exact scenario)
- Two folders: **`data_1/`** and **`data_2/`**
- Each folder contains **3 JSON files** → Spark creates **3 input partitions** per DataFrame
- Local session: **`master("local[3]")`** → 1 executor process with **3 task slots** (max 3 tasks at once)
- Shuffle partitions: **`spark.sql.shuffle.partitions = 3`** → the post-shuffle (reduce) side will have **3 partitions**
- Operation: **inner join on `id`** + a terminal **action** (`count()`, `show()`, or your `foreach`)

This configuration ensures:
- The scan of `data_1` uses **3 tasks**, scan of `data_2` uses **3 tasks**
- The reduce side of the join runs **3 tasks** (partitions 0, 1, 2)
- Because of `local[3]`, Spark can execute **at most 3 tasks concurrently** across all stages

---

## 2) What an **Exchange** is (and why it matters)
In Spark’s physical plan, an **Exchange** marks a boundary where data is **redistributed** (repartitioned) across the cluster (or across threads in local mode).

- **Map-side Exchange (Shuffle *Write*)**  
  For each input DataFrame, Spark reads rows from its current partitions and decides the destination reduce partition via a partitioner (commonly **`hash(joinKey) % numShufflePartitions`**). It writes one **shuffle bucket** per destination partition (spillable to disk).
- **Reduce-side Exchange (Shuffle *Read*)**  
  The reduce tasks **fetch** bucket *k* from **all map tasks of both inputs**, merge them, and materialize reduce partition *k*. Now “all rows for a given key” are co-located, enabling the actual join.

Mental model: **Scatter → Gather**  
Map stages **scatter** rows into N buckets; reduce stages **gather** bucket *k* from everywhere and process it.

---

## 3) The three stages of an inner join (what you see in the UI)
With `data_1` (3 partitions) joined to `data_2` (3 partitions) and `spark.sql.shuffle.partitions = 3`, the join job breaks into **three stages**:

1) **Stage A — Map side for `data_1` (Shuffle Write)**  
   - Tasks: **3** (one per input partition)  
   - For each row: compute `target = hash(id) % 3` and append to shuffle bucket `target`  
   - Output: 3 buckets per task → later read by the reduce stage

2) **Stage B — Map side for `data_2` (Shuffle Write)**  
   - Tasks: **3** (one per input partition)  
   - Same process; produces buckets keyed by the same partitioner

3) **Stage C — Reduce side (Shuffle Read + Sort-Merge Join)**  
   - Tasks: **3** (for reduce partitions **0, 1, 2**)  
   - Each task **fetches** bucket *k* from **all 3 map tasks of `data_1`** and **all 3 map tasks of `data_2`**, merges, sorts by `id` if needed, and performs the **join** to produce the final partition *k*

Totals you’ll notice:
- **9 tasks** in the join job: **3 + 3 + 3**
- Because you’re on **`local[3]`**, they run in **waves of 3** (never more than 3 concurrent tasks)

---

## 4) “Parallel” with only 3 cores? (concurrency vs. eligibility)
Both parent stages (A for `data_1`, B for `data_2`) are **eligible at the same time**, but you only have **3 slots**. Spark interleaves them to keep slots busy:

Slots (3 total) timeline illustration:
- Slot 1: A1 → A3 → C1
- Slot 2: A2 → B2 → C2
- Slot 3: B1 → B3 → C3

Key point: at any instant you see **≤ 3** running tasks, but those 3 can come from **both** map stages. Once all parent tasks finish, the reduce stage (C) runs its 3 tasks.

---

## 5) Why shuffle dominates cost
- **Network I/O**: every reduce task pulls its bucket from **all** map tasks of both inputs (many small remote reads)
- **Disk I/O**: map tasks **spill** shuffle files; reduce tasks **merge** them
- **Serialization / deserialization** and **sorting**
- **Skew**: a hot key can make one reduce partition dramatically slower than others

Rule of thumb: **“slow join” = “expensive shuffle”** far more often than not.

---

## 6) Reading vs. Shuffling vs. Joining — how to *visualize* it
1) **Input scans (Jobs 0 & 1 in your UI)**  
   - Each DataFrame read appears as a simple job with **1 stage / 3 tasks** (schema inference for JSON can add a tiny job, too)
2) **Join job (Job 2)**  
   - **Stage A (3 tasks)**: map/exchange for `data_1` (shuffle write)  
   - **Stage B (3 tasks)**: map/exchange for `data_2` (shuffle write)  
   - **Stage C (3 tasks)**: reduce/exchange + sort-merge join (shuffle read)  
   - UI often shows arrows between A/B and C (the **shuffle** edges)

If you set `spark.sql.adaptive.enabled=true` (default in newer Spark), the UI may show **skipped** or **coalesced** stages/partitions as AQE optimizes at runtime.

---

## 7) Broadcast Join (the alternative that skips the big shuffle)
If one side is **small enough** (fits under `spark.sql.autoBroadcastJoinThreshold`, e.g., ~10–20 MB by default), Spark can **broadcast** that DataFrame to all executors. Then the big side **doesn’t shuffle**:
- Plan shows a **BroadcastExchange** for the small side
- Join becomes **BroadcastHashJoin**
- You can force it with a hint: `df_small.hint("broadcast")`

Broadcasting avoids the “scatter–gather” shuffle for the big side and is often the fastest option when applicable.

---

## 8) Practical tuning knobs (for shuffle joins)
- **`spark.sql.shuffle.partitions`**:  
  Too **high** → many tiny tasks (scheduler overhead). Too **low** → fat partitions (skew/stragglers). A starting heuristic: **2–4× total cores** available to the job, then adjust with metrics.
- **Adaptive Query Execution (AQE)**:  
  Keep **on** unless you need deterministic, tutorial-style plans. It can **coalesce** post-shuffle partitions and **handle skew**.
- **Skew mitigation**:  
  Salt hot keys (add a random suffix before join), or use AQE’s skew join handling when available.
- **File sizing**:  
  Avoid thousands of tiny files; aim for sensible partition/file sizes to reduce overhead.
- **Filter early, select only needed columns** to reduce shuffle volume.
- **Prefer Broadcast Join** whenever one side is small.

---

## 9) Quick mental checklist (cheat sheet)
- Partition → **task**; **1 core ≈ 1 concurrent task** slot
- **Exchange** = repartition + data movement
- **Shuffle write** (parents) → **Shuffle read** (children)
- Inner join with two inputs ⇒ usually **two map stages + one reduce stage**
- Total tasks you saw: **3 (map A) + 3 (map B) + 3 (reduce C) = 9**
- On `local[3]`, tasks run in **waves of 3**
- Optimize joins by **reducing shuffled bytes**, **handling skew**, and **choosing broadcast** when possible

---

## 10) One-screen summary diagram
<pre>
Inputs (3 partitions each)
  data_1: P1, P2, P3          data_2: Q1, Q2, Q3
         │   │   │                    │   │   │
         ├───┴───┤  Map/Shuffle-Write ├───┴───┤
         │ hash(id) % 3               │ hash(id) % 3
         ▼   ▼   ▼                    ▼   ▼   ▼
       buckets 0/1/2                buckets 0/1/2
             \                           /
              \_________________________/
                       ▼  Shuffle-Read
                Reduce partitions 0,1,2
                  (Sort-Merge Join)
                       ▼
                 Joined Output
                 P0, P1, P2
<pre>

That’s the full story behind your Spark UI showing **3 stages** and **9/9 tasks** for the join job, and why **Exchange** and **shuffle** are the heart of join performance.

# Solving Spark Join Performance Problems — A Simple Playbook

Joins are often the #1 reason Spark jobs slow down, because they trigger **shuffles** (lots of network I/O and disk I/O). Here’s a clear, copy-paste guide to make your joins faster and safer.

---

## 1) Choose the Right Join Strategy
- **Broadcast Join (large ↔ small):** If one table is small enough to fit in executor memory, broadcast it so the large table doesn’t shuffle.
  - When to use: small side is tens/hundreds of MBs (rough guide; depends on your cluster).
  - How: force with a broadcast hint if Spark doesn’t auto-choose it.
- **Shuffle (Sort-Merge) Join (large ↔ large):** Unavoidable when both sides are big. Focus on shrinking data and balancing work (see below).

Tip: Always check the physical plan (`explain`) for **BroadcastHashJoin** vs **SortMergeJoin**.

---

## 2) Shrink Data *Before* the Join
- **Filter early:** Remove rows that cannot match (e.g., keep only “US” rows if the other table is US-only).
- **Project early:** Select only columns you actually need.
- **Aggregate early:** Pre-aggregate facts to the grain required for the join/output.
- **Deduplicate:** Drop duplicates on join keys if appropriate to avoid cartesian explosions.

Small inputs → smaller shuffle → faster join.

---

## 3) Tune Parallelism Without Overhead
- **Shuffle partitions (`spark.sql.shuffle.partitions`):** Controls reduce-side parallelism for shuffle joins.
  - Start near **2–4× total task slots** (executors × cores) if AQE is **off**.
  - If **AQE is ON** (recommended), it can **coalesce** partitions: set this higher and let AQE merge small ones.
- **Target partition size:** Aim for ~**128–256 MB** per task to avoid tiny tasks (overhead) and giant tasks (spill/stragglers).

---

## 4) Handle Key Skew (Hot Keys) — The Silent Job Killer
When a few keys have most of the rows, their reduce tasks run much longer.

Fixes:
- **AQE Skew Join:** Enable adaptive query execution so Spark **splits** oversized shuffle partitions at runtime.
- **Salting hot keys:** For the biggest keys, add a small **salt** bucket (e.g., 0..7) and join on `(key, salt)` so multiple tasks work the same key in parallel. Aggregate back after the join if needed.
- **Pre-aggregate by key:** Reduce rows per hot key *before* joining.
- **Bucketing:** If you repeatedly join on the same key, bucket both tables on that key (and optionally sort). This can avoid or reduce shuffle on recurring pipelines.

---

## 5) Reduce Network I/O (Move Fewer Bytes)
- **Filter/Project/Aggregate early** (repeating because it’s that important).
- **Columnar formats** (Parquet/ORC) with predicate pushdown to trim reads.
- **Partition pruning** on the read side (e.g., date/state folders) so you don’t even load irrelevant data.

---

## 6) Practical Patterns (minimal examples)

Broadcast the small side:
    # assume df_big (large), df_small (fits in memory)
    # from pyspark.sql.functions import broadcast
    joined = df_big.join(broadcast(df_small), "id", "inner")

Pre-aggregate before join:
    pre = df_big.groupBy("id").agg(F.sum("amount").alias("amt"))
    joined = pre.join(df_small, "id", "left")

Salt a hot key (fan-out only for the hottest keys):
    hot = ["K1","K2"]               # discovered from key counts
    S = 8                           # split factor for hot keys
    big_salted = df_big.withColumn(
        "salt",
        F.when(F.col("key").isin(hot), F.abs(F.hash(F.monotonically_increasing_id())) % S).otherwise(F.lit(0))
    )
    small_expanded = df_small.withColumn(
        "salt",
        F.when(F.col("key").isin(hot), F.explode(F.array([F.lit(i) for i in range(S)]))).otherwise(F.lit(0))
    )
    joined = big_salted.join(small_expanded, ["key","salt"], "inner")
    result = joined.groupBy("key").agg(F.sum("metric").alias("metric_sum"))  # optional “unsalt”

Enable AQE (recommended defaults):
    spark.conf.set("spark.sql.adaptive.enabled", "true")
    spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

---

## 7) Quick Checklist (use every time)
- Did I **filter**, **project**, and **aggregate** before joining?
- Is **broadcast** viable (and forced if needed)?
- Is **AQE** on, with skew join enabled?
- Are **shuffle partitions** tuned (or high with AQE to coalesce)?
- Any **skewed keys**? (Check Spark UI: long tasks, big shuffle-read size)
- Do I need **salting** or **bucketing** for recurring joins?
- Are partition/file sizes sane (avoid too many tiny files)?

---

## 8) Simple Rules of Thumb
- Prefer **broadcast** when one side is small → avoids big shuffle.
- For shuffle joins, keep partitions around **128–256 MB**.
- If you have **more cores than distinct keys**, you’ll underutilize the cluster unless you **increase effective cardinality** (salting or extra bucketing dimension).
- **Measure**: Spark UI “Shuffle Read/Write” and task timelines will tell you where the pain is.

---

**Bottom line:** Make the data smaller first, pick the right join (broadcast when possible), give Spark enough (but not absurd) parallelism, and neutralize skew with AQE or salting. Do those four, and most join performance problems disappear.