Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Re-write

  • Loading branch information...
commit 65a61a4d9a7728841786dff39811cdbeb359a775 1 parent 8c6ad0e
@Frans-Willem authored
View
5 BiStream.js
@@ -0,0 +1,5 @@
+var EventEmitter=require("events").EventEmitter;
+
+function createStreamPair() {
+
+}
View
80 ICPNodeProtocol.txt
@@ -0,0 +1,80 @@
+[command,[1,2,3,{},4,{}],
+1.Command (string or integer, see IPCNode.commands)
+2.Table of all used remote (e.g. owned by sender) objects.
+ Key = id
+ Value: type or [type,properties] (type being one of IPCNode.objectTypes)
+ Should call release on any not used.
+4+.Arguments all marshalled objects
+
+Marshalled values:
+ null, undefined, string, number, boolean -> stay the same
+ Array -> array with same rules applied
+ Object/function ->
+ Global -> {t:IPCNode.objectSource.global}
+ Object on receiving side: -> {t:IPCNode.objectSource.local,i:(id)}
+ Object on sending side: -> {t:IPCNode.objectSource.marshalled,i:(id)} //Should be in marshalled objects table!
+Object properties:
+ Table with property -> marshalled value. If marshalled value
+
+_localObjects table contains following objects:
+ {
+ id: id, //Key in _localObjects
+ refCount: number,
+ object: actual object //where object will also get a new properties __ipc_id_(IPCNode id) pointing back to that id.
+ }
+_remoteObjects table contains following objects:
+ {
+ id: id, //Key in _remoteObjects table
+ refCount: number, //Number of internal references
+ externalRefCount: number, //Number of external references (To protect from IPCNode.release to mess up internal references)
+ usedBy: {}, //Remote used by this object, id -> key, _remoteObjects entry -> value
+ uses: {}, //Remote objects this object uses, id -> key, _remoteObjects entry -> value
+ usedLocals: {} //Local objects this object uses (and keeps a reference on!) id -> key, _localObjects entry -> value
+ hasProperties: false, //Has it had properties applied already?
+ hasRequested: false, //Have we already requested properties?
+ type: "f" or "o"
+ stub: local representation of object,
+ isReady: boolean, //Is this object ready to be used? e.g. are all objects properly referenced?
+ readyCallbacks: array
+ }
+Stub objects contain:
+ {
+ __ipc_owner: owner IPC document
+ __ipc_object: item in remoteObjects table (see above)
+ }
+IPCNode._isRemoteObjectReady=function(remoteObject,presumeReady) {
+ if (typeof(presumeReady)==="object" && presumeReady[remoteObject.id])
+ return true;
+ if (remoteObject.isReady)
+ return true;
+ if (typeof(properties)!=="object")
+ return false;
+ presumeReady[remoteObject.id]=true;
+ var queue=[];
+ for (var key in remoteObject.properties)
+ queue.push(remoteObject.properties[k]);
+ var ret=true;
+ while (ret && queue.length>0) {
+ var current=q.pop();
+ if (typeof(current)==="function" || typeof(current)==="object")
+ continue;
+ if (Array.isArray(current)) {
+ current.forEach(function(x) { queue.push(x); });
+ continue;
+ }
+ if (typeof(current.__ipc_owner)!=="object")
+ continue;
+ if (!this._isRemoteObjectReady(current.__ipc_object,presumeReady))
+ ret=false;
+ }
+ delete presumeReady[remoteObject.id];
+ return ret;
+}
+
+When to actually release a remote object ?
+ refCount == 0?
+ for all objects in usedBy table:
+ can this be released (given that this object will be released?)
+ if (all yes)
+ release object, and for all also remote objects in properties, remove self from usedBy.
+
View
1,173 IPCNode.js
@@ -2,23 +2,35 @@ var sys=require("sys");
var EventEmitter=require("events").EventEmitter;
var IDProvider=require("./IDProvider").IDProvider;
+var isDebug=false;
+var defaultPrepareCount=0;//isDebug?-1:0;
+
+//Helper functions
+function getGlobal() {
+ return (function(){return this;})();
+}
+
function IPCNode() {
EventEmitter.call(this);
this._pauseBuffer=[];
this._buffer="";
this._idp=new IDProvider();
- this._localobjects={};
- this._remoteobjects={};
- this._requesting={};
+ this._localObjects={};
+ this._remoteObjects={};
this._id=IPCNode._idp.alloc();
+ this._waitingReleases=[];
+ this._holdingReleases=0;
}
IPCNode._idp=new IDProvider();
-//Readable stream
function Base(){};
Base.prototype=EventEmitter.prototype;
IPCNode.prototype=new Base();
+IPCNode.prototype.prepareCount=defaultPrepareCount;
+/*************************\
+* Readable stream methods *
+\*************************/
IPCNode.prototype._closed=false;
IPCNode.prototype._paused=false;
IPCNode.prototype._pauseBuffer=undefined; //Should be set in constructor
@@ -35,7 +47,9 @@ IPCNode.prototype.resume=function() {
IPCNode.prototype.destroy=function() {
this.end();
}
-//Writeable stream
+/**************************\
+* Writeable stream methods *
+\**************************/
IPCNode.prototype._buffer=undefined; //Should be set in constructor
IPCNode.prototype.writeable=true;
IPCNode.prototype.write=function(string,encoding) {
@@ -68,28 +82,34 @@ IPCNode.prototype.end=function(string,encoding) {
this.emit("close");
}
//See Readable Stream for destroy
-//Custom functions
-IPCNode.prototype._isClean=function() {
- //this._localobjects and this._remoteobjects should be empty.
- return (
- !Object.keys(this._localobjects).some(function(x) { return x==parseInt(x); }) &&
- !Object.keys(this._requesting).some(function(x) { return x==parseInt(x); }) &&
- !Object.keys(this._remoteobjects).some(function(x) { return x==parseInt(x); })
- );
-}
-IPCNode.prototype._cleanup=function() {
- //TODO: Clean up all marshalled objects
-}
+/***********\
+* Constants *
+\***********/
+IPCNode.commands={
+ register: isDebug?"register":0,
+ call: isDebug?"call":1,
+ release: isDebug?"release":2,
+ infoRequest: isDebug?"infoRequest":3,
+ infoResponse: isDebug?"infoResponse":4,
+};
+IPCNode.objectSource={
+ local: isDebug?"local":0,
+ marshalled: isDebug?"marshalled":1,
+ global: isDebug?"global":2
+};
+/****************\
+* Outgoing stuff *
+\****************/
IPCNode.prototype._emitError=function(err) {
this._closed=true;
this.emit("error",err);
}
-IPCNode.prototype._emitCommand=function(cmd) {
- this._emitData(JSON.stringify(cmd)+"\n");
+IPCNode.prototype._emitObject=function(obj) {
+ this._emitData(JSON.stringify(obj)+"\n");
}
IPCNode.prototype._emitData=function(data) {
if (this._closed)
- throw new Error("IPCNode is closed");
+ throw new Error("_emitData: IPCNode is closed: "+data.toString());
if (!this._paused) this.emit("data",data);
else {
if (typeof(this._pauseBuffer)!=="object")
@@ -97,519 +117,738 @@ IPCNode.prototype._emitData=function(data) {
this._pauseBuffer.push(data);
}
}
-IPCNode.prototype._onData=function(data) {
- if (this._closed)
- throw new Error("IPCNode is closed");
- try {
- return this._onCommand(JSON.parse(data));
- }
- catch(e) {
- this._emitError(e);
+IPCNode.prototype._emitReleases=function() {
+ var i,cur;
+ if (this._holdingReleases>0) {
+ for (var i=0; i<arguments.length; i++) {
+ cur=arguments[i];
+ if (cur!=parseInt(cur))
+ continue;
+ this._waitingReleases.push(cur);
+ }
+ } else {
+ var command=[IPCNode.commands.release];
+ for (var i=0; i<arguments.length; i++) {
+ cur=arguments[i];
+ if (cur!=parseInt(cur))
+ continue;
+ command.push(cur);
+ }
+ if (command.length>1)
+ this._emitObject(command);
+ this._checkClean();
}
}
-IPCNode.prototype._onCommand=function(cmd) {
- switch (cmd.m) {
- //Response
- case "rs":
- this._onResponse(cmd.i,cmd.t,cmd.p);
- break;
- //Request
- case "rq":
- this._onRequest(cmd.i);
- break;
- //Release
- case "rl":
- this._onRelease(cmd.i);
- break;
- //Call
- case "c":
- this._onCall(cmd.f,cmd.t,cmd.a);
- break;
- //Register
- case "rg":
- this._onRegister(cmd.n,cmd.o);
- break;
- default:
- throw new Error("Unknown method: "+m);
+IPCNode.prototype._holdReleases=function() {
+ this._holdingReleases++;
+}
+IPCNode.prototype._unholdReleases=function() {
+ if (this._holdingReleases == 0) throw new Error("_unholdReleases: not held");
+ if (--this._holdingReleases == 0) {
+ if (this._waitingReleases.length>0) {
+ var command=[IPCNode.commands.release].concat(this._waitingReleases);
+ this._waitingReleases=[];
+ this._emitObject(command);
+ }
+ this._checkClean();
}
}
-IPCNode.prototype._onRelease=function(id) {
- var info=this._localobjects[parseInt(id)];
- if (typeof(info)!=="object")
- throw new Error("Unknown ID: "+id);
- if (--info.refcount == 0) {
- //Release it completely
- delete info.object["__ipcinfo_"+this._id];
- delete this._localobjects[id];
- this._idp.free(id);
+IPCNode.prototype.register=function() {
+ return this._emitMarshalledCommand(IPCNode.commands.register,Array.prototype.slice.call(arguments));
+}
+IPCNode.prototype._call=function() {
+ return this._emitMarshalledCommand(IPCNode.commands.call,Array.prototype.slice.call(arguments));
+}
+IPCNode.prototype._onInfoRequest=function() {
+ var ids=Array.prototype.slice.call(arguments);
+ var ret={};
+ var self=this;
+ var objectTable={};
+ var marshalled=[];
+ function marshalLocalObjectCallback(localObject) {
+ var id=self._marshalLocalObject(localObject,objectTable);
+ marshalled.push(id);
+ return id;
}
- if (this._isClean())
- this.emit("clean");
+ //Marshal each one of them to the object table, but don't store the result
+ var requested=[];
+ ids.forEach(function(id) {
+ if (id!=parseInt(id))
+ return;
+ var localObject=self._localObjects[id];
+ if (typeof(localObject)!=="object")
+ throw new Error("Local object "+id+" not found");
+ //Marshal it, but don't store. Will make sure it ends up in the object Table
+ requested.push({id:marshalLocalObjectCallback(localObject.object),object:localObject.object});
+ });
+ //For all requested IDs, make sure the properties are set
+ requested.forEach(function(x) {
+ var id=x.id;
+ var object=x.object;
+ var currentInfo;
+ if (id!=parseInt(id))
+ return;
+ currentInfo=objectTable[id];
+ if (typeof(currentInfo)==="undefined" || typeof(currentInfo)==="object")
+ return; //Either non-existing, or already done
+ objectTable[id]=[currentInfo,self._marshalProperties(object,marshalLocalObjectCallback)];
+ });
+ //Attempt to prepare some extra objects
+ var toPrepare=IPCNode.prototype.prepareCount;
+ var i,id,object,currentInfo;
+ for (var i=0; i<marshalled.length && toPrepare!==0; i++) {
+ id=marshalled[i].id;
+ object=marshalled[i].object;
+ if (id!=parseInt(id))
+ continue;
+ currentInfo=objectTable[id];
+ if (typeof(currentInfo)==="undefined" || typeof(currentInfo)==="object")
+ continue; //Either non-existing, or already done
+ if (toPrepare>0)
+ toPrepare--;
+ objectTable[id]=[currentInfo,self._marshalProperties(object,marshalLocalObjectCallback)];
+ }
+ //Send
+ return this._emitObject([IPCNode.commands.infoResponse,objectTable]);
}
-IPCNode.prototype._onRequest=function(id) {
+/*************\
+* Marshalling *
+\*************/
+IPCNode.prototype._emitMarshalledCommand=function(cmd,args) {
var self=this;
- var info=self._localobjects[parseInt(id)];
- if (typeof(info)!=="object")
- throw new Error("Unknown ID: "+id);
- var props={};
- var obj=info.object;
- var type=typeof(obj);
- var res={m:"rs",i:id,t: type.substr(0,1),p: props};
- var todo=1;
- function checkDone() {
- if (todo==0)
- self._emitCommand(res);
+ var objectTable={};
+ var marshalled=[];
+ function marshalLocalObjectCallback(localObject) {
+ var id=self._marshalLocalObject(localObject,objectTable);
+ marshalled.push({id:id,object:localObject});
+ return id;
}
- for (var i in obj) {
- var x=obj[i];
- if (i.substr(0,2)!=="__" && (typeof(x)==="object" || typeof(x)==="function") && x!==null) {
- todo++;
- self._marshalObject(x,(function(i) {
- return function(m) {
- props[i]=m;
- todo--;
- checkDone();
- }
- })(i));
- }
+ var marshalledArgs=args.map(function(value) { return self._marshalValue(value,marshalLocalObjectCallback); });
+ //Send some extra properties over the wire
+ var toPrepare=IPCNode.prototype.prepareCount;
+ var i,id,object,currentInfo;
+ for (var i=0; i<marshalled.length && toPrepare!==0; i++) {
+ id=marshalled[i].id;
+ object=marshalled[i].object;
+ if (id!=parseInt(id))
+ continue;
+ currentInfo=objectTable[id];
+ if (typeof(currentInfo)==="undefined" || typeof(currentInfo)==="object")
+ continue; //Either non-existing, or already done
+ if (toPrepare>0)
+ toPrepare--;
+ objectTable[id]=[currentInfo,self._marshalProperties(object,marshalLocalObjectCallback)];
}
- todo--;
- checkDone();
+ this._emitObject([cmd,objectTable].concat(marshalledArgs));
}
-IPCNode.prototype._onResponse=function(id,type,props) {
- var id=parseInt(id);
- var req=this._requesting[id];
- if (typeof(req)!=="object")
- throw new Error("Invalid request ID in response")
- delete this._requesting[id];
- var info={type:type,props:props,refcount:1};
- this._remoteobjects[id]=info;
- while (req.length)
- req.shift()(info);
- this._releaseRemoteId(id,info);
-}
-IPCNode.prototype._marshalObject=function(o,callback) {
- var info=o["__ipcinfo_"+this._id];
- if (typeof(info)==="object") {
- if (info.owner) {
- this._localobjects[info.id].refcount++;
- return callback({o:true,i:info.id});
- } else {
- return callback({o:false,i:info.id});
+IPCNode.prototype._marshalValue=function(value,marshalLocalObjectCallback) {
+ var self=this;
+ if (value===null || value===undefined || typeof(value)==="string" || typeof(value)==="number" || typeof(value)==="boolean")
+ return value;
+ if (Array.isArray(value))
+ return value.map(function(subvalue) { return self._marshalValue(subvalue,marshalLocalObjectCallback); });
+ if (value===getGlobal())
+ return {t:IPCNode.objectSource.global};
+ if (typeof(value)==="object" || typeof(value)==="function") {
+ if (value.__ipc_owner == this) {
+ if (typeof(value.__ipc_object)!=="object")
+ throw new Error("Attempt to use disposed object");
+ return {t:IPCNode.objectSource.local,i:value.__ipc_object.id};
}
+ return {t:IPCNode.objectSource.marshalled,i:marshalLocalObjectCallback(value)};
+ }
+ throw new Error("Unable to marshal value: "+value);
+}
+IPCNode.prototype._marshalProperties=function(object,marshalLocalObjectCallback) {
+ var ret={};
+ for (var key in object) {
+ if (key.substr(0,6)=="__ipc_")
+ continue;
+ ret[key]=this._marshalValue(object[key],marshalLocalObjectCallback);
+ }
+ return ret;
+}
+IPCNode.prototype._marshalLocalObject=function(object,objectTable) {
+ var localObject=object["__ipc_info_"+this._id];
+ var id;
+ var type;
+ if (typeof(localObject)!=="object" || this._localObjects[localObject.id]!==localObject) {
+ id=this._idp.alloc();
+ localObject=object["__ipc_info_"+this._id]=this._localObjects[id]={id:id,refCount:1,object:object};
+ type=typeof(object);
+ objectTable[id]=type.substr(0,1);
+ return id;
} else {
- //Allocate new
- info={owner:true,id:this._idp.alloc()};
- o["__ipcinfo_"+this._id]=info;
- this._localobjects[info.id]={
- refcount: 1,
- object: o
- };
- return callback({o:true,i:info.id});
- }
-}
-IPCNode.prototype._marshal=function(o,callback) {
- if (Array.isArray(o)) {
- var self=this;
- var todo=1;
- var ret=[];
- function checkDoneArr() {
- try {
- if (todo==0)
- callback(ret);
- }
- catch(e) {
- self._emitError(e);
- }
- }
- for (var i=0; i<o.length; i++) {
- this._marshal(o[i],(function(i) {
- return function(m) {
- ret[i]=m;
- todo--;
- checkDoneArr();
- }
- })(i));
+ id=localObject.id;
+ if (typeof(objectTable[id])==="undefined") {
+ type=typeof(object);
+ objectTable[id]=type.substr(0,1);
+ localObject.refCount++;
}
- todo--;
- checkDoneArr();
- return;
+ return localObject.id;
}
- if (o===null || o===undefined || typeof(o)==="number" || typeof(o)==="string" || typeof(o)==="boolean")
- return callback(o);
- if (typeof(o)==="object") {
- var has_functions=Object.keys(o).some(function(x) { return ((typeof(o[x])==="function") && (Object.prototype[x] !== o[x])); });
- if (has_functions) {
- return this._marshalObject(o,function(m) {
- callback({t:"o",o:m});
- });
- } else {
- var self=this;
- var todo=1;
- var ret={};
- function checkDoneObj() {
- try {
- if (todo==0)
- callback({t:"j",o:ret});
- }
- catch(e) {
- self._emitError(e);
- }
- }
- for (var i in o) {
- this._marshal(o[i],(function(i) {
- return function(m) {
- ret[i]=m;
- todo--;
- checkDoneObj();
- }
- })(i));
- }
- todo--;
- checkDoneObj();
+}
+IPCNode.prototype._releaseLocals=function(ids) {
+ var self=this;
+ ids.forEach(function(id) {
+ if (id!=parseInt(id))
return;
+ var localObject=self._localObjects[id];
+ if (typeof(localObject)!=="object")
+ throw new Error("Unknown local ID "+id);
+ if (--localObject.refCount === 0) {
+ delete self._localObjects[id];
+ delete localObject.object["__ipc_id_"+self._id];
+ self._idp.free(parseInt(id));
}
+ });
+ if (ids.length>0)
+ this._checkClean();
+}
+IPCNode.prototype._onRelease=function() {
+ var ids=Array.prototype.slice.call(arguments);
+ this._releaseLocals(ids);
+}
+/****************\
+* Incoming stuff *
+\****************/
+IPCNode.prototype._onData=function(data) {
+ if (this._closed)
+ throw new Error("IPCNode is closed");
+ try {
+ return this._onObject(JSON.parse(data));
}
- if (typeof(o)==="function") {
- return this._marshalObject(o,function(m) {
- callback({t:"o",o:m});
- });
+ catch(e) {
+ this._emitError(e);
}
- throw new Error("Unable to marshal value: "+o);
}
-function getGlobal() {
- return (function(){return this;})();
+IPCNode.prototype._onObject=function(obj) {
+ if (!Array.isArray(obj))
+ throw new Error("Expected Array as object");
+ var cmd=obj[0];
+ switch (cmd) {
+ case IPCNode.commands.register: return this._onRegister.apply(this,obj.slice(1));
+ case IPCNode.commands.call: return this._onCall.apply(this,obj.slice(1));
+ case IPCNode.commands.release: return this._onRelease.apply(this,obj.slice(1));
+ case IPCNode.commands.infoRequest: return this._onInfoRequest.apply(this,obj.slice(1));
+ case IPCNode.commands.infoResponse: return this._onInfoResponse.apply(this,obj.slice(1));
+ default: throw new Error("Unsupported command: "+cmd);
+ }
}
-IPCNode.prototype._call=function(f,t,args) {
- if (this._closed)
- throw new Error("IPCNode was closed");
- var marshalledArgs=[];
- var marshalledF=undefined;
- var marshalledT=undefined;
- var todo=1;
+IPCNode.prototype._onRegister=function(marshalledObjectTable) {
var self=this;
- function checkDone() {
- try {
- if (todo==0)
- self._emitCommand({m:"c",f:marshalledF,t:marshalledT,a:marshalledArgs});
- }
- catch(e) {
- self._emitError(e);
+ this._unmarshalArguments(Array.prototype.slice.call(arguments),function(args) {
+ self.emit.apply(self,["register"].concat(args));
+ });
+}
+IPCNode.prototype._onInfoResponse=function(marshalledObjectTable) {
+ var self=this;
+ this._unmarshalArguments(Array.prototype.slice.call(arguments),function(args) {
+ //Don't do anything :)
+ });
+}
+IPCNode.prototype._onCall=function(marshalledObjectTable) {
+ var self=this;
+ this._unmarshalArguments(Array.prototype.slice.call(arguments),function(args) {
+ var f=args[0];
+ if (typeof(f)!=="function")
+ throw new Error("Can only call functions");
+ var t=args[1];
+ var a=args.slice(2);
+ f.apply(t,a);
+ });
+}
+/***************\
+* Unmarshalling *
+\***************/
+IPCNode.prototype._unmarshalArguments=function(args,resultCallback) {
+ var marshalledObjectTable=args.shift();
+ var objectTableCallbacks=[];
+ var objectTable;
+ var localObjects={};
+ var self=this;
+
+ function requestObjectTable(resultCallback) {
+ if (objectTable===undefined) objectTableCallbacks.push(resultCallback);
+ else resultCallback(objectTable);
+ }
+ function localObjectUnmarshaller(id,resultCallback) {
+ if (id!=parseInt(id))
+ throw new Error("ID is not numeric");
+ var localObject=localObjects[id];
+ if (typeof(localObject)!=="object") {
+ localObjects[id]=localObject=self._localObjects[id];
+ if (typeof(localObject)!=="object")
+ throw new Error("Unknown local id "+id);
+ localObject.refCount++;
}
+ resultCallback(localObject.object);
}
- todo++;
- //Marshal function
- self._marshalObject(f,function(m) {
- marshalledF=m;
- todo--;
- checkDone();
- });
- //Marshal this
- if (t===null || t===undefined || typeof(t)==="string") {
- marshalledT={t:"d",v:t};
- } else if (t===getGlobal()) {
- marshalledT={t:"g"};
- } else if (typeof(t)==="object") {
- todo++;
- self._marshal(t,function(m) {
- marshalledT={t:"m",m:m};
- todo--;
- checkDone();
+ function remoteObjectUnmarshaller(id,resultCallback) {
+ requestObjectTable(function(objectTable) {
+ self._unmarshalFromObjectTable(objectTable,id,resultCallback);
});
}
- //Marshal arguments
- for (var i=0; i<args.length; i++) {
- todo++;
- self._marshal(args[i],(function(i) {
- return function(m) {
- marshalledArgs[i]=m;
- todo--;
- checkDone();
- }
- })(i));
- }
- todo--;
- checkDone();
-}
-IPCNode.prototype._onCall=function(f,t,args) {
- var todo=1;
- var unmarshalledArgs=[];
- var unmarshalledF=undefined;
- var unmarshalledT=undefined;
+ this._holdReleases();
+ this._unmarshalArray(args,localObjectUnmarshaller,remoteObjectUnmarshaller,function(unmarshalledArgs) {
+ resultCallback(unmarshalledArgs);
+ //Free up object table
+ requestObjectTable(function(objectTable) {
+ self._disposeObjectTable(objectTable);
+ });
+ });
+ var unused=this._unmarshalObjectTable(marshalledObjectTable,function(unmarshalledObjectTable) {
+ objectTable=unmarshalledObjectTable;
+ objectTableCallbacks.forEach(function(c) {
+ c(unmarshalledObjectTable);
+ });
+ self._releaseLocals(Object.keys(localObjects).filter(function(x) { return x==parseInt(x); }));
+ });
+ this._emitReleases.apply(this,unused);
+ this._unholdReleases();
+}
+IPCNode.prototype._unmarshalValue=function(value,localObjectUnmarshaller,remoteObjectUnmarshaller,resultCallback) {
+ if (value===null || value===undefined || typeof(value)==="string" || typeof(value)==="number" || typeof(value)==="boolean")
+ return resultCallback(value);
+ if (Array.isArray(value))
+ return this._unmarshalArray(value,localObjectUnmarshaller,remoteObjectUnmarshaller,resultCallback);
+ if (typeof(value)==="object")
+ return this._unmarshalObject(value,localObjectUnmarshaller,remoteObjectUnmarshaller,resultCallback);
+ throw new Error("Unable to unmarshal: "+value);
+}
+IPCNode.prototype._unmarshalArray=function(arr,localObjectUnmarshaller,remoteObjectUnmarshaller,resultCallback) {
+ var ret=[];
var self=this;
+ var todo=1;
function checkDone() {
- try {
- if (todo==0) {
- unmarshalledF.apply(unmarshalledT,unmarshalledArgs);
- IPCNode.dispose(unmarshalledF);
- IPCNode.dispose(unmarshalledT);
- for (var i=0; i<unmarshalledArgs.length; i++)
- IPCNode.dispose(unmarshalledArgs[i]);
- }
- }
- catch(e) {
- self._emitError(e);
- }
+ if (todo===0)
+ resultCallback(ret);
}
- todo++;
- this._unmarshalObject(f,function(um) {
- unmarshalledF=um;
- todo--;
- checkDone();
- },true);
- if (t.t==="d")
- unmarshalledT=t.v;
- else if (t.t==="g")
- unmarshalledT=getGlobal();
- else if (t.t==="m") {
+ arr.forEach(function(value,key) {
todo++;
- this._unmarshal(t.m,function(um) {
- unmarshalledT=um;
+ self._unmarshalValue(value,localObjectUnmarshaller,remoteObjectUnmarshaller,function(unmarshalledValue) {
+ ret[key]=unmarshalledValue;
todo--;
checkDone();
- },true);
- }
- else throw new Error("Unable to unmarshal 'this' from "+t);
- for (var i=0; i<args.length; i++) {
- todo++;
- this._unmarshal(args[i],(function(i) {
- return function(um) {
- unmarshalledArgs[i]=um;
- todo--;
- checkDone();
- }
- })(i),true);
- }
+ });
+ });
todo--;
checkDone();
}
-IPCNode.prototype._unmarshalLocalObject=function(id,callback,releaseDuplicate) {
- //Unmarshal an object on my side
- var info=this._localobjects[parseInt(id)];
- if (typeof(info)!=="object")
- throw new Error("Unknown local id "+id);
- callback(info.object);
-}
-IPCNode.prototype._releaseRemoteId=function(id,info) {
- //Do nothing for now
- id=parseInt(id);
- if (typeof(info)!=="object")
- throw new Error("Unhandled error on releasing");
- if (--info.refcount == 0) {
- delete this._remoteobjects[id];
- this._emitCommand({m:"rl",i:id});
- }
- if (this._isClean())
- this.emit("clean");
+IPCNode.prototype._unmarshalObject=function(obj,localObjectUnmarshaller,remoteObjectUnmarshaller,resultCallback) {
+ switch (obj.t) {
+ case IPCNode.objectSource.global: return resultCallback(getGlobal());
+ case IPCNode.objectSource.local: return localObjectUnmarshaller(obj.i,resultCallback);
+ case IPCNode.objectSource.marshalled: return remoteObjectUnmarshaller(obj.i,resultCallback);
+ default:
+ throw new Error("Unknown object type: "+obj.t);
+ }
}
-function createIPCFunction() {
+function createStubFunction() {
return function() {
var f=arguments.callee;
var owner=f.__ipc_owner;
if (typeof(owner)!=="object")
- throw new Error("Function has been disposed");
- owner._call(f,this,Array.prototype.slice.call(arguments));
- }
-}
-IPCNode.reference=function(obj) {
- var owner=obj.__ipc_owner;
- if (typeof(owner)!=="object")
- return obj;
- var ret;
- //Will call back immediatly seeing as all info should be readily available
- owner._createRemoteObject(obj.__ipc_remote_id,obj.__ipc_owner_info,function(r) {
- ret=r;
- },false);
- return ret;
+ throw new Error("IPC object has been disposed");
+ var args=[f,this].concat(Array.prototype.slice.call(arguments));
+ owner._call.apply(owner,args);
+ };
+}
+IPCNode.prototype._unmarshalFromObjectTable=function(objectTable,id,resultCallback) {
+ if (id!=parseInt(id))
+ throw new Error("Unable to unmarshal ID "+id+", not numeric");
+ var remoteObject=objectTable[id];
+ if (typeof(remoteObject)!=="object")
+ throw new Error("Unable to unmarshal ID "+id+", not in table");
+ if (remoteObject.isReady)
+ return resultCallback(remoteObject.stub);
+ remoteObject.readyCallbacks.push(function() {
+ resultCallback(remoteObject.stub);
+ });
}
-IPCNode.dispose=function(obj) {
- if (obj.__ipc_disposed)
- throw new Error("IPC object already disposed");
- var owner=obj.__ipc_owner;
- if (typeof(owner)!=="object")
- return;
- var info=obj.__ipc_owner_info;
- var id=obj.__ipc_remote_id;
- var children=obj.__ipc_children;
- delete obj.__ipc_owner;
- delete obj.__ipc_owner_info;
- delete obj.__ipc_remote_id;
- delete obj.__ipc_children;
- delete obj["__ipcinfo_"+owner._id];
- obj.__ipc_disposed=true;
- owner._releaseRemoteId(id,info);
- for (var i=0; i<children.length; i++) {
- try {
- IPCNode.dispose(children[i]);
+IPCNode.prototype._unmarshalObjectTable=function(objectTable,resultCallback) {
+ var id;
+ var type;
+ var infoCache={};
+ var remoteObject;
+ var notUsed=[];
+ var unmarshalledTable={};
+ var stub;
+ var properties;
+ var requestProperties=[IPCNode.commands.infoRequest];
+ var todo=1;
+ //Step one: Ensure that a _remoteObject exists for all objects, and increase the reference count.
+ for (id in objectTable) {
+ if (id!=parseInt(id))
+ continue;
+ type=objectTable[id];
+ if (typeof(type)==="object") {
+ infoCache[id]=type[1];
+ type=type[0];
}
- catch(e) {
-
+ remoteObject=this._remoteObjects[id];
+ if (typeof(remoteObject)!=="object") {
+ switch (type) {
+ case "f": stub=createStubFunction(); break;
+ case "o": stub={}; break;
+ default: throw new Error("Unable to unmarshal object: unknown type "+type);
+ }
+ Object.defineProperty(stub,"__ipc_owner",{value:this,enumerable:false,configurable:true});
+ remoteObject=this._remoteObjects[id]=unmarshalledTable[id]={
+ id: id,
+ refCount: 1,
+ externalRefCount: 0,
+ usedBy: {},
+ uses: {},
+ usedLocals: {},
+ hasProperties: false,
+ hasRequested: false,
+ type: type,
+ stub: stub,
+ isReady: false,
+ readyCallbacks: []
+ };
+ Object.defineProperty(stub,"__ipc_object",{value:remoteObject,enumerable:false,configurable:true});
+ } else {
+ remoteObject.refCount++;
+ unmarshalledTable[id]=remoteObject;
+ notUsed.push(id);
}
}
-}
-IPCNode.prototype._createRemoteObject=function(id,info,callback,releaseDuplicate) {
- if (this._closed)
- throw new Error("IPCNode was closed");
- var ret;
- if (info.type==="f") ret=createIPCFunction();
- else if (info.type==="o") ret={};
- else throw new Error("Type "+t+" not supported");
- info.refcount++;
- var self=this;
- ret.__ipc_disposed=false;
- ret.__ipc_owner=self;
- ret.__ipc_owner_info=info;
- ret.__ipc_remote_id=id;
- ret.__ipc_children=[];
- ret["__ipcinfo_"+self._id]={owner:false,id:id};
- var props=info.props;
- var todo=1;
+ //Step 2: Apply the infoCache to all objects not yet having properties (and attempt to make them ready)
+ for (id in infoCache) {
+ if (id!=parseInt(id))
+ continue;
+ remoteObject=unmarshalledTable[id];
+ if (typeof(remoteObject)!=="object")
+ continue;
+ if (remoteObject.hasProperties)
+ continue;
+ this._applyProperties(remoteObject,infoCache[id],unmarshalledTable);
+ }
+ //Step 3: Request information for any object that does not have it yet.
+ for (id in unmarshalledTable) {
+ if (id!=parseInt(id))
+ continue;
+ remoteObject=unmarshalledTable[id];
+ if (typeof(remoteObject)!=="object")
+ continue;
+ if (!remoteObject.hasProperties && !remoteObject.hasRequested) {
+ requestProperties.push(remoteObject.id);
+ remoteObject.hasRequested=true;
+ }
+ }
+ if (requestProperties.length>1)
+ this._emitObject(requestProperties);
+ //Step 4: Check all objects if they're ready, and if not, wait for them
function checkDone() {
- if (todo==0)
- callback(ret);
+ if (todo===0)
+ resultCallback(unmarshalledTable);
}
- for (var propname in props) {
- if (propname.substr(0,2)=="__")
+ for (id in unmarshalledTable) {
+ if (id!=parseInt(id))
continue;
- todo++;
- self._unmarshalObject(props[propname],(function(propname) {
- return function(propvalue) {
- ret[propname]=propvalue;
- ret["__ipc_children"].push(propvalue);
+ remoteObject=unmarshalledTable[id];
+ if (typeof(remoteObject)!=="object")
+ continue;
+ if (!this._checkReady(remoteObject)) {
+ todo++;
+ remoteObject.readyCallbacks.push(function() {
todo--;
checkDone();
- }
- })(propname),releaseDuplicate);
+ });
+ }
}
todo--;
checkDone();
+ //Step 3: Return notUsed table, so command can do a release on those.
+ return notUsed;
+}
+IPCNode.prototype._disposeObjectTable=function(objectTable) {
+ var currentObject;
+ for (var id in objectTable) {
+ if (id!=parseInt(id))
+ continue;
+ currentObject=objectTable[id];
+ if (typeof(currentObject)!=="object")
+ continue;
+ this._releaseStub(currentObject);
+ }
+}
+IPCNode.prototype._addrefStub=function(remoteObject) {
+ remoteObject.refCount++;
}
-IPCNode.prototype._unmarshalRemoteObject=function(id,callback,releaseDuplicate) {
- //Unmarshal an object on the other side
- var id=parseInt(id);
+IPCNode.prototype._releaseStub=function(remoteObject) {
+ if (remoteObject.refCount<1)
+ throw new Error("Object is already fully released");
+ remoteObject.refCount--;
+ this._checkDispose(remoteObject);
+}
+IPCNode.prototype._applyProperties=function(remoteObject,properties,objectTable) {
+ var key,stub;
var self=this;
- function onInfo(info) {
- self._createRemoteObject(id,info,callback,releaseDuplicate);
- }
- var existing=this._remoteobjects[id];
- if (typeof(existing)=="object") {
- //Send a release command, we already have a reference that we're holding on to
- if (releaseDuplicate)
- this._emitCommand({m:"rl",i:id});
- return onInfo(existing);
- }
- var requesting=this._requesting[id];
- if (typeof(requesting)=="object")
- requesting.push(onInfo);
- this._requesting[id]=[onInfo];
- this._emitCommand({m:"rq",i:id});
-}
-IPCNode.prototype._unmarshalObject=function(info,callback,releaseDuplicate) {
- if (info.o) this._unmarshalRemoteObject(info.i,callback,releaseDuplicate);
- else this._unmarshalLocalObject(info.i,callback,releaseDuplicate);
-}
-IPCNode.prototype._unmarshal=function(o,callback,releaseDuplicate) {
- if (Array.isArray(o)) {
- var self=this;
- var ret=[];
- var todo=1;
- function checkDone() {
- if (todo==0)
- callback(ret);
- }
- for (var i in o) {
- todo++;
- this._unmarshal(o[i],(function(i) {
- return function(r) {
- ret[i]=r;
- todo--;
- checkDone();
- }
- })(i),releaseDuplicate);
+ remoteObject.hasProperties=true;
+ stub=remoteObject.stub;
+ function localObjectUnmarshaller(id,resultCallback) {
+ if (id!=parseInt(id))
+ throw new Error("ID is not numeric!");
+ var localObject=remoteObject.usedLocals[id];
+ if (typeof(localObject)!=="object") {
+ localObject=self._localObjects[id];
+ if (typeof(localObject)!=="object")
+ throw new Error("Local object "+id+" not found");
+ remoteObject.usedLocals[id]=localObject;
+ localObject.refCount++;
}
- todo--;
- checkDone();
- return;
+ resultCallback(localObject.object);
}
- if (typeof(o)==="string")
- return callback(o);
- if (typeof(o)==="number")
- return callback(o);
- if (typeof(o)==="object") {
- if (o.t==="j") {
- //Similar to Array
- o=o.o;
- var self=this;
- var ret={};
- var todo=1;
- function checkDone() {
- if (todo==0)
- callback(ret);
- }
- for (var i in o) {
- todo++;
- this._unmarshal(o[i],(function(i) {
- return function(r) {
- ret[i]=r;
- todo--;
- checkDone();
- }
- })(i),releaseDuplicate);
+ function remoteObjectUnmarshaller(id,resultCallback) {
+ if (id!=parseInt(id))
+ throw new Error("ID is not numeric!");
+ var referencedObject=objectTable[id];
+ if (typeof(referencedObject)!=="object")
+ throw new Error("Remote object with "+id+" not in object table");
+ remoteObject.uses[id]=referencedObject;
+ referencedObject.usedBy[remoteObject.id]=remoteObject;
+ resultCallback(referencedObject.stub);
+ }
+ for (var key in properties) {
+ if (!properties.hasOwnProperty(key))
+ continue;
+ this._unmarshalValue(properties[key],localObjectUnmarshaller,remoteObjectUnmarshaller,(function(key) {
+ return function(value) {
+ stub[key]=value;
}
- todo--;
- checkDone();
- return;
- } else if (o.t==="o") {
- return this._unmarshalObject(o.o,callback,releaseDuplicate);
- } else {
- throw new Error("Unknown object type "+o.t);
- }
+ })(key));
}
- throw new Error("Unable to unmarshal "+o);
}
-
-IPCNode.prototype.register=function(name,obj) {
+IPCNode.prototype._checkDispose=function(remoteObject) {
+ //Check entire tree through usedBy for non-zero reference counts
+ var q=[remoteObject];
+ var seen={}; //Objects already seen and checked
+ seen[remoteObject.id]=true;
+ var toDispose=[]; //Objects that should be disposed
+ var toCheck=[]; //Objects that were used by disposed objects that should be checked for disposing
+ var currentObject;
+ var useId;
+ var referencedObject;
var self=this;
- this._marshalObject(obj,function(marshalledObj) {
- self._emitCommand({m:"rg",n:name,o:marshalledObj});
+ var releases=[];
+ while (q.length>0) {
+ currentObject=q.shift();
+ if (currentObject.refCount>0)
+ return false;
+ toDispose.push(currentObject);
+ //Note: No need to check for hasProperties or hasRequested, if there is anything left still checking that, the refcount should be non-zero
+ //Add used-by to queue
+ for (useId in currentObject.usedBy) {
+ if (useId!=parseInt(useId)) {
+ continue;
+ }
+ if (seen[useId]) {
+ continue;
+ }
+ referencedObject=currentObject.usedBy[useId];
+ if (typeof(referencedObject)!=="object")
+ continue;
+ seen[useId]=true;
+ q.push(referencedObject);
+ }
+ }
+ //If at this point, we should dispose of everone in the toDispose list.
+ //Also, seen contains all IDs in this list.
+ this._holdReleases();
+ toDispose.forEach(function(disposeObject) {
+ var stub=disposeObject.stub;
+ var uses=disposeObject.uses;
+ var id=disposeObject.id;
+ var useId;
+ var referencedObject;
+ disposeObject.uses={};
+ //Since we don't keep a list, we can't remove properties.
+ //Could be a good thing: simple objects containing just simple stuff will still work fine even when disposed, only when calling any functions on it will it fail.
+ //Remove uses
+ for (useId in uses) {
+ if (useId!=parseInt(useId))
+ continue;
+ referencedObject=uses[useId];
+ if (typeof(referencedObject)!=="object")
+ continue;
+ delete referencedObject.usedBy[id];
+ if (!seen[useId]) {
+ seen[useId]=true;
+ toCheck.push(referencedObject);
+ }
+ }
+ //Unreference locals
+ var locals=Object.keys(disposeObject.usedLocals).filter(function(x) { return x==parseInt(x); });
+ disposeObject.usedLocals={};
+ self._releaseLocals(locals);
+ //Remove object
+ delete stub.__ipc_object;
+ //Remove from table
+ delete self._remoteObjects[id];
+ //Add to release table
+ releases.push(id);
+ });
+ toCheck.forEach(function(referencedObject) {
+ self._checkDispose(referencedObject);
});
+ this._emitReleases.apply(this,releases);
+ this._unholdReleases();
+}
+IPCNode.prototype._checkReady=function(remoteObject) {
+ var q=[remoteObject]; //Queue for BFS
+ var seen={}; //Objects already seen and checked
+ var toSignal=[]; //Objects that should be set to ready
+ var usedBy={};
+ var currentObject;
+ var useId;
+ var i;
+ var callbacks=[];
+ seen[remoteObject.id]=true;
+ while (q.length) {
+ currentObject=q.shift();
+ if (currentObject.isReady)
+ continue;
+ if (!currentObject.hasProperties)
+ return false;
+ toSignal.push(currentObject);
+ for (useId in currentObject.uses) {
+ if (seen[useId])
+ continue;
+ seen[useId]=true;
+ q.push(currentObject.uses[useId]);
+ }
+ }
+ //Set all objects in the path to ready
+ for (i=0; i<toSignal.length; i++) {
+ currentObject=toSignal[i];
+ //Set it to ready
+ currentObject.isReady=true;
+ //Keep a list of all objects that are using objects we're now signalling
+ for (useId in currentObject.usedBy) {
+ if (useId!=parseInt(useId))
+ continue;
+ usedBy[useId]=currentObject.usedBy[useId];
+ }
+ //Keep a list of all callbacks we should signal once ready.
+ callbacks=callbacks.concat(currentObject.readyCallbacks);
+ currentObject.readyCallbacks=[];
+ }
+ //Call all callbacks
+ for (i=0; i<callbacks.length; i++)
+ callbacks[i]();
+ //Check all objects that are using any of the objects we're using.
+ for (useId in usedBy) {
+ if (useId!=parseInt(useId))
+ continue;
+ currentObject=usedBy[useId];
+ if (typeof(currentObject)!=="object")
+ continue;
+ this._checkReady(currentObject);
+ }
+ return true;
}
-IPCNode.prototype._onRegister=function(name,remoteObject) {
- var self=this;
- this._unmarshalObject(remoteObject,function(localObject) {
- self.emit("register",name,localObject);
- IPCNode.dispose(localObject);
- },true);
+/*******************\
+* General functions *
+\*******************/
+IPCNode.prototype._checkClean=function() {
+ if (
+ Object.keys(this._localObjects).filter(function(x) { return x==parseInt(x); }).length == 0 &&
+ Object.keys(this._remoteObjects).filter(function(x) { return x==parseInt(x); }).length == 0 &&
+ this._holdingReleases==0 &&
+ this._waitingReleases.length==0
+ ) {
+ this.emit("clean");
+ }
+}
+/****\
+* *
+\****/
+
+IPCNode.reference=function(obj) {
+ var owner=obj.__ipc_owner;
+ if (typeof(owner)!=="object")
+ return obj;
+ var remoteObject=obj.__ipc_object;
+ if (typeof(remoteObject)!=="object")
+ throw new Error("Remote object was already disposed");
+ remoteObject.refCount++;
+ remoteObject.externalRefCount++;
+ return obj;
}
-IPCNode.sync=function(f) {
+IPCNode.dispose=function(obj) {
+ var owner=obj.__ipc_owner;
+ if (typeof(owner)!=="object")
+ return;
+ var remoteObject=obj.__ipc_object;
+ if (typeof(remoteObject)!=="object")
+ throw new Error("Remote object was already disposed");
+ if (remoteObject.externalRefCount<1)
+ throw new Error("Remote object was not referenced any more");
+ remoteObject.refCount--;
+ remoteObject.externalRefCount--;
+ owner._checkDispose(remoteObject);
+}
+exports.IPCNode=IPCNode;
+
+IPCNode.async=function(func) {
return function() {
var args=Array.prototype.slice.call(arguments);
- if (args.length<1)
- return;
- var callback=args.pop();
- callback(f.apply(this,args));
- }
+ var copy;
+ var last;
+ var caught;
+ if (args.length>0) {
+ last=args[args.length-1];
+ if (typeof(last)==="function") {
+ args[args.length-1]=(function(f) {
+ var copy=IPCNode.reference(f);
+ return function() {
+ copy.apply(this,Array.prototype.slice.call(arguments));
+ IPCNode.dispose(copy);
+ };
+ })(last);
+ }
+ }
+ try {
+ func.apply(this,args);
+ }
+ catch(e) {
+ caught=e;
+ }
+ if (caught!==undefined)
+ throw caught;
+ };
}
-IPCNode.async=function(f) {
+IPCNode.sync=function(func) {
return function() {
var args=Array.prototype.slice.call(arguments);
- if (args.length<1)
- return;
- var lastindex=args.length-1;
- var last=args[lastindex];
- if (typeof(last)==="function") {
- var copy=IPCNode.reference(last);
- var called=false;
- args[lastindex]=function() {
- if (called)
- throw new Error("Callback function called twice");
- called=true;
- copy.apply(this,Array.prototype.slice.call(arguments));
- IPCNode.dispose(copy);
+ var copy;
+ var last;
+ var caught;
+ var ret;
+ if (args.length>0) {
+ last=args[args.length-1];
+ if (typeof(last)==="function")
+ copy=IPCNode.reference(args.pop());
+ }
+ try {
+ ret=func.apply(this,args);
+ }
+ catch(e) {
+ caught=e;
+ }
+ if (copy!==undefined) {
+ try {
+ copy(ret);
+ }
+ catch(e) {
+ caught=e;
}
+ IPCNode.dispose(copy);
}
- f.apply(this,args);
- }
-}
-
-exports.IPCNode=IPCNode;
+ if (caught!==undefined)
+ throw caught;
+ };
+}
View
578 IPCNode.old.js
@@ -0,0 +1,578 @@
+var sys=require("sys");
+var EventEmitter=require("events").EventEmitter;
+var IDProvider=require("./IDProvider").IDProvider;
+
+//Helper functions
+function getGlobal() {
+ return (function(){return this;})();
+}
+
+function IPCNode() {
+ EventEmitter.call(this);
+ this._pauseBuffer=[];
+ this._buffer="";
+ this._idp=new IDProvider();
+ this._localobjects={};
+ this._remoteobjects={};
+ this._requesting={};
+ this._id=IPCNode._idp.alloc();
+}
+
+IPCNode._idp=new IDProvider();
+
+/*************************\
+* Readable stream methods *
+\*************************/
+function Base(){};
+Base.prototype=EventEmitter.prototype;
+IPCNode.prototype=new Base();
+IPCNode.prototype._closed=false;
+IPCNode.prototype._paused=false;
+IPCNode.prototype._pauseBuffer=undefined; //Should be set in constructor
+IPCNode.prototype.readable=true;
+IPCNode.prototype.setEncoding=function(encoding) {}; //Ignore for now
+IPCNode.prototype.pause=function() {
+ this._paused=true;
+}
+IPCNode.prototype.resume=function() {
+ this._paused=false;
+ while (!this._paused && this._pauseBuffer.length>0)
+ this.emit("data",this._pauseBuffer.shift());
+}
+IPCNode.prototype.destroy=function() {
+ this.end();
+}
+/**************************\
+* Writeable stream methods *
+\**************************/
+IPCNode.prototype._buffer=undefined; //Should be set in constructor
+IPCNode.prototype.writeable=true;
+IPCNode.prototype.write=function(string,encoding) {
+ if (string===undefined)
+ return true;
+ if (this._closed)
+ throw new Error("IPCNode was closed, unable to write: "+string);
+ this._buffer+=string;
+ var split;
+ while ((split=this._buffer.indexOf("\n"))!==-1) {
+ var line=this._buffer.substr(0,split);
+ this._buffer=this._buffer.substr(split+1);
+ try {
+ this._onData(line);
+ }
+ catch(e) {
+ this._emitError(e);
+ }
+ }
+ return true;
+}
+IPCNode.prototype.end=function(string,encoding) {
+ if (this._closed)
+ return;
+ this.write.apply(this,Array.prototype.slice.call(arguments,0));
+ this._closed=true;
+ this.writeable=false;
+ this.readable=false;
+ this.emit("end");
+ this.emit("close");
+}
+//See Readable Stream for destroy
+/****************************************\
+* Internal functions for stream handling *
+\****************************************/
+IPCNode.prototype._emitError=function(err) {
+ this._closed=true;
+ this.emit("error",err);
+}
+IPCNode.prototype._emitCommand=function(cmd) {
+ this._emitData(JSON.stringify(cmd)+"\n");
+}
+IPCNode.prototype._emitData=function(data) {
+ if (this._closed)
+ throw new Error("IPCNode is closed");
+ if (!this._paused) this.emit("data",data);
+ else {
+ if (typeof(this._pauseBuffer)!=="object")
+ this._pauseBuffer=[];
+ this._pauseBuffer.push(data);
+ }
+}
+IPCNode.prototype._onData=function(data) {
+ if (this._closed)
+ throw new Error("IPCNode is closed");
+ try {
+ return this._onCommand(JSON.parse(data));
+ }
+ catch(e) {
+ this._emitError(e);
+ }
+}
+IPCNode.commands={
+ request:"request",
+ response:"response",
+ release:"release",
+ call:"call",
+ register:"register"
+};
+IPCNode.prototype._onCommand=function(cmd) {
+ if (cmd.length<0)
+ return;
+ var c=cmd[0];
+ switch (c) {
+ case IPCNode.commands.response: return this._onResponse.apply(this,cmd.slice(1));
+ case IPCNode.commands.request: return this._onRequest.apply(this,cmd.slice(1));
+ case IPCNode.commands.release: return this._onRelease.apply(this,cmd.slice(1));
+ case IPCNode.commands.call: return this._onCall.apply(this,cmd.slice(1));
+ case IPCNode.commands.register: return this._onRegister.apply(this,cmd.slice(1));
+ default:
+ throw new Error("Unknown method: "+c+" "+cmd);
+ }
+}
+/*************************\
+* Marshalling information *
+\*************************/
+IPCNode.prototype._isClean=function() {
+ //sys.puts("_isClean: "+JSON.stringify(Object.keys(this._localobjects))+" "+JSON.stringify(Object.keys(this._requesting))+" "+JSON.stringify(Object.keys(this._remoteobjects)));
+ //this._localobjects and this._remoteobjects should be empty.
+ return (
+ !Object.keys(this._localobjects).some(function(x) { return x==parseInt(x); }) &&
+ !Object.keys(this._requesting).some(function(x) { return x==parseInt(x); }) &&
+ !Object.keys(this._remoteobjects).some(function(x) { return x==parseInt(x); })
+ );
+}
+IPCNode.prototype._cleanup=function() {
+ //TODO: Clean up all marshalled objects
+}
+/*****************************\
+* Remote Object Unmarshalling *
+\*****************************/
+IPCNode.prototype._createRemoteObject=function(remoteInfo,remoteObjectUnmarshaller,resultCallback) {
+ if (this._closed)
+ throw new Error("IPCNode was closed");
+ //Create stub function or object
+ var ret;
+ if (remoteInfo.type==="f") ret=createIPCFunction();
+ else if (remoteInfo.type==="o") ret={};
+ else throw new Error("Type "+t+" not supported");
+
+ var self=this;
+ ret.__ipc_disposed=false;
+ ret.__ipc_owner=self;
+ ret.__ipc_owner_info=remoteInfo;
+ ret.__ipc_children=[];
+ ret["__ipcinfo_"+self._id]={owner:false,id:remoteInfo.id};
+ var props=remoteInfo.props;
+ var todo=1;
+ function checkDone() {
+ if (todo==0)
+ resultCallback(ret);
+ }
+ for (var propname in props) {
+ if (propname.substr(0,2)=="__")
+ continue;
+ todo++;
+ self._unmarshal(props[propname],remoteObjectUnmarshaller,(function(propname) {
+ return function(propvalue) {
+ ret[propname]=propvalue;
+ ret["__ipc_children"].push(propvalue);
+ todo--;
+ checkDone();
+ }
+ })(propname));
+ }
+ todo--;
+ checkDone();
+}
+function createIPCFunction() {
+ return function() {
+ var me=arguments.callee;
+ var owner=me.__ipc_owner;
+ if (typeof(owner)!=="object")
+ throw new Error("Function has been disposed");
+ owner._call(me,this,Array.prototype.slice.call(arguments));
+ }
+}
+IPCNode.reference=function(obj) {
+ var owner=obj.__ipc_owner;
+ if (typeof(owner)!=="object")
+ return obj;
+ sys.puts("IPCNode.reference "+obj.__ipc_owner_info.id);
+ var ret;
+ var self=owner;
+ //Will call back immediately seeing as all info should be readily available
+ function remoteObjectUnmarshaller(id,resultCallback) {
+ return self._unmarshalRemoteObject(id,remoteObjectUnmarshaller,resultCallback);
+ }
+ //Increase copied refcount manually
+ obj.__ipc_owner_info.refcount++;
+ owner._createRemoteObject(obj.__ipc_owner_info,remoteObjectUnmarshaller,function(r) {
+ ret=r;
+ },false);
+ return ret;
+}
+IPCNode.dispose=function(obj) {
+ if (obj.__ipc_disposed)
+ throw new Error("IPC object already disposed");
+ var owner=obj.__ipc_owner;
+ if (typeof(owner)!=="object") {
+ return;
+ }
+ var remoteInfo=obj.__ipc_owner_info;
+ sys.puts("IPCNode.dispose "+remoteInfo.id);
+ var children=obj.__ipc_children;
+ delete obj.__ipc_owner;
+ delete obj.__ipc_owner_info;
+ delete obj.__ipc_children;
+ delete obj["__ipcinfo_"+owner._id];
+ obj.__ipc_disposed=true;
+ owner._releaseRemoteInfo(remoteInfo);
+ for (var i=0; i<children.length; i++) {
+ try {
+ IPCNode.dispose(children[i]);
+ }
+ catch(e) {
+
+ }
+ }
+}
+IPCNode.prototype._unmarshalRemoteObject=function(id,remoteObjectUnmarshaller,resultCallback) {
+ //Unmarshal an object on the other side
+ var id=parseInt(id);
+ var self=this;
+ this._requestRemoteInfo(id,function(remoteInfo) {
+ self._createRemoteObject(remoteInfo,remoteObjectUnmarshaller,resultCallback);
+ });
+}
+/*********************************\
+* Object marshalling *
+\*********************************/
+/**
+ * Will be called on a release request (called from _releaseRemoteId on other side)
+ * @author fw@hardijzer.nl
+ * @param {id1} First id
+ * @param {id2} Second id
+ * @param {idn} nth id
+ * @return undefined
+ */
+IPCNode.prototype._onRelease=function() {
+ var ids=Array.prototype.slice.call(arguments);
+ for (var i=0; i<ids.length; i++) {
+ var id=parseInt(ids[i]);
+ var info=this._localobjects[id];
+ if (typeof(info)!=="object")
+ throw new Error("Unknown ID: "+id);
+ if (--info.refcount == 0) {
+ sys.puts("onRelease: "+id+" fully released");
+ //Release it completely
+ delete info.object["__ipcinfo_"+this._id];
+ delete this._localobjects[id];
+ this._idp.free(id);
+ } else sys.puts("onRelease: "+id+" not fully released");
+ }
+ if (ids.length>0 && this._isClean())
+ this.emit("clean");
+}
+/**
+ * (Called from _requestRemoteProperties on other side)
+ */
+IPCNode.prototype._onRequest=function(id) {
+ var self=this;
+ var info=self._localobjects[parseInt(id)];
+ if (typeof(info)!=="object")
+ throw new Error("Unknown ID: "+id);
+ var props={};
+ var obj=info.object;
+ var type=typeof(obj);
+ var res=[IPCNode.commands.response,id,type.substr(0,1),props];
+ var seen={};
+ function refcountCallback(id) {
+ id=parseInt(id);
+ if (!seen[id]) {
+ seen[id]=true;
+ return true;
+ }
+ return false;
+ }
+ function objectMarshaller(o) {
+ return self._marshalObject(o,refcountCallback);
+ }
+ for (var i in obj) {
+ var x=obj[i];
+ if (i.substr(0,2)!=="__")
+ props[i]=self._marshal(x,objectMarshaller);
+ }
+ self._emitCommand(res);
+}
+/**
+ * Gets remote object information (calls _onRequest on other side)
+ * @param {id} Remote object id
+ * @param {callback} Callback to be called with information
+ */
+IPCNode.prototype._requestRemoteInfo=function(id,callback) {
+ id=parseInt(id);
+ var cached=this._remoteobjects[id];
+ if (typeof(cached)==="object") {
+ cached.refcount++;
+ callback(cached);
+ return;
+ }
+ var requesting=this._requesting[id];
+ if (typeof(requesting)==="object") {
+ requesting.push(callback);
+ return;
+ }
+ this._requesting[id]=[callback];
+ this._emitCommand([IPCNode.commands.request,id]);
+ return;
+}
+IPCNode.prototype._haveRemoteReference=function(id) {
+ id=parseInt(id);
+ return ((typeof(this._remoteobjects[id])==="object") || (typeof(this._requesting[id])==="object"));
+}
+/**
+ *
+ */
+IPCNode.prototype._onResponse=function(id,type,props) {
+ var id=parseInt(id);
+ var req=this._requesting[id];
+ if (typeof(req)!=="object")
+ throw new Error("Invalid request ID in response")
+ delete this._requesting[id];
+ var remoteInfo={id:id,refcount:1, type:type,props:props};
+ this._remoteobjects[id]=remoteInfo;
+ while (req.length>0) {
+ remoteInfo.refcount++;
+ req.shift()(remoteInfo);
+ }
+ this._releaseRemoteInfo(remoteInfo);
+}
+/**
+ * Release remote id (calls _onRelease on other side)
+ * @param {id} id of object
+ * @param {info} local information object
+ * @return undefined
+ */
+IPCNode.prototype._releaseRemoteInfo=function(remoteInfo) {
+ //Do nothing for now
+ if (typeof(remoteInfo)!=="object")
+ throw new Error("Unhandled error on releasing");
+ if (--remoteInfo.refcount == 0) {
+ delete this._remoteobjects[remoteInfo.id];
+ this._emitCommand([IPCNode.commands.release,remoteInfo.id]);
+ }
+ if (this._isClean())
+ this.emit("clean");
+}
+/*************************\
+* Local value marshalling *
+\*************************/
+IPCNode.prototype._marshal=function(o,objectMarshaller) {
+ if (Array.isArray(o)) {
+ var self=this;
+ return o.map(function(x) { return self._marshal(x,objectMarshaller); });
+ }
+ if (o===null || o===undefined || typeof(o)==="number" || typeof(o)==="string" || typeof(o)==="boolean")
+ return o;
+ if (typeof(o)==="object" || typeof(o)==="function")
+ return objectMarshaller(o);
+ throw new Error("Unable to marshal value: "+o);
+}
+IPCNode.objecttypes={
+ mine: "mine",
+ yours: "yours",
+ global: "global"
+};
+IPCNode.prototype._marshalObject=function(o,refcountCallback) {
+ if (o===getGlobal())
+ return {t:IPCNode.objecttypes.global};
+ var info=o["__ipcinfo_"+this._id];
+ if (typeof(info)==="object") {
+ if (info.owner) {
+ if (refcountCallback(info.id))
+ this._localobjects[info.id].refcount++;
+ return {t:IPCNode.objecttypes.mine,i:info.id};
+ } else {
+ return {t:IPCNode.objecttypes.yours,i:info.id};
+ }
+ } else {
+ //Allocate new
+ info={owner:true,id:this._idp.alloc()};
+ o["__ipcinfo_"+this._id]=info;
+ this._localobjects[info.id]={
+ refcount: refcountCallback(info.id)?1:0,
+ object: o
+ };
+ return {t:IPCNode.objecttypes.mine,i:info.id};
+ }
+}
+/************************\
+* Cross-boundary calling *
+\************************/
+IPCNode.prototype._call=function(f,t,args) {
+ if (this._closed)
+ throw new Error("IPCNode was closed");
+ var self=this;
+ var seen={};
+ function refcountCallback(id) {
+ id=parseInt(id);
+ if (!seen[id]) {
+ seen[id]=true;
+ return true;
+ }
+ return false;
+ }
+ function objectMarshaller(o) {
+ return self._marshalObject(o,refcountCallback);
+ }
+ var localArgs=[f,t].concat(args);
+ var marshalledArgs=localArgs.map(function(v) {return self._marshal(obj,objectMarshaller); });
+ self._emitCommand([IPCNode.commands.call,{}].concat(marshalledArgs));
+}
+IPCNode.prototype._onCall=function() {
+ var args=Array.prototype.slice.call(arguments,2);
+ var self=this;
+
+ this._unmarshalArguments(arguments,function(f,t) {
+ var args=Array.prototype.slice.call(arguments,2);
+ f.apply(t,args);
+ IPCNode.dispose(f);
+ IPCNode.dispose(t);
+ args.forEach(function(a) {
+ IPCNode.dispose(a);
+ });
+ });
+}
+IPCNode.prototype.register=function(name,obj) {
+ if (this._closed)
+ throw new Error("IPCNode was closed");
+ var self=this;
+ var seen={};
+ function refcountCallback(id) {
+ id=parseInt(id);
+ if (!seen[id]) {
+ seen[id]=true;
+ return true;
+ }
+ return false;
+ }
+ function objectMarshaller(o) {
+ return self._marshalObject(o,refcountCallback);
+ }
+ var args=Array.prototype.slice.call(arguments,0);
+ var marshalledArgs=args.map(function(v) {return self._marshal(obj,objectMarshaller); });
+ self._emitCommand([IPCNode.commands.register,{}].concat(marshalledArgs));
+}
+IPCNode.prototype._onRegister=function() {
+ var self=this;
+ this._unmarshalArguments(arguments,function(name,object) {
+ self.emit("register",name,object);
+ IPCNode.dispose(object);
+ });
+}
+/*************\
+* End section *
+\*************/
+IPCNode.prototype._unmarshalArguments=function(args,resultCallback) {
+ var infoCache=args[0];
+ var marshalledArgs=Array.prototype.slice.call(args,1);
+ var resultCallback=marshalledArgs.pop();
+ var ret=[];
+ var self=this;
+ function remoteObjectUnmarshaller(id,resultCallback) {
+ if (self._haveRemoteReference(id))
+ self._emitCommand([IPCNode.commands.release,id]);
+ self._unmarshalRemoteObject(id,remoteObjectUnmarshaller,resultCallback);
+ }
+ var todo=1;
+ function checkDone() {
+ if (todo==0)
+ resultCallback.apply(this,ret);
+ }
+ marshalledArgs.forEach(function(marshalledValue,key) {
+ todo++;
+ self._unmarshal(marshalledValue,remoteObjectUnmarshaller,function(value) {
+ ret[key]=value;
+ todo--;
+ checkDone();
+ });
+ });
+ todo--;
+ checkDone();
+}
+IPCNode.prototype._unmarshalLocalObject=function(id,callback,releaseDuplicate) {
+ //Unmarshal an object on my side
+ var info=this._localobjects[parseInt(id)];
+ if (typeof(info)!=="object")
+ throw new Error("Unknown local id "+id);
+ callback(info.object);
+}
+IPCNode.prototype._unmarshalObject=function(info,remoteObjectUnmarshaller,resultCallback) {
+ if (info.t===IPCNode.objecttypes.mine) remoteObjectUnmarshaller(info.i,resultCallback);
+ else if (info.t===IPCNode.objecttypes.yours) this._unmarshalLocalObject(info.i,resultCallback);
+ else if (info.t===IPCNode.objecttypes.global) return resultCallback(getGlobal());
+ else throw new Error("Unknown object-type: "+info.t);
+}
+IPCNode.prototype._unmarshal=function(o,remoteObjectUnmarshaller,resultCallback) {
+ if (Array.isArray(o)) {
+ var self=this;
+ var ret=[];
+ var todo=1;
+ function checkDone() {
+ if (todo==0)
+ resultCallback(ret);
+ }
+ o.forEach(function(value,key) {
+ self._unmarshal(value,objectUnmarshaller,function(unmarshalledValue) {
+ ret[key]=unmarshalledValue;
+ todo--;
+ checkDone();
+ });
+ todo++;
+ });
+ todo--;
+ checkDone();
+ return;
+ }
+ if (typeof(o)==="string")
+ return resultCallback(o);
+ if (typeof(o)==="number")
+ return resultCallback(o);
+ if (typeof(o)==="object")
+ return this._unmarshalObject(o,remoteObjectUnmarshaller,resultCallback);
+ throw new Error("Unable to unmarshal "+o);
+}
+/******************\
+* Helper functions *
+\******************/
+IPCNode.sync=function(f) {
+ return function() {
+ var args=Array.prototype.slice.call(arguments);
+ if (args.length<1)
+ return;
+ var callback=args.pop();
+ callback(f.apply(this,args));
+ }
+}
+IPCNode.async=function(f) {
+ return function() {
+ var args=Array.prototype.slice.call(arguments);
+ if (args.length<1)
+ return;
+ var lastindex=args.length-1;
+ var last=args[lastindex];
+ if (typeof(last)==="function") {
+ var copy=IPCNode.reference(last);
+ var called=false;
+ args[lastindex]=function() {
+ if (called)
+ throw new Error("Callback function called twice");
+ called=true;
+ copy.apply(this,Array.prototype.slice.call(arguments));
+ IPCNode.dispose(copy);
+ }
+ }
+ f.apply(this,args);
+ }
+}
+
+exports.IPCNode=IPCNode;
View
80 IPCNodeProtocol.txt
@@ -0,0 +1,80 @@
+[command,[1,2,3,{},4,{}],
+1.Command (string or integer, see IPCNode.commands)
+2.Table of all used remote (e.g. owned by sender) objects.
+ Key = id
+ Value: type or [type,properties] (type being one of IPCNode.objectTypes)
+ Should call release on any not used.
+4+.Arguments all marshalled objects
+
+Marshalled values:
+ null, undefined, string, number, boolean -> stay the same
+ Array -> array with same rules applied
+ Object/function ->
+ Global -> {t:IPCNode.objectSource.global}
+ Object on receiving side: -> {t:IPCNode.objectSource.local,i:(id)}
+ Object on sending side: -> {t:IPCNode.objectSource.marshalled,i:(id)} //Should be in marshalled objects table!
+Object properties:
+ Table with property -> marshalled value. If marshalled value
+
+_localObjects table contains following objects:
+ {
+ id: id, //Key in _localObjects
+ refCount: number,
+ object: actual object //where object will also get a new properties __ipc_id_(IPCNode id) pointing back to that id.
+ }
+_remoteObjects table contains following objects:
+ {
+ id: id, //Key in _remoteObjects table
+ refCount: number, //Number of internal references
+ externalRefCount: number, //Number of external references (To protect from IPCNode.release to mess up internal references)
+ usedBy: {}, //Remote used by this object, id -> key, _remoteObjects entry -> value
+ uses: {}, //Remote objects this object uses, id -> key, _remoteObjects entry -> value
+ usedLocals: {} //Local objects this object uses (and keeps a reference on!) id -> key, _localObjects entry -> value
+ hasProperties: false, //Has it had properties applied already?
+ hasRequested: false, //Have we already requested properties?
+ type: "f" or "o"
+ stub: local representation of object,
+ isReady: boolean, //Is this object ready to be used? e.g. are all objects properly referenced?
+ readyCallbacks: array
+ }
+Stub objects contain:
+ {
+ __ipc_owner: owner IPC document
+ __ipc_object: item in remoteObjects table (see above)
+ }
+IPCNode._isRemoteObjectReady=function(remoteObject,presumeReady) {
+ if (typeof(presumeReady)==="object" && presumeReady[remoteObject.id])
+ return true;
+ if (remoteObject.isReady)
+ return true;
+ if (typeof(properties)!=="object")
+ return false;
+ presumeReady[remoteObject.id]=true;
+ var queue=[];
+ for (var key in remoteObject.properties)
+ queue.push(remoteObject.properties[k]);
+ var ret=true;
+ while (ret && queue.length>0) {
+ var current=q.pop();
+ if (typeof(current)==="function" || typeof(current)==="object")
+ continue;
+ if (Array.isArray(current)) {
+ current.forEach(function(x) { queue.push(x); });
+ continue;
+ }
+ if (typeof(current.__ipc_owner)!=="object")
+ continue;
+ if (!this._isRemoteObjectReady(current.__ipc_object,presumeReady))
+ ret=false;
+ }
+ delete presumeReady[remoteObject.id];
+ return ret;
+}
+
+When to actually release a remote object ?
+ refCount == 0?
+ for all objects in usedBy table:
+ can this be released (given that this object will be released?)
+ if (all yes)
+ release object, and for all also remote objects in properties, remove self from usedBy.
+
View
28 examples/sockets/IPCNodeClient.js
@@ -2,11 +2,32 @@ var sys=require("sys");
var net=require("net");
var IPCNode=require("../../IPCNode").IPCNode;
+var isDebug=true;
+
var client=net.createConnection(81234);
var ipc=new IPCNode();
client.on("connect",function() {
- sys.pump(client,ipc,function() { ipc.end(); sys.puts("Connection closed"); });
- sys.pump(ipc,client,function() { client.end(); client.destroy() });
+ if (!isDebug) {
+ sys.pump(client,ipc,function() { ipc.end(); sys.puts("Connection closed"); });
+ sys.pump(ipc,client,function() { client.end(); client.destroy() });
+ } else {
+ client.on("data",function(data) {
+ data.toString().split("\n").forEach(function(line) {
+ if (line.length<1)
+ return;
+ sys.puts(">> "+line);
+ });
+ ipc.write(data);
+ });
+ ipc.on("data",function(data) {
+ data.toString().split("\n").forEach(function(line) {
+ if (line.length<1)
+ return;
+ sys.puts("<< "+line);
+ });
+ client.write(data);
+ });
+ }
});
client.on("error",function(e) {
sys.puts("Client error: "+e.toString());
@@ -30,4 +51,7 @@ ipc.on("register",function(name,obj) {
IPCNode.dispose(copy);
});
}
+});
+ipc.on("clean",function() {
+ sys.puts("IPC channel clean");
});
View
4 examples/sockets/IPCNodeServer.js
@@ -34,11 +34,11 @@ net.createServer(function(client) {
sys.pump(ipc,client,function() { ipc.end(); });
sys.pump(client,ipc,function() { client.end(); client.destroy(); });
- ipc.on("error",function(e) {
+ /*ipc.on("error",function(e) {
sys.puts("IPC error: "+e.toString());
client.end();
client.destroy();
- });
+ });*/
ipc.on("clean",function(from) {
sys.puts("IPC stream is clean, no more cross-process oject references");
ipc.end();
Please sign in to comment.
Something went wrong with that request. Please try again.