diff --git a/lib/IpcLogger.php b/lib/IpcLogger.php index 8da6dc2..54f6da6 100644 --- a/lib/IpcLogger.php +++ b/lib/IpcLogger.php @@ -13,7 +13,6 @@ class IpcLogger extends Logger { private $writeWatcherId; private $writeQueue = []; private $writeBuffer = ""; - private $stopPromisor; private $isDead; public function __construct(Console $console, $ipcSock) { @@ -26,6 +25,7 @@ public function __construct(Console $console, $ipcSock) { $onWritable = $this->makePrivateCallable("onWritable"); $this->ipcSock = $ipcSock; + stream_set_blocking($ipcSock, false); $this->writeWatcherId = \Amp\onWritable($ipcSock, $onWritable, [ "enable" => false, ]); @@ -72,14 +72,7 @@ private function onWritable() { $this->writeBuffer = ""; - if ($this->stopPromisor) { - \Amp\cancel($this->writeWatcherId); - $promisor = $this->stopPromisor; - $this->stopPromisor = null; - $promisor->succeed(); - } else { - \Amp\disable($this->writeWatcherId); - } + \Amp\disable($this->writeWatcherId); } private function onDeadIpcSock() { @@ -87,19 +80,15 @@ private function onDeadIpcSock() { $this->writeBuffer = ""; $this->writeQueue = []; \Amp\cancel($this->writeWatcherId); - if ($this->stopPromisor) { - $promisor = $this->stopPromisor; - $this->stopPromisor = null; - $promisor->succeed(); - } } - public function stop(): Promise { - if ($this->isDead || $this->writeBuffer === "") { - return new Success; - } else { - $this->stopPromisor = new Deferred; - return $this->stopPromisor->promise(); + public function flush() { // BLOCKING + if ($this->isDead || ($this->writeBuffer === "" && empty($this->writeQueue))) { + return; } + + stream_set_blocking($this->ipcSock, true); + $this->onWritable(); + stream_set_blocking($this->ipcSock, false); } } diff --git a/lib/Process.php b/lib/Process.php index a987a75..da71c75 100644 --- a/lib/Process.php +++ b/lib/Process.php @@ -52,8 +52,8 @@ public function start(Console $console): \Generator { \Amp\onError([$this->logger, "critical"]); } catch (\Throwable $uncaught) { $this->exitCode = 1; - yield $this->logger->critical($uncaught); - $this->exit(); + $this->logger->critical($uncaught); + static::exit(); } } @@ -75,9 +75,9 @@ public function stop(): \Generator { yield from $this->doStop(); } catch (\Throwable $uncaught) { $this->exitCode = 1; - yield $this->logger->critical($uncaught); + $this->logger->critical($uncaught); } finally { - $this->exit(); + static::exit(); } } @@ -121,12 +121,11 @@ private function registerShutdownHandler() { $this->exitCode = 1; $msg = "{$err["message"]} in {$err["file"]} on line {$err["line"]}"; - $gen = function($msg) { - yield $this->logger->critical($msg); + // FIXME: Fatal error: Uncaught LogicException: Cannot run() recursively; event reactor already active + \Amp\run(function() use ($msg) { + $this->logger->critical($msg); yield from $this->stop(); - }; - $promise = resolve($gen($msg)); - \Amp\wait($promise); + }); }); } @@ -145,23 +144,23 @@ private function registerErrorHandler() { case E_CORE_ERROR: case E_COMPILE_ERROR: case E_RECOVERABLE_ERROR: - yield $this->logger->error($msg); + $this->logger->error($msg); break; case E_CORE_WARNING: case E_COMPILE_WARNING: case E_WARNING: case E_USER_WARNING: - yield $this->logger->warning($msg); + $this->logger->warning($msg); break; case E_NOTICE: case E_USER_NOTICE: case E_DEPRECATED: case E_USER_DEPRECATED: case E_STRICT: - yield $this->logger->notice($msg); + $this->logger->notice($msg); break; default: - yield $this->logger->warning($msg); + $this->logger->warning($msg); break; } })); diff --git a/lib/WorkerProcess.php b/lib/WorkerProcess.php index fa48876..82093d2 100644 --- a/lib/WorkerProcess.php +++ b/lib/WorkerProcess.php @@ -28,7 +28,12 @@ protected function doStart(Console $console): \Generator { protected function doStop(): \Generator { if ($this->server) { yield $this->server->stop(); - yield $this->logger->stop(); } + $this->logger->flush(); + } + + protected function exit() { + $this->logger->flush(); + parent::exit(); } }