This project implements a distributed MapReduce framework similar to Google's original MapReduce execution model.
The system is deployed across multiple AWS EC2 instances, using HDFS for shared storage, Protobuf for intermediate serialization, and custom TCP-based shuffle between Mappers and the Reducer.
The framework executes two example applications:
- Word Count (wordc)
- Grep Search (grep)
The goal was to explore:
- Distributed coordination
- Producer–consumer synchronization
- Shuffle scaling behavior
- The impact of buffer sizes, mapper parallelism, and scheduling barriers
flowchart LR
subgraph Client
Job[Submit Job]
end
subgraph Master
Split[Split Input]
Assign[Assign Shards]
Monitor[Track Worker Status]
end
subgraph Mappers
M1[Mapper 1]
M2[Mapper 2]
M3[Mapper 3]
Emit[Emit KV Batches]
end
subgraph Network
RPC[gRPC Streaming RPC]
end
subgraph Reducer
Q1[(Active Buffer)]
Q2[(Processing Buffer)]
Reduce[Reduce Function Execution]
Out[(Output)]
end
Job --> Master --> Mappers
Emit --> RPC --> Q1
Q1 <--> Q2 --> Reduce --> Out
| Component | Implementation Detail |
|---|---|
| Mapper Execution | Multi-threaded per Worker |
| Shuffle Phase | TCP streaming + length-prefixed Protobuf KV records |
| Synchronization | Pthread mutex + condvars for bounded buffers |
| Reducer Behavior | Single-threaded merge + application reduce() |
| HDFS Integration | Final output stored via Hadoop CLI |
| Barrier Mode | Optional barrierEnable forces Reduce to wait for all Mappers |
Edit cluster.conf:
reducer_ip=<REDUCER_IP>:50051
mapper_buffer_size=<bytes>
threads_per_unit=<N>
mapper_ip=<WORKER_1_IP>
mapper_ip=<WORKER_2_IP>
...
These are started automatically by the driver scripts, but can be launched manually:
./grep_reducer_server <total_threads> <buffer_size> <output_path> <barrier> grep &
./grep_worker_server &./grep_driver grep <threads_per_unit> <input_file> <output_hdfs_file> "<pattern>" <barrierEnable>
./wordc_driver wordc <threads_per_unit> <input> <output> " .,:;-?!()[]-+<>" <barrierEnable>
Experiments were run on AWS:
| Role | Instance Type |
|---|---|
| Master + HDFS NameNode | m5a.xlarge |
| Each Mapper Node | m5a.xlarge |
| Reducer Node | t3a.xlarge |
| Parameter | Values |
|---|---|
| Mapper Units | {1, 4} |
| Threads per Mapper Unit | {4, 16} |
| Buffer Size | {10 KB, 100 KB, 1 MB} |
| Barrier Enabled? | {Yes, No} |
| Runs per Config | 3 runs (mean + std dev) |
I initially assumed “more mappers and more threads ⇒ faster.” The data disproved that and exposed a core synchronization bug in my non-barrier path.
Expectation: With Barrier=0, the reducer should overlap with mappers and finish sooner.
Observed: For large inputs (e.g., grep Testcase5), Barrier=0 was 6–7× slower (≈38 s vs. ≈6 s).
Root cause: The reducer’s consumer thread held the same mutex used by the producer RPC handlers while:
- deserializing batches,
- and calling
reduce_fn().
That serialized the entire pipeline: while the reducer was “inside reduce,” all incoming RPCs waited on the lock. With Barrier=1, producers drain first and never contend with the reducer, so the job runs faster.
Fix (applied/needed):
- Hold the lock only to copy bytes out of the shared buffer into a local scratch buffer;
unlock immediately, then parse + call
reduce_fn()outside the lock. - (Even better) give each mapper a dedicated queue (separate mutex/condvars) and let the reducer merge without contending on a single global lock.
- Consider double-buffering or a ring buffer so producers never wait on long reducer work.
- If keeping
Barrier=0, ensure the reducer is truly streaming: small atomic copies, minimal critical sections, backpressure signals vianot_full/not_empty.
Expectation: 4 mapper units should be faster than 1. Observed: 4 units were consistently slower (e.g., for Testcase5 with barrier on: ~6–7 s at 1 unit vs. 13–20 s at 4 units).
Why: A single reducer (on t3a.xlarge) is the bottleneck. Adding more mappers increases:
- inbound shuffle pressure (more senders, more sockets),
- reducer-side lock contention,
- and network overhead.
Result: the reducer saturates, so extra mapper parallelism just amplifies queueing and jitter.
Implication: Throughput is reducer-bound; scale the reducer (faster instance, more reducer threads/partitions) before scaling mapper count.
Expectation: 16 threads beats 4 threads. Observed: Any benefit appeared only for small inputs with a single mapper unit (≈10–12% improvement). With 4 units or larger files, 16 threads was often slower due to:
- CPU scheduling overhead on mapper hosts,
- increased shuffle burstiness,
- and more pressure on the single reducer.
Takeaway: Past a modest degree of mapper parallelism, the reducer dominates. Adding threads just pushes the bottleneck harder.
Expectation: Larger buffers reduce flush frequency → faster. Observed: Mixed results; in many cases no improvement or slower (e.g., Testcase5, 1 MU × 16 TPU, barrier on: 1 MB ≈ 15.6 s vs. 100 KB ≈ 7.8 s).
Why this happens:
- Large batches increase latency and make the pipeline bursty; the reducer experiences long stalls per flush.
- Bigger copies/allocations add CPU and memory pressure.
- When combined with the mutex bug (above), large critical sections multiply the damage.
Guideline: Prefer moderate buffer sizes (e.g., 100 KB) that keep steady flow; only increase if profiling shows under-utilized network without added latency.
- Fix the critical lock scope in
reducer_thread_func: copy → unlock → parse +reduce_fn()outside the lock. - Consider per-producer queues or lock sharding; avoid one global hot lock.
- Use smaller, regular batches (100 KB range tested well) to reduce jitter.
- If you need to scale, scale the reducer (bigger instance, or N reducers with partitioned keys) before increasing mapper units/threads.
- Use
Barrier=1as a safe default until the non-barrier streaming path is fully de-contended.
This is the key lesson from the measurements: parallelism helps only when the critical path can consume it. In this framework, that means ensuring the reducer is never forced to serialize the whole job via a shared lock or oversized, bursty batches.
| Configuration Trend | Result |
|---|---|
| Increasing threads per mapper beyond 4 | Causes CPU contention + reducer queue pressure. |
| Increasing mapper units from 1 → 4 | Adds network overhead → ~2× slowdown for grep. |
| Increasing buffer size | Improves performance only when reducer can keep up. |
| Barrier ON | Prevents reducer overload, improves stability at high parallelism. |
| Barrier OFF | Slightly faster for low parallelism, but unstable for high thread counts. |
The reducer becomes the system bottleneck when mapper parallelism increases. Performance improves most when concurrency is balanced — not maximized.
| Area | Choice | Reason |
|---|---|---|
| Shuffle Format | Protobuf messages | Compact, structured, language-independent |
| Intermediate Transfer | Streamed TCP sockets | Lower overhead than gRPC for bulk data |
| One Reducer Thread | Simplifies ordering; avoids reducer-side contention | |
| Per-Mapper Bounded Buffers | Prevents infinite memory growth | |
| Optional Barrier | Controls backpressure during heavy mapper throughput |
.
├── mr_framework.cpp # Core init/destroy logic
├── mr_rpc_server.cc # Master/Worker/Reducer RPC services
├── mr_comm_helpers.cc # Protobuf-based send/recv wrappers
├── wordc_* # Word Count app (driver, reducer, worker)
├── grep_* # Grep app (driver, reducer, worker)
├── cluster.conf # Runtime cluster configuration
└── run_evaluation.sh # Automated performance testing (72 runs/test)
Pratyush Kumar