Skip to content

Commit

Permalink
Replace shutdown function and typing improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
TheLevti committed Feb 2, 2020
1 parent 9d9e482 commit 4cc83f1
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 55 deletions.
14 changes: 8 additions & 6 deletions src/Spork/Factory.php
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
<?php

/*
* This file is part of Spork, an OpenSky project.
* This file is part of the thelevti/spork package.
*
* (c) OpenSky Project Inc
* (c) Petr Levtonov <petr@levtonov.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Spork;

use Spork\Batch\BatchJob;
Expand All @@ -33,12 +35,12 @@ public function createBatchJob(ProcessManager $manager, $data = null, StrategyIn
/**
* Creates a new shared memory instance.
*
* @param integer $pid The child process id or null if this is the child
* @param integer $signal The signal to send after writing to shared memory
* @param int|null $pid The child process id or null if this is the child.
* @param int|null $signal The signal to send after writing to shared memory.
*
* @return SharedMemory A new shared memory instance
* @return \Spork\SharedMemory A new shared memory instance.
*/
public function createSharedMemory($pid = null, $signal = null)
public function createSharedMemory(?int $pid = null, ?int $signal = null): SharedMemory
{
return new SharedMemory($pid, $signal);
}
Expand Down
16 changes: 8 additions & 8 deletions src/Spork/Fork.php
Original file line number Diff line number Diff line change
Expand Up @@ -183,42 +183,42 @@ public function getState()
return $this->defer->getState();
}

public function progress($progress)
public function progress($progress): Fork
{
$this->defer->progress($progress);

return $this;
}

public function always($always)
public function always($always): Fork
{
$this->defer->always($always);

return $this;
}

public function done($done)
public function done($done): Fork
{
$this->defer->done($done);

return $this;
}

public function fail($fail)
public function fail($fail): Fork
{
$this->defer->fail($fail);

return $this;
}

public function then($done, $fail = null)
public function then($done, $fail = null): Fork
{
$this->defer->then($done, $fail);

return $this;
}

public function notify()
public function notify(): Fork
{
$args = func_get_args();
array_unshift($args, $this);
Expand All @@ -228,7 +228,7 @@ public function notify()
return $this;
}

public function resolve()
public function resolve(): Fork
{
$args = func_get_args();
array_unshift($args, $this);
Expand All @@ -238,7 +238,7 @@ public function resolve()
return $this;
}

public function reject()
public function reject(): Fork
{
$args = func_get_args();
array_unshift($args, $this);
Expand Down
72 changes: 31 additions & 41 deletions src/Spork/ProcessManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@

namespace Spork;

use Exception;
use InvalidArgumentException;
use Spork\Batch\Strategy\StrategyInterface;
use Spork\EventDispatcher\Events;
use Spork\EventDispatcher\SignalEventDispatcher;
use Spork\EventDispatcher\SignalEventDispatcherInterface;
use Spork\Exception\ProcessControlException;
use Spork\Exception\UnexpectedTypeException;
use Spork\Util\Error;
use Spork\Util\ExitMessage;
use Symfony\Contracts\EventDispatcher\Event;
Expand All @@ -33,7 +33,7 @@ class ProcessManager
private $zombieOkay;
private $signal;

/** @var Fork[] */
/** @var array<int,\Spork\Fork> $forks */
private $forks;

public function __construct(
Expand Down Expand Up @@ -87,77 +87,67 @@ public function process($data, $callable, StrategyInterface $strategy = null)

/**
* Forks something into another process and returns a deferred object.
*
* @param callable $callable Code to execute in a fork.
* @return \Spork\Fork Newly created fork.
*/
public function fork($callable)
public function fork(callable $callable): Fork
{
if (!is_callable($callable)) {
throw new UnexpectedTypeException($callable, 'callable');
}

// allow the system to cleanup before forking
call_user_func([$this->dispatcher, 'dispatch'], new Event(), Events::PRE_FORK);

if (-1 === $pid = pcntl_fork()) {
throw new ProcessControlException('Unable to fork a new process');
if (-1 === ($pid = pcntl_fork())) {
throw new ProcessControlException('Unable to fork a new process.');
}

if (0 === $pid) {
// reset the list of child processes
$this->forks = [];

// setup the shared memory
$shm = $this->factory->createSharedMemory(null, $this->signal);
$message = new ExitMessage();

// phone home on shutdown
$currPid = posix_getpid();
register_shutdown_function(function () use ($currPid, $shm, $message): void {
// Do not execute this function in child processes.
if ($currPid !== posix_getpid()) {
return;
}

try {
$shm->send($message, false);
} catch (\Exception $e) {
// probably an error serializing the result
$message->setResult(null);
$message->setError(Error::fromException($e));

$shm->send($message, false);

exit(2);
}
});

// dispatch an event so the system knows it's in a new process
call_user_func([$this->dispatcher, 'dispatch'], new Event(), Events::POST_FORK);

// setup the shared memory and exit message.
$shm = $this->factory->createSharedMemory(null, $this->signal);
$message = new ExitMessage();

if (!$this->debug) {
ob_start();
}

try {
$result = call_user_func($callable, $shm);

$message->setResult($result);
$status = is_integer($result) ? $result : 0;
} catch (\Exception $e) {
$message->setError(Error::fromException($e));
} catch (Exception $exception) {
$message->setError(Error::fromException($exception));
$status = 1;
}

if (!$this->debug) {
$message->setOutput(ob_get_clean());
}

try {
$shm->send($message, false);
} catch (Exception $exception) {
// probably an error serializing the result
$message->setResult(null);
$message->setError(Error::fromException($exception));

$shm->send($message, false);

$status = 2;
}

exit($status);
}

// connect to shared memory
$shm = $this->factory->createSharedMemory($pid);

return $this->forks[$pid] = $this->factory->createFork($pid, $shm, $this->debug);
return $this->forks[$pid] = $this->factory->createFork(
$pid,
$this->factory->createSharedMemory($pid),
$this->debug
);
}

public function monitor($signal = SIGUSR1)
Expand Down

0 comments on commit 4cc83f1

Please sign in to comment.