Skip to content

Commit

Permalink
Move IPC hub to public API
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Feb 15, 2020
1 parent 8b74bb3 commit f8a0790
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 191 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"amphp/byte-stream": "^1.6.1",
"amphp/parser": "^1",
"amphp/process": "^1",
"amphp/socket": "^1",
"amphp/sync": "^1.0.1"
},
"require-dev": {
Expand Down
37 changes: 19 additions & 18 deletions lib/Context/Internal/ParallelHub.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,62 +3,63 @@
namespace Amp\Parallel\Context\Internal;

use Amp\Loop;
use Amp\Parallel\Sync\ChannelledSocket;
use Amp\Parallel\Context\IpcHub;
use Amp\Socket\Socket;
use parallel\Events;
use parallel\Future;

class ParallelHub extends ProcessHub
class ParallelHub extends IpcHub
{
const EXIT_CHECK_FREQUENCY = 250;

/** @var ChannelledSocket[] */
private $channels;
/** @var Socket[] */
private $sockets;

/** @var string */
private $watcher;

/** @var Events */
private $events;

public function __construct()
public function __construct(int $keyLength = 32)
{
parent::__construct();
parent::__construct($keyLength);

$events = $this->events = new Events;
$this->events->setBlocking(false);

$channels = &$this->channels;
$this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function () use (&$channels, $events): void {
$sockets = &$this->sockets;
$this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function () use (&$sockets, $events): void {
while ($event = $events->poll()) {
$id = (int) $event->source;
\assert(isset($channels[$id]), 'Channel for context ID not found');
$channel = $channels[$id];
unset($channels[$id]);
$channel->close();
\assert(isset($sockets[$id]), 'Channel for context ID not found');
$socket = $sockets[$id];
unset($sockets[$id]);
$socket->close();
}
});
Loop::disable($this->watcher);
Loop::unreference($this->watcher);
}

public function add(int $id, ChannelledSocket $channel, Future $future): void
final public function add(int $id, Socket $socket, Future $future): void
{
$this->channels[$id] = $channel;
$this->sockets[$id] = $socket;
$this->events->addFuture((string) $id, $future);

Loop::enable($this->watcher);
}

public function remove(int $id): void
final public function remove(int $id): void
{
if (!isset($this->channels[$id])) {
if (!isset($this->sockets[$id])) {
return;
}

unset($this->channels[$id]);
unset($this->sockets[$id]);
$this->events->remove((string) $id);

if (empty($this->channels)) {
if (empty($this->sockets)) {
Loop::disable($this->watcher);
}
}
Expand Down
137 changes: 0 additions & 137 deletions lib/Context/Internal/ProcessHub.php

This file was deleted.

12 changes: 4 additions & 8 deletions lib/Context/Internal/process-runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\Parallel\Context\Internal;

use Amp\Parallel\Context\IpcHub;
use Amp\Parallel\Context\Process;
use Amp\Parallel\Sync;
use Amp\Promise;
Expand Down Expand Up @@ -61,20 +62,15 @@
$key .= $chunk;
} while (\strlen($key) < Process::KEY_LENGTH);

if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) {
\trigger_error("Could not connect to IPC socket", E_USER_ERROR);
exit(1);
}

$channel = new Sync\ChannelledSocket($socket, $socket);

try {
Promise\wait($channel->send($key));
$socket = Promise\wait(IpcHub::connect($uri, $key));
} catch (\Throwable $exception) {
\trigger_error("Could not send key to parent", E_USER_ERROR);
exit(1);
}

$channel = new Sync\ChannelledStream($socket, $socket);

try {
if (!isset($argv[0])) {
throw new \Error("No script path given");
Expand Down

0 comments on commit f8a0790

Please sign in to comment.