Skip to content

Commit

Permalink
Moved all listeners from Scheduler into separate listener classes
Browse files Browse the repository at this point in the history
Fix for bug: SchedulerEvent::EVENT_INTERNAL_KERNEL_START is triggered multiple times
Fix for bug: IpcServer does not read messages from workers
Fix for bug: Scheduler catches all exceptions and ignores them silently
  • Loading branch information
aargoth committed Apr 12, 2018
1 parent 4cac8e4 commit f875ecf
Show file tree
Hide file tree
Showing 26 changed files with 199 additions and 151 deletions.
1 change: 1 addition & 0 deletions src/Zeus/Controller/WorkerController.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ private function triggerWorkerEvent(string $serviceName, array $startParams)
$worker->setThreadId(defined("ZEUS_THREAD_ID") ? ZEUS_THREAD_ID : 1);
$event->setWorker($worker);
$event->setTarget($worker);
$event->setScheduler($scheduler);
$event->setParams($startParams);
if (defined("ZEUS_THREAD_IPC_ADDRESS")) {
$event->setParam(ModuleWrapper::ZEUS_IPC_ADDRESS_PARAM, ZEUS_THREAD_IPC_ADDRESS);
Expand Down
15 changes: 9 additions & 6 deletions src/Zeus/Kernel/IpcServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ protected function attachDefaultListeners()

private function startIpc()
{
if ($this->ipcServer) {
return;
}
$server = new SocketServer();
$server->setTcpNoDelay(true);
$server->setSoTimeout(0);
Expand All @@ -165,7 +168,6 @@ private function checkInboundConnections()
$selectionKey->attach(new IpcSocketStream($ipcStream));
}
} catch (SocketTimeoutException $exception) {

}
}

Expand Down Expand Up @@ -233,7 +235,7 @@ private function registerIpc(int $ipcPort, int $uid)
];

$host = $this->ipcHost;
$socket = @stream_socket_client("$host:$ipcPort", $errno, $errstr, 5, STREAM_CLIENT_CONNECT, stream_context_create($opts));
$socket = @stream_socket_client("$host:$ipcPort", $errno, $errstr, 100, STREAM_CLIENT_CONNECT, stream_context_create($opts));

if (!$socket) {
throw new RuntimeException("IPC connection failed: $errstr [$errno]");
Expand All @@ -252,8 +254,11 @@ private function handleIpcMessages(AbstractStreamSelector $selector)
{
$messages = [];

$this->checkInboundConnections();

$keys = $selector->getSelectionKeys();
$failed = 0; $processed = 0;

foreach ($keys as $key) {
/** @var SocketStream $stream */;
$stream = $key->getStream();
Expand Down Expand Up @@ -423,15 +428,13 @@ private function distributeMessages(array $messages)
}
}


private function setSelector(Reactor $scheduler)
{
$scheduler->observe($this->ipcSelector, function(AbstractStreamSelector $selector) {
$this->handleIpcMessages($selector);
}, function() {


}, 1000);

}, 1000
);
}
}
118 changes: 19 additions & 99 deletions src/Zeus/Kernel/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@

use Throwable;
use Zend\EventManager\EventManagerAwareTrait;
use Zend\Log\Logger;
use Zend\Log\LoggerAwareTrait;
use Zeus\IO\Stream\AbstractStreamSelector;
use Zeus\Kernel\IpcServer\IpcEvent;
use Zeus\Kernel\Scheduler\ConfigInterface;
use Zeus\Kernel\Scheduler\Exception\SchedulerException;
use Zeus\Kernel\Scheduler\Helper\PluginRegistry;
use Zeus\Kernel\Scheduler\Listener\KernelLoopGenerator;
use Zeus\Kernel\Scheduler\Listener\SchedulerExitListener;
use Zeus\Kernel\Scheduler\Listener\SchedulerInitListener;
use Zeus\Kernel\Scheduler\Listener\SchedulerLoopListener;
use Zeus\Kernel\Scheduler\Listener\SchedulerStartListener;
use Zeus\Kernel\Scheduler\Listener\SchedulerStopListener;
use Zeus\Kernel\Scheduler\Listener\SchedulerTerminateListener;
use Zeus\Kernel\Scheduler\Listener\WorkerExitListener;
use Zeus\Kernel\Scheduler\Listener\WorkerInitListener;
use Zeus\Kernel\Scheduler\Listener\WorkerStatusListener;
use Zeus\Kernel\Scheduler\Listener\WorkerStatusSender;
use Zeus\Kernel\Scheduler\MultiProcessingModule\MultiProcessingModuleInterface;
Expand All @@ -30,12 +31,8 @@
use Zeus\Kernel\Scheduler\WorkerLifeCycleFacade;
use Zeus\ServerService\Shared\Logger\ExceptionLoggerTrait;

use function microtime;
use function sprintf;
use function array_merge;
use function file_get_contents;
use function file_put_contents;
use function unlink;

/**
* Class Scheduler
Expand Down Expand Up @@ -181,31 +178,13 @@ protected function attachDefaultListeners()

$events[] = $eventManager->attach(WorkerEvent::EVENT_TERMINATED, new WorkerExitListener(), SchedulerEvent::PRIORITY_FINALIZE);
$events[] = $eventManager->attach(SchedulerEvent::EVENT_STOP, new SchedulerExitListener($this->workerLifeCycle), SchedulerEvent::PRIORITY_REGULAR);
$events[] = $eventManager->attach(SchedulerEvent::EVENT_TERMINATE, new SchedulerStopListener($this->workerLifeCycle), SchedulerEvent::PRIORITY_FINALIZE);
$events[] = $eventManager->attach(SchedulerEvent::EVENT_STOP, new SchedulerStopListener($this->workerLifeCycle), SchedulerEvent::PRIORITY_FINALIZE);
$events[] = $eventManager->attach(SchedulerEvent::EVENT_TERMINATE, new SchedulerTerminateListener($this->workerLifeCycle), SchedulerEvent::PRIORITY_INITIALIZE);
$events[] = $eventManager->attach(SchedulerEvent::EVENT_START, new SchedulerStartListener($this->workerLifeCycle), SchedulerEvent::PRIORITY_FINALIZE);
$events[] = $eventManager->attach(SchedulerEvent::EVENT_LOOP, new SchedulerLoopListener($this->workerLifeCycle, $this->discipline));

$events[] = $eventManager->attach(WorkerEvent::EVENT_CREATE,
// scheduler init
function (WorkerEvent $event) use ($eventManager) {
if (!$event->getParam(SchedulerInterface::WORKER_SERVER) || $event->getParam(SchedulerInterface::WORKER_INIT)) {
return;
}
$this->kernelLoop();
}, WorkerEvent::PRIORITY_FINALIZE
);

$events[] = $eventManager->attach(WorkerEvent::EVENT_CREATE, new KernelLoopGenerator(), WorkerEvent::PRIORITY_FINALIZE);
$events[] = $eventManager->attach(WorkerEvent::EVENT_INIT, new SchedulerInitListener($this->schedulerLifeCycle), WorkerEvent::PRIORITY_INITIALIZE);

$events[] = $eventManager->attach(WorkerEvent::EVENT_INIT, function(WorkerEvent $event) use ($eventManager) {
$statusSender = new WorkerStatusSender();
$events[] = $eventManager->attach(WorkerEvent::EVENT_RUNNING, $statusSender, SchedulerEvent::PRIORITY_FINALIZE + 1);
$events[] = $eventManager->attach(WorkerEvent::EVENT_WAITING, $statusSender, SchedulerEvent::PRIORITY_FINALIZE + 1);
$events[] = $eventManager->attach(WorkerEvent::EVENT_EXIT, $statusSender, SchedulerEvent::PRIORITY_FINALIZE + 2);
$this->eventHandles = array_merge($this->eventHandles, $events);

}, WorkerEvent::PRIORITY_FINALIZE + 1);

$events[] = $eventManager->attach(WorkerEvent::EVENT_INIT, new WorkerInitListener(), WorkerEvent::PRIORITY_FINALIZE + 1);
$this->eventHandles = array_merge($this->eventHandles, $events);
}

Expand Down Expand Up @@ -252,102 +231,43 @@ public function syncWorker(WorkerState $worker)
$this->workerLifeCycle->syncWorker($worker);
}

/**
* Stops the scheduler.
*/
public function stop()
{
$this->log(Logger::DEBUG, "Stopping scheduler");
$fileName = $this->getUidFile();

$uid = @file_get_contents($fileName);
if (!$uid) {
throw new SchedulerException("Scheduler not running", SchedulerException::SCHEDULER_NOT_RUNNING);
}

$this->setTerminating(true);

$this->log(Logger::INFO, "Terminating scheduler $uid");
$worker = new WorkerState($this->getConfig()->getServiceName(), WorkerState::RUNNING);
$worker->setUid($uid);
$this->schedulerLifeCycle->stop($worker, true);
$this->log(Logger::INFO, "Workers checked");
}

/**
* Creates the server instance.
*
* @param bool $launchAsDaemon Run this server as a daemon?
* @throws Throwable
*/
public function start(bool $launchAsDaemon = false)
{
$plugins = $this->getPluginRegistry()->count();
$this->triggerEvent(SchedulerEvent::INTERNAL_EVENT_KERNEL_START);
$this->log(Logger::INFO, sprintf("Starting Scheduler with %d plugin%s", $plugins, $plugins !== 1 ? 's' : ''));
$kernelStart = $this->getSchedulerEvent();
$kernelStart->setName(SchedulerEvent::INTERNAL_EVENT_KERNEL_START);
$this->getEventManager()->triggerEvent($kernelStart);
$this->getLogger()->info(sprintf("Starting Scheduler with %d plugin%s", $plugins, $plugins !== 1 ? 's' : ''));

try {
if (!$launchAsDaemon) {
$this->schedulerLifeCycle->start([]);
$this->kernelLoop();

return;
}
$this->workerLifeCycle->start([SchedulerInterface::WORKER_SERVER => true]);

} catch (Throwable $exception) {
$this->logException($exception, $this->getLogger());
throw $exception;
}
}

private function log(int $priority, string $message, array $extra = [])
{
if (!isset($extra['service_name'])) {
$extra['service_name'] = $this->getConfig()->getServiceName();
}

if (!isset($extra['logger'])) {
$extra['logger'] = __CLASS__;
}

$this->getLogger()->log($priority, $message, $extra);
}

private function getUidFile() : string
{
// @todo: make it more sophisticated
$fileName = sprintf("%s%s.pid", $this->getConfig()->getIpcDirectory(), $this->getConfig()->getServiceName());

return $fileName;
}

/**
* @param string $eventName
* @param mixed[] $extraData
* Stops the scheduler.
*/
private function triggerEvent(string $eventName, array $extraData = [])
{
$extraData = array_merge(['status' => $this->worker], $extraData, ['service_name' => $this->getConfig()->getServiceName()]);
$events = $this->getEventManager();
$event = $this->getSchedulerEvent();
$event->setParams($extraData);
$event->setName($eventName);
$events->triggerEvent($event);
}

private function kernelLoop()
public function stop()
{
$reactor = $this->getReactor();
$this->setTerminating(true);

$terminator = function() use ($reactor) {
$this->triggerEvent(SchedulerEvent::INTERNAL_EVENT_KERNEL_LOOP);
if ($this->isTerminating()) {
$reactor->setTerminating(true);
}
};
do {
$reactor->mainLoop(
$terminator
);
} while (!$this->isTerminating());
$worker = new WorkerState($this->getConfig()->getServiceName(), WorkerState::RUNNING);
$this->schedulerLifeCycle->stop($worker, false);
$this->getLogger()->info("Workers checked");
}
}
34 changes: 34 additions & 0 deletions src/Zeus/Kernel/Scheduler/Listener/KernelLoopGenerator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

namespace Zeus\Kernel\Scheduler\Listener;

use Zeus\Kernel\Scheduler\SchedulerEvent;
use Zeus\Kernel\Scheduler\WorkerEvent;
use Zeus\Kernel\SchedulerInterface;

class KernelLoopGenerator
{
public function __invoke(WorkerEvent $event)
{
if (!$event->getParam(SchedulerInterface::WORKER_SERVER) || $event->getParam(SchedulerInterface::WORKER_INIT)) {
return;
}

$scheduler = $event->getScheduler();
$reactor = $scheduler->getReactor();

$terminator = function() use ($reactor, $scheduler) {
$event = $scheduler->getSchedulerEvent();
$event->setName(SchedulerEvent::INTERNAL_EVENT_KERNEL_LOOP);
$scheduler->getEventManager()->triggerEvent($event);
if ($scheduler->isTerminating()) {
$reactor->setTerminating(true);
}
};
do {
$reactor->mainLoop(
$terminator
);
} while (!$scheduler->isTerminating());
}
}
8 changes: 0 additions & 8 deletions src/Zeus/Kernel/Scheduler/Listener/SchedulerExitListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,4 @@ public function __invoke(SchedulerEvent $event)

@unlink($this->getUidFile($scheduler->getConfig()));
}

private function stopWorker(WorkerState $worker, bool $isSoftStop)
{



}

}
51 changes: 51 additions & 0 deletions src/Zeus/Kernel/Scheduler/Listener/SchedulerTerminateListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

namespace Zeus\Kernel\Scheduler\Listener;

use Zeus\Kernel\Scheduler\Exception\SchedulerException;
use Zeus\Kernel\Scheduler\SchedulerEvent;
use Zeus\Kernel\Scheduler\Status\WorkerState;

class SchedulerTerminateListener extends AbstractWorkerLifeCycleListener
{
public function __invoke(SchedulerEvent $event)
{
$scheduler = $event->getScheduler();
$scheduler->getLogger()->debug("Stopping scheduler");
$fileName = $this->getUidFile($event->getScheduler()->getConfig());

$uid = @file_get_contents($fileName);
if (!$uid) {
throw new SchedulerException("Scheduler not running", SchedulerException::SCHEDULER_NOT_RUNNING);
}

$uid = intval($uid);

$scheduler->getLogger()->info("Terminating scheduler process #$uid");
/** @var WorkerState $worker */
$worker = $event->getTarget();
$worker->setUid($uid);
$worker->setProcessId($uid);

$globalCount = 0;
foreach ([true, false] as $isSoftStop) {
$this->workerLifeCycle->stop($worker, $isSoftStop);

$count = 0;
while (($uid = @file_get_contents($fileName)) && $count < 5) {
if ($globalCount > 3 && $globalCount % 2) {
$scheduler->getLogger()->debug("Waiting for scheduler to shutdown");
}
sleep(1);
$count++;
$globalCount++;
}

if (!$uid) {
return;
}
}

throw new SchedulerException("Scheduler not stopped", SchedulerException::LOCK_FILE_ERROR);
}
}
17 changes: 17 additions & 0 deletions src/Zeus/Kernel/Scheduler/Listener/WorkerInitListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace Zeus\Kernel\Scheduler\Listener;

use Zeus\Kernel\Scheduler\WorkerEvent;

class WorkerInitListener
{
public function __invoke(WorkerEvent $event)
{
$eventManager = $event->getScheduler()->getEventManager();
$statusSender = new WorkerStatusSender();
$events[] = $eventManager->attach(WorkerEvent::EVENT_RUNNING, $statusSender, WorkerEvent::PRIORITY_FINALIZE + 1);
$events[] = $eventManager->attach(WorkerEvent::EVENT_WAITING, $statusSender, WorkerEvent::PRIORITY_FINALIZE + 1);
$events[] = $eventManager->attach(WorkerEvent::EVENT_EXIT, $statusSender, WorkerEvent::PRIORITY_FINALIZE + 2);
}
}

0 comments on commit f875ecf

Please sign in to comment.