# Module 11: Distributing the Load
**Goal**: Shatter the illusion of "Infinite Scalability." The Lesson: When you add more computers, you don't just get more power; you get a "Network Tax." We will demonstrate why Shuffles kill performance and how Skew creates "straggler" nodes that slow down the entire cluster.

## 1. Setup and "Cluster" Initialization
First, we load the heavy data (`clickstream.parquet`) and the lookup data (`users.parquet`). We will then define a helper function to simulate a 4-Node Cluster.

In [None]:
import pandas as pd
import numpy as np
import duckdb
import matplotlib.pyplot as plt
import time
import hashlib
import sys

# Configure Layout
plt.style.use('seaborn-v0_8')
pd.set_option('display.max_columns', None)

print("--- 1. CONNECTING TO DATA ---")
# Load the "Justin Bieber" Skew dataset (2M rows)
# If this file doesn't exist from previous chapters, we generate a mock version here for safety.
try:
    df_clicks = pd.read_parquet('../data/clickstream.parquet')
    print(f"Loaded Clickstream: {len(df_clicks):,} rows (The 'Big' Table)")
except FileNotFoundError:
    print("Generating Clickstream Data...")
    # 50% of rows are User ID 1 (The Skew), the rest are random
    ids = np.concatenate([np.ones(1_000_000), np.random.randint(2, 100000, 1_000_000)])
    np.random.shuffle(ids)
    df_clicks = pd.DataFrame({'user_id': ids.astype(int), 'event_time': range(2_000_000)})

# Load Users (100k rows)
try:
    df_users = pd.read_parquet('../data/users.parquet')
except FileNotFoundError:
    df_users = pd.DataFrame({'user_id': range(1, 100001), 'country': 'USA'})

print(f"Loaded Users: {len(df_users):,} rows (The 'Small' Table)")

# SIMULATION CONFIG
NUM_NODES = 4

def get_node_id(key, num_nodes=NUM_NODES):
    """Simulates a hash-based sharding function."""
    # Simple modulo hashing
    return key % num_nodes

print("\n--- CLUSTER SIMULATION READY ---")
print(f"Simulating a {NUM_NODES}-Node Cluster.")

----

## 2. Experiment 1: The "Hot Spot" (Data Skew)
**The Concept**: In a distributed system, we split data across nodes using a Sharding Key. A common strategy is `Hash(ID) % NodeCount`. However, if one user has 50% of the data (e.g., Justin Bieber on Twitter), that one node will fill up while the others sit idle. This is Data Skew. The query is only as fast as the slowest node.

**The Hypothesis**: "If we shard `clickstream` by `user_id`, will the work be distributed evenly across our 4 nodes?"

**The Experiment**: We will simulate distributing the rows to 4 lists (Nodes) based on the `user_id`.

In [None]:
# --- STEP 1: HYPOTHESIS ---
# We have 2M rows. Ideally, each node gets 500k rows.
# Let's see what actually happens due to the content of the data.

# --- STEP 2: RUN EXPERIMENT ---
start_time = time.time()

# Create 4 empty "Nodes"
nodes = {i: 0 for i in range(NUM_NODES)}

# Simulate the Sharding: Calculate destination node for every row
# We use pandas vectorized operations for speed, but the logic is physical sharding
destinations = df_clicks['user_id'] % NUM_NODES
distribution = destinations.value_counts().sort_index()

sharding_time = time.time() - start_time

print(f"Sharding calculation took: {sharding_time:.4f} seconds")
print("\nRow Counts per Node:")
for node_id, count in distribution.items():
    print(f"Node {node_id}: {count:,} rows")

# --- STEP 3: VISUALIZE ---
plt.figure(figsize=(10, 5))
bars = plt.bar(distribution.index, distribution.values, color=['green', 'green', 'red', 'green'])
plt.title('The "Hot Spot": Row Distribution per Node')
plt.xlabel('Node ID')
plt.ylabel('Row Count')
plt.xticks(range(NUM_NODES))
plt.grid(axis='y')

# Annotate the skewed node
skewed_node_idx = distribution.idxmax()
plt.annotate('The Justin Bieber Node\n(Straggler)', 
             xy=(skewed_node_idx, distribution[skewed_node_idx]), 
             xytext=(skewed_node_idx + 0.5, distribution[skewed_node_idx]),
             arrowprops=dict(facecolor='black', shrink=0.05))

plt.show()

**The Physics**: You should see one bar significantly higher than the others.
- **The Straggler Problem**: If you run a query like SELECT COUNT(*) ... GROUP BY user_id, Nodes 0, 2, and 3 will finish quickly and sit idle. Node 1 (with 50% of the data) is still working.
- **Implication**: Your expensive 100-node cluster is effectively reduced to the speed of a single node because of a poor choice of Sharding Key on skewed data.

----

## 3. Experiment 2: The Shuffle (The Network Tax)
**The Concept**: The most expensive operation in a distributed system is the Shuffle. If Node A needs to join data that lives on Node B, we must send that data over the network.
- **Scenario A (Co-Located)**: Data is pre-sorted/partitioned. Node A has all the data it needs. Zero network traffic.
- **Scenario B (Shuffle Required)**: Data is random. We must serialize it and send it across the wire to group it.

**The Hypothesis**: "Is it faster to process data that is already locally aligned (No Shuffle) vs. data that needs to be moved (Shuffle)?"

**The Experiment**: We will simulate the "Physics" of a shuffle by forcing a memory copy and re-organization of the data, versus a simple pass-through.

In [None]:
# --- STEP 1: PREPARE DATA ---
# create a dataset that requires grouping
data_size = 1_000_000
df_shuffle = pd.DataFrame({
    'key': np.random.randint(0, 1000, data_size),
    'value': np.random.randn(data_size)
})

# --- STEP 2: RUN EXPERIMENT ---

# Case 1: LOCAL (No Shuffle)
# Simulates data already partitioned by Key. 
# We just groupby locally on the existing chunk.
start_local = time.time()
# In a sorted/partitioned scenario, the DB just scans and aggregates linearly
local_result = df_shuffle.groupby('key')['value'].sum()
time_local = time.time() - start_local

# Case 2: NETWORK SHUFFLE
# Simulates having to redistribute data.
# 1. Hash Partition the data (CPU Cost)
# 2. "Move" data (Memory Copy / Serialization Cost simulated by copy)
# 3. GroupBy on new partitions
start_shuffle = time.time()

# A. Partitioning (The CPU cost of deciding where data goes)
df_shuffle['partition'] = df_shuffle['key'] % 4

# B. The Exchange (The I/O cost of moving data)
# We simulate the network cost by explicitly copying data into new buffers
node_buffers = []
for i in range(4):
    # This copy simulates serialization + network transmission
    part = df_shuffle[df_shuffle['partition'] == i].copy()
    node_buffers.append(part)

# C. Final Aggregation (The Receive side)
final_results = []
for part in node_buffers:
    final_results.append(part.groupby('key')['value'].sum())

time_shuffle = time.time() - start_shuffle

# --- STEP 3: VISUALIZE ---
times = [time_local, time_shuffle]
labels = ['Local Compute\n(No Shuffle)', 'Distributed Shuffle\n(Network Tax)']

plt.figure(figsize=(10, 5))
plt.bar(labels, times, color=['blue', 'orange'])
plt.title('Cost of The Shuffle: Local vs. Networked')
plt.ylabel('Execution Time (s)')
for i, v in enumerate(times):
    plt.text(i, v, f"{v:.4f}s", ha='center', va='bottom')

plt.show()

**The Physics**:
- **CPU is fast, I/O is slow**. Even though we only simulated memory copies (not actual network latency), the "Shuffle" is significantly slower.
- **Serialization**: In a real cluster, the Shuffle involves pickling data, putting it in a TCP packet, sending it over a wire, and unpickling it.
- **Takeaway**: A "Broadcast Join" or good "Data Modeling" tries to eliminate this orange bar.

---

## 4. Experiment 3: Broadcast vs. Shuffle Join
**The Concept**: When joining a Large Table (Clickstream) and a Small Table (Users):
- **Shuffle Join**: We cut both tables into pieces and shuffle both across the network so keys align. (Moving the Mountain).
- **Broadcast Join**: We send a copy of the entire small table to every node. The large table stays put. (Moving the Climber).

**The Hypothesis**: "If the small table fits in RAM, Broadcast will crush Shuffle."

**The Experiment**: We will use DuckDB to compare a join where we hint for a Broadcast vs. a generic join plan (simulated logic).

**Note**: Since DuckDB is single-node, we simulate the 'physics' by comparing the data volume moved.

In [None]:
# --- STEP 1: SETUP ---
con = duckdb.connect()
con.register('clicks', df_clicks) # 2M Rows
con.register('users', df_users)   # 100k Rows

# --- STEP 2: MEASURE DATA MOVEMENT ---
# In a physical distributed system, the cost is proportional to bytes transmitted.

# COST MODEL:
# Shuffle Join Cost = Size(Table A) + Size(Table B) (Everything moves)
# Broadcast Join Cost = Size(Table B) * N_Nodes      (Only small table moves)

size_clicks_mb = df_clicks.memory_usage(deep=True).sum() / (1024 * 1024)
size_users_mb  = df_users.memory_usage(deep=True).sum() / (1024 * 1024)
num_nodes = 10  # Let's imagine a 10-node cluster

shuffle_cost = size_clicks_mb + size_users_mb
broadcast_cost = size_users_mb * num_nodes

print(f"Dataset Sizes:")
print(f"  Big Table:   {size_clicks_mb:.2f} MB")
print(f"  Small Table: {size_users_mb:.2f} MB")
print(f"\nSimulated Network Traffic (10 Nodes):")
print(f"  Shuffle Join:   {shuffle_cost:.2f} MB (Moving the Big Table)")
print(f"  Broadcast Join: {broadcast_cost:.2f} MB (Replicating Small Table)")

# --- STEP 3: VISUALIZE ---
costs = [shuffle_cost, broadcast_cost]
labels = ['Shuffle Join\n(Move Big Table)', 'Broadcast Join\n(Copy Small Table)']

plt.figure(figsize=(10, 5))
bars = plt.bar(labels, costs, color=['red', 'green'])
plt.title('Network Traffic: Shuffle vs. Broadcast')
plt.ylabel('Data Moved (MB)')

# Add Ratio
improvement = shuffle_cost / broadcast_cost
plt.annotate(f"{improvement:.1f}x Less Traffic!", 
             xy=(1, broadcast_cost), 
             xytext=(1, shuffle_cost/2),
             arrowprops=dict(facecolor='black', arrowstyle='->'),
             ha='center')

plt.show()

# --- STEP 4: PROOF IN DUCKDB ---
# While DuckDB isn't distributed, we can see how fast the join is 
# when indexes (hash maps) are built on the small table.

print("Running Actual Join in DuckDB...")
start_join = time.time()
con.execute("SELECT count(*) FROM clicks JOIN users ON clicks.user_id = users.user_id").fetchall()
print(f"Join Time: {time.time() - start_join:.4f}s")

## The Physics:

- **The Math**: If BigTable > SmallTable * NodeCount, Broadcast wins.
- **The Bottleneck**: Network bandwidth is usually 10Gbps. Memory bandwidth is 50GBps. Moving 30MB (Broadcast) is instant; moving 100GB (Shuffle) takes minutes.
- **Spark/Snowflake Tip**: If you see a "Shuffle Hash Join" in your query plan for a small lookup table, you are wasting money. Force a Broadcast.