Skip to content

Commit

Permalink
Use function instead of class
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed May 4, 2020
1 parent 5c2031e commit 03dc508
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 73 deletions.
2 changes: 1 addition & 1 deletion composer.json
Expand Up @@ -22,7 +22,7 @@
],
"require": {
"php": ">=7.1",
"amphp/amp": "^2.2"
"amphp/amp": "dev-discard as 2.5.0"
},
"require-dev": {
"phpunit/phpunit": "^8 || ^7",
Expand Down
10 changes: 5 additions & 5 deletions examples/queue.php
Expand Up @@ -4,7 +4,9 @@
use Amp\Sync\LocalSemaphore;
use Amp\Sync\Queue;
use function Amp\delay;
use function Amp\Iterator\discard;
use function Amp\Promise\wait;
use function Amp\Sync\concurrentMap;

require __DIR__ . '/../vendor/autoload.php';

Expand All @@ -17,7 +19,8 @@
$emitter->emit($jobId++);
}

wait(Queue::fromIterator($emitter->iterate())->process(
wait(discard(concurrentMap(
$emitter->iterate(),
new LocalSemaphore(3),
function ($job) use ($emitter, &$jobId) {
print 'starting ' . $job . \PHP_EOL;
Expand All @@ -34,8 +37,5 @@ function ($job) use ($emitter, &$jobId) {
}

print 'finished ' . $job . \PHP_EOL;
},
function (\Throwable $error, $job) {
throw $error;
}
));
)));
67 changes: 0 additions & 67 deletions src/Queue.php

This file was deleted.

52 changes: 52 additions & 0 deletions src/functions.php
Expand Up @@ -2,6 +2,8 @@

namespace Amp\Sync;

use Amp\Iterator;
use Amp\Producer;
use Amp\Promise;
use function Amp\call;

Expand Down Expand Up @@ -29,3 +31,53 @@ function synchronized(Mutex $mutex, callable $callback, ...$args): Promise
}
});
}

/**
* @param Iterator $iterator
* @param Semaphore $semaphore
* @param callable $processor
*
* @return Iterator
*/
function concurrentMap(Iterator $iterator, Semaphore $semaphore, callable $processor): Iterator
{
return new Producer(function () use ($iterator, $semaphore, $processor) {
/** @var \Throwable|null $error */
$error = null;
$pending = [];

while (yield $iterator->advance()) {
if ($error) {
throw $error;
}

/** @var Lock $lock */
$lock = yield $semaphore->acquire();

$currentElement = $iterator->getCurrent();

$promise = call(static function () use ($lock, $currentElement, $processor, &$error) {
try {
yield call($processor, $currentElement);
} catch (\Throwable $e) {
$error = $e;
} finally {
$lock->release();
}
});

$promiseId = \spl_object_id($promise);

$pending[$promiseId] = $promise;
$promise->onResolve(static function () use (&$pending, $promiseId) {
unset($pending[$promiseId]);
});
}

yield Promise\any($pending);

if ($error) {
throw $error;
}
});
}

0 comments on commit 03dc508

Please sign in to comment.