A concurrent MapReduce framework implementation with a custom thread pool library for efficient parallel processing of large distrubuted file systems.
- Library Descriptions
- Design Choices
- Synchronization Primitives
- Job Queue Implementation
- Partition Implementation
- Testing
- Thread Scaling Analysis
The ThreadPool library provides a generic, reusable thread pool implementation for executing concurrent tasks. It manages a fixed number of worker threads that process jobs from a priority queue. The library handles thread lifecycle management, and provides a simple API for job submission and completion checking.
Key Features:
- Fixed-size thread pool with configurable worker count
- Priority-based job queue (shortest job first)
- Thread-safe job submission and execution
- Barrier synchronization to wait for job completion
- Clean shutdown and resource cleanup
The MapReduce library implements the MapReduce programming model for distributed data processing. It provides a framework where users define mapper and reducer functions, and the library handles parallel execution, intermediate data partitioning, and coordination between phases.
Key Features:
- Parallel map phase across multiple input files
- Hash-based partitioning of intermediate key-value pairs
- Parallel reduce phase across partitions
- Automatic thread pool management
Choice: Use a fixed number of worker threads created at initialization.
Rationale:
- Avoids the overhead of creating/destroying threads for each job
- Provides predictable resource usage
- Simplifies synchronization (no dynamic thread management)
- Matches typical MapReduce workloads where parallelism is known upfront
Choice: Order jobs by size with smaller jobs having higher priority.
Rationale:
- Smaller jobs complete faster, improving overall throughput
- Reduces average completion time across all jobs
- Prevents small jobs from waiting behind large ones
- Works well with file-based MapReduce where file sizes vary
Choice: Each partition has its own mutex rather than using a single global lock.
Rationale:
- Reduces lock contention during the map phase
- Allows true parallel writes to different partitions
- Scales better with increasing partition count
Choice: Maintain key-value pairs in sorted order within each partition.
Rationale:
- Groups identical keys together naturally
- Simplifies the reduce phase (all values for a key are consecutive)
Choice: Use ThreadPool_check() to wait for all mappers before starting reducers.
Rationale:
- Ensures all intermediate data is produced before reduction begins
- Eliminates need for complex synchronization between phases
- Matches the MapReduce programming model (map must complete before reduce)
- Purpose: Protects the job queue and thread pool state
- Protects:
- Job queue modifications (push/pop operations)
- Worker thread status variables (
waiting,shutdown) - Condition variable wait/signal operations
-
ThreadPool_t->notEmpty- Purpose: Signals when jobs are available in the queue
- Signaled by:
ThreadPool_add_job()after adding a job - Waited on by: Worker threads when queue is empty
-
ThreadPool_t->isEmpty- Purpose: Signals when the job queue becomes empty
- Signaled by:
ThreadPool_get_job()when dequeuing the last job - Waited on by:
ThreadPool_check()to detect job completion
-
ThreadPool_t->idle- Purpose: Signals when a worker thread becomes idle
- Signaled by: Worker threads before waiting for work
- Waited on by:
ThreadPool_check()to detect all threads idle
- Location:
Partition->lock(one per partition) - Purpose: Protects partition data structure during concurrent access
- Protects:
- Linked list of key-value pairs
- Partition size counter
- Insertion operations during map phase
Note: No mutex is needed during the reduce phase because each partition is processed by exactly one reducer thread, guaranteeing exclusive access.
The job queue is implemented as a priority queue using a sorted linked list
Algorithm:
- If queue is empty or new job has smallest size, insert at head
- Otherwise, traverse list to find insertion point
- Insert job maintaining sorted order (ascending order by job size)
- Job is picked always picked by a worker hread off the head
The thread pool implements a Shortest Job First scheduling policy where jobs with smaller size values are executed before larger ones.
Job Priority:
- Jobs are ordered by their
sizeparameter
In MapReduce Context:
-
Mapper jobs: Prioritized by file size (bytes)
-
Reducer jobs: Prioritized by partition size (number of key-value pairs)
Partitions are implemented as sorted linked lists of key-value pairs:
typedef struct Partition {
Kv_pair *head_kv_pair; // Head of linked list
long size; // Number of key-value pairs
pthread_mutex_t lock; // Per-partition mutex
} Partition;
typedef struct Kv_pair {
char *key; // Key string (owned by partition)
char *value; // Value string (owned by partition)
struct Kv_pair *next; // Next pair in list
} Kv_pair;Sorting: Keys are maintained in ascending alphabetical order using strcmp().
Ownership Rules:
- Keys and Values: Owned by partition, allocated with
strdup() - Returned Values: Ownership transferred to caller (reducer must free)
A comprehensive test suite (test_suite.c) was developed to validate both the ThreadPool and MapReduce libraries. The suite contains 10 major test cases covering functionality, concurrency, edge cases, and performance characteristics.
Purpose: Verify basic initialization and cleanup
Method:
- Create a thread pool with 4 threads
- Check that structure is properly initialized
- Verify thread count matches request
- Destroy pool and ensure clean shutdown
Validates:
- Memory allocation
- Thread creation
- Resource cleanup
- No memory leaks
Purpose: Verify jobs execute correctly
Method:
- Create pool with 4 threads
- Submit 10 jobs that increment a shared counter
- Wait for completion using
ThreadPool_check() - Verify counter equals 10
Validates:
- Job submission works
- Jobs execute exactly once
- Thread pool processes all jobs
- Barrier synchronization
Purpose: Verify parallel execution (not sequential)
Method:
- Create pool with 4 threads
- Submit 8 jobs that sleep for 100ms each
- Measure total elapsed time
- Verify time < 400ms (proves parallelism)
Validates:
- Jobs run in parallel, not sequentially
Purpose: Verify job ordering by size
Method:
- Create pool with 1 thread (to enforce ordering)
- Submit 5 jobs with sizes 5, 4, 3, 2, 1
- Track execution order
- Verify smallest job (size 1) executes first
Validates:
- Priority queue maintains correct order
- Scheduling policy implementation
Purpose: Verify stability under load
Method:
- Create pool with 8 threads
- Submit 1000 jobs incrementing a counter
- Wait for completion
- Verify counter equals 1000
Validates:
- No race conditions with many jobs
- No memory leaks under load
- Queue handles many insertions
Purpose: Verify hash function correctness
Method:
- Hash the same key multiple times
- Verify consistent partition assignment
- Hash 100 different keys
- Verify all partitions are in valid range [0, num_partitions)
Validates:
- Hash function is deterministic
- Partition indices are always valid
- No out-of-bounds access
Purpose: End-to-end MapReduce functionality
Method:
- Create two test files:
- Run MapReduce with 2 workers and 10 partitions
- Verify output counts:
- "hello": 2
- "world": 2
- "mapreduce": 3
Validates:
- Mapper correctly processes files
MR_Emit()works correctly- Partitioning distributes keys
- Reducer aggregates values
MR_GetNext()iterates correctly- Output is accurate
Purpose: Different use case (character-level processing)
Method:
- Create file: "aaa bbb ccc"
- Map each character (excluding spaces/newlines)
- Count occurrences
- Verify: 'a'=3, 'b'=3, 'c'=3
Validates:
- Framework handles different data types
- Multiple values per key handled correctly
Purpose: Verify scaling across many input files
Method:
- Create 10 files, each containing "test word test"
- Run MapReduce with 4 workers and 10 partitions
- Verify counts:
- "test": 20 (2 per file × 10 files)
- "word": 10 (1 per file × 10 files)
Validates:
- Multiple mappers run concurrently
- Results aggregate across files correctly
- Thread pool distributes work efficiently
- No data loss or duplication
Purpose: Edge case handling
Method:
- Create empty input file
- Run MapReduce
- Verify no crashes
- Verify no output files created
Validates:
- Graceful handling of empty inputs
- No segmentation faults
- No incorrect output generation
A benchmark was conducted to measure how the MapReduce framework scales with increasing number of worker threads. The benchmark processed 20 input files with 5,000 words each (100,000 total words) across 10 partitions.
| Threads | Time (ms) | Speedup | Efficiency (%) |
|---|---|---|---|
| 1 | 6165.20 | 1.00x | 100.0% |
| 2 | 3996.71 | 1.54x | 77.1% |
| 4 | 3221.86 | 1.91x | 47.8% |
| 6 | 6642.24 | 0.93x | 15.5% |
| 8 | 7789.14 | 0.79x | 9.9% |
The graph shows the MapReduce thread scaling performance measured on the lab machine, illustrating reduced execution time upto a minima.
Several factors limit speedup beyond a certain number of threads:
-
Amdahl's law
- Serial portions of code (initialization, coordination) create a theoretical limit on parallelization benefits
- Speedup = 1 / (s + p/n) where s=serial fraction, p=parallel fraction
-
Synchronization overhead
- Lock contention increases with more threads
ThreadPool_check()must wait for all threads to complete- Condition variable signaling becomes more expensive
-
Memory bandwidth saturation
- All threads compete for memory access
- Hash table/partition writes cause cache invalidation
- Memory bandwidth becomes the bottleneck
-
CPU core limits
- Beyond physical core count, threads share execution units
- Context switching overhead increases
- Hyperthreading provides diminishing returns