Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add modula distribution #365

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 26 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ Memcached server uses the same properties:
* `maxExpiration`: *2592000*, the maximum expiration time of keys (in seconds).
* `maxValue`: *1048576*, the maximum size of a value.
* `poolSize`: *10*, the maximum size of the connection pool.
* `algorithm`: *md5*, the hashing algorithm used to generate the `hashRing` values.
* `algorithm`: *md5*, the hashing algorithm used to generate the `hashRing` values. Can be a custom
hash function if `modula` is set to true (see compatibility section).
* `reconnect`: *18000000*, the time between reconnection attempts (in milliseconds).
* `timeout`: *5000*, the time after which Memcached sends a connection timeout (in milliseconds).
* `retries`: *5*, the number of socket allocation retries per request.
Expand All @@ -93,7 +94,8 @@ Memcached server uses the same properties:
that are removed from the consistent hashing scheme.
* `keyCompression`: *true*, whether to use `md5` as hashing scheme when keys exceed `maxKeySize`.
* `idle`: *5000*, the idle timeout for the connections.
* `encoding`: *utf8*, encoding of data when socket as a readable stream
* `encoding`: *utf8*, encoding of data when socket as a readable stream.
* `modula`: *false*, if *true*, disables hashring and uses modula distribution.

Example usage:

Expand Down Expand Up @@ -458,6 +460,28 @@ For compatibility with other [libmemcached](http://libmemcached.org/Clients.html

Due to client dependent type flags it is unlikely that any types other than `string` will work.

If you wish to be compatible with PHP's legacy client configured with default settings you can do it by switching to modula distribution and by defining a custom algorithm:
```js
// jenkins-one-at-a-time hash is an old and the default algorithm used by PHP's memcached client
// you can also use any other supported node's crypto hash string to fit your PHP's memcached
// client config (md5, sha1, ...)
// credit: https://stackoverflow.com/questions/70177888/jenkins-one-at-a-time-hash-trying-to-make-python-code-reproduce-javascript-cod
function jenkinsOneAtATimeHash(key: string) {
let hash = 0;
for (let charIndex = 0; charIndex < key.length; ++charIndex) {
hash += key.charCodeAt(charIndex);
hash += hash << 10;
hash ^= hash >>> 6;
}
hash += hash << 3;
hash ^= hash >>> 11;
//4,294,967,295 is FFFFFFFF, the maximum 32 bit unsigned integer value, used here as a mask.
return ((hash + (hash << 15)) & 4294967295) >>> 0;
}

var memcached = new Memcached([ '192.168.0.102:11211', '192.168.0.103:11211' ], { modula: true, algorithm: jenkinsOneAtATimeHash });
```

# Test
You may encounter several problems when run the test. Be sure you already made these preparations:
1. Start the `memcached` service. (If in Mac env, you can install it via homebrew, and `brew services start memcached`)
Expand Down
63 changes: 44 additions & 19 deletions lib/memcached.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
* Node's native modules
*/
var Stream = require('net').Stream
, Socket = require('net').Socket;
, Socket = require('net').Socket
, crypto = require('crypto');

/**
* External or custom modules
Expand Down Expand Up @@ -64,11 +65,13 @@ function Client (args, options) {
Utils.merge(this, options);

this.servers = servers;
var compatibility = this.compatibility || this.compatiblity;
this.HashRing = new HashRing(args, this.algorithm, {
'compatibility': compatibility,
'default port': compatibility === 'ketama' ? 11211 : null
});
if (!this.modula) {
var compatibility = this.compatibility || this.compatiblity;
this.HashRing = new HashRing(args, this.algorithm, {
'compatibility': compatibility,
'default port': compatibility === 'ketama' ? 11211 : null
});
}
this.connections = {};
this.issues = [];
}
Expand All @@ -82,6 +85,7 @@ Client.config = {
, maxQueueSize: -1
, algorithm: 'md5' // hashing algorithm that is used for key mapping
, compatibility: 'ketama' // hashring compatibility
, modula: false // use modula distribution instead of hashring
, encoding: 'utf8' // data encoding, if you use another encoding such as 'binary'

, poolSize: 10 // maximal parallel connections
Expand Down Expand Up @@ -240,9 +244,12 @@ Client.config = {
// or just gives all servers if we don't have keys
if (keys) {
keys.forEach(function fetchMultipleServers(key) {
var server = memcached.servers.length === 1
? memcached.servers[0]
: memcached.HashRing.get(key);
var server = memcached.servers[0];

if (memcached.servers.length > 1) {
server = memcached.modula ?
memcached.getServerByModulaDistribution(key) : memcached.HashRing.get(key);
}

if (map[server]){
map[server].push(key);
Expand Down Expand Up @@ -296,11 +303,15 @@ Client.config = {
if (this.servers.length === 1) {
server = this.servers[0];
} else {
if (redundancy && queryRedundancy) {
redundancy = this.HashRing.range(query.key, (this.redundancy + 1), true);
server = redundancy.shift();
if (this.modula) {
server = this.getServerByModulaDistribution(query.key);
} else {
server = this.HashRing.get(query.key);
if (redundancy && queryRedundancy) {
redundancy = this.HashRing.range(query.key, (this.redundancy + 1), true);
server = redundancy.shift();
} else {
server = this.HashRing.get(query.key);
}
}
}
}
Expand Down Expand Up @@ -413,13 +424,15 @@ Client.config = {
memcached.emit('remove', details);
memcached.connections[server].end();

if (this.failOverServers && this.failOverServers.length) {
memcached.HashRing.swap(server, this.failOverServers.shift());
} else {
memcached.HashRing.remove(server);
memcached.emit('failure', details);
if (!this.modula) {
if (this.failOverServers && this.failOverServers.length) {
memcached.HashRing.swap(server, this.failOverServers.shift());
} else {
memcached.HashRing.remove(server);
memcached.emit('failure', details);
}
}
}
}
});

// bumpt the event listener limit
Expand Down Expand Up @@ -1149,6 +1162,18 @@ Client.config = {
};
}, server);
};

memcached.getServerByModulaDistribution = function modulaDistribution(key) {
var hashValue;

if ('function' === typeof this.algorithm) {
hashValue = this.algorithm(key);
} else {
hashValue = crypto.createHash(this.algorithm).update(key).digest().readUint32BE();
}

return this.servers[hashValue % this.servers.length];
}
})(Client);

module.exports = Client;