Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

* Added event with reply callbacks

  • Loading branch information...
commit 9d7f9f168d254a034e0c6a5da5dbc5cb653d3bf4 1 parent 262e322
@digitalwm authored
Showing with 223 additions and 3 deletions.
  1. +101 −1 cloudjs.js
  2. +12 −0 example.js
  3. +108 −0 modules/callbacks.js
  4. +2 −2 package.json
View
102 cloudjs.js
@@ -28,6 +28,7 @@ var util = require('util'),
crypto = require('cryptojs').Crypto,
dgram = require('dgram'),
serializer = require("JASON"),
+ callbacks = require("./modules/callbacks.js"),
totalOps = 0, totalTime = 0;
Array.prototype.removeElement = function (o) {
@@ -302,6 +303,15 @@ function guidGenerator() {
return (S4()+S4()+"-"+S4()+"-"+S4()+"-"+S4()+"-"+S4()+S4()+S4());
}
+function midGenerator() {
+ var S4, now;
+ S4 = function() {
+ return (((1+Math.random())*0x10000)|0).toString(16).substring(1);
+ };
+ now = new Date().getTime();
+ return now + "-" + S4();
+}
+
function obidGenerator() {
var S4, now;
S4 = function() {
@@ -436,7 +446,8 @@ Clouder.prototype.connect = function () {
//noinspection JSUnresolvedFunction
this.socket.on('message', function (buf) {
var msg, bodyParser,
- mode, dataBytes, dataDecripted;
+ mode, dataBytes, dataDecripted,
+ cbList, i, reply;
try {
if(self._hasEncription === true) {
mode = new crypto.mode.ECB(crypto.pad.pkcs7);
@@ -474,6 +485,37 @@ Clouder.prototype.connect = function () {
}
}
}
+ else if(msg.type === 3) {
+ if(msg.sid.toString() !== self.id.toString() && (typeof(msg.needReply) !== 'undefined' || typeof(msg.hasReply) !== 'undefined')) {
+ //we have a start event with need for reply
+ if(typeof(msg.needReply) !== 'undefined') {
+ //this should not happen, BUT if it is we have a solution
+ if(msg.needReply === false) {
+ if(msg.bounce === true) {
+ //noinspection JSUnresolvedFunction
+ self.emit(msg.title, [msg.body, msg.sid]);
+ }
+ else {
+ if(msg.sid.toString() !== self.id.toString()) {
+ self.emit(msg.title, [msg.body, msg.sid]);
+ }
+ }
+ }
+ else {
+ cbList = callbacks.getOnCallbacks(msg.title);
+ for(i = 0 ; i < cbList.length ; i++) {
+ reply = cbList[i](msg.body, msg.sid);
+ self.sendReply(msg, reply);
+ }
+ }
+ }
+
+ //we have a reply
+ if(typeof(msg.hasReply) !== 'undefined' && typeof(msg.mid) !== 'undefined') {
+ callbacks.parse(msg.mid, msg.body, msg.sid);
+ }
+ }
+ }
}
catch(Exception) {
console.log(Exception);
@@ -539,6 +581,64 @@ Clouder.prototype.connect = function () {
this.socket.send(messageBuffer, 0, messageBuffer.length, this.port, this.group);
};
+ this.sendWithCallback = function(title, message, callback, timeout) {
+ var mode, dataBytes, dataEncripted;
+
+ var messageBuffer, msg = {
+ mid : midGenerator(),
+ sid : this.id,
+ bounce : false,
+ type : 3,
+ title : title,
+ body : message,
+ needReply : true
+ };
+ try {
+ if(this._hasEncription === true) {
+ mode = new crypto.mode.ECB(crypto.pad.pkcs7);
+ dataBytes = crypto.charenc.UTF8.stringToBytes(serializer.stringify(msg));
+ dataEncripted = crypto.DES.encrypt(dataBytes, this._encryptionKey, {asBytes: true, mode: mode});
+ messageBuffer = new Buffer(crypto.util.bytesToHex(dataEncripted));
+ }
+ else {
+ messageBuffer = new Buffer(serializer.stringify(msg));
+ }
+ }
+ catch(Exception) {
+ throw "Message to complex to be sent";
+ }
+ this.socket.send(messageBuffer, 0, messageBuffer.length, this.port, this.group);
+ callbacks.add(msg.mid, callback, timeout);
+ };
+
+ this.sendReply = function(msg, reply) {
+ var mode, dataBytes, dataEncripted, messageBuffer;
+
+ delete msg.needReply;
+ msg.hasReply = true;
+ msg.body = reply;
+ msg.sid = self.id;
+ try {
+ if(this._hasEncription === true) {
+ mode = new crypto.mode.ECB(crypto.pad.pkcs7);
+ dataBytes = crypto.charenc.UTF8.stringToBytes(serializer.stringify(msg));
+ dataEncripted = crypto.DES.encrypt(dataBytes, this._encryptionKey, {asBytes: true, mode: mode});
+ messageBuffer = new Buffer(crypto.util.bytesToHex(dataEncripted));
+ }
+ else {
+ messageBuffer = new Buffer(serializer.stringify(msg));
+ }
+ }
+ catch(Exception) {
+ throw "Message to complex to be sent";
+ }
+ this.socket.send(messageBuffer, 0, messageBuffer.length, this.port, this.group);
+ };
+
+ this.onWithReply = function (title, callback) {
+ callbacks.on(title, callback);
+ };
+
/**
* The pool system
*/
View
12 example.js
@@ -42,3 +42,15 @@ function Ob1(id) {
for(i = 0 ; i < numberOfObjects ; i++) {
myObject.addElementToPool(new Ob1(guidGenerator()), 5000);
}
+
+function mesgCallback1(data, sid) {
+ console.log("Received callback from " + sid + ", time to return data");
+ return "test" + sid;
+}
+
+function mesgCallback2(data, sid) {
+ console.log("Received reply on event : " + data + " from " + sid);
+}
+
+myObject.onWithReply("sendReply", mesgCallback1);
+myObject.sendWithCallback("sendReply", "blah", mesgCallback2, 3000);
View
108 modules/callbacks.js
@@ -0,0 +1,108 @@
+/**
+ * Created by Dan Harabagiu.
+ * Date: 2/24/12
+ * Time: 11:36 AM
+ */
+
+var callbackList = [];
+var onList = [];
+
+Array.prototype.removeElement = function (o) {
+ var idx = this.indexOf(o);
+ if (idx !== -1) {
+ this.splice(idx, 1);
+ return true;
+ }
+ else {
+ return false;
+ }
+};
+
+function checkTimeouts() {
+ var i, now, cleaningArray;
+
+ now = new Date().getTime();
+ cleaningArray = [];
+
+ for(i = 0 ; i < callbackList.length ; i++) {
+ if(callbackList[i].timeout < now) {
+ cleaningArray.push(callbackList[i]);
+ }
+ }
+
+ for(i = 0 ; i < cleaningArray ; i++) {
+ callbackList.removeElement(cleaningArray[i]);
+ }
+ cleaningArray.length = 0;
+}
+
+setInterval(checkTimeouts, 1000);
+
+/**
+ * Add a callback for a specific message ID
+ * @param string mesgId
+ * @param function callbackFunction
+ * @param number callbackTimeout
+ */
+function AddCallback(mesgId, callbackFunction, callbackTimeout) {
+ var callbackElement, now;
+
+ now = new Date().getTime();
+
+ callbackElement = {
+ id : mesgId,
+ func : callbackFunction,
+ timeout : now + callbackTimeout
+ };
+
+ callbackList.push(callbackElement);
+}
+
+/**
+ * Calls all the saved callbacks for a specific message ID
+ * @param string mesgId
+ * @param object data
+ */
+function ParseReply(mesgId, data, sid) {
+ var i;
+
+ for(i = 0 ; i < callbackList.length ; i++) {
+ if(callbackList[i].id.toString() === mesgId.toString() ) {
+ callbackList[i].func(data, sid);
+ }
+ }
+}
+
+/**
+ * Adds an event tracker and the repsective callback
+ * @param string title
+ * @param function callback
+ */
+function AddOnEvent(title, callback) {
+ onList.push({
+ event : title,
+ callback: callback
+ });
+}
+
+/**
+ * Gets a list of callbacks for a requested event
+ * @param string title
+ * @return array
+ */
+function GetOnEvent(title) {
+ var retList, i;
+
+ retList = [];
+ for(i = 0 ; i < onList.length ; i++) {
+ if(onList[i].event.toString() === title.toString()) {
+ retList.push(onList[i].callback);
+ }
+ }
+ return retList;
+}
+
+exports.add = AddCallback;
+exports.parse = ParseReply;
+exports.on = AddOnEvent;
+exports.getOnCallbacks = GetOnEvent;
View
4 package.json
@@ -2,7 +2,7 @@
"author" : "Dan Harabagiu <harabagiu.dan@gmail.com> (http://dan.harabagiu.net/)",
"name" : "cloudjs",
"description" : "A network distributed event system. Similar to node JS standard event system. A process pool, where objects can be added and ran at a periodic interval a predefined functions. An auto-balancing system, that migrate objects in the process pool, from one running instance to another, based on the load of each instance.",
- "version" : "0.0.7",
+ "version" : "0.0.8",
"homepage" : "http://dan.harabagiu.net/cloudjs",
"keywords" : ["udp", "broadcast", "realtime", "cloud", "event"],
"repository" : {
@@ -17,7 +17,7 @@
"engines" : {
"node" : ">0.4.0"
},
- "files" : ["cloudjs.js"],
+ "files" : ["cloudjs.js", "./modules/callbacks.js"],
"dependencies" : {
"JASON" : "0.1.2",
"cryptojs" : "2.5.3"
Please sign in to comment.
Something went wrong with that request. Please try again.