Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue get requests #79

Merged
merged 2 commits into from Sep 8, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Expand Up @@ -78,15 +78,16 @@ The object is constructed using `new Policy(options, [cache, segment])` where:
together with `expiresAt`.
- `expiresAt` - time of day expressed in 24h notation using the 'HH:MM' format, at which point all cache records for the route
expire. Uses local time. Cannot be used together with `expiresIn`.
- `staleIn` - number of milliseconds to mark an item stored in cache as stale and reload it. Must be less than `expiresIn`.
- `staleTimeout` - number of milliseconds to wait before checking if an item is stale.
- `generateFunc` - a function used to generate a new cache item if one is not found in the cache when calling `get()`. The method's
signature is `function(id, next)` where:
- `id` - the `id` string or object provided to the `get()` method.
- `next` - the method called when the new item is returned with the signature `function(err, value, ttl)` where:
- `err` - an error condition.
- `value` - the new value generated.
- `ttl` - the cache ttl value in milliseconds. Set to `0` to skip storing in the cache. Defaults to the cache global policy.
- `staleIn` - number of milliseconds to mark an item stored in cache as stale and attempt to regenerate it when `generateFunc` is
provided. Must be less than `expiresIn`.
- `staleTimeout` - number of milliseconds to wait before checking if an item is stale.
- `generateTimeout` - number of milliseconds to wait before returning a timeout error when the `generateFunc` function
takes too long to return a value. When the value is eventually returned, it is stored in the cache for future requests.
- `cache` - a `Client` instance (which has already been started).
Expand All @@ -98,7 +99,7 @@ The object is constructed using `new Policy(options, [cache, segment])` where:
The `Policy` object provides the following methods:

- `get(id, callback)` - retrieve an item from the cache. If the item is not found and the `generateFunc` method was provided, a new value
is generated, stored in the cache, and returned. the method arguments are:
is generated, stored in the cache, and returned. Multiple concurrent requests are queued and processed once. The method arguments are:
- `id` - the unique item identifier (within the policy segment). Can be a string or an object with the required 'id' key.
- `callback` - the return function. The function signature is based on the `generateFunc` settings. If the `generateFunc` is not set,
the signature is `function(err, cached)`. Otherwise, the signature is `function(err, value, cached, report)` where:
Expand Down
134 changes: 73 additions & 61 deletions lib/policy.js
Expand Up @@ -16,6 +16,7 @@ exports = module.exports = internals.Policy = function (options, cache, segment)
Hoek.assert(this.constructor === internals.Policy, 'Cache Policy must be instantiated using new');

this.rule = internals.Policy.compile(options, !!cache);
this._pendings = {}; // id -> [callbacks]

if (cache) {
var nameErr = cache.validateSegmentName(segment);
Expand All @@ -27,25 +28,39 @@ exports = module.exports = internals.Policy = function (options, cache, segment)
};


internals.Policy.prototype.get = function (key, callback) { // key: string or { id: 'id' }
internals.Policy.prototype.get = function (key, callback) { // key: string or { id: 'id' }

var self = this;

var id = (key && typeof key === 'object') ? key.id : key;
// Check if request is already pending

if (!this.rule.generateFunc) {
return this._get(id, callback);
var id = (key && typeof key === 'object') ? key.id : key;
if (this._pendings[id]) {
this._pendings[id].push(callback);
return;
}

this._pendings[id] = [callback];

// Lookup in cache

var timer = new Hoek.Timer();
this._get(id, function (err, cached) {

if (cached) {
cached.isStale = (self.rule.staleIn ? (Date.now() - cached.stored) >= self.rule.staleIn : false);
}

// No generate method

if (!self.rule.generateFunc) {
return self._finalize(id, err, cached); // Pass 'cached' as 'value' and omit other arguments for backwards compatibility
}

// Error / Not found

if (err || !cached) {
return self._validate(id, key, null, { msec: timer.elapsed(), error: err }, callback);
return self._generate(id, key, null, { msec: timer.elapsed(), error: err }, callback);
}

// Found
Expand All @@ -57,64 +72,31 @@ internals.Policy.prototype.get = function (key, callback) { // key: string
isStale: cached.isStale
};

return self._validate(id, key, cached, report, callback);
});
};


internals.Policy.prototype.getOrGenerate = function (id, generateFunc, callback) { // For backwards compatibility

var self = this;

this.rule.generateFunc = function (id, next) {
// Check if found and fresh

self.rule.generateFunc = null;
return generateFunc(next);
};
if (!cached.isStale) {
return self._finalize(id, null, cached.item, cached, report);
}

return this.get(id, callback);
return self._generate(id, key, cached, report, callback);
});
};


internals.Policy.prototype._get = function (id, callback) {

if (!this._cache) {
return callback(null, null);
return Hoek.nextTick(callback)(null, null);
}

var rule = this.rule;

this._cache.get({ segment: this._segment, id: id }, function (err, cached) {

if (err) {
return callback(err);
}

if (cached) {
cached.isStale = (rule.staleIn ? (Date.now() - cached.stored) >= rule.staleIn : false);
}

return callback(null, cached);
});
this._cache.get({ segment: this._segment, id: id }, callback);
};


internals.Policy.prototype._validate = function (id, key, cached, report, callback) {
internals.Policy.prototype._generate = function (id, key, cached, report, callback) {

var self = this;

// Check if found and fresh

if (cached &&
!cached.isStale) {

return callback(null, cached.item, cached, report);
}

// Not in cache, or cache stale

callback = Hoek.once(callback); // Return only the first callback between stale timeout and generated fresh

if (cached &&
cached.isStale) {

Expand All @@ -124,7 +106,7 @@ internals.Policy.prototype._validate = function (id, key, cached, report, callba
if (cached.ttl > 0) {
setTimeout(function () {

return callback(null, cached.item, cached, report);
return self._finalize(id, null, cached.item, cached, report);
}, this.rule.staleTimeout);
}
}
Expand All @@ -134,7 +116,7 @@ internals.Policy.prototype._validate = function (id, key, cached, report, callba

setTimeout(function () {

return callback(Boom.serverTimeout(), null, null, report);
return self._finalize(id, Boom.serverTimeout(), null, null, report);
}, this.rule.generateTimeout);
}

Expand All @@ -153,11 +135,40 @@ internals.Policy.prototype._validate = function (id, key, cached, report, callba
self.set(id, value, ttl); // Lazy save (replaces stale cache copy with late-coming fresh copy)
}

return callback(err, value, null, report); // Ignored if stale value already returned
return self._finalize(id, err, value, null, report); // Ignored if stale value already returned
});
};


internals.Policy.prototype._finalize = function (id, err, value, cached, report) {

var pendings = this._pendings[id];
if (!pendings) {
return;
}

delete this._pendings[id]; // Return only the first callback between stale timeout and generated fresh

for (var i = 0, il = pendings.length; i < il; ++i) {
pendings[i](err, value, cached, report);
}
};


internals.Policy.prototype.getOrGenerate = function (id, generateFunc, callback) { // For backwards compatibility

var self = this;

this.rule.generateFunc = function (id, next) {

self.rule.generateFunc = null;
return generateFunc(next);
};

return this.get(id, callback);
};


internals.Policy.prototype.set = function (key, value, ttl, callback) {

callback = callback || Hoek.ignore;
Expand Down Expand Up @@ -195,10 +206,11 @@ internals.Policy.compile = function (options, serverSide) {
{
expiresIn: 30000,
expiresAt: '13:00',
staleIn: 20000,
staleTimeout: 500,

generateFunc: function (id, next) { next(err, result, ttl); }
generateTimeout: 500
generateTimeout: 500,
staleIn: 20000,
staleTimeout: 500
}
*/

Expand Down Expand Up @@ -242,14 +254,6 @@ internals.Policy.compile = function (options, serverSide) {
rule.expiresIn = options.expiresIn;
}

// Stale

if (options.staleIn) {
Hoek.assert(serverSide, 'Cannot use stale options without server-side caching');
rule.staleIn = options.staleIn;
rule.staleTimeout = options.staleTimeout;
}

// generateTimeout

if (options.generateFunc) {
Expand All @@ -260,6 +264,14 @@ internals.Policy.compile = function (options, serverSide) {
rule.generateTimeout = options.generateTimeout;
}

// Stale

if (options.staleIn) { // Keep outside options.generateFunc condition for backwards compatibility
Hoek.assert(serverSide, 'Cannot use stale options without server-side caching');
rule.staleIn = options.staleIn;
rule.staleTimeout = options.staleTimeout;
}

return rule;
};

Expand Down
33 changes: 33 additions & 0 deletions test/policy.js
Expand Up @@ -726,6 +726,39 @@ describe('Policy', function () {
});
});
});

it('queues requests while pending', function (done) {

var gen = 0;
var rule = {
expiresIn: 100,
generateFunc: function (id, next) {

return next(null, { gen: ++gen });
}
};

var client = new Catbox.Client(Import, { partition: 'test-partition' });
var policy = new Catbox.Policy(rule, client, 'test-segment');

client.start(function () {

var result = null;
var compare = function (err, value, cached) {

if (!result) {
result = value;
return;
}

expect(result).to.equal(value);
done();
};

policy.get('test', compare);
policy.get('test', compare);
});
});
});
});

Expand Down