diff --git a/library/Requests.php b/library/Requests.php index d452bd418..4f3d07b9c 100644 --- a/library/Requests.php +++ b/library/Requests.php @@ -385,6 +385,79 @@ public static function request($url, $headers = array(), $data = array(), $type return self::parse_response($response, $url, $headers, $data, $options); } + public static function request_pool($requests, $options = array(), $pool_size = 2) { + $options = array_merge(self::get_default_options(true), $options); + + if (!empty($options['hooks'])) { + $options['hooks']->register('transport.internal.parse_response', array('Requests', 'parse_multiple')); + if (!empty($options['complete'])) { + $options['hooks']->register('multiple.request.complete', $options['complete']); + } + } + + foreach ($requests as $id => &$request) { + if (!isset($request['headers'])) { + $request['headers'] = array(); + } + if (!isset($request['data'])) { + $request['data'] = array(); + } + if (!isset($request['type'])) { + $request['type'] = self::GET; + } + if (!isset($request['options'])) { + $request['options'] = $options; + $request['options']['type'] = $request['type']; + } + else { + if (empty($request['options']['type'])) { + $request['options']['type'] = $request['type']; + } + $request['options'] = array_merge($options, $request['options']); + } + + self::set_defaults($request['url'], $request['headers'], $request['data'], $request['type'], $request['options']); + + // Ensure we only hook in once + if ($request['options']['hooks'] !== $options['hooks']) { + $request['options']['hooks']->register('transport.internal.parse_response', array('Requests', 'parse_multiple')); + if (!empty($request['options']['complete'])) { + $request['options']['hooks']->register('multiple.request.complete', $request['options']['complete']); + } + } + } + unset($request); + + if (!empty($options['transport'])) { + $transport = $options['transport']; + + if (is_string($options['transport'])) { + $transport = new $transport(); + } + } + else { + $transport = self::get_transport(); + } + + if (get_class($transport) !== 'Requests_Transport_cURL') { + return array(); + } + + $responses = $transport->request_pool($requests, $options, $pool_size); + + foreach ($responses as $id => &$response) { + // If our hook got messed with somehow, ensure we end up with the + // correct response + if (is_string($response)) { + $request = $requests[$id]; + self::parse_multiple($response, $request); + $request['options']['hooks']->dispatch('multiple.request.complete', array(&$response, $id)); + } + } + + return $responses; + } + /** * Send multiple HTTP requests simultaneously * diff --git a/library/Requests/Transport/cURL.php b/library/Requests/Transport/cURL.php index 4cd10866a..9be6e7cad 100644 --- a/library/Requests/Transport/cURL.php +++ b/library/Requests/Transport/cURL.php @@ -147,20 +147,6 @@ public function request($url, $headers = array(), $data = array(), $options = ar $this->response_byte_limit = $options['max_bytes']; } - if (isset($options['verify'])) { - if ($options['verify'] === false) { - curl_setopt($this->handle, CURLOPT_SSL_VERIFYHOST, 0); - curl_setopt($this->handle, CURLOPT_SSL_VERIFYPEER, 0); - } - elseif (is_string($options['verify'])) { - curl_setopt($this->handle, CURLOPT_CAINFO, $options['verify']); - } - } - - if (isset($options['verifyname']) && $options['verifyname'] === false) { - curl_setopt($this->handle, CURLOPT_SSL_VERIFYHOST, 0); - } - curl_exec($this->handle); $response = $this->response_data; @@ -186,6 +172,134 @@ public function request($url, $headers = array(), $data = array(), $options = ar return $this->headers; } + + /** + * Send multiple requests simultaneously with a maximal concurrent conncetion (pool) + * + * @param array $requests Request data + * @param array $options Global options + * @param int $pool_size Maximal simultaneously connections + * @return array Array of Requests_Response objects (may contain Requests_Exception or string responses as well) + */ + public function request_pool($requests, $options, $pool_size = 2) { + if (empty($requests)) { + return array(); + } + + $pool = array(); + $queue = array(); + $responses = array(); + + $main_curl_executor_pool = curl_multi_init(); + + foreach ($requests as $id => $request) { + $queue[] = array( + $id, + $request, + ); + } + + $queue = array_reverse($queue); + + $request['options']['hooks']->dispatch('curl.before_multi_exec', array(&$multihandle)); + + while (!empty($pool) || !empty($queue)) { + curl_multi_exec($main_curl_executor_pool, $active); + $done = curl_multi_info_read($main_curl_executor_pool); + + if ($done !== false) { + $pool_key = (int) $done['handle']; + $pool_element = $pool[$pool_key]; + unset($pool[$pool_key]); + + $response = $this->handleCurlResponse($done, $pool_element); + $responses[$pool_element['id']] = $response; + curl_multi_remove_handle($main_curl_executor_pool, $done['handle']); + curl_close($done['handle']); + + $pool_element['request']['options']['hooks']->dispatch( + 'multiple.request.complete', + array( + &$responses[$pool_element['id']], + $pool_element['id'], + ) + ); + } + + if (count($pool) >= $pool_size || empty($queue)) { + sleep(0.5); + + continue; + } + + list($id, $request) = array_pop($queue); + list($subhandle, $subrequest) = $this->addNewSubrequestHandle($request); + + $pool[(int) $subhandle] = array( + 'id' => $id, + 'request' => $request, + 'subhandle' => $subhandle, + 'subrequest' => $subrequest, + ); + + curl_multi_add_handle($main_curl_executor_pool, $subhandle); + } + + $request['options']['hooks']->dispatch('curl.after_multi_exec', array(&$multihandle)); + curl_multi_close($main_curl_executor_pool); + + return $responses; + } + + private function addNewSubrequestHandle($request) { + $class = get_class($this); + $subrequest = new $class(); + $subhandle = $subrequest->get_subrequest_handle( + $request['url'], + $request['headers'], + $request['data'], + $request['options'] + ); + $request['options']['hooks']->dispatch('curl.before_multi_add', array(&$subhandle)); + + return array($subhandle, $subrequest); + } + + private function handleCurlResponse(array $done, $pool_element) { + + if ($done['result'] === CURLE_OK) { + $parsed_response = $pool_element['subrequest']->process_response( + $pool_element['subrequest']->response_data, + $pool_element['request']['options'] + ); + + $pool_element['request']['options']['hooks']->dispatch( + 'transport.internal.parse_response', + array( + &$parsed_response, + $pool_element['request'], + ) + ); + + return $parsed_response; + } + + $reason = curl_error($done['handle']); + $exception = new Requests_Exception_Transport_cURL( + $reason, + Requests_Exception_Transport_cURL::EASY, + $done['handle'], + $done['result'] + ); + + $pool_element['request']['options']['hooks']->dispatch( + 'transport.internal.parse_error', + array(&$exception, $pool_element['request']) + ); + + return $exception; + } + /** * Send multiple requests simultaneously * @@ -410,6 +524,20 @@ protected function setup_handle($url, $headers, $data, $options) { curl_setopt($this->handle, CURLOPT_WRITEFUNCTION, array($this, 'stream_body')); curl_setopt($this->handle, CURLOPT_BUFFERSIZE, Requests::BUFFER_SIZE); } + + if (isset($options['verify'])) { + if ($options['verify'] === false) { + curl_setopt($this->handle, CURLOPT_SSL_VERIFYHOST, 0); + curl_setopt($this->handle, CURLOPT_SSL_VERIFYPEER, 0); + } + elseif (is_string($options['verify'])) { + curl_setopt($this->handle, CURLOPT_CAINFO, $options['verify']); + } + } + + if (isset($options['verifyname']) && $options['verifyname'] === false) { + curl_setopt($this->handle, CURLOPT_SSL_VERIFYHOST, 0); + } } /** diff --git a/tests/Transport/cURL.php b/tests/Transport/cURL.php index 7f7c41774..78288382f 100644 --- a/tests/Transport/cURL.php +++ b/tests/Transport/cURL.php @@ -15,6 +15,135 @@ public function testExpiredHTTPS() { parent::testExpiredHTTPS(); } + public function testPoolMultiple() { + $requests = array( + 'test1' => array( + 'url' => httpbin('/get'), + ), + 'test2' => array( + 'url' => httpbin('/get'), + ), + ); + $responses = Requests::request_pool($requests, $this->getOptions()); + + // test1 + $this->assertNotEmpty($responses['test1']); + $this->assertInstanceOf('Requests_Response', $responses['test1']); + $this->assertSame(200, $responses['test1']->status_code); + + $result = json_decode($responses['test1']->body, true); + $this->assertSame(httpbin('/get'), $result['url']); + $this->assertEmpty($result['args']); + + // test2 + $this->assertNotEmpty($responses['test2']); + $this->assertInstanceOf('Requests_Response', $responses['test2']); + $this->assertSame(200, $responses['test2']->status_code); + + $result = json_decode($responses['test2']->body, true); + $this->assertSame(httpbin('/get'), $result['url']); + $this->assertEmpty($result['args']); + } + + public function testPoolWithDifferingMethods() { + $requests = array( + 'get' => array( + 'url' => httpbin('/get'), + ), + 'post' => array( + 'url' => httpbin('/post'), + 'type' => Requests::POST, + 'data' => 'test', + ), + ); + $responses = Requests::request_pool($requests, $this->getOptions()); + + // get + $this->assertSame(200, $responses['get']->status_code); + + // post + $this->assertSame(200, $responses['post']->status_code); + $result = json_decode($responses['post']->body, true); + $this->assertSame('test', $result['data']); + } + + public function testPoolUsingCallbackAndFailure() { + $requests = array( + 'success' => array( + 'url' => httpbin('/get'), + ), + 'timeout' => array( + 'url' => httpbin('/delay/10'), + 'options' => array( + 'timeout' => 1, + ), + ), + ); + $this->completed = array(); + $options = array( + 'complete' => array($this, 'completeCallback'), + ); + $responses = Requests::request_pool($requests, $this->getOptions($options)); + + $this->assertSame($this->completed, $responses); + $this->completed = array(); + } + + public function testPoolUsingCallback() { + $requests = array( + 'get' => array( + 'url' => httpbin('/get'), + ), + 'post' => array( + 'url' => httpbin('/post'), + 'type' => Requests::POST, + 'data' => 'test', + ), + ); + $this->completed = array(); + $options = array( + 'complete' => array($this, 'completeCallback'), + ); + $responses = Requests::request_pool($requests, $this->getOptions($options)); + + $this->assertSame($this->completed, $responses); + $this->completed = array(); + } + + public function testPoolToFile() { + $requests = array( + 'get' => array( + 'url' => httpbin('/get'), + 'options' => array( + 'filename' => tempnam(sys_get_temp_dir(), 'RLT'), // RequestsLibraryTest + ), + ), + 'post' => array( + 'url' => httpbin('/post'), + 'type' => Requests::POST, + 'data' => 'test', + 'options' => array( + 'filename' => tempnam(sys_get_temp_dir(), 'RLT'), // RequestsLibraryTest + ), + ), + ); + Requests::request_pool($requests, $this->getOptions()); + + // GET request + $contents = file_get_contents($requests['get']['options']['filename']); + $result = json_decode($contents, true); + $this->assertSame(httpbin('/get'), $result['url']); + $this->assertEmpty($result['args']); + unlink($requests['get']['options']['filename']); + + // POST request + $contents = file_get_contents($requests['post']['options']['filename']); + $result = json_decode($contents, true); + $this->assertSame(httpbin('/post'), $result['url']); + $this->assertSame('test', $result['data']); + unlink($requests['post']['options']['filename']); + } + public function testRevokedHTTPS() { $this->expectException('Requests_Exception'); $this->expectExceptionMessage('certificate subject name'); diff --git a/tests/Transport/fsockopen.php b/tests/Transport/fsockopen.php index f77c154a1..7248a52dd 100644 --- a/tests/Transport/fsockopen.php +++ b/tests/Transport/fsockopen.php @@ -29,6 +29,19 @@ public function testBadDomain() { parent::testBadDomain(); } + public function testPoolNotImplementedInFsock() { + $requests = array( + 'test1' => array( + 'url' => httpbin('/get'), + ), + 'test2' => array( + 'url' => httpbin('/get'), + ), + ); + $responses = Requests::request_pool($requests, $this->getOptions()); + $this->assertSame(array(), $responses); + } + /** * Issue #248. */