Skip to content

Commit

Permalink
Merge pull request #17 from Ekstazi/dispatcher-hangs-up
Browse files Browse the repository at this point in the history
#17 fix infinite loop in dispatcher and enh error handling
  • Loading branch information
Ekstazi committed Jun 24, 2019
2 parents c217138 + 8d2435f commit 47ee4c3
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 44 deletions.
45 changes: 30 additions & 15 deletions src/Channel/Channel.php
Expand Up @@ -79,28 +79,35 @@ public function getRequestEmitter(): Emitter {

protected function dispatch() {
asyncCall(function () {
while (yield $this->channelMessage->advance()) {
$message = $this->channelMessage->getCurrent();
try {
while (yield $this->channelMessage->advance()) {
$message = $this->channelMessage->getCurrent();

if ($message instanceof ChannelData) {
$this->dataEmitter->emit($message);
}
if ($message instanceof ChannelData) {
$this->dataEmitter->emit($message);
}

if ($message instanceof ChannelExtendedData) {
$this->dataExtendedEmitter->emit($message);
}
if ($message instanceof ChannelExtendedData) {
$this->dataExtendedEmitter->emit($message);
}

if ($message instanceof ChannelRequest) {
$this->requestEmitter->emit($message);
}
if ($message instanceof ChannelRequest) {
$this->requestEmitter->emit($message);
}

if ($message instanceof ChannelSuccess || $message instanceof ChannelFailure) {
$this->requestResultEmitter->emit($message);
}
if ($message instanceof ChannelSuccess || $message instanceof ChannelFailure) {
$this->requestResultEmitter->emit($message);
}

if ($message instanceof ChannelClose) {
if ($message instanceof ChannelClose) {
$this->doClose();
}
}
if ($this->open) {
$this->doClose();
}
} catch (\Exception $exception) {
$this->doFail($exception);
}
});
}
Expand Down Expand Up @@ -175,6 +182,14 @@ private function doClose() {
$this->dataExtendedEmitter->complete();
}

private function doFail(\Exception $reason) {
$this->open = false;
$this->requestResultEmitter->fail($reason);
$this->requestEmitter->fail($reason);
$this->dataEmitter->complete();
$this->dataExtendedEmitter->complete();
}

protected function doRequest(ChannelRequest $request, $needAck = true): Promise {
return call(function () use ($request, $needAck) {
yield $this->writer->write($request);
Expand Down
13 changes: 13 additions & 0 deletions src/Channel/Dispatcher.php
Expand Up @@ -37,6 +37,10 @@ public function start() {
while ($this->running) {
$message = yield $this->handler->read();

if ($message === null) {
$this->doFail(new ChannelException('SSH connection was closed by remote server'));
}

if (!$message instanceof Message) {
continue;
}
Expand Down Expand Up @@ -68,6 +72,15 @@ public function start() {
});
}

private function doFail(\Throwable $reason) {
$this->stop();
foreach ($this->channelsEmitter as $channelId => $emitter) {
$emitter->fail($reason);

unset($this->channelsEmitter[$channelId]);
}
}

public function stop() {
$this->running = false;
}
Expand Down
45 changes: 32 additions & 13 deletions src/Process.php
Expand Up @@ -66,17 +66,22 @@ public function start(): Promise {
$this->resolved = new Deferred();

return call(function () {
if (!$this->open) {
yield $this->session->open();
try {
if (!$this->open) {
yield $this->session->open();

$this->open = true;
}
$this->open = true;
}

foreach ($this->env as $key => $value) {
yield $this->session->env($key, $value);
}
foreach ($this->env as $key => $value) {
yield $this->session->env($key, $value);
}

yield $this->session->exec($this->command);
yield $this->session->exec($this->command);
} catch (\Exception $exception) {
$this->resolved = null;
throw $exception;
}
});
}

Expand Down Expand Up @@ -124,14 +129,28 @@ private function handleRequests() {
asyncCall(function () {
$requestIterator = $this->session->getRequestEmitter()->iterate();

while (yield $requestIterator->advance()) {
$message = $requestIterator->getCurrent();
try {
while (yield $requestIterator->advance()) {
$message = $requestIterator->getCurrent();

if ($message instanceof ChannelRequestExitStatus) {
if ($message instanceof ChannelRequestExitStatus) {
$resolved = $this->resolved;
$this->resolved = null;
$this->exitCode = $message->code;
$resolved->resolve($message->code);
}
}
// some servers does not send exit status
if ($this->resolved) {
$this->resolved->resolve(false);
$this->exitCode = false;
$this->resolved = null;
}
} catch (\Exception $exception) {
if ($this->resolved) {
$resolved = $this->resolved;
$this->resolved = null;
$this->exitCode = $message->code;
$resolved->resolve($message->code);
$resolved->fail($exception);
}
}
});
Expand Down
48 changes: 33 additions & 15 deletions src/Shell.php
Expand Up @@ -72,14 +72,19 @@ public function start(int $columns = 80, int $rows = 24, int $width = 800, int $
$this->exitCode = null;

return call(function () use ($columns, $rows, $width, $height) {
yield $this->session->open();
try {
yield $this->session->open();

foreach ($this->env as $key => $value) {
yield $this->session->env($key, $value, true);
}
foreach ($this->env as $key => $value) {
yield $this->session->env($key, $value, true);
}

yield $this->session->pty($columns, $rows, $width, $height);
yield $this->session->shell();
yield $this->session->pty($columns, $rows, $width, $height);
yield $this->session->shell();
} catch (\Exception $exception) {
$this->resolved = null;
throw $exception;
}
});
}

Expand Down Expand Up @@ -126,17 +131,30 @@ public function getStderr(): InputStream {
private function handleRequests() {
asyncCall(function () {
$requestIterator = $this->session->getRequestEmitter()->iterate();

while (yield $requestIterator->advance()) {
$message = $requestIterator->getCurrent();

if ($message instanceof ChannelRequestExitStatus) {
try {
while (yield $requestIterator->advance()) {
$message = $requestIterator->getCurrent();

if ($message instanceof ChannelRequestExitStatus) {
$resolved = $this->resolved;
$this->resolved = null;
$this->exitCode = $message->code;
$resolved->resolve($message->code);

break;
}
}
// some servers does not send exit status
if ($this->resolved) {
$this->resolved->resolve(false);
$this->exitCode = false;
$this->resolved = null;
}
} catch (\Exception $exception) {
if ($this->resolved) {
$resolved = $this->resolved;
$this->resolved = null;
$this->exitCode = $message->code;
$resolved->resolve($message->code);

break;
$resolved->fail($exception);
}
}
});
Expand Down
73 changes: 73 additions & 0 deletions tests/Channel/SessionTest.php
@@ -0,0 +1,73 @@
<?php

namespace Amp\Ssh\Tests\Channel;

use Amp\Loop;
use Amp\Ssh\Authentication\UsernamePassword;
use Amp\Ssh\Channel\ChannelException;
use Amp\Ssh\Channel\Session;
use function Amp\Ssh\connect;
use Amp\Ssh\SshResource;
use Amp\Ssh\Tests\NetworkHelper;
use PHPUnit\Framework\TestCase;

class SessionTest extends TestCase {
protected function getSsh() {
return connect('127.0.0.1:2222', new UsernamePassword('root', 'root'));
}

/**
* if connection closed by server then fail dispatcher and all opened request emitters.
*/
public function testRequestEmitterFailedAfterDisconnect() {
$this->expectException(ChannelException::class);
Loop::run(function () {
$connection = yield $this->getSsh();
/** @var Session $session */
$session = $connection->createSession();
yield $session->open();
NetworkHelper::disconnect($connection);
$iterator = $session->getRequestEmitter()->iterate();
yield $iterator->advance();
});
}

/**
* if connection closed by server then fail dispatcher and close all opened data emitters.
*/
public function testDataEmitterClosedAfterDisconnect() {
Loop::run(function () {
$connection = yield $this->getSsh();
/** @var Session $session */
$session = $connection->createSession();
yield $session->open();
NetworkHelper::disconnect($connection);

$iterator = $session->getDataEmitter()->iterate();
$hasNext = yield $iterator->advance();
$this->assertFalse($hasNext);

$iterator = $session->getDataExtendedEmitter()->iterate();
$hasNext = yield $iterator->advance();
$this->assertFalse($hasNext);
});
}

/**
* if dispatcher closed then all opened channels from client must stop.
*/
public function testSessionClosedAfterConnectionClose() {
Loop::run(function () {
/** @var SshResource $connection */
$connection = yield $this->getSsh();
$session = $connection->createSession();
yield $session->open();
$connection->close();

$iterator = $session->getRequestEmitter()->iterate();

$hasNext = yield $iterator->advance();
$this->assertFalse($hasNext);
});
}
}
19 changes: 19 additions & 0 deletions tests/NetworkHelper.php
@@ -0,0 +1,19 @@
<?php

namespace Amp\Ssh\Tests;

use Amp\Ssh\SshResource;

class NetworkHelper {
/**
* Simulate disconnect from server.
* @param SshResource $sshResource
*/
public static function disconnect(SshResource $sshResource) {
$reflection = new \ReflectionObject($sshResource);
$property = $reflection->getProperty('handler');
$property->setAccessible(true);
$handler = $property->getValue($sshResource);
$handler->close();
}
}
44 changes: 44 additions & 0 deletions tests/ProcessTest.php
Expand Up @@ -5,9 +5,11 @@
use function Amp\call;
use Amp\Loop;
use Amp\Ssh\Authentication\UsernamePassword;
use Amp\Ssh\Channel\ChannelException;
use Amp\Ssh\Channel\SessionEnvException;
use function Amp\Ssh\connect;
use Amp\Ssh\Process;
use Amp\Ssh\SshResource;
use Amp\Ssh\StatusError;
use PHPUnit\Framework\TestCase;

Expand Down Expand Up @@ -185,4 +187,46 @@ public function testStderr() {
yield $ssh->close();
});
}

/**
* If connection closed by server and process started then fail with channel error.
*/
public function testProcessFailOnDisconnect() {
$this->expectException(ChannelException::class);
Loop::run(function () {
/** @var SshResource $ssh */
$ssh = yield $this->getSsh();

$process = new Process($ssh, 'sleep 10; echo test;');

yield $process->start();
self::assertTrue($process->isRunning());
Loop::defer(function () use ($ssh) {
NetworkHelper::disconnect($ssh);
});
yield $process->join();
});
}

/**
* If channel closed then join must resolve with false exitCode
* Some implementations doesn't send exit code.
* In that cases false must be used.
*/
public function testProcessFinishWithFalseOnChannelClose() {
Loop::run(function () {
/** @var SshResource $ssh */
$ssh = yield $this->getSsh();

$process = new Process($ssh, 'sleep 10; echo test;');

yield $process->start();
self::assertTrue($process->isRunning());
Loop::defer(function () use ($ssh) {
$ssh->close();
});
$exitCode = yield $process->join();
self::assertFalse($exitCode, false);
});
}
}

0 comments on commit 47ee4c3

Please sign in to comment.