Skip to content

Commit

Permalink
initial working commit
Browse files Browse the repository at this point in the history
  • Loading branch information
bobrik committed Jun 10, 2012
1 parent 5516993 commit 025bf8b
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 0 deletions.
3 changes: 3 additions & 0 deletions index.js
@@ -0,0 +1,3 @@
(function(module) {
module.exports = require("./lib/Locker");
})(module);
98 changes: 98 additions & 0 deletions lib/Lock.js
@@ -0,0 +1,98 @@
(function(module) {
var util = require("util"),
events = require("events");

function Lock(name, sequence, acquirer, releaser) {
this.name = name;
this.sequence = sequence;
this.acquirer = acquirer;
this.releaser = releaser;
this.acquired = false;
this.waitTimer = undefined;
this.releaseTimer = undefined;
};

util.inherits(Lock, events.EventEmitter);

Lock.prototype.acquire = function(wait, timeout, callback) {
var self = this;

self.waitTimer = setTimeout(function() {
var error;

if (!self.acquired) {
error = new Error("Timeout waiting for lock");

callback(error);
self.emit("error", error);
}

clearTimeout(self.waitTimer);
self.waitTimer = undefined;
}, wait);

self.acquirer(self.name, self.sequence, wait, timeout, function(error) {
if (self.waitTimer) {
clearTimeout(self.waitTimer);
self.waitTimer = undefined;
} else {
// already finished
return;
}

if (error) {
return callback(error);
}

self.acquired = true;

self.releaseTimer = setTimeout(function() {
if (self.acquired) {
self.emit("error", new Error("Lock timeout (" + timeout + ") exceed: " + self.name));
}

clearTimeout(self.releaseTimer);
self.releaseTimer = undefined;
}, timeout);

callback();
});
};

Lock.prototype.release = function(callback) {
var self = this,
error = undefined;

if (!callback) {
callback = function() {};
}

if (self.releaseTimer) {
clearTimeout(self.releaseTimer);
self.releaseTimer = undefined;
}

if (!self.acquired) {
error = new Error("Releasing lock that was not acquired or reset: " + self.name);

self.emit("error", error);
return;
}

self.acquired = false;

self.releaser(self.sequence, function(error) {
if (error) {
self.emit(error);
}
});

callback();
};

Lock.prototype.reset = function() {
this.acquired = false;
};

module.exports = Lock;
})(module);
6 changes: 6 additions & 0 deletions lib/LockAction.js
@@ -0,0 +1,6 @@
(function(module) {
module.exports = {
ACTION_LOCK : 1,
ACTION_UNLOCK : 0
};
})(module);
144 changes: 144 additions & 0 deletions lib/Locker.js
@@ -0,0 +1,144 @@
(function(module) {
var net = require("net"),
util = require("util"),
events = require("events"),
Lock = require("./Lock"),
LockAction = require("./LockAction");

function Locker(port, host) {
var self = this;

self.port = port;
self.host = host;
self.registry = {};
self.sequence = 0;

self.connect = function(callback) {
var data = new Buffer(0),
temp = new Buffer(0);

self.connection = net.createConnection(port, host);

self.connection.on("error", self.reset.bind(self, 1));
self.connection.on("close", self.reset.bind(self, 2));
self.connection.on("timeout", self.reset.bind(self, 3));

self.connection.on("data", function(part) {
var sequence,
action,
result,
lock;

temp = new Buffer(data.length + part.length);
data.copy(temp, 0);
part.copy(temp, data.length);

data = new Buffer(temp);

while (data.length >= 6) {
sequence = data.readUInt32LE(0);
action = data[4];
result = data[5];

lock = self.registry[sequence];

if (lock) {
if (action == LockAction.ACTION_LOCK) {
if (lock.acquire) {
lock.acquire(result ? undefined : new Error("Failed to acquire lock"));
}
} else if (action == LockAction.ACTION_UNLOCK) {
if (lock.release) {
lock.release(result ? undefined : new Error("Failed to release lock"));
}
}
}

data = data.slice(6);
}
});

self.connection.on("connect", callback);
};

self.reset = function(error) {
self.connection = undefined;
Object.keys(self.registry).forEach(function(key) {
self.registry[key].lock.reset();
});
self.registry = {};

self.emit("reset", error);
};

self.createRequest = function(name, sequence, wait, timeout, type) {
var name = new Buffer(name),
request = new Buffer(1 + 4 + 4 + 4 + 1 + name.length);

request[0] = name.length;
request.writeUInt32LE(sequence, 1);
request.writeUInt32LE(wait, 5);
request.writeUInt32LE(timeout, 9);
request[13] = type;

name.copy(request, 14);

return request;
};

self.requireConnection = function(callback) {
if (!self.connection) {
return self.connect(callback);
}

callback();
};

self.requestAction = function(name, sequence, wait, timeout, action) {
self.requireConnection(function() {
self.connection.write(self.createRequest(name, sequence, wait, timeout, action));
});
};

self.acquire = function(name, sequence, wait, timeout, callback) {
self.requestAction(name, sequence, wait, timeout, LockAction.ACTION_LOCK);
self.registry[sequence].acquire = callback;
};

self.release = function(sequence, callback) {
self.requestAction("", sequence, 0, 0, LockAction.ACTION_UNLOCK);
self.registry[sequence].release = callback;
};

self.createLock = function(name) {
var sequence = ++this.sequence,
acquire = this.acquire,
release = this.release,
registry = this.registry,
lock = new Lock(name, sequence, acquire, release);

lock.on("error", function(error) {
self.emit("error", error, lock);
});

registry[sequence] = {
lock: lock
};

return lock;
};
};

util.inherits(Locker, events.EventEmitter);

Locker.prototype.locked = function(name, wait, timeout, callback) {
var self = this,
lock = self.createLock(name);

lock.acquire(wait, timeout, function(error) {
callback(error, lock.release.bind(lock));
});
};

module.exports = Locker;
})(module);

0 comments on commit 025bf8b

Please sign in to comment.