Skip to content

Commit

Permalink
Added support for injectable cache to stream wrapper.
Browse files Browse the repository at this point in the history
- You can now inject a cache into the stream wrapper in a contect or in
  the default context options.
- Added a CacheInterface for simple get/set/remove caching.
- Added a single cache implementation LruArrayCache that implements an
  LRU cache using PHP's ordered associative arrays.
- Added support for creating a stream wrapper using a custom protocol.

Closes aws#531. Closes aws#536.
  • Loading branch information
mtdowling committed Apr 22, 2015
1 parent d2bb35d commit 6c4fdf4
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 107 deletions.
34 changes: 34 additions & 0 deletions src/CacheInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php
namespace Aws;

/**
* Represents a simple cache interface.
*/
interface CacheInterface
{
/**
* Get a cache item by key.
*
* @param string $key Key to retrieve.
*
* @return mixed|null Returns the value or null if not found.
*/
public function get($key);

/**
* Set a cache key value.
*
* @param string $key Key to set
* @param mixed $value Value to set.
* @param int $ttl Number of seconds the item is allowed to live. Set
* to 0 to allow an unlimited lifetime.
*/
public function set($key, $value, $ttl = 0);

/**
* Remove a cache key.
*
* @param string $key Key to remove.
*/
public function remove($key);
}
74 changes: 74 additions & 0 deletions src/LruArrayCache.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php
namespace Aws;

/**
* Simple in-memory LRU cache that limits the number of cached entries.
*
* The LRU cache is implemented using PHP's ordered associative array. When
* accessing an element, the element is removed from the hash and re-added to
* ensure that recently used items are always at the end of the list while
* least recently used are at the beginning. When a value is added to the
* cache, if the number of cached items exceeds the allowed number, the first
* N number of items are removed from the array.
*/
class LruArrayCache implements CacheInterface
{
/** @var int */
private $maxItems;

/** @var array */
private $items;

/**
* @param int $maxItems Maximum number of allowed cache items.
*/
public function __construct($maxItems = 1000)
{
$this->maxItems = $maxItems;
}

public function get($key)
{
if (!isset($this->items[$key])) {
return null;
}

$entry = $this->items[$key];

// Ensure the item is not expired.
if (!$entry[1] || time() < $entry[1]) {
// LRU: remove the item and push it to the end of the array.
unset($this->items[$key]);
$this->items[$key] = $entry;
return $entry[0];
}

unset($this->items[$key]);
return null;
}

public function set($key, $value, $ttl = 0)
{
// Only call time() if the TTL is not 0/false/null
$ttl = $ttl ? time() + $ttl : 0;
$this->items[$key] = [$value, $ttl];

// Determine if there are more items in the cache than allowed.
$diff = count($this->items) - $this->maxItems;

// Clear out least recently used items.
if ($diff > 0) {
// Reset to the beginning of the array and begin unsetting.
reset($this->items);
for ($i = 0; $i < $diff; $i++) {
unset($this->items[key($this->items)]);
next($this->items);
}
}
}

public function remove($key)
{
unset($this->items[$key]);
}
}
148 changes: 64 additions & 84 deletions src/S3/StreamWrapper.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
<?php
namespace Aws\S3;

use Aws\CacheInterface;
use Aws\LruArrayCache;
use Aws\Result;
use Aws\S3\Exception\S3Exception;
use GuzzleHttp\Psr7;
Expand Down Expand Up @@ -84,38 +86,43 @@ class StreamWrapper
/** @var string Opened bucket path */
private $openedPath;

/** @var array LRU cache containing a hash of "time" and "value" items */
private static $statCache = [];
/** @var CacheInterface Cache for object and dir lookups */
private $cache;

/**
* Register the 's3://' stream wrapper
*
* @param S3Client $client Client to use with the stream wrapper
* @param S3Client $client Client to use with the stream wrapper
* @param string $protocol Protocol to register as.
* @param CacheInterface $cache Default cache for the protocol.
*/
public static function register(S3Client $client)
{
if (in_array('s3', stream_get_wrappers())) {
stream_wrapper_unregister('s3');
public static function register(
S3Client $client,
$protocol = 's3',
CacheInterface $cache = null
) {
if (in_array($protocol, stream_get_wrappers())) {
stream_wrapper_unregister($protocol);
}

// Set the client passed in as the default stream context client
stream_wrapper_register('s3', get_called_class(), STREAM_IS_URL);
stream_wrapper_register($protocol, get_called_class(), STREAM_IS_URL);
$default = stream_context_get_options(stream_context_get_default());
$default['s3']['client'] = $client;
stream_context_set_default($default);
}
$default[$protocol]['client'] = $client;

/**
* Clears the internal LRU cache of the stream wrapper.
*/
public static function clearCache()
{
static::$statCache = [];
if ($cache) {
$default[$protocol]['cache'] = $cache;
} elseif (!isset($default[$protocol]['cache'])) {
// Set a default cache adapter.
$default[$protocol]['cache'] = new LruArrayCache();
}

stream_context_set_default($default);
}

public function stream_close()
{
$this->body = null;
$this->body = $this->cache = null;
}

public function stream_open($path, $mode, $options, &$opened_path)
Expand Down Expand Up @@ -194,7 +201,7 @@ public function stream_write($data)
public function unlink($path)
{
return $this->boolCall(function () use ($path) {
self::clearCacheKey($path);
$this->clearCacheKey($path);
$this->getClient()->deleteObject($this->withPath($path));
return true;
});
Expand All @@ -217,14 +224,27 @@ public function stream_stat()
public function url_stat($path, $flags)
{
// Some paths come through as S3:// for some reason.
$path = trim(str_replace('S3://', 's3://', $path));
$split = explode('://', $path);
$path = strtolower($split[0]) . '://' . $split[1];

// Check if this path is in the url_stat cache
if ($value = self::getCache($path)) {
if ($value = $this->getCacheStorage()->get($path)) {
return $value;
}

$stat = $this->createStat($path, $flags);

if (is_array($stat)) {
$this->getCacheStorage()->set($path, $stat);
}

return $stat;
}

private function createStat($path, $flags)
{
$parts = $this->withPath($path);

if (!$parts['Key']) {
return $this->statDirectory($parts, $path, $flags);
}
Expand Down Expand Up @@ -287,7 +307,7 @@ private function statDirectory($parts, $path, $flags)
public function mkdir($path, $mode, $options)
{
$params = $this->withPath($path);
self::clearCacheKey($path);
$this->clearCacheKey($path);
if (!$params['Bucket']) {
return false;
}
Expand All @@ -303,7 +323,7 @@ public function mkdir($path, $mode, $options)

public function rmdir($path, $options)
{
self::clearCacheKey($path);
$this->clearCacheKey($path);
$params = $this->withPath($path);
$client = $this->getClient();

Expand Down Expand Up @@ -439,15 +459,16 @@ public function dir_readdir()

// Cache the object data for quick url_stat lookups used with
// RecursiveDirectoryIterator.
self::setCache($key, $stat);
$this->getCacheStorage()->set($key, $stat);
$this->objectIterator->next();

return $result;
}

private function formatKey($key)
{
return "s3://{$this->openedBucket}/{$key}";
$protocol = explode('://', $this->openedPath)[0];
return "{$protocol}://{$this->openedBucket}/{$key}";
}

/**
Expand All @@ -464,8 +485,8 @@ public function rename($path_from, $path_to)
{
$partsFrom = $this->withPath($path_from);
$partsTo = $this->withPath($path_to);
self::clearCacheKey($path_from);
self::clearCacheKey($path_to);
$this->clearCacheKey($path_from);
$this->clearCacheKey($path_to);

if (!$partsFrom['Key'] || !$partsTo['Key']) {
return $this->triggerError('The Amazon S3 stream wrapper only '
Expand Down Expand Up @@ -581,7 +602,10 @@ private function getClient()

private function getBucketKey($path)
{
$parts = explode('/', substr($path, 5), 2);
// Remove the protocol
$parts = explode('://', $path);
// Get the bucket, key
$parts = explode('/', $parts[1], 2);

return [
'Bucket' => $parts[0],
Expand Down Expand Up @@ -609,14 +633,9 @@ private function openReadStream()
$client = $this->getClient();
$command = $client->getCommand('GetObject', $this->getOptions());
$command['@http']['stream'] = true;

$result = $client->execute($command);
$this->body = $result['Body'];

if ($result['ContentLength']) {
$this->body->setSize($result['ContentLength']);
}

// Wrap the body in a caching entity body if seeking is allowed
if ($this->getOption('seekable') && !$this->body->isSeekable()) {
$this->body = new CachingStream($this->body);
Expand Down Expand Up @@ -693,8 +712,6 @@ private function formatUrlStat($result = null)
// Pluck the content-length if available.
if (isset($result['ContentLength'])) {
$stat['size'] = $stat[7] = $result['ContentLength'];
} elseif (isset($stat['Size'])) {
$stat['size'] = $stat[7] = $result['Size'];
}
if (isset($result['LastModified'])) {
// ListObjects or HeadObject result
Expand Down Expand Up @@ -722,7 +739,7 @@ private function createBucket($path, array $params)

return $this->boolCall(function () use ($params, $path) {
$this->getClient()->createBucket($params);
self::clearCacheKey($path);
$this->clearCacheKey($path);
return true;
});
}
Expand Down Expand Up @@ -751,7 +768,7 @@ private function createSubfolder($path, array $params)

return $this->boolCall(function () use ($params, $path) {
$this->getClient()->putObject($params);
self::clearCacheKey($path);
$this->clearCacheKey($path);
return true;
});
}
Expand Down Expand Up @@ -846,62 +863,25 @@ private function boolCall(callable $fn, $flags = null)
}

/**
* Clears a specific stat cache value from the stat cache.
*
* @param string $key S3 path (s3://bucket/key).
* @return LruArrayCache
*/
private static function clearCacheKey($key)
private function getCacheStorage()
{
clearstatcache(true, $key);
unset(self::$statCache[$key]);
}

/**
* Gets a value from the stat cache.
*
* @param string $key S3 path (s3://bucket/key).
*
* @return array|null
*/
private static function getCache($key)
{
if (isset(self::$statCache[$key])) {
self::$statCache[$key]['time'] = microtime(true);
return self::$statCache[$key]['value'];
if (!$this->cache) {
$this->cache = $this->getOption('cache') ?: new LruArrayCache();
}

return null;
return $this->cache;
}

/**
* Adds a value to the stat cache, marking the time it was added.
* Clears a specific stat cache value from the stat cache and LRU cache.
*
* @param string $key S3 path (e.g., s3://bucket/key).
* @param array $value stat() array.
*/
private static function setCache($key, $value)
{
if (count(self::$statCache) === 1001) {
self::purgeLru();
}

self::$statCache[$key] = [
'value' => $value,
'time' => microtime(true)
];
}

/**
* Purges half of the items in the cache, removing the least recently used.
* @param string $key S3 path (s3://bucket/key).
*/
private static function purgeLru()
private function clearCacheKey($key)
{
usort(self::$statCache, function ($a, $b) {
return $a['time'] > $b['time']
? 1
: ($a['time'] < $b['time'] ? -1 : 0);
});

self::$statCache = array_slice(self::$statCache, 0, 500);
clearstatcache(true, $key);
$this->getCacheStorage()->remove($key);
}
}
Loading

0 comments on commit 6c4fdf4

Please sign in to comment.