Skip to content

Commit

Permalink
Merge pull request #3 from Jaumo/shutdown-rc
Browse files Browse the repository at this point in the history
Fix race condition during server shutdown + server restart
  • Loading branch information
bwoebi committed Jun 18, 2019
2 parents b479826 + c6b5c0c commit 907adb1
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 3 deletions.
4 changes: 4 additions & 0 deletions lib/CommandClient.php
Expand Up @@ -98,6 +98,10 @@ public function stop(): Promise {
return $this->send(["action" => "stop"]);
}

public function started(): Promise {
return $this->send(["action" => "started"]);
}

public function importServerSockets($addrCtxMap): Promise {
return call(function () use ($addrCtxMap) {
$reply = yield $this->send(["action" => "import-sockets", "addrCtxMap" => array_map(function ($context) { return $context["socket"]; }, $addrCtxMap)]);
Expand Down
6 changes: 6 additions & 0 deletions lib/Server.php
Expand Up @@ -417,6 +417,12 @@ private function doStop(): \Generator {
assert($this->logDebug("stopping"));
$this->state = self::STOPPING;

// Unbind server sockets, otherwise connections are still sent to bound sockets but never accepted
// In restart situation that can lead to unnecessary request errors
foreach ($this->boundServers as $server) {
fclose($server);
}

foreach ($this->acceptWatcherIds as $watcherId) {
Loop::cancel($watcherId);
}
Expand Down
11 changes: 8 additions & 3 deletions lib/WatcherProcess.php
Expand Up @@ -207,6 +207,13 @@ private function readCommand(Socket $client): \Generator {
case "import-sockets":
yield from $this->parseWorkerAddrCtx($client, $message["addrCtxMap"]);
break;

case "started":
$this->logger->info("Worker started");
yield $this->replyCommand($client, []);
assert(!empty($this->spawnDeferreds));
array_shift($this->spawnDeferreds)->resolve();
break;
}
}
}
Expand Down Expand Up @@ -396,9 +403,6 @@ private function accept(): \Generator {

$this->ipcClients[\spl_object_hash($client)] = $client;

assert(!empty($this->spawnDeferreds));
array_shift($this->spawnDeferreds)->resolve();

Promise\rethrow(new Coroutine($this->read($client)));
}
}
Expand Down Expand Up @@ -496,6 +500,7 @@ public function restart() {
for ($i = 0; $i < $this->workerCount; $i++) {
$spawnPromise = $this->spawn();
$spawnPromise->onResolve(function () {
$this->logger->info("Spawned new worker, stop old worker");
$client = current($this->ipcClients);
next($this->ipcClients);
$client->end(self::STOP_SEQUENCE);
Expand Down
2 changes: 2 additions & 0 deletions lib/WorkerProcess.php
Expand Up @@ -62,9 +62,11 @@ protected function doStart(Console $console): \Generator {
}
$this->server = $server;
Loop::unreference(Loop::onReadable($this->ipcSock, function ($watcherId) {
$this->logger->info("Received stop command");
Loop::cancel($watcherId);
yield from $this->stop();
}));
yield (new CommandClient($console->getArg("config")))->started();
}

protected function doStop(): \Generator {
Expand Down

0 comments on commit 907adb1

Please sign in to comment.