Skip to content

Commit

Permalink
Merge pull request aws#525 from aws/v3-async-mups
Browse files Browse the repository at this point in the history
Async multipart uploads
  • Loading branch information
jeremeamia committed Apr 7, 2015
2 parents 886d656 + 0e10884 commit adebccb
Show file tree
Hide file tree
Showing 30 changed files with 1,691 additions and 2,004 deletions.
46 changes: 28 additions & 18 deletions src/Exception/MultipartUploadException.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,43 @@

class MultipartUploadException extends \RuntimeException
{
const MSG_TEMPLATE = 'An exception occurred while %s a multipart upload.';

/** @var UploadState State of the erroneous transfer */
private $state;

/**
* @param UploadState $state Upload state at time of the exception.
* @param string $action Action being taken.
* @param \Exception $prev Exception being thrown.
* @param UploadState $state Upload state at time of the exception.
* @param \Exception|array $prev Exception being thrown.
*/
public function __construct(
UploadState $state,
$action,
\Exception $prev = null
) {
if (is_array($action)) {
$message = sprintf(self::MSG_TEMPLATE, 'uploading parts to');
$message .= " The following parts had errors:\n";
foreach ($action as $part => $error) {
$message .= "- Part {$part}: {$error}\n";
public function __construct(UploadState $state, $prev = null) {
$msg = 'An exception occurred while performing a multipart upload.';

if (is_array($prev)) {
$msg = strtr($msg, ['performing' => 'uploading parts to']);
$msg .= " The following parts had errors:\n";
/** @var $error AwsException */
foreach ($prev as $part => $error) {
$msg .= "- Part {$part}: " . $error->getMessage(). "\n";
}
} elseif ($prev instanceof AwsException) {
switch ($prev->getCommand()->getName()) {
case 'CreateMultipartUpload':
case 'InitiateMultipartUpload':
$action = 'initiating';
break;
case 'CompleteMultipartUpload':
$action = 'completing';
break;
}
if (isset($action)) {
$msg = strtr($msg, ['performing' => $action]);
}
} else {
$message = sprintf(self::MSG_TEMPLATE, $action);
}

parent::__construct($message, 0, $prev);
if (!$prev instanceof \Exception) {
$prev = null;
}

parent::__construct($msg, 0, $prev);
$this->state = $state;
}

Expand Down
281 changes: 281 additions & 0 deletions src/Glacier/MultipartUploader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
<?php
namespace Aws\Glacier;

use Aws\CommandInterface;
use Aws\HashingStream;
use Aws\Multipart\AbstractUploader;
use Aws\Multipart\UploadState;
use Aws\PhpHash;
use Aws\ResultInterface;
use GuzzleHttp\Psr7;
use Psr\Http\Message\StreamableInterface as Stream;

/**
* Encapsulates the execution of a multipart upload to Glacier.
*/
class MultipartUploader extends AbstractUploader
{
const PART_MIN_SIZE = 1048576;

private static $validPartSizes = [
1048576, // 1 MB
2097152, // 2 MB
4194304, // 4 MB
8388608, // 8 MB
16777216, // 16 MB
33554432, // 32 MB
67108864, // 64 MB
134217728, // 128 MB
268435456, // 256 MB
536870912, // 512 MB
1073741824, // 1 GB
2147483648, // 2 GB
4294967296, // 4 GB
];

/**
* Creates an UploadState object for a multipart upload by querying the
* service for the specified upload's information.
*
* @param GlacierClient $client GlacierClient object to use.
* @param string $vaultName Vault name for the multipart upload.
* @param string $uploadId Upload ID for the multipart upload.
* @param string $accountId Account ID for the multipart upload.
*
* @return UploadState
*/
public static function getStateFromService(
GlacierClient $client,
$vaultName,
$uploadId,
$accountId = '-'
) {
$state = new UploadState([
'accountId' => $accountId,
'vaultName' => $vaultName,
'uploadId' => $uploadId,
]);

foreach ($client->getPaginator('ListParts', $state->getId()) as $result) {
// Get the part size from the first part in the first result.
if (!$state->getPartSize()) {
$state->setPartSize($result['PartSizeInBytes']);
}
// Mark all the parts returned by ListParts as uploaded.
foreach ($result['Parts'] as $part) {
list($rangeIndex, $rangeSize) = self::parseRange(
$part['RangeInBytes'],
$state->getPartSize()
);
$state->markPartAsUploaded($rangeIndex, [
'size' => $rangeSize,
'checksum' => $part['SHA256TreeHash'],
]);
}
}

$state->setStatus(UploadState::INITIATED);

return $state;
}

/**
* Creates a multipart upload for a Glacier archive.
*
* The valid configuration options are as follows:
*
* - account_id: (string, default=string('-')) Account ID for the archive
* being uploaded, if different from the account making the request.
* - archive_description: (string) Description of the archive.
* - before_complete: (callable) Callback to invoke before the
* `CompleteMultipartUpload` operation. The callback should have a
* function signature like `function (Aws\Command $command) {...}`.
* - before_initiate: (callable) Callback to invoke before the
* `InitiateMultipartUpload` operation. The callback should have a
* function signature like `function (Aws\Command $command) {...}`.
* - before_upload: (callable) Callback to invoke before any
* `UploadMultipartPart` operations. The callback should have a function
* signature like `function (Aws\Command $command) {...}`.
* - concurrency: (int, default=int(3)) Maximum number of concurrent
* `UploadMultipartPart` operations allowed during the multipart upload.
* - part_size: (int, default=int(1048576)) Part size, in bytes, to use when
* doing a multipart upload. This must between 1 MB and 4 GB, and must be
* a power of 2 (in megabytes).
* - state: (Aws\Multipart\UploadState) An object that represents the state
* of the multipart upload and that is used to resume a previous upload.
* When this options is provided, the `account_id`, `key`, and `part_size`
* options are ignored.
* - vault_name: (string, required) Vault name to use for the archive being
* uploaded.
*
* @param GlacierClient $client Client used for the upload.
* @param mixed $source Source of the data to upload.
* @param array $config Configuration used to perform the upload.
*/
public function __construct(GlacierClient $client, $source, array $config = [])
{
parent::__construct($client, $source, $config + [
'account_id' => '-',
'vault_name' => null,
]);
}

protected function loadUploadWorkflowInfo()
{
return [
'command' => [
'initiate' => 'InitiateMultipartUpload',
'upload' => 'UploadMultipartPart',
'complete' => 'CompleteMultipartUpload',
],
'id' => [
'account_id' => 'accountId',
'vault_name' => 'vaultName',
'upload_id' => 'uploadId',
],
'part_num' => 'range',
];
}

protected function determinePartSize()
{
// Make sure the part size is set.
$partSize = $this->config['part_size'] ?: self::PART_MIN_SIZE;

// Ensure that the part size is valid.
if (!in_array($partSize, self::$validPartSizes)) {
throw new \InvalidArgumentException('The part_size must be a power '
. 'of 2, in megabytes, such that 1 MB <= PART_SIZE <= 4 GB.');
}

return $partSize;
}

protected function createPart($seekable, $number)
{
$data = [];
$firstByte = $this->source->tell();

// Read from the source to create the body stream. This also
// calculates the linear and tree hashes as the data is read.
if ($seekable) {
// Case 1: Stream is seekable, can make stream from new handle.
$body = Psr7\try_fopen($this->source->getMetadata('uri'), 'r');
$body = $this->limitPartStream(Psr7\stream_for($body));
// Create another stream decorated with hashing streams and read
// through it, so we can get the hash values for the part.
$decoratedBody = $this->decorateWithHashes($body, $data);
while (!$decoratedBody->eof()) $decoratedBody->read(1048576);
// Seek the original source forward to the end of the range.
$this->source->seek($this->source->tell() + $body->getSize());
} else {
// Case 2: Stream is not seekable, must store part in temp stream.
$source = $this->limitPartStream($this->source);
$source = $this->decorateWithHashes($source, $data);
$body = Psr7\stream_for();
Psr7\copy_to_stream($source, $body);
}

// Do not create a part if the body size is zero.
if ($body->getSize() === 0) {
return false;
}

$body->seek(0);
$data['body'] = $body;
$lastByte = $this->source->tell() - 1;
$data['range'] = "bytes {$firstByte}-{$lastByte}/*";

return $data;
}

protected function handleResult(CommandInterface $command, ResultInterface $result)
{
list($rangeIndex, $rangeSize) = $this->parseRange(
$command['range'],
$this->state->getPartSize()
);

$this->state->markPartAsUploaded($rangeIndex, [
'size' => $rangeSize,
'checksum' => $command['checksum']
]);
}

protected function getInitiateParams()
{
$params = ['partSize' => $this->state->getPartSize()];
if (isset($this->config['archive_description'])) {
$params['archiveDescription'] = $this->config['archive_description'];
}

return $params;
}

protected function getCompleteParams()
{
$treeHash = new TreeHash();
$archiveSize = 0;
foreach ($this->state->getUploadedParts() as $part) {
$archiveSize += $part['size'];
$treeHash->addChecksum($part['checksum']);
}

return [
'archiveSize' => $archiveSize,
'checksum' => bin2hex($treeHash->complete()),
];
}

/**
* Decorates a stream with a tree AND linear sha256 hashing stream.
*
* @param Stream $stream Stream to decorate.
* @param array $data Data bag that results are injected into.
*
* @return Stream
*/
private function decorateWithHashes(Stream $stream, array &$data)
{
// Make sure that a tree hash is calculated.
$stream = new HashingStream($stream, new TreeHash(),
function ($result) use (&$data) {
$data['checksum'] = bin2hex($result);
}
);

// Make sure that a linear SHA256 hash is calculated.
$stream = new HashingStream($stream, new PhpHash('sha256'),
function ($result) use (&$data) {
$data['ContentSHA256'] = bin2hex($result);
}
);

return $stream;
}

/**
* Parses a Glacier range string into a size and part number.
*
* @param string $range Glacier range string (e.g., "bytes 5-5000/*")
* @param int $partSize The chosen part size
*
* @return array
*/
private static function parseRange($range, $partSize)
{
// Strip away the prefix and suffix.
if (strpos($range, 'bytes') !== false) {
$range = substr($range, 6, -2);
}

// Split that range into it's parts.
list($firstByte, $lastByte) = explode('-', $range);

// Calculate and return range index and range size
return [
intval($firstByte / $partSize) + 1,
$lastByte - $firstByte + 1,
];
}
}
Loading

0 comments on commit adebccb

Please sign in to comment.