Skip to content

Commit

Permalink
Simplify worker implementation
Browse files Browse the repository at this point in the history
Most of the complexity in AbstractWorker was to support out-of-order task execution.
  • Loading branch information
trowski committed Dec 24, 2017
1 parent 37d97e4 commit 77e5583
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 87 deletions.
126 changes: 43 additions & 83 deletions lib/Worker/AbstractWorker.php
Expand Up @@ -2,11 +2,9 @@

namespace Amp\Parallel\Worker;

use Amp\Deferred;
use Amp\Parallel\Context\Context;
use Amp\Parallel\Context\ContextException;
use Amp\Parallel\Context\StatusError;
use Amp\Parallel\Sync\SerializationException;
use Amp\Parallel\Worker\Internal\TaskResult;
use Amp\Promise;
use Amp\Success;
use function Amp\call;
Expand All @@ -21,14 +19,8 @@ abstract class AbstractWorker implements Worker {
/** @var bool */
private $shutdown = false;

/** @var \Amp\Deferred[] */
private $jobQueue = [];

/** @var callable */
private $onResolve;

/** @var callable */
private $cancel;
/** @var \Amp\Promise|null */
private $pending;

/**
* @param \Amp\Parallel\Context\Context $context
Expand All @@ -39,57 +31,6 @@ public function __construct(Context $context) {
}

$this->context = $context;

$jobQueue = &$this->jobQueue;

$this->cancel = static function (\Throwable $exception = null) use (&$jobQueue, &$context) {
if (!empty($jobQueue)) {
$exception = new WorkerException('Worker was shut down', $exception);

foreach ($jobQueue as $job) {
$job->fail($exception);
}

$jobQueue = [];
}

if ($context->isRunning()) {
$context->kill();
}
};

$cancel = &$this->cancel;

$this->onResolve = static function ($exception, $data) use (&$jobQueue, &$cancel, &$context, &$onResolve) {
if ($exception) {
$cancel($exception);
return;
}

if (!$data instanceof Internal\TaskResult) {
$cancel(new ContextException("Context did not return a task result"));
return;
}

$id = $data->getId();

if (!isset($jobQueue[$id])) {
$cancel(new ContextException("Job ID returned by context does not exist"));
return;
}

$deferred = $jobQueue[$id];
unset($jobQueue[$id]);
$empty = empty($jobQueue);

$deferred->resolve($data->promise());

if (!$empty) {
$context->receive()->onResolve($onResolve);
}
};

$onResolve = $this->onResolve;
}

/**
Expand All @@ -103,7 +44,7 @@ public function isRunning(): bool {
* {@inheritdoc}
*/
public function isIdle(): bool {
return empty($this->jobQueue);
return $this->pending === null;
}

/**
Expand All @@ -118,26 +59,43 @@ public function enqueue(Task $task): Promise {
$this->context->start();
}

return call(function () use ($task) {
$empty = empty($this->jobQueue);

$job = new Internal\Job($task);
$this->jobQueue[$job->getId()] = $deferred = new Deferred;
$job = new Internal\Job($task);
$id = $job->getId();

try {
yield $this->context->send($job);
if ($empty) {
$this->context->receive()->onResolve($this->onResolve);
$promise = $this->pending = call(function () use ($task, $job, $id) {
if ($this->pending) {
try {
yield $this->pending;
} catch (\Throwable $exception) {
// Ignore error from prior job.
}
} catch (SerializationException $exception) {
unset($this->jobQueue[$job->getId()]);
$deferred->fail($exception);
} catch (\Throwable $exception) {
$this->cancel($exception);
}

return $deferred->promise();
if (!$this->context->isRunning()) {
throw new WorkerException("The worker was shutdown");
}

yield $this->context->send($job);
$result = yield $this->context->receive();

if (!$result instanceof TaskResult) {
$this->cancel(new WorkerException("Context did not return a task result"));
}

if ($result->getId() !== $id) {
$this->cancel(new WorkerException("Task results returned out of order"));
}

return $result->promise();
});

$promise->onResolve(function () use ($promise) {
if ($this->pending === $promise) {
$this->pending = null;
}
});

return $promise;
}

/**
Expand All @@ -155,11 +113,9 @@ public function shutdown(): Promise {
}

return call(function () {
if (!empty($this->jobQueue)) {
if ($this->pending) {
// If a task is currently running, wait for it to finish.
yield Promise\any(\array_map(function (Deferred $deferred): Promise {
return $deferred->promise();
}, $this->jobQueue));
yield Promise\any([$this->pending]);
}

yield $this->context->send(0);
Expand All @@ -177,9 +133,13 @@ public function kill() {
/**
* Cancels all pending tasks and kills the context.
*
* @TODO Parameter kept for BC, remove in future version.
*
* @param \Throwable|null $exception Optional exception to be used as the previous exception.
*/
protected function cancel(\Throwable $exception = null) {
($this->cancel)($exception);
if ($this->context->isRunning()) {
$this->context->kill();
}
}
}
8 changes: 4 additions & 4 deletions lib/Worker/DefaultPool.php
Expand Up @@ -61,11 +61,11 @@ public function __construct(int $maxSize = self::DEFAULT_MAX_SIZE, WorkerFactory
$this->idleWorkers = new \SplQueue;
$this->busyQueue = new \SplQueue;

$workers = &$this->workers;
$idleWorkers = &$this->idleWorkers;
$busyQueue = &$this->busyQueue;
$workers = $this->workers;
$idleWorkers = $this->idleWorkers;
$busyQueue = $this->busyQueue;

$this->push = static function (Worker $worker) use (&$workers, &$idleWorkers, &$busyQueue) {
$this->push = static function (Worker $worker) use ($workers, $idleWorkers, $busyQueue) {

This comment has been minimized.

Copy link
@kelunik

kelunik Dec 26, 2017

Member

Why did you remove the references here?

This comment has been minimized.

Copy link
@trowski

trowski Dec 26, 2017

Author Member

Because those properties are objects and references are unnecessary.

\assert($workers->contains($worker), "The provided worker was not part of this queue");

if (($workers[$worker] -= 1) === 0) {
Expand Down

6 comments on commit 77e5583

@kelunik
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is not backwards compatible, as technically this code supported concurrent tasks?

@trowski
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskRunner didn't execute tasks concurrently, so it doesn't matter as the tasks were always run one at a time, in order.

@kelunik
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the Worker API could be used on its own, no? Why is it not in an internal namespace?

@trowski
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskRunner is not in an internal namespace in case someone wanted to roll there own context for a worker. To depend on the tasks running concurrently someone would have had to then use internal classes (Job in particular`), so I do not believe this is a BC break.

@kelunik
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then let's release it.

@kelunik
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as the tests on Travis work.

Please sign in to comment.