-
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Dispatcher.php
93 lines (68 loc) · 2.4 KB
/
Dispatcher.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
<?php
namespace Amp\Ssh\Channel;
use Amp\Emitter;
use Amp\Ssh\Message\ChannelClose;
use Amp\Ssh\Message\ChannelOpen;
use Amp\Ssh\Message\Message;
use Amp\Ssh\Transport\BinaryPacketHandler;
use function Amp\asyncCall;
/**
* @internal
*/
class Dispatcher {
/** @var Emitter[] */
private $channelsEmitter = [];
private $handler;
private $channelSequence = 0;
private $running = true;
private $closed = false;
public function __construct(BinaryPacketHandler $handler) {
$this->handler = $handler;
}
public function start() {
if ($this->closed) {
throw new \RuntimeException('SSH Connection is closed');
}
asyncCall(function () {
while ($this->running) {
$message = yield $this->handler->read();
if (!$message instanceof Message) {
continue;
}
$type = $message::getNumber();
if ($type >= Message::SSH_MSG_CHANNEL_OPEN && $type <= Message::SSH_MSG_CHANNEL_FAILURE) {
$channelId = $message instanceof ChannelOpen ? $message->senderChannel : $message->recipientChannel;
if (!\array_key_exists($channelId, $this->channelsEmitter)) {
continue;
}
yield $this->channelsEmitter[$channelId]->emit($message);
if (!\array_key_exists($channelId, $this->channelsEmitter)) {
continue;
}
if ($message instanceof ChannelClose) {
$this->channelsEmitter[$channelId]->complete();
unset($this->channelsEmitter[$channelId]);
}
continue;
}
}
});
}
public function stop() {
$this->running = false;
}
public function close() {
$this->stop();
foreach ($this->channelsEmitter as $channelId => $emitter) {
$emitter->complete();
unset($this->channelsEmitter[$channelId]);
}
}
public function createSession(): Session {
$emitter = new Emitter();
$session = new Session($this->handler, $emitter->iterate(), $this->channelSequence);
$this->channelsEmitter[$this->channelSequence] = $emitter;
++$this->channelSequence;
return $session;
}
}