# Module 7: Join Algorithms
## Goal: Understanding how the database "zips" data together.

In module 6, we saw how the Optimizer estimates costs. Now, we see those estimates in action.

The most expensive operation in any database is usually the **JOIN**. Combining two separate tables requires comparing rows to find matches. How the database physically does this depends heavily on the "Physics" of the data (Sorted? Small? Large?).

There are three main algorithms you must master:
1.  **Nested Loop Join:** The "For Loop" approach. Simple, but dangerous for large data.
2.  **Hash Join:** The "HashMap" approach. The gold standard for unsorted data engineering.
3.  **Merge Join:** The "Zipper" approach. Extremely fast, *if* the data is already sorted.

---

## 1. Setup and Connection

We will load two tables:
1.  `users_join`: 100,000 rows (The "Dimension" table).
2.  `orders_join`: 500,000 rows (The "Fact" table).

**Note:** We are creating a foreign key relationship where `orders_join.user_id` matches `users_join.user_id`.

In [None]:
import pandas as pd
import psycopg2
import matplotlib.pyplot as plt
import seaborn as sns
import time
from io import StringIO

# Database Connection
DB_PARAMS = {
    "host": "db_int_opt",
    "port": 5432,
    "user": "admin",
    "password": "password",
    "dbname": "db_int_opt"
}

def get_db_connection():
    conn = psycopg2.connect(**DB_PARAMS)
    conn.autocommit = True
    return conn

conn = get_db_connection()
cur = conn.cursor()
print("✅ Connected to Postgres.")

#### 1.1 Load the Data
We need to ensure the `orders` table has a `user_id` to join on. The `orders_sorted.csv` file typically contains `order_id`, `user_id`, `order_date`, amount.

In [None]:
import numpy as np
import pandas as pd
from io import StringIO

# 1. Create Tables
cur.execute("DROP TABLE IF EXISTS users_join CASCADE;")
cur.execute("DROP TABLE IF EXISTS orders_join CASCADE;")

cur.execute("""
    CREATE TABLE users_join (
        user_id INTEGER PRIMARY KEY, 
        name TEXT, 
        email TEXT, 
        account_status TEXT, 
        signup_date TIMESTAMP, 
        is_email_verified BOOLEAN
    );
""")

cur.execute("""
    CREATE TABLE orders_join (
        order_id INTEGER, 
        user_id INTEGER, 
        order_date DATE, 
        amount FLOAT
    );
""")

# 2. Load Users
print("⏳ Loading users...")
df_users = pd.read_csv('../data/users.csv')
buffer = StringIO()
df_users.to_csv(buffer, index=False, header=False)
buffer.seek(0)
cur.copy_expert("COPY users_join FROM STDIN WITH CSV", buffer)

# 3. Load Orders (With Auto-Repair)
print("⏳ Loading orders...")
df_orders = pd.read_csv('../data/orders_sorted.csv')
print(f"   Original Columns: {list(df_orders.columns)}")

# PATCH: Check for missing columns and generate data if needed
if 'user_id' not in df_orders.columns:
    print("   ⚠️ Injecting 'user_id' (Random 1-100k)...")
    df_orders['user_id'] = np.random.randint(1, 100000, df_orders.shape[0])

if 'amount' not in df_orders.columns:
    print("   ⚠️ Injecting 'amount' (Random $10-$500)...")
    df_orders['amount'] = np.random.uniform(10.0, 500.0, df_orders.shape[0]).round(2)

# Ensure order matches database schema for COPY
df_orders = df_orders[['order_id', 'user_id', 'order_date', 'amount']]

buffer = StringIO()
df_orders.to_csv(buffer, index=False, header=False)
buffer.seek(0)
cur.copy_expert("COPY orders_join FROM STDIN WITH CSV", buffer)

# 4. Analyze to update stats
cur.execute("ANALYZE users_join;")
cur.execute("ANALYZE orders_join;")
print("✅ Data Loaded: 100k Users, 500k Orders.")

---

## 2. Experiment 7.1: The Nested Loop Join (The "For Loop")

The Nested Loop is the simplest algorithm. It works exactly like writing two for loops in Python:

```python
for user in users:          # Outer Loop
    for order in orders:    # Inner Loop
        if user.id == order.user_id:
            yield (user, order)
```

**The Physics**: If you have 100k users and 500k orders, that is $100,000 \times 500,000 = 50,000,000,000$ comparisons. This is disastrous unless there is an index on the inner loop.

We will force a Nested Loop Join. To be fair, we will add an index on `orders(user_id)` so we don't wait until the heat death of the universe.

In [None]:
# Create Index to make Nested Loop viable
cur.execute("CREATE INDEX IF NOT EXISTS idx_orders_user ON orders_join(user_id);")

query = """
    SELECT count(*) 
    FROM users_join u 
    JOIN orders_join o ON u.user_id = o.user_id
"""

# Force Nested Loop
cur.execute("SET enable_nestloop = ON;")
cur.execute("SET enable_hashjoin = OFF;")
cur.execute("SET enable_mergejoin = OFF;")

print("⏳ Running Nested Loop Join (This might take a few seconds)...")
start_nl = time.time()
cur.execute(f"EXPLAIN (ANALYZE, FORMAT JSON) {query}")
result_nl = cur.fetchone()[0][0]
end_nl = time.time()

time_nl = (end_nl - start_nl) * 1000
cost_nl = result_nl['Plan']['Total Cost']
print(f"✅ Nested Loop Done: {time_nl:.2f} ms")

----

## 3. Experiment 7.2: The Hash Join (The "Engineering Standard")
The Hash Join is the default for most Data Engineering workloads.
1. **Build Phase**: It takes the smaller table (Users) and builds a Hash Map in memory (RAM).
    - `Key: user_id -> Value: Row Location`
2. **Probe Phase**: It scans the larger table (Orders) once. For every row, it looks up the `user_id` in the Hash Map.

**The Physics**: Complexity is $O(N + M)$. We touch every row exactly once. No looping back and forth.

In [None]:
# Force Hash Join
cur.execute("SET enable_nestloop = OFF;")
cur.execute("SET enable_hashjoin = ON;")
cur.execute("SET enable_mergejoin = OFF;")

print("⏳ Running Hash Join...")
start_hj = time.time()
cur.execute(f"EXPLAIN (ANALYZE, FORMAT JSON) {query}")
result_hj = cur.fetchone()[0][0]
end_hj = time.time()

time_hj = (end_hj - start_hj) * 1000
cost_hj = result_hj['Plan']['Total Cost']
print(f"✅ Hash Join Done: {time_hj:.2f} ms")

----

## 4. Experiment 7.3: The Sort-Merge Join (The "Zipper")
The Merge Join is beautiful but needy. It requires both tables to be physically sorted by the join key (`user_id`).
1. It puts a pointer at the top of Table A and Table B.
2. It moves them down together, "zipping" matches.

**The Physics**: If the data is not sorted, the database must sort it first ($O(N \log N)$), which is expensive.We will try this join without pre-sorting the data to see the cost of that "Sort" step.

In [None]:
# Force Merge Join
cur.execute("SET enable_nestloop = OFF;")
cur.execute("SET enable_hashjoin = OFF;")
cur.execute("SET enable_mergejoin = ON;")

print("⏳ Running Merge Join...")
start_mj = time.time()
cur.execute(f"EXPLAIN (ANALYZE, FORMAT JSON) {query}")
result_mj = cur.fetchone()[0][0]
end_mj = time.time()

time_mj = (end_mj - start_mj) * 1000
cost_mj = result_mj['Plan']['Total Cost']
print(f"✅ Merge Join Done: {time_mj:.2f} ms")

---

## 5. Visualization and Analysis
Let's compare the three gladiators.

In [None]:
methods = ['Nested Loop', 'Hash Join', 'Merge Join']
times = [time_nl, time_hj, time_mj]
costs = [cost_nl, cost_hj, cost_mj]

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))

# Chart 1: Execution Time
sns.barplot(x=methods, y=times, ax=ax1, palette="viridis")
ax1.set_title("Actual Execution Time (ms)")
ax1.set_ylabel("Milliseconds (Lower is Better)")
for i, v in enumerate(times):
    ax1.text(i, v + 10, f"{v:.0f}", ha='center', fontweight='bold')

# Chart 2: Optimizer Cost
sns.barplot(x=methods, y=costs, ax=ax2, palette="magma")
ax2.set_title("Optimizer Cost Estimate")
ax2.set_ylabel("Cost Units")

plt.tight_layout()
plt.show()

# Restore defaults
cur.execute("SET enable_nestloop = ON;")
cur.execute("SET enable_hashjoin = ON;")
cur.execute("SET enable_mergejoin = ON;")

## 5.1 The "Why" behind the results
1. **Nested Loop (The Villain):**
    - **Expected**: It was the slowest (74ms) and had the highest predicted cost.
    - **Why**: Even with the index, the CPU had to jump back and forth 500,000 times. The Optimizer correctly identified this as the worst option (look at that massive purple bar!).
2. **Hash Join (The Optimizer's Favorite):**
    - **Expected**: The Optimizer gave this the lowest cost (the shortest purple bar). This means if you hadn't forced anything, the database would have chosen this path.
    - **Why**: Hash Joins are the "safe bet" for large, unsorted data.
3. **Merge Join (The Surprise Winner)**:
    - **Unexpected but Logical**: It was actually the fastest (47ms), beating the Hash Join (60ms), even though the Optimizer thought it would be more expensive.
    - **The Physics**: The "Sort" step (required for Merge Join) turned out to be cheaper than the Optimizer feared. Modern CPUs are incredibly fast at sorting simple integers (like `user_id`) in L2/L3 Cache. The Optimizer was conservative, expecting the sort to be slower.