diff --git a/src/Handler/GuzzleV5/GuzzleHandler.php b/src/Handler/GuzzleV5/GuzzleHandler.php index b278a68d74..b8af8eba4e 100644 --- a/src/Handler/GuzzleV5/GuzzleHandler.php +++ b/src/Handler/GuzzleV5/GuzzleHandler.php @@ -10,6 +10,7 @@ use GuzzleHttp\Promise; use GuzzleHttp\Psr7\Response as Psr7Response; use Psr\Http\Message\RequestInterface as Psr7Request; +use Psr\Http\Message\StreamInterface as Psr7StreamInterface; /** * A request handler that sends PSR-7-compatible requests with Guzzle 5. @@ -28,6 +29,7 @@ class GuzzleHandler 'connect_timeout' => true, 'stream' => true, 'delay' => true, + 'sink' => true, ]; /** @var ClientInterface */ @@ -82,6 +84,8 @@ function (Exception $exception) { private function createGuzzleRequest(Psr7Request $psrRequest, array $options) { + $ringConfig = []; + // Remove unsupported options. foreach (array_keys($options) as $key) { if (isset(self::$validOptions[$key])) { @@ -90,12 +94,19 @@ private function createGuzzleRequest(Psr7Request $psrRequest, array $options) } // Handle delay option. - $delay = null; if (isset($options['delay'])) { - $delay = $options['delay']; + $ringConfig['delay'] = $options['delay']; unset($options['delay']); } + // Prepare sink option. + if (isset($options['sink'])) { + $ringConfig['save_to'] = ($options['sink'] instanceof Psr7StreamInterface) + ? new GuzzleStream($options['sink']) + : $options['sink']; + unset($options['sink']); + } + // Ensure that all requests are async and lazy like Guzzle 6. $options['future'] = 'lazy'; @@ -118,12 +129,14 @@ private function createGuzzleRequest(Psr7Request $psrRequest, array $options) $request->setHeaders($psrRequest->getHeaders()); $request->setHeader( 'user-agent', - $request->getHeaderLine('user-agent') . ' ' . Client::getDefaultUserAgent() + $request->getHeader('user-agent') . ' ' . Client::getDefaultUserAgent() ); // Make sure the delay is configured, if provided. - if ($delay) { - $request->getConfig()->set('delay', $delay); + if ($ringConfig) { + foreach ($ringConfig as $k => $v) { + $request->getConfig()->set($k, $v); + } } return $request; diff --git a/src/Handler/GuzzleV6/GuzzleHandler.php b/src/Handler/GuzzleV6/GuzzleHandler.php index 381df7541a..eadfb856b8 100644 --- a/src/Handler/GuzzleV6/GuzzleHandler.php +++ b/src/Handler/GuzzleV6/GuzzleHandler.php @@ -36,8 +36,7 @@ public function __invoke(Psr7Request $request, array $options = []) { $request = $this->prepareRequest($request, $options); - return $this->client->sendAsync($request, $options)->then( - null, + return $this->client->sendAsync($request, $options)->otherwise( static function (\Exception $e) { $error = [ 'exception' => $e, diff --git a/src/Multipart/AbstractUploader.php b/src/Multipart/AbstractUploader.php index 6123f6e5be..152ec013ef 100644 --- a/src/Multipart/AbstractUploader.php +++ b/src/Multipart/AbstractUploader.php @@ -292,12 +292,6 @@ private function execCommand($operation, array $params) $params + $this->state->getId() ); - // Wait for the next tick. - if (!$command['@http']) { - $command['@http'] = []; - } - $command['@http']['delay'] = true; - // Execute the before callback. if (is_callable($this->config["before_{$operation}"])) { $this->config["before_{$operation}"]($command); diff --git a/src/S3/Transfer.php b/src/S3/Transfer.php index 8a928bd7e0..80072e88a7 100644 --- a/src/S3/Transfer.php +++ b/src/S3/Transfer.php @@ -43,15 +43,15 @@ class Transfer implements PromisorInterface * The options array can contain the following key value pairs: * * - base_dir: (string) Base dir of the source, if $source is an iterator. - * - before: A callable that accepts the following positional arguments: + * - before: (callable) Accepts the following positional arguments: * source, dest, command; where command is an instance of a Command * object. The provided command will be either a GetObject, PutObject, * InitiateMultipartUpload, or UploadPart command. - * - mup_threshold: Size in bytes in which a multipart upload should be - * used instead of PutObject. Defaults to 20971520 (20 MB). - * - concurrency: Number of files to upload concurrently. Defaults to 5. - * - debug: Set to true to print out debug information for transfers. Set - * to an fopen() resource to write to a specific stream. + * - mup_threshold: (int) Size in bytes in which a multipart upload should + * be used instead of PutObject. Defaults to 20971520 (20 MB). + * - concurrency: (int, default=5) Number of files to upload concurrently. + * - debug: (bool) Set to true to print out debug information for transfers. + * Set to an fopen() resource to write to a specific stream. * * @param S3Client $client Client used for transfers. * @param string|\Iterator $source Where the files are transferred from. @@ -248,10 +248,7 @@ private function createDownloadPromise() $commands[] = $this->client->getCommand('GetObject', [ 'Bucket' => $listArgs['Bucket'], 'Key' => $key, - '@http' => [ - 'sink' => $sink, - 'delay' => true - ], + '@http' => ['sink' => $sink], ]); } @@ -292,7 +289,6 @@ private function upload($filename) $args = $this->s3Args; $args['SourceFile'] = $filename; $args['Key'] = $this->createS3Key($filename); - $args['@http'] = ['delay' => true]; $command = $this->client->getCommand('PutObject', $args); $this->before and call_user_func($this->before, $command); diff --git a/tests/Handler/GuzzleV5/HandlerTest.php b/tests/Handler/GuzzleV5/HandlerTest.php index 9554a73a61..bdb605641e 100644 --- a/tests/Handler/GuzzleV5/HandlerTest.php +++ b/tests/Handler/GuzzleV5/HandlerTest.php @@ -7,6 +7,7 @@ use GuzzleHttp\Message\Request as GuzzleRequest; use GuzzleHttp\Message\Response as GuzzleResponse; use GuzzleHttp\Promise\RejectionException; +use GuzzleHttp\Psr7; use GuzzleHttp\Psr7\Request as PsrRequest; use GuzzleHttp\Psr7\Response as PsrResponse; use GuzzleHttp\Stream\Stream; @@ -19,7 +20,7 @@ class HandlerTest extends \PHPUnit_Framework_TestCase { public function setUp() { - if (class_exists('GuzzleHttp\Promise\Promise')) { + if (!class_exists('GuzzleHttp\Ring\Core')) { $this->markTestSkipped(); } } @@ -29,8 +30,9 @@ public function testHandlerWorksWithSuccessfulRequest() $deferred = new Deferred(); $handler = $this->getHandler($deferred); $request = new PsrRequest('PUT', 'http://example.com', [], '{}'); + $sink = Psr7\stream_for(); - $promise = $handler($request, ['delay' => 500]); + $promise = $handler($request, ['delay' => 500, 'sink' => $sink]); $this->assertInstanceOf('GuzzleHttp\\Promise\\PromiseInterface', $promise); $deferred->resolve(new GuzzleResponse(200, [], Stream::factory('foo'))); @@ -38,7 +40,8 @@ public function testHandlerWorksWithSuccessfulRequest() $response = $promise->wait(); $this->assertInstanceOf(PsrResponse::class, $response); $this->assertEquals(200, $response->getStatusCode()); - $this->assertEquals(200, $response->getStatusCode()); + $this->assertEquals('foo', $response->getBody()->getContents()); + $this->assertEquals('foo', (string) $sink); } public function testHandlerWorksWithFailedRequest() @@ -87,7 +90,12 @@ private function getHandler(Deferred $deferred) $future = new FutureResponse($deferred->promise()); $client->method('send')->willReturn($future); - /** @var $client \GuzzleHttp\Client */ - return new GuzzleHandler($client); + return function ($request, $options = []) use ($client) { + /** @var $client \GuzzleHttp\Client */ + if (isset($options['sink'])) { + $options['sink']->write('foo'); + } + return call_user_func(new GuzzleHandler($client), $request, $options); + }; } } diff --git a/tests/Handler/GuzzleV5/StreamTest.php b/tests/Handler/GuzzleV5/StreamTest.php index ec7a06112e..8d65af2903 100644 --- a/tests/Handler/GuzzleV5/StreamTest.php +++ b/tests/Handler/GuzzleV5/StreamTest.php @@ -14,7 +14,7 @@ class StreamTest extends \PHPUnit_Framework_TestCase { public function setUp() { - if (class_exists('GuzzleHttp\Promise\Promise')) { + if (!class_exists('GuzzleHttp\Ring\Core')) { $this->markTestSkipped(); } } diff --git a/tests/Integ/GuzzleV5HandlerTest.php b/tests/Integ/GuzzleV5HandlerTest.php index 526ff08d5e..83e9c61c01 100644 --- a/tests/Integ/GuzzleV5HandlerTest.php +++ b/tests/Integ/GuzzleV5HandlerTest.php @@ -11,7 +11,7 @@ class GuzzleV5HandlerTest extends \PHPUnit_Framework_TestCase { public function setUp() { - if (class_exists('GuzzleHttp\Promise\Promise')) { + if (!class_exists('GuzzleHttp\Ring\Core')) { $this->markTestSkipped(); } } @@ -25,9 +25,10 @@ public function testSendRequest() ['c' => '3', 'd' => '4', 'user-agent' => 'AWS/3'], Psr7\stream_for('{"f":6,"g":7}') ); + $sink = Psr7\stream_for(); /** @var \GuzzleHttp\Promise\Promise $responsePromise */ - $responsePromise = $handler($request); + $responsePromise = $handler($request, ['sink' => $sink]); $responsePromise = $responsePromise->then( function (Response $resp) { return $resp->withHeader('e', '5'); @@ -39,9 +40,10 @@ function (array $error) { /** @var Response $response */ $response = $responsePromise->wait(); - $data = json_decode($response->getBody()->getContents(), true); + $body = $response->getBody()->getContents(); + $data = json_decode($body, true); - // Check request data. + // Check response data. $this->assertArrayHasKey('C', $data['headers']); $this->assertArrayHasKey('D', $data['headers']); $this->assertStringStartsWith('AWS/3 Guzzle/5', $data['headers']['User-Agent']); @@ -52,6 +54,10 @@ function (array $error) { // Check response data. $this->assertTrue($response->hasHeader('E')); + + // Check the sink. + $sink->seek(0); + $this->assertEquals($body, $sink->getContents()); } public function testProduceErrorData()