A modern, elegant PHP library for parallel execution using process forking. Run code in separate processes with shared memory communication, graceful shutdown, and comprehensive error handling.
- 🚀 Simple API - Start threads with a single line of code
- 🔄 Shared Memory - Communicate between parent and child processes
- ⏱️ Timeouts - Automatic killing of long-running threads
- 🛑 Graceful Shutdown - Signal threads to stop cleanly
- 📡 Signal Handling - Auto-handles SIGTERM, SIGINT, SIGHUP in child processes
- 📊 Progress Tracking - Monitor thread progress in real-time
- 🏊 Thread Pools - Batch operations with concurrency limits
- đź”’ Thread-Safe - Mutex-protected shared memory access
- ⚡ Parallel Map - Process arrays in parallel with ease
- 🔌 Pluggable Serializers - Use PHP serialize, JSON, or custom formats
- đź§ą Memory Cleanup - Automatic cleanup with leak detection utilities
- PHP 8.3+
- Linux/Unix (process forking via
pcntl) - PHP Extensions:
pcntl,shmop,sysvsem
composer require h4or/threadablesuse H4or\Threadables\Thread;
// Run code in a separate process (non-blocking)
$handle = Thread::run(function ($ctx) {
$ctx->set('result', 'Hello from thread!');
});
// Do other work while thread runs...
$handle->join(); // Wait for completion
echo $handle->get('result'); // "Hello from thread!"$handle = Thread::runBlocking(function ($ctx) {
$ctx->set('answer', 42);
});
echo $handle->get('answer'); // 42// Process items in parallel
$results = Thread::map([1, 2, 3, 4, 5], function ($item, $ctx) {
$ctx->set('result', $item * 2);
});
// $results = [2, 4, 6, 8, 10]When you start a thread, you get a ThreadHandle that lets you:
$handle = Thread::run(fn($ctx) => sleep(5));
$handle->isAlive(); // Check if running
$handle->join(); // Wait for completion
$handle->stop(); // Request graceful shutdown
$handle->kill(); // Force kill immediately
$handle->getExitCode(); // Get exit code (0 = success)
$handle->getFailure(); // Get exception details if failed
$handle->getRuntime(); // Get execution time in secondsThreads communicate via shared memory:
$handle = Thread::run(function ($ctx) {
// Write individual fields
$ctx->set('status', 'processing');
$ctx->set('progress', 50);
// Write entire data structure
$ctx->write(['items' => [1, 2, 3], 'count' => 3]);
});
// Parent can read while thread runs
while ($handle->isAlive()) {
echo "Progress: " . $handle->get('progress', 0) . "%\n";
usleep(100000);
}
$handle->join();
$data = $handle->read(); // Get all dataThreads can check for shutdown requests:
$handle = Thread::run(function ($ctx) {
while (!$ctx->shouldStop()) {
// Do work...
usleep(100000);
}
$ctx->set('status', 'stopped gracefully');
});
sleep(1);
$handle->stop(); // Request shutdown
$handle->join();Exceptions in threads are captured:
$handle = Thread::runBlocking(function ($ctx) {
throw new RuntimeException('Something went wrong');
});
if ($handle->failed()) {
$failure = $handle->getFailure();
echo $failure->message; // "Something went wrong"
echo $failure->class; // "RuntimeException"
echo $failure->file; // "/path/to/file.php"
echo $failure->line; // 42
echo $failure->trace; // Full stack trace
}Automatically kill threads that run too long:
$handle = Thread::runWithTimeout(function ($ctx) {
sleep(60); // Long operation
}, timeoutSeconds: 5.0);
if ($handle->wasTimedOut()) {
echo "Thread was killed after 5 seconds";
}For batch operations with concurrency control:
use H4or\Threadables\Support\ThreadPool;
// Map with concurrency limit (max 3 threads at once)
$results = ThreadPool::map(
items: $largeArray,
callback: fn($item, $ctx) => $ctx->set('result', process($item)),
concurrency: 3
);
// Run multiple tasks
$handles = ThreadPool::run([
fn($ctx) => $ctx->set('result', 'Task 1'),
fn($ctx) => $ctx->set('result', 'Task 2'),
fn($ctx) => $ctx->set('result', 'Task 3'),
], concurrency: 2);
// Batch operations
ThreadPool::joinAll($handles);
ThreadPool::killAll($handles);
ThreadPool::requestShutdownAll($handles);
// Query status
$running = ThreadPool::getRunning($handles);
$completed = ThreadPool::getCompleted($handles);
$succeeded = ThreadPool::getSucceeded($handles);
$failed = ThreadPool::getFailed($handles);For reusable thread logic, extend Threadable:
use H4or\Threadables\Threadable;
use H4or\Threadables\Core\ThreadContext;
class DataProcessor extends Threadable
{
public function __construct(
private readonly array $data
) {}
public function execute(ThreadContext $context): void
{
$results = [];
foreach ($this->data as $i => $item) {
if ($context->shouldStop()) {
break;
}
$results[] = $this->processItem($item);
$context->set('progress', ($i + 1) / count($this->data) * 100);
}
$context->set('result', $results);
}
private function processItem(mixed $item): mixed
{
// Your processing logic
return $item;
}
}
// Usage
$processor = new DataProcessor([1, 2, 3, 4, 5]);
$handle = $processor->run();
$handle->join();
$results = $handle->get('result');Child processes automatically handle POSIX signals for graceful shutdown:
// Signals (SIGTERM, SIGINT, SIGHUP) are auto-handled in child processes.
// When a signal is received, $context->shouldStop() returns true.
$handle = Thread::run(function ($ctx) {
while (!$ctx->shouldStop()) {
// This loop will exit gracefully on Ctrl+C or kill signal
doWork();
usleep(100000);
}
$ctx->set('result', 'Gracefully stopped');
});
// In parent: handle Ctrl+C to stop threads
pcntl_async_signals(true);
pcntl_signal(SIGINT, function () use ($handle) {
$handle->requestShutdown();
});By default, PHP's serialize() is used for shared memory data. You can use JSON for portable, human-readable data:
use H4or\Threadables\Support\SynchronizedMemory;
use H4or\Threadables\Serializers\JsonSerializer;
// Create memory with JSON serializer
$memory = new SynchronizedMemory(
serializer: new JsonSerializer()
);
// Or with custom JSON options
$memory = new SynchronizedMemory(
serializer: new JsonSerializer(
encodeFlags: JSON_PRETTY_PRINT | JSON_THROW_ON_ERROR,
decodeFlags: JSON_THROW_ON_ERROR,
depth: 128
)
);Available serializers:
- PhpSerializer (default): Full PHP type support, validates non-serializable types
- JsonSerializer: Portable format, human-readable, loses class information
You can create custom serializers by implementing SerializerInterface:
use H4or\Threadables\Contracts\SerializerInterface;
class MsgpackSerializer implements SerializerInterface
{
public function serialize(mixed $value): string
{
return msgpack_pack($value);
}
public function unserialize(string $data): mixed
{
return msgpack_unpack($data);
}
}The library automatically cleans up shared memory resources. For debugging or manual cleanup:
use H4or\Threadables\Thread;
// Check active resources in current process
echo Thread::getActiveResourceCount(); // e.g., 3
// List all system IPC resources (runs `ipcs -m`)
echo Thread::listSystemResources();
// Clean up orphaned resources by known keys (e.g., after a crash)
Thread::cleanupOrphaned($memoryKey, $mutexKey);| Method | Description |
|---|---|
run(Closure, ?int $memorySize) |
Start non-blocking thread |
runBlocking(Closure, ?int $memorySize) |
Start and wait for completion |
runWithTimeout(Closure, float $timeout, ?int $memorySize) |
Start with auto-kill timeout |
map(array, Closure, ?int $memorySize) |
Process array in parallel |
| Method | Description |
|---|---|
isAlive() / running() |
Check if thread is running |
isFinished() |
Check if thread has completed |
join() / await() |
Wait for thread to complete |
stop(bool $force = false) |
Request shutdown or force kill |
kill() |
Force kill immediately |
pause() / resume() |
Pause/resume thread execution |
get(string $key, $default) |
Read shared data field |
set(string $key, $value) |
Write shared data field |
read() |
Read all shared data |
write(mixed $data) |
Write all shared data |
getResult($default) |
Shorthand for get('result', $default) |
getFailure() |
Get ThreadFailure if failed |
succeeded() / failed() |
Check success/failure status |
getExitCode() |
Get process exit code |
wasTimedOut() |
Check if killed by timeout |
getRuntime() |
Get execution time in seconds |
getState() |
Get current state string |
setName(string) / getName() |
Set/get thread name |
toArray() |
Get complete thread info as array |
| Method | Description |
|---|---|
shouldStop() |
Check if shutdown requested |
get(string $key, $default) |
Read shared data field |
set(string $key, $value) |
Write shared data field |
read() |
Read all shared data |
write(mixed $data) |
Write all shared data |
increment(string $key, int $by) |
Atomically increment counter |
decrement(string $key, int $by) |
Atomically decrement counter |
has(string $key) |
Check if key exists |
forget(string $key) |
Remove a key |
clear() |
Clear all shared data |
| Method | Description |
|---|---|
map(array, Closure, int $concurrency, ?int $memorySize) |
Parallel map with limit |
run(array $tasks, int $concurrency, ?int $memorySize) |
Run tasks with limit |
joinAll(array $handles) |
Wait for all to complete |
killAll(array $handles) |
Kill all threads |
requestShutdownAll(array $handles) |
Request graceful shutdown |
waitAll(array $handles, float $timeout) |
Wait with timeout |
waitAny(array $handles, ?float $timeout) |
Wait for first completion |
getRunning(array $handles) |
Get running threads |
getCompleted(array $handles) |
Get completed threads |
getSucceeded(array $handles) |
Get successful threads |
getFailed(array $handles) |
Get failed threads |
countRunning(array $handles) |
Count running threads |
allCompleted(array $handles) |
Check if all done |
anyRunning(array $handles) |
Check if any running |
allSucceeded(array $handles) |
Check if all succeeded |
anyFailed(array $handles) |
Check if any failed |
The examples/ directory contains 22 runnable examples:
php examples/01_basic_thread.php # Basic thread execution
php examples/02_blocking_execution.php # Blocking execution
php examples/03_shared_data.php # Shared data communication
php examples/04_graceful_shutdown.php # Graceful shutdown handling
php examples/05_error_handling.php # Exception handling
php examples/06_class_based_threadable.php # Class-based threads
php examples/07_parallel_map.php # Parallel array processing
php examples/08_concurrency_limit.php # Concurrency limiting
php examples/09_thread_pool_operations.php # Thread pool batch operations
php examples/10_timeouts.php # Timeout handling
php examples/11_progress_tracking.php # Progress bar tracking
php examples/12_debugging.php # Debug info and toArray()
php examples/13_custom_memory_size.php # Custom shared memory size
php examples/14_multiple_progress_bars.php # Multiple thread monitoring
php examples/15_signal_handling.php # POSIX signal handling (Ctrl+C)
php examples/16_custom_serializers.php # Custom serialization formats
php examples/17_parallel_http.php # Real-world HTTP fetching
php examples/18_parallel_files.php # Parallel file processing
php examples/19_atomic_counters.php # Atomic increment/decrement
php examples/20_producer_consumer.php # Producer-consumer pattern
php examples/21_retry_with_backoff.php # Retry with exponential backoff
php examples/22_memory_cleanup.php # Memory cleanup & debuggingShared memory has a default size of 64KB. For larger data:
// Set memory size per thread
$handle = Thread::run(fn($ctx) => $ctx->write($largeData), memorySize: 1024 * 1024);
// For class-based threads
$threadable = new MyThreadable();
$threadable->setSharedMemorySize(1024 * 1024); // 1MB
$handle = $threadable->run();
// Check if data will fit
if ($handle->wouldFit($data)) {
$handle->write($data);
}# Run tests
composer test
# Run with coverage
composer test:coverage
# Static analysis
composer analyse
# Format code
composer formatMIT License. See LICENSE.md for details.
- Built on GPhpThread for process forking
- Uses Symfony UID for unique identifiers