Skip to content

Commit

Permalink
Removed "delay" options when not needed, and added support for the "s…
Browse files Browse the repository at this point in the history
…ink" option to the GuzzleV5 adapter.
  • Loading branch information
jeremeamia committed Apr 24, 2015
1 parent e07afd3 commit 729f3a6
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 34 deletions.
23 changes: 18 additions & 5 deletions src/Handler/GuzzleV5/GuzzleHandler.php
Expand Up @@ -10,6 +10,7 @@
use GuzzleHttp\Promise;
use GuzzleHttp\Psr7\Response as Psr7Response;
use Psr\Http\Message\RequestInterface as Psr7Request;
use Psr\Http\Message\StreamInterface as Psr7StreamInterface;

/**
* A request handler that sends PSR-7-compatible requests with Guzzle 5.
Expand All @@ -28,6 +29,7 @@ class GuzzleHandler
'connect_timeout' => true,
'stream' => true,
'delay' => true,
'sink' => true,
];

/** @var ClientInterface */
Expand Down Expand Up @@ -82,6 +84,8 @@ function (Exception $exception) {

private function createGuzzleRequest(Psr7Request $psrRequest, array $options)
{
$ringConfig = [];

// Remove unsupported options.
foreach (array_keys($options) as $key) {
if (isset(self::$validOptions[$key])) {
Expand All @@ -90,12 +94,19 @@ private function createGuzzleRequest(Psr7Request $psrRequest, array $options)
}

// Handle delay option.
$delay = null;
if (isset($options['delay'])) {
$delay = $options['delay'];
$ringConfig['delay'] = $options['delay'];
unset($options['delay']);
}

// Prepare sink option.
if (isset($options['sink'])) {
$ringConfig['save_to'] = ($options['sink'] instanceof Psr7StreamInterface)
? new GuzzleStream($options['sink'])
: $options['sink'];
unset($options['sink']);
}

// Ensure that all requests are async and lazy like Guzzle 6.
$options['future'] = 'lazy';

Expand All @@ -118,12 +129,14 @@ private function createGuzzleRequest(Psr7Request $psrRequest, array $options)
$request->setHeaders($psrRequest->getHeaders());
$request->setHeader(
'user-agent',
$request->getHeaderLine('user-agent') . ' ' . Client::getDefaultUserAgent()
$request->getHeader('user-agent') . ' ' . Client::getDefaultUserAgent()
);

// Make sure the delay is configured, if provided.
if ($delay) {
$request->getConfig()->set('delay', $delay);
if ($ringConfig) {
foreach ($ringConfig as $k => $v) {
$request->getConfig()->set($k, $v);
}
}

return $request;
Expand Down
3 changes: 1 addition & 2 deletions src/Handler/GuzzleV6/GuzzleHandler.php
Expand Up @@ -36,8 +36,7 @@ public function __invoke(Psr7Request $request, array $options = [])
{
$request = $this->prepareRequest($request, $options);

return $this->client->sendAsync($request, $options)->then(
null,
return $this->client->sendAsync($request, $options)->otherwise(
static function (\Exception $e) {
$error = [
'exception' => $e,
Expand Down
6 changes: 0 additions & 6 deletions src/Multipart/AbstractUploader.php
Expand Up @@ -292,12 +292,6 @@ private function execCommand($operation, array $params)
$params + $this->state->getId()
);

// Wait for the next tick.
if (!$command['@http']) {
$command['@http'] = [];
}
$command['@http']['delay'] = true;

// Execute the before callback.
if (is_callable($this->config["before_{$operation}"])) {
$this->config["before_{$operation}"]($command);
Expand Down
18 changes: 7 additions & 11 deletions src/S3/Transfer.php
Expand Up @@ -43,15 +43,15 @@ class Transfer implements PromisorInterface
* The options array can contain the following key value pairs:
*
* - base_dir: (string) Base dir of the source, if $source is an iterator.
* - before: A callable that accepts the following positional arguments:
* - before: (callable) Accepts the following positional arguments:
* source, dest, command; where command is an instance of a Command
* object. The provided command will be either a GetObject, PutObject,
* InitiateMultipartUpload, or UploadPart command.
* - mup_threshold: Size in bytes in which a multipart upload should be
* used instead of PutObject. Defaults to 20971520 (20 MB).
* - concurrency: Number of files to upload concurrently. Defaults to 5.
* - debug: Set to true to print out debug information for transfers. Set
* to an fopen() resource to write to a specific stream.
* - mup_threshold: (int) Size in bytes in which a multipart upload should
* be used instead of PutObject. Defaults to 20971520 (20 MB).
* - concurrency: (int, default=5) Number of files to upload concurrently.
* - debug: (bool) Set to true to print out debug information for transfers.
* Set to an fopen() resource to write to a specific stream.
*
* @param S3Client $client Client used for transfers.
* @param string|\Iterator $source Where the files are transferred from.
Expand Down Expand Up @@ -248,10 +248,7 @@ private function createDownloadPromise()
$commands[] = $this->client->getCommand('GetObject', [
'Bucket' => $listArgs['Bucket'],
'Key' => $key,
'@http' => [
'sink' => $sink,
'delay' => true
],
'@http' => ['sink' => $sink],
]);
}

Expand Down Expand Up @@ -292,7 +289,6 @@ private function upload($filename)
$args = $this->s3Args;
$args['SourceFile'] = $filename;
$args['Key'] = $this->createS3Key($filename);
$args['@http'] = ['delay' => true];

$command = $this->client->getCommand('PutObject', $args);
$this->before and call_user_func($this->before, $command);
Expand Down
18 changes: 13 additions & 5 deletions tests/Handler/GuzzleV5/HandlerTest.php
Expand Up @@ -7,6 +7,7 @@
use GuzzleHttp\Message\Request as GuzzleRequest;
use GuzzleHttp\Message\Response as GuzzleResponse;
use GuzzleHttp\Promise\RejectionException;
use GuzzleHttp\Psr7;
use GuzzleHttp\Psr7\Request as PsrRequest;
use GuzzleHttp\Psr7\Response as PsrResponse;
use GuzzleHttp\Stream\Stream;
Expand All @@ -19,7 +20,7 @@ class HandlerTest extends \PHPUnit_Framework_TestCase
{
public function setUp()
{
if (class_exists('GuzzleHttp\Promise\Promise')) {
if (!class_exists('GuzzleHttp\Ring\Core')) {
$this->markTestSkipped();
}
}
Expand All @@ -29,16 +30,18 @@ public function testHandlerWorksWithSuccessfulRequest()
$deferred = new Deferred();
$handler = $this->getHandler($deferred);
$request = new PsrRequest('PUT', 'http://example.com', [], '{}');
$sink = Psr7\stream_for();

$promise = $handler($request, ['delay' => 500]);
$promise = $handler($request, ['delay' => 500, 'sink' => $sink]);
$this->assertInstanceOf('GuzzleHttp\\Promise\\PromiseInterface', $promise);
$deferred->resolve(new GuzzleResponse(200, [], Stream::factory('foo')));

/** @var $response PsrResponse */
$response = $promise->wait();
$this->assertInstanceOf(PsrResponse::class, $response);
$this->assertEquals(200, $response->getStatusCode());
$this->assertEquals(200, $response->getStatusCode());
$this->assertEquals('foo', $response->getBody()->getContents());
$this->assertEquals('foo', (string) $sink);
}

public function testHandlerWorksWithFailedRequest()
Expand Down Expand Up @@ -87,7 +90,12 @@ private function getHandler(Deferred $deferred)
$future = new FutureResponse($deferred->promise());
$client->method('send')->willReturn($future);

/** @var $client \GuzzleHttp\Client */
return new GuzzleHandler($client);
return function ($request, $options = []) use ($client) {
/** @var $client \GuzzleHttp\Client */
if (isset($options['sink'])) {
$options['sink']->write('foo');
}
return call_user_func(new GuzzleHandler($client), $request, $options);
};
}
}
2 changes: 1 addition & 1 deletion tests/Handler/GuzzleV5/StreamTest.php
Expand Up @@ -14,7 +14,7 @@ class StreamTest extends \PHPUnit_Framework_TestCase
{
public function setUp()
{
if (class_exists('GuzzleHttp\Promise\Promise')) {
if (!class_exists('GuzzleHttp\Ring\Core')) {
$this->markTestSkipped();
}
}
Expand Down
14 changes: 10 additions & 4 deletions tests/Integ/GuzzleV5HandlerTest.php
Expand Up @@ -11,7 +11,7 @@ class GuzzleV5HandlerTest extends \PHPUnit_Framework_TestCase
{
public function setUp()
{
if (class_exists('GuzzleHttp\Promise\Promise')) {
if (!class_exists('GuzzleHttp\Ring\Core')) {
$this->markTestSkipped();
}
}
Expand All @@ -25,9 +25,10 @@ public function testSendRequest()
['c' => '3', 'd' => '4', 'user-agent' => 'AWS/3'],
Psr7\stream_for('{"f":6,"g":7}')
);
$sink = Psr7\stream_for();

/** @var \GuzzleHttp\Promise\Promise $responsePromise */
$responsePromise = $handler($request);
$responsePromise = $handler($request, ['sink' => $sink]);
$responsePromise = $responsePromise->then(
function (Response $resp) {
return $resp->withHeader('e', '5');
Expand All @@ -39,9 +40,10 @@ function (array $error) {

/** @var Response $response */
$response = $responsePromise->wait();
$data = json_decode($response->getBody()->getContents(), true);
$body = $response->getBody()->getContents();
$data = json_decode($body, true);

// Check request data.
// Check response data.
$this->assertArrayHasKey('C', $data['headers']);
$this->assertArrayHasKey('D', $data['headers']);
$this->assertStringStartsWith('AWS/3 Guzzle/5', $data['headers']['User-Agent']);
Expand All @@ -52,6 +54,10 @@ function (array $error) {

// Check response data.
$this->assertTrue($response->hasHeader('E'));

// Check the sink.
$sink->seek(0);
$this->assertEquals($body, $sink->getContents());
}

public function testProduceErrorData()
Expand Down

0 comments on commit 729f3a6

Please sign in to comment.