Permalink
Browse files

Merge pull request #527 from shimonchayim/master

Cache operation if memcached connection is busy
  • Loading branch information...
kiai
kiai committed Jul 11, 2012
2 parents ab0bb43 + 4579da3 commit a25cb6e4cb3cc4b081a2b410f0880296b9b6550b
View
@@ -1,3 +1,7 @@
+## July 11, 2012
+
+* Changes to mem-cache-local to queue operations if connections are not available.
+
## July 5, 2012
* Update support.
@@ -44,6 +44,10 @@ var Cache = module.exports = function (opts) {
var self = this;
+ var requestBackup = [];
+
+ var opsInProgress = 0;
+
function doHeartbeat() {
memcached.stats(function (err, result) {
if (err && err.error && (err.error != {} && err.error != [])) {
@@ -57,11 +61,12 @@ var Cache = module.exports = function (opts) {
if (memcached) {
return;
}
+ var Memcached = require('memcached');
if (_.isObject(opts) && opts.config) {
- memcached = new (require('memcached'))(opts.config, opts.options);
+ memcached = new Memcached(opts.config, opts.options);
}
else {
- memcached = new (require('memcached'))(opts);
+ memcached = new Memcached(opts);
}
memcached.on('issue', function(details){
@@ -100,22 +105,27 @@ var Cache = module.exports = function (opts) {
}
function saveSliceBySlice(cacheKey, count, key, dataStr, numSlices, dataSliceSize, duration, cb) {
- memcached.set(cacheKey + count, dataStr.substr(count * dataSliceSize, dataSliceSize), duration,
- function (err, result) {
- if (err || !result) {
- self.emit(cacheEvents.ERROR, {key:key, error:err || "unknown"});
- return cb({message:'failed', data:{key:key, data:dataStr, duration:duration}, error:err
- || "unknown"});
- }
- count++;
- if (count == numSlices) {
- self.emit(cacheEvents.NEW, {key:key, duration:duration});
- return cb(null, {message:'success', data:result});
- }
- else {
- return saveSliceBySlice(cacheKey, count, key, dataStr, numSlices, dataSliceSize, duration, cb);
- }
- });
+ var operation = {op: 'set', args: []};
+ operation.args.push(cacheKey + count);
+ operation.args.push(dataStr.substr(count * dataSliceSize, dataSliceSize));
+ operation.args.push(duration);
+ operation.args.push(function (err, result) {
+ if (err || !result) {
+ self.emit(cacheEvents.ERROR, {key:key, error:err || "unknown"});
+ return cb({message:'failed', data:{key:key, data:dataStr, duration:duration}, error:err
+ || "unknown"});
+ }
+ count++;
+ if (count == numSlices) {
+ self.emit(cacheEvents.NEW, {key:key, duration:duration});
+ return cb(null, {message:'success', data:result});
+ }
+ else {
+ return saveSliceBySlice(cacheKey, count, key, dataStr, numSlices, dataSliceSize, duration, cb);
+ }
+ });
+
+ blockIfBusyDoOp(operation);
}
function fetchSliceBySlice(cacheKey, count, key, slices, numSlices, cb) {
@@ -132,7 +142,9 @@ var Cache = module.exports = function (opts) {
}
}
else {
- memcached.get(cacheKey + count, function (err, slice) {
+ var operation = {op: 'get', args: []};
+ operation.args.push(cacheKey + count);
+ operation.args.push(function (err, slice) {
if (err) {
self.emit(cacheEvents.MISS, {key:key, error:err});
return cb({message:'failed', data:{key:key}, error:err});
@@ -146,6 +158,7 @@ var Cache = module.exports = function (opts) {
return cb({message:'failed', data:{key:key}, error:'unexpected result', result:slice});
}
});
+ blockIfBusyDoOp(operation);
}
}
@@ -182,8 +195,11 @@ var Cache = module.exports = function (opts) {
if(dataStr.length > MAX_SLICE_SIZE){
var dataSliceSize = MAX_SLICE_SIZE - key.length - SLICE_OVERHEAD;
var numSlices = Math.ceil(dataStr.length/dataSliceSize);
-
- memcached.set(cacheKey, {key:key, slices:numSlices}, duration, function(err, result){
+ var operation = {op: 'set', args: []};
+ operation.args.push(cacheKey);
+ operation.args.push({key:key, slices:numSlices});
+ operation.args.push(duration);
+ operation.args.push(function(err, result){
if(err || !result){
self.emit(cacheEvents.ERROR, {key:key, error: err || "unknown"});
@@ -192,13 +208,16 @@ var Cache = module.exports = function (opts) {
data:{key:key, data:data, duration:duration}, error: err
|| "unknown"});
}
-
return saveSliceBySlice(cacheKey, 0, key, dataStr, numSlices, dataSliceSize, duration, cb);
});
-
+ blockIfBusyDoOp(operation);
}
else {
- memcached.set(cacheKey, {key:key, data:dataStr}, duration, function (err, result) {
+ var operation = {op: 'set', args: []};
+ operation.args.push(cacheKey);
+ operation.args.push({key:key, data:dataStr});
+ operation.args.push(duration);
+ operation.args.push(function (err, result) {
if (err || !result) {
self.emit(cacheEvents.ERROR, {key:key, error: err || "unknown"});
@@ -209,6 +228,7 @@ var Cache = module.exports = function (opts) {
return cb(null, {message:'success', data:result});
});
+ blockIfBusyDoOp(operation);
}
}
@@ -231,7 +251,9 @@ var Cache = module.exports = function (opts) {
cacheKey = crypto.createHash('md5').update(key).digest('hex');
- memcached.get(cacheKey, function (err, cacheValue) {
+ var operation = {op: 'get', args: []};
+ operation.args.push(cacheKey);
+ operation.args.push(function (err, cacheValue) {
if (err) {
self.emit(cacheEvents.MISS, {key:key, error:err});
@@ -269,6 +291,32 @@ var Cache = module.exports = function (opts) {
}
});
+
+ blockIfBusyDoOp(operation);
+ }
+
+ function blockIfBusyDoOp(op){
+ var userFunction = op.args[op.args.length-1];
+ op.args[op.args.length-1] = function(err,val) {
+ opsInProgress--;
+ var backedOp = requestBackup.shift();
+ if(backedOp){
+ doOpt(backedOp);
+ }
+ return userFunction(err,val);
+ };
+
+ if(opsInProgress < memcached.poolSize){
+ doOpt(op);
+ }
+ else {
+ requestBackup.push(op);
+ }
+ }
+
+ function doOpt(op){
+ opsInProgress++;
+ return memcached[op.op].apply(memcached,op.args);
}
}
@@ -1,7 +1,7 @@
{
"author": "ql.io",
"name": "ql.io-mem-cache-local",
- "version": "0.7.0",
+ "version": "0.7.1",
"repository": {
"type": "git",
"url": "https://github.com/ql-io/ql.io"
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2012 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+var Cache = require('../lib/cache'),
+ util = require('util'),
+ _ = require('underscore'),
+ fs = require('fs');
+
+exports['concurrent-flooding'] = function (test) {
+ var cache = new Cache('127.0.0.1:8026');
+ var events = [];
+ var count = {
+ start: 0,
+ new: 0,
+ hit: 0,
+ end: 0,
+ miss: 0
+ };
+
+ var concurrency = 25;
+
+ _.chain(Cache.Events)
+ .values()
+ .each(function (name) {
+ cache.on(name, function (event) {
+ count[name]++;
+ if (name == Cache.Events.END) {
+ test.deepEqual(count, {start:1, new:concurrency, hit:concurrency, end:1, miss:0});
+ test.done();
+ }
+ })
+ })
+ .value();
+ cache.start();
+
+ var data = fs.readFileSync(__dirname + '/1mb.txt', 'utf8');
+
+ var done = 0;
+ for (var i = 0; i < concurrency; i++) {
+ cache.put('1mb-test'+i, data, 100, function (err, result) {
+ if (err) {
+ test.ok(false, util.inspect(err, false, null));
+ }
+ else {
+ test.deepEqual(result, { message:'success', data:true });
+ }
+ done++;
+ if(done === concurrency){
+ done = 0;
+ for(var j=0; j<concurrency; j++){
+ cache.get('1mb-test'+j, function (err, result) {
+ if (err) {
+ test.ok(false, util.inspect(err, false, null));
+ }
+ else {
+ test.deepEqual(result, { message:'success', data:data });
+ }
+ done++;
+ if(done == concurrency) {
+ cache.end();
+ }
+ });
+ }
+ }
+ });
+ }
+}

0 comments on commit a25cb6e

Please sign in to comment.