Skip to content

Commit

Permalink
Rework concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed May 6, 2020
1 parent c61b435 commit 4dd1c5b
Showing 1 changed file with 33 additions and 40 deletions.
73 changes: 33 additions & 40 deletions src/functions.php
Expand Up @@ -48,17 +48,44 @@ function synchronized(Mutex $mutex, callable $callback, ...$args): Promise
function concurrent(Iterator $iterator, Semaphore $semaphore, callable $processor): Iterator
{
return new Producer(static function (callable $emit) use ($iterator, $semaphore, $processor) {
$processor = coroutine($processor);

/** @var \Throwable|null $error */
$error = null;

// one dummy item, because we can't start the barrier with a count of zero
$barrier = new CountingBarrier(1);

/** @var \Throwable|null $error */
$error = null;
$locks = [];
$gc = false;

$processor = coroutine($processor);
$processor = static function (Lock $lock, $currentElement) use (
$processor,
$emit,
$barrier,
&$locks,
&$error,
&$gc
) {
$done = false;

try {
yield $processor($currentElement, $emit);

$done = true;
} catch (\Throwable $e) {
$error = $error ?? $e;
$done = true;
} finally {
if (!$done) {
$gc = true;
}

unset($locks[$lock->getId()]);

$lock->release();
$barrier->decrease();
}
};

while (yield $iterator->advance()) {
if ($error) {
break;
Expand All @@ -71,43 +98,9 @@ function concurrent(Iterator $iterator, Semaphore $semaphore, callable $processo
}

$locks[$lock->getId()] = true;

$currentElement = $iterator->getCurrent();
$barrier->increase();

asyncCall(static function () use (
$lock,
$currentElement,
$processor,
$emit,
$barrier,
&$locks,
&$error,
&$gc
) {
$done = false;

try {
yield $processor($currentElement, $emit);

$done = true;
} catch (\Throwable $e) {
if ($error === null) {
$error = $e;
}

$done = true;
} finally {
unset($locks[$lock->getId()]);

if (!$done) {
$gc = true;
}

$lock->release();
$barrier->decrease();
}
});
asyncCall($processor, $lock, $iterator->getCurrent());
}

$barrier->decrease(); // remove dummy item
Expand Down

0 comments on commit 4dd1c5b

Please sign in to comment.