Skip to content

Commit

Permalink
coverage->100%, refactor priorities, add more tests to prioritypool (#19
Browse files Browse the repository at this point in the history
)

* coverage->100%, refactor priorities, add more tests to prioritypool

* fix some scruitinzer stuff
  • Loading branch information
Harry Bragg committed Sep 25, 2018
1 parent b053045 commit 84ccec5
Show file tree
Hide file tree
Showing 15 changed files with 370 additions and 92 deletions.
10 changes: 5 additions & 5 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 4 additions & 22 deletions src/CallbackRun.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@

use Exception;
use Graze\ParallelProcess\Event\EventDispatcherTrait;
use Graze\ParallelProcess\Event\PriorityChangedEvent;
use Graze\ParallelProcess\Event\RunEvent;
use Throwable;

class CallbackRun implements RunInterface, OutputterInterface
class CallbackRun implements RunInterface, OutputterInterface, PrioritisedInterface
{
use EventDispatcherTrait;
use RunningStateTrait;
use PrioritisedTrait;

/** @var callable */
private $callback;
Expand All @@ -33,8 +35,6 @@ class CallbackRun implements RunInterface, OutputterInterface
private $exception = null;
/** @var string */
private $last;
/** @var float */
private $priority;

/**
* Run constructor.
Expand All @@ -51,17 +51,6 @@ public function __construct(callable $callback, array $tags = [], $priority = 1.
$this->priority = $priority;
}

/**
* @param float $priority
*
* @return CallbackRun
*/
public function setPriority($priority)
{
$this->priority = $priority;
return $this;
}

/**
* @return string[]
*/
Expand All @@ -73,6 +62,7 @@ protected function getEventNames()
RunEvent::SUCCESSFUL,
RunEvent::FAILED,
RunEvent::UPDATED,
PriorityChangedEvent::CHANGED,
];
}

Expand Down Expand Up @@ -216,12 +206,4 @@ public function getLastMessageType()
{
return '';
}

/**
* @return float The priority for this run, where the larger the number the higher the priority
*/
public function getPriority()
{
return $this->priority;
}
}
12 changes: 5 additions & 7 deletions src/Display/Table.php
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,14 @@ function (RunEvent $event) use ($index, &$bar, &$spinner) {
*/
private function getSummary()
{
if (!$this->pool instanceof RunInterface) {
return '';
}

if ($this->pool->hasStarted()) {
if ($this->pool->isRunning()) {
$running = count($this->pool->getRunning());
$finished = count($this->pool->getFinished());
if ($running > 0 || $finished > 0) {
if ($running > 0) {
return sprintf(
'<comment>Total</comment>: %2d, <comment>Running</comment>: %2d, <comment>Waiting</comment>: %2d',
$this->pool->count(),
count($this->pool->getRunning()),
$running,
count($this->pool->getWaiting())
);
} else {
Expand Down
51 changes: 51 additions & 0 deletions src/Event/PriorityChangedEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php
/**
* This file is part of graze/parallel-process.
*
* Copyright © 2018 Nature Delivered Ltd. <https://www.graze.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* @license https://github.com/graze/parallel-process/blob/master/LICENSE.md
* @link https://github.com/graze/parallel-process
*/

namespace Graze\ParallelProcess\Event;

use Graze\ParallelProcess\PrioritisedInterface;
use Symfony\Component\EventDispatcher\Event;

class PriorityChangedEvent extends Event
{
const CHANGED = 'priority.changed';

/** @var PrioritisedInterface */
private $item;
/** @var float */
private $priority;
/** @var float|null */
private $oldPriority;

/**
* RunEvent constructor.
*
* @param PrioritisedInterface $item
* @param float $priority
* @param float|null $oldPriority
*/
public function __construct(PrioritisedInterface $item, $priority, $oldPriority = null)
{
$this->item = $item;
$this->priority = $priority;
$this->oldPriority = $oldPriority;
}

/**
* @return PrioritisedInterface
*/
public function getItem()
{
return $this->item;
}
}
16 changes: 11 additions & 5 deletions src/Monitor/PoolLogger.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ public function onRunStarted(RunEvent $event)
public function onRunSuccessful(RunEvent $event)
{
$this->logger->debug(
sprintf('run [%s:%s]: successfully finished', get_class($event->getRun()), spl_object_hash($event->getRun())),
sprintf(
'run [%s:%s]: successfully finished',
get_class($event->getRun()),
spl_object_hash($event->getRun())
),
$this->getTags($event->getRun())
);
}
Expand Down Expand Up @@ -137,7 +141,11 @@ function (Exception $e) {
public function onRunCompleted(RunEvent $event)
{
$this->logger->debug(
sprintf('run [%s:%s]: has finished running', get_class($event->getRun()), spl_object_hash($event->getRun())),
sprintf(
'run [%s:%s]: has finished running',
get_class($event->getRun()),
spl_object_hash($event->getRun())
),
$this->getTags($event->getRun())
);
}
Expand All @@ -151,10 +159,8 @@ private function getTags($item)
{
if ($item instanceof PoolInterface) {
return $this->getPoolTags($item);
} elseif ($item instanceof RunInterface) {
return $this->getRunTags($item);
}
return [];
return $this->getRunTags($item);
}

/**
Expand Down
47 changes: 19 additions & 28 deletions src/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Graze\DataStructure\Collection\Collection;
use Graze\ParallelProcess\Event\EventDispatcherTrait;
use Graze\ParallelProcess\Event\PoolRunEvent;
use Graze\ParallelProcess\Event\PriorityChangedEvent;
use Graze\ParallelProcess\Event\RunEvent;
use InvalidArgumentException;
use Symfony\Component\Process\Process;
Expand All @@ -28,12 +29,20 @@
* A Pool is a arbitrary collection of runs that can be used to group runs together when displaying with a
* Table
*
* A Pool can transition from `not_running` back to `running` again. But it cannot transition back to `not_started`.
* This means that multiple `COMPLETED` and `STARTED` events can be sent out for a single pool.
*
* ```
* not_started -> running <-> not_running
* ```
*
* @package Graze\ParallelProcess
*/
class Pool extends Collection implements RunInterface, PoolInterface
class Pool extends Collection implements RunInterface, PoolInterface, PrioritisedInterface
{
use EventDispatcherTrait;
use RunningStateTrait;
use PrioritisedTrait;

/** @var RunInterface[] */
protected $items = [];
Expand All @@ -47,8 +56,6 @@ class Pool extends Collection implements RunInterface, PoolInterface
private $exceptions = [];
/** @var array */
private $tags;
/** @var float */
private $priority;

/**
* RunCollection constructor.
Expand Down Expand Up @@ -89,7 +96,7 @@ public function add($item, array $tags = [])
$this->running[] = $item;
} elseif ($item->hasStarted()) {
$status = 'finished';
$this->finished[] = $item;
$this->complete[] = $item;
} else {
$this->waiting[] = $item;
}
Expand All @@ -100,11 +107,13 @@ public function add($item, array $tags = [])

$this->dispatch(PoolRunEvent::POOL_RUN_ADDED, new PoolRunEvent($this, $item));

if ($status == 'running' || $status == 'finished') {
if ($this->state == static::STATE_NOT_STARTED) {
$this->setStarted();
$this->dispatch(RunEvent::STARTED, new RunEvent($this));
}
if ($status != 'waiting' && $this->state != static::STATE_RUNNING) {
$this->setStarted();
$this->dispatch(RunEvent::STARTED, new RunEvent($this));
}
if ($status == 'finished' && $this->state != static::STATE_NOT_RUNNING) {
$this->setFinished();
$this->dispatch(RunEvent::COMPLETED, new RunEvent($this));
}

return $this;
Expand Down Expand Up @@ -259,25 +268,6 @@ public function getProgress()
return [count($this->complete), count($this->items), count($this->complete) / count($this->items)];
}

/**
* @return float
*/
public function getPriority()
{
return $this->priority;
}

/**
* @param float $priority
*
* @return $this
*/
public function setPriority($priority)
{
$this->priority = $priority;
return $this;
}

/**
* @return string[]
*/
Expand All @@ -290,6 +280,7 @@ protected function getEventNames()
RunEvent::FAILED,
RunEvent::UPDATED,
PoolRunEvent::POOL_RUN_ADDED,
PriorityChangedEvent::CHANGED,
];
}

Expand Down
22 changes: 22 additions & 0 deletions src/PrioritisedInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

namespace Graze\ParallelProcess;

interface PrioritisedInterface
{
/**
* Get the priority for this item. The higher the number the higher the priority
*
* @return float
*/
public function getPriority();

/**
* Fluent call for setting the priority.
*
* @param float $priority The higher the number the higher the priority
*
* @return $this
*/
public function setPriority($priority);
}
37 changes: 37 additions & 0 deletions src/PrioritisedTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

namespace Graze\ParallelProcess;

use Graze\ParallelProcess\Event\DispatcherInterface;
use Graze\ParallelProcess\Event\PriorityChangedEvent;

trait PrioritisedTrait
{
/** @var float */
protected $priority;

/**
* @param float $priority
*
* @return $this
*/
public function setPriority($priority)
{
$oldPriority = $this->priority;
$this->priority = $priority;
if (!($this instanceof RunInterface && $this->hasStarted())
&& method_exists($this, 'dispatch')
&& $this instanceof PrioritisedInterface) {
$this->dispatch(PriorityChangedEvent::CHANGED, new PriorityChangedEvent($this, $priority, $oldPriority));
}
return $this;
}

/**
* @return float
*/
public function getPriority()
{
return $this->priority;
}
}

0 comments on commit 84ccec5

Please sign in to comment.