Skip to content

Commit

Permalink
Refactor sending task to worker context
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Jan 26, 2016
1 parent fed6226 commit 61c8bdd
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 24 deletions.
12 changes: 11 additions & 1 deletion src/Exception/WorkerException.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
<?php
namespace Icicle\Concurrent\Exception;

class WorkerException extends \Exception implements Exception {}
class WorkerException extends \Exception implements Exception
{
/**
* @param string $message
* @param \Exception|null $previous
*/
public function __construct($message, \Exception $previous = null)
{
parent::__construct($message, 0, $previous);
}
}
59 changes: 36 additions & 23 deletions src/Worker/AbstractWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Icicle\Concurrent\Exception\StatusError;
use Icicle\Concurrent\Exception\WorkerException;
use Icicle\Concurrent\Worker\Internal\TaskFailure;
use Icicle\Coroutine\Coroutine;

/**
* Base class for most common types of task workers.
Expand All @@ -17,27 +18,21 @@ abstract class AbstractWorker implements Worker
*/
private $context;

/**
* @var bool
*/
private $idle = true;

/**
* @var bool
*/
private $shutdown = false;

/**
* @var \Icicle\Awaitable\Delayed
* @var \Icicle\Coroutine\Coroutine
*/
private $activeDelayed;
private $active;

/**
* @var \SplQueue
*/
private $busyQueue;


/**
* @param \Icicle\Concurrent\Strand $strand
*/
Expand All @@ -60,7 +55,7 @@ public function isRunning()
*/
public function isIdle()
{
return $this->idle;
return null === $this->active;
}

/**
Expand All @@ -85,27 +80,26 @@ public function enqueue(Task $task)
}

// If the worker is currently busy, store the task in a busy queue.
if (!$this->idle) {
if (null !== $this->active) {
$delayed = new Delayed();
$this->busyQueue->enqueue($delayed);
yield $delayed;
}

$this->idle = false;
$this->activeDelayed = new Delayed();
$this->active = new Coroutine($this->send($task));

try {
yield $this->context->send($task);

$result = (yield $this->context->receive());
$result = (yield $this->active);
} catch (\Exception $exception) {
$this->kill();
throw new WorkerException('Sending the task to the worker failed.', $exception);
} finally {
$this->idle = true;
$this->activeDelayed->resolve();
$this->active = null;
}

// We're no longer busy at the moment, so dequeue a waiting task.
if (!$this->busyQueue->isEmpty()) {
$this->busyQueue->dequeue()->resolve();
}
// We're no longer busy at the moment, so dequeue a waiting task.
if (!$this->busyQueue->isEmpty()) {
$this->busyQueue->dequeue()->resolve();
}

if ($result instanceof TaskFailure) {
Expand All @@ -115,6 +109,21 @@ public function enqueue(Task $task)
yield $result;
}

/**
* @coroutine
*
* @param \Icicle\Concurrent\Worker\Task $task
*
* @return \Generator
*
* @resolve mixed
*/
private function send(Task $task)
{
yield $this->context->send($task);
yield $this->context->receive();
}

/**
* {@inheritdoc}
*/
Expand All @@ -130,8 +139,12 @@ public function shutdown()
$this->cancelPending();

// If a task is currently running, wait for it to finish.
if (!$this->idle) {
yield $this->activeDelayed;
if (null !== $this->active) {
try {
yield $this->active;
} catch (\Exception $exception) {
// Ignore failure in this context.
}
}

yield $this->context->send(0);
Expand Down

0 comments on commit 61c8bdd

Please sign in to comment.