Skip to content

Commit

Permalink
Merge 4890915 into 4c3c93e
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Nov 4, 2018
2 parents 4c3c93e + 4890915 commit 7fd2bfe
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 65 deletions.
46 changes: 38 additions & 8 deletions README.md
Expand Up @@ -9,11 +9,7 @@
<a href="https://github.com/amphp/parallel/blob/master/LICENSE"><img src="https://img.shields.io/badge/license-MIT-blue.svg?style=flat-square" alt="License"/></a>
</p>

<p align="center"><strong>True parallel processing using multiple processes or native threads for concurrent PHP code execution, <em>without</em> blocking, no extensions required.</strong></p>

<hr/>

`amphp/parallel` is a component for [Amp](https://amphp.org) that provides native threading, multiprocessing, process synchronization, shared memory, and task workers for concurrently executing PHP code. Like other Amp components, this library uses [Coroutines](http://amphp.org/amp/coroutines/) built from [Promises](http://amphp.org/amp/promises/) and [Generators](http://www.php.net/manual/en/language.generators.overview.php) to make writing asynchronous code more like writing synchronous code.
`amphp/parallel` provides *true parallel processing* for PHP using multiple processes or native threads, *without blocking and no extensions required*.

To be as flexible as possible, this library comes with a collection of non-blocking concurrency tools that can be used independently as needed, as well as an "opinionated" worker API that allows you to assign units of work to a pool of worker threads or processes.

Expand All @@ -25,9 +21,41 @@ This package can be installed as a [Composer](https://getcomposer.org/) dependen
composer require amphp/parallel
```

## Requirements
## Usage

The basic usage of this library is to submit blocking tasks to be executed by a worker pool in order to avoid blocking the main event loop.

```php
<?php

require __DIR__ . '/../vendor/autoload.php';

use Amp\Parallel\Worker;
use Amp\Promise;

$urls = [
'https://secure.php.net',
'https://amphp.org',
'https://github.com',
];

$promises = [];
foreach ($urls as $url) {
$promises[$url] = Worker\enqueueCallable('file_get_contents', $url);
}

$responses = Promise\wait(Promise\all($promises));

foreach ($responses as $url => $response) {
\printf("Read %d bytes from %s\n", \strlen($response), $url);
}
```

[`file_get_contents`](https://secure.php.net/file_get_contents) is just used as an example for a blocking function here.
If you just want to fetch multiple HTTP resources concurrently, it's better to use [Artax](https://amphp.org/artax/), our non-blocking HTTP client.

- PHP 7.0+ (no extensions required)
The functions you call must be predefined or autoloadable by Composer so they also exist in the worker processes.
Instead of simple callables, you can also enqueue `Task` instances with `Amp\Parallel\Worker\enqueue()`.

## Documentation

Expand All @@ -51,6 +79,8 @@ Want to hack on the source? A [Vagrant](http://vagrantup.com) box is provided wi

Starting up and logging into the virtual machine is as simple as

vagrant up && vagrant ssh
```bash
vagrant up && vagrant ssh
```

Once inside the VM, you can install PHP extensions with [Pickle](https://github.com/FriendsOfPHP/pickle), switch versions with `newphp VERSION`, and test for memory leaks with [Valgrind](http://valgrind.org).
38 changes: 35 additions & 3 deletions docs/index.md
@@ -1,8 +1,8 @@
---
title: Parallel
title: Parallel processing for PHP
permalink: /
---
**True parallel processing using multiple processes or native threads for concurrent PHP code execution, *without* blocking, no extensions required.**
This package provides *true parallel processing* for PHP using multiple processes or native threads, *without blocking and no extensions required*.

## Installation

Expand All @@ -14,4 +14,36 @@ composer require amphp/parallel

## Usage

This package provides native threading, multiprocessing, process synchronization, shared memory, and task workers for concurrently executing PHP code. To be as flexible as possible, this package includes a collection of non-blocking concurrency tools that can be used independently as needed, as well as an "opinionated" worker API that allows you to assign units of work to a pool of worker threads or processes.
The basic usage of this library is to submit blocking tasks to be executed by a worker pool in order to avoid blocking the main event loop.

```php
<?php

require __DIR__ . '/../vendor/autoload.php';

use Amp\Parallel\Worker;
use Amp\Promise;

$urls = [
'https://secure.php.net',
'https://amphp.org',
'https://github.com',
];

$promises = [];
foreach ($urls as $url) {
$promises[$url] = Worker\enqueueCallable('file_get_contents', $url);
}

$responses = Promise\wait(Promise\all($promises));

foreach ($responses as $url => $response) {
\printf("Read %d bytes from %s\n", \strlen($response), $url);
}
```

[`file_get_contents`](https://secure.php.net/file_get_contents) is just used as an example for a blocking function here.
If you just want to fetch multiple HTTP resources concurrently, it's better to use [Artax](https://amphp.org/artax/), our non-blocking HTTP client.

The functions you call must be predefined or autoloadable by Composer so they also exist in the worker processes.
Instead of simple callables, you can also enqueue `Task` instances with `Amp\Parallel\Worker\enqueue()`.
42 changes: 0 additions & 42 deletions examples/BlockingTask.php

This file was deleted.

24 changes: 24 additions & 0 deletions examples/worker-pool-simple.php
@@ -0,0 +1,24 @@
#!/usr/bin/env php
<?php

require \dirname(__DIR__) . '/vendor/autoload.php';

use Amp\Parallel\Worker;
use Amp\Promise;

$urls = [
'https://secure.php.net',
'https://amphp.org',
'https://github.com',
];

$promises = [];
foreach ($urls as $url) {
$promises[$url] = Worker\enqueueCallable('file_get_contents', $url);
}

$responses = Promise\wait(Promise\all($promises));

foreach ($responses as $url => $response) {
\printf("Read %d bytes from %s\n", \strlen($response), $url);
}
11 changes: 6 additions & 5 deletions examples/worker-pool.php
@@ -1,19 +1,20 @@
#!/usr/bin/env php
<?php
require \dirname(__DIR__).'/vendor/autoload.php';

require \dirname(__DIR__) . '/vendor/autoload.php';

use Amp\Loop;
use Amp\Parallel\Example\BlockingTask;
use Amp\Parallel\Worker\CallableTask;
use Amp\Parallel\Worker\DefaultPool;

// A variable to store our fetched results
$results = [];

// We can first define tasks and then run them
$tasks = [
new BlockingTask('file_get_contents', 'http://php.net'),
new BlockingTask('file_get_contents', 'https://amphp.org'),
new BlockingTask('file_get_contents', 'https://github.com'),
new CallableTask('file_get_contents', 'http://php.net'),
new CallableTask('file_get_contents', 'https://amphp.org'),
new CallableTask('file_get_contents', 'https://github.com'),
];

// Event loop for parallel tasks
Expand Down
6 changes: 3 additions & 3 deletions examples/worker.php
@@ -1,16 +1,16 @@
#!/usr/bin/env php
<?php
require \dirname(__DIR__).'/vendor/autoload.php';
require \dirname(__DIR__) . '/vendor/autoload.php';

use Amp\Parallel\Example\BlockingTask;
use Amp\Parallel\Worker\CallableTask;
use Amp\Parallel\Worker\DefaultWorkerFactory;

Amp\Loop::run(function () {
$factory = new DefaultWorkerFactory();

$worker = $factory->create();

$result = yield $worker->enqueue(new BlockingTask('file_get_contents', 'https://google.com'));
$result = yield $worker->enqueue(new CallableTask('file_get_contents', 'https://google.com'));
\printf("Read %d bytes\n", \strlen($result));

$code = yield $worker->shutdown();
Expand Down
38 changes: 38 additions & 0 deletions lib/Worker/CallableTask.php
@@ -0,0 +1,38 @@
<?php

namespace Amp\Parallel\Worker;

/**
* Task implementation dispatching a simple callable.
*/
final class CallableTask implements Task
{
/** @var string */
private $callable;

/** @var mixed[] */
private $args;

/**
* @param callable $callable Callable will be serialized.
* @param mixed $args Arguments to pass to the function. Must be serializable.
*/
public function __construct(callable $callable, array $args)
{
$this->callable = $callable;
$this->args = $args;
}

public function run(Environment $environment)
{
if ($this->callable instanceof \__PHP_Incomplete_Class) {
throw new \Error('When using a class instance as a callable, the class must be autoloadable');
}

if (\is_array($this->callable) && ($this->callable[0] ?? null) instanceof \__PHP_Incomplete_Class) {
throw new \Error('When using a class instance method as a callable, the class must be autoloadable');
}

return ($this->callable)(...$this->args);
}
}
17 changes: 15 additions & 2 deletions lib/Worker/functions.php
Expand Up @@ -33,15 +33,28 @@ function pool(Pool $pool = null): Pool
/**
* Enqueues a task to be executed by the global worker pool.
*
* @param \Amp\Parallel\Worker\Task $task The task to enqueue.
* @param Task $task The task to enqueue.
*
* @return \Amp\Promise<mixed>
* @return Promise<mixed>
*/
function enqueue(Task $task): Promise
{
return pool()->enqueue($task);
}

/**
* Enqueues a callable to be executed by the global worker pool.
*
* @param callable $callable Callable needs to be serializable.
* @param mixed ...$args Arguments have to be serializable.
*
* @return Promise<mixed>
*/
function enqueueCallable(callable $callable, ...$args)
{
return enqueue(new CallableTask($callable, $args));
}

/**
* Gets a worker from the global worker pool.
*
Expand Down
37 changes: 35 additions & 2 deletions test/Worker/FunctionsTest.php
Expand Up @@ -44,6 +44,40 @@ public function testEnqueue()
$this->assertSame($value, Promise\wait($awaitable));
}

/**
* @depends testPool
*/
public function testEnqueueCallable()
{
$pool = $this->createMock(Pool::class);
$pool->method('enqueue')
->will($this->returnCallback(function (Task $task): Promise {
return new Success($task->run($this->createMock(Environment::class)));
}));

Worker\pool($pool);

$value = 42;

$promise = Worker\enqueueCallable('strval', $value);

$this->assertSame('42', Promise\wait($promise));
}

/**
* @depends testEnqueueCallable
*/
public function testEnqueueCallableIntegration()
{
Worker\pool(new Worker\DefaultPool());

$value = 42;

$promise = Worker\enqueueCallable('strval', $value);

$this->assertSame('42', Promise\wait($promise));
}

/**
* @depends testPool
*/
Expand Down Expand Up @@ -79,7 +113,6 @@ public function testCreate()
->will($this->returnValue($this->createMock(Worker\Worker::class)));

Worker\factory($factory);

$worker = Worker\create();
Worker\create(); // shouldn't throw
}
}

0 comments on commit 7fd2bfe

Please sign in to comment.