|
| 1 | +<?php |
| 2 | + |
| 3 | +abstract class PhabricatorBotBaseStreamingProtocolAdapter |
| 4 | + extends PhabricatorBaseProtocolAdapter { |
| 5 | + |
| 6 | + private $readBuffers; |
| 7 | + private $authtoken; |
| 8 | + private $server; |
| 9 | + private $readHandles; |
| 10 | + private $multiHandle; |
| 11 | + private $active; |
| 12 | + private $inRooms = array(); |
| 13 | + |
| 14 | + public function connect() { |
| 15 | + $this->server = $this->getConfig('server'); |
| 16 | + $this->authtoken = $this->getConfig('authtoken'); |
| 17 | + $rooms = $this->getConfig('join'); |
| 18 | + |
| 19 | + // First, join the room |
| 20 | + if (!$rooms) { |
| 21 | + throw new Exception("Not configured to join any rooms!"); |
| 22 | + } |
| 23 | + |
| 24 | + $this->readBuffers = array(); |
| 25 | + |
| 26 | + // Set up our long poll in a curl multi request so we can |
| 27 | + // continue running while it executes in the background |
| 28 | + $this->multiHandle = curl_multi_init(); |
| 29 | + $this->readHandles = array(); |
| 30 | + |
| 31 | + foreach ($rooms as $room_id) { |
| 32 | + $this->joinRoom($room_id); |
| 33 | + |
| 34 | + // Set up the curl stream for reading |
| 35 | + $url = $this->buildStreamingUrl($room_id); |
| 36 | + $this->readHandle[$url] = curl_init(); |
| 37 | + curl_setopt($this->readHandle[$url], CURLOPT_URL, $url); |
| 38 | + curl_setopt($this->readHandle[$url], CURLOPT_RETURNTRANSFER, true); |
| 39 | + curl_setopt($this->readHandle[$url], CURLOPT_FOLLOWLOCATION, 1); |
| 40 | + curl_setopt( |
| 41 | + $this->readHandle[$url], |
| 42 | + CURLOPT_USERPWD, |
| 43 | + $this->authtoken.':x'); |
| 44 | + curl_setopt( |
| 45 | + $this->readHandle[$url], |
| 46 | + CURLOPT_HTTPHEADER, |
| 47 | + array("Content-type: application/json")); |
| 48 | + curl_setopt( |
| 49 | + $this->readHandle[$url], |
| 50 | + CURLOPT_WRITEFUNCTION, |
| 51 | + array($this, 'read')); |
| 52 | + curl_setopt($this->readHandle[$url], CURLOPT_BUFFERSIZE, 128); |
| 53 | + curl_setopt($this->readHandle[$url], CURLOPT_TIMEOUT, 0); |
| 54 | + |
| 55 | + curl_multi_add_handle($this->multiHandle, $this->readHandle[$url]); |
| 56 | + |
| 57 | + // Initialize read buffer |
| 58 | + $this->readBuffers[$url] = ''; |
| 59 | + } |
| 60 | + |
| 61 | + $this->active = null; |
| 62 | + $this->blockingMultiExec(); |
| 63 | + } |
| 64 | + |
| 65 | + protected function joinRoom($room_id) { |
| 66 | + // Optional hook, by default, do nothing |
| 67 | + } |
| 68 | + |
| 69 | + // This is our callback for the background curl multi-request. |
| 70 | + // Puts the data read in on the readBuffer for processing. |
| 71 | + private function read($ch, $data) { |
| 72 | + $info = curl_getinfo($ch); |
| 73 | + $length = strlen($data); |
| 74 | + $this->readBuffers[$info['url']] .= $data; |
| 75 | + return $length; |
| 76 | + } |
| 77 | + |
| 78 | + private function blockingMultiExec() { |
| 79 | + do { |
| 80 | + $status = curl_multi_exec($this->multiHandle, $this->active); |
| 81 | + } while ($status == CURLM_CALL_MULTI_PERFORM); |
| 82 | + |
| 83 | + // Check for errors |
| 84 | + if ($status != CURLM_OK) { |
| 85 | + throw new Exception( |
| 86 | + "Phabricator Bot had a problem reading from stream."); |
| 87 | + } |
| 88 | + } |
| 89 | + |
| 90 | + public function getNextMessages($poll_frequency) { |
| 91 | + $messages = array(); |
| 92 | + |
| 93 | + if (!$this->active) { |
| 94 | + throw new Exception("Phabricator Bot stopped reading from stream."); |
| 95 | + } |
| 96 | + |
| 97 | + // Prod our http request |
| 98 | + curl_multi_select($this->multiHandle, $poll_frequency); |
| 99 | + $this->blockingMultiExec(); |
| 100 | + |
| 101 | + // Process anything waiting on the read buffer |
| 102 | + while ($m = $this->processReadBuffer()) { |
| 103 | + $messages[] = $m; |
| 104 | + } |
| 105 | + |
| 106 | + return $messages; |
| 107 | + } |
| 108 | + |
| 109 | + private function processReadBuffer() { |
| 110 | + foreach ($this->readBuffers as $url => &$buffer) { |
| 111 | + $until = strpos($buffer, "}\r"); |
| 112 | + if ($until == false) { |
| 113 | + continue; |
| 114 | + } |
| 115 | + |
| 116 | + $message = substr($buffer, 0, $until + 1); |
| 117 | + $buffer = substr($buffer, $until + 2); |
| 118 | + |
| 119 | + $m_obj = json_decode($message, true); |
| 120 | + if ($message = $this->processMessage($m_obj)) { |
| 121 | + return $message; |
| 122 | + } |
| 123 | + } |
| 124 | + |
| 125 | + // If we're here, there's nothing to process |
| 126 | + return false; |
| 127 | + } |
| 128 | + |
| 129 | + protected function performPost($endpoint, $data = Null) { |
| 130 | + $uri = new PhutilURI($this->server); |
| 131 | + $uri->setPath($endpoint); |
| 132 | + |
| 133 | + $payload = json_encode($data); |
| 134 | + |
| 135 | + list($output) = id(new HTTPSFuture($uri)) |
| 136 | + ->setMethod('POST') |
| 137 | + ->addHeader('Content-Type', 'application/json') |
| 138 | + ->addHeader('Authorization', $this->getAuthorizationHeader()) |
| 139 | + ->setData($payload) |
| 140 | + ->resolvex(); |
| 141 | + |
| 142 | + $output = trim($output); |
| 143 | + if (strlen($output)) { |
| 144 | + return json_decode($output, true); |
| 145 | + } |
| 146 | + |
| 147 | + return true; |
| 148 | + } |
| 149 | + |
| 150 | + protected function getAuthorizationHeader() { |
| 151 | + return 'Basic '.base64_encode($this->authtoken.':x'); |
| 152 | + } |
| 153 | + |
| 154 | + abstract protected function buildStreamingUrl($channel); |
| 155 | + |
| 156 | + abstract protected function processMessage($raw_object); |
| 157 | +} |
| 158 | + |
0 commit comments