Permalink
Browse files

update for new part stop/start model

  • Loading branch information...
1 parent 295cf1c commit 04e5a556356906330081f0a362db95248e46d99c @cloudshift committed Mar 14, 2012
View
@@ -76,19 +76,21 @@ interface Part_<S,B,G,E> {
var partID(default,null):String;
var state:EPartState<E>;
var info:PartInfo;
+ var sstopper:Dynamic->Outcome<String,Dynamic>;
+
function start(d:S,?oc:Outcome<B,G>):Outcome<B,G>;
function stop(d:Dynamic):Outcome<String,Dynamic>;
function observe(cb:E->Void,?info:Dynamic):Void->Void;
function notify(e:E):Void;
function observeState(cb:EPartState<E>->Void):Void;
function notifyState(s:EPartState<E>):Void;
function peer():Dynamic;
+ function setStop(cb:Dynamic->Outcome<String,Dynamic>):Void;
}
interface Part<S,B,G,E> {
var part_:Part_<S,B,G,E>;
function start_(p:S,?oc:Outcome<B,G>):Outcome<B,G>;
- function stop_(?d:Dynamic):Outcome<String,Dynamic>;
}
typedef AnyPart = Part<Dynamic,Dynamic,Dynamic,Dynamic>;
View
@@ -623,6 +623,10 @@ class PartX {
public static function stop<S,B,G,E>(part:Part<S,B,G,E>,?data:Dynamic):Outcome<String,Dynamic> {
return part.part_.stop(data);
}
+
+ public static function stop_<S,B,G,E>(part:Part<S,B,G,E>,cb:Dynamic->Outcome<String,Dynamic>) {
+ part.part_.setStop(cb);
+ }
public static function observe<S,B,G,E>(part:Part<S,B,G,E>,cb:E->Void) {
part.part_.observe(cb);
@@ -3,12 +3,9 @@ package cloudshift.channel;
import cloudshift.Core;
import cloudshift.Channel;
-
import cloudshift.channel.Flow;
import cloudshift.Session;
-import cloudshift.core.ObservableImpl;
using cloudshift.Mixin;
-using cloudshift.channel.Flow;
class PushClientImpl implements Conduit {
public var part_:Part_<ConduitClientStart,String,Conduit,ConduitEvent>;
@@ -29,21 +26,21 @@ class PushClientImpl implements Conduit {
_url = "http://"+js.Lib.window.location.host;
_sessID = cs.sessID;
_parted = false;
+
remoteInit(function(ignore) {
- trace("got remote init, resolving "+_sessID);
+ stop_(function(d) {
+ var soc = Core.outcome();
+ remoteClose(function(o) {
+ soc.resolve(o);
+ });
+ return soc;
+ });
+
oc.resolve(Right(cast(this,Conduit)));
poll();
});
return oc;
}
-
- public function stop_(?d:Dynamic) {
- var oc = Core.outcome();
- remoteClose(function(o) {
- oc.resolve(o);
- });
- return oc;
- }
public function
authorize(pipeID:String):Future<Either<String,String>> {
@@ -119,7 +116,6 @@ class PushClientImpl implements Conduit {
function
remoteUnSub(chanID:String,cb:Dynamic->Void) {
client("u",{},chanID,function(o) {
- trace("Remove unsub:"+o);
cb(o);
});
}
@@ -26,14 +26,18 @@ class SinkImpl implements Sink {
_conduit = [];
_chans = new Hash();
addConduit(c);
+
+ stop_(function(d) {
+ var soc = Core.outcome();
+ _conduit[0].stop();
+ soc.resolve(Right(""));
+ return soc;
+ });
+
oc.resolve(Right(cast(this,Sink)));
return oc;
}
- public function stop_(?d):Outcome<String,Dynamic> {
- return _conduit[0].stop();
- }
-
public function addConduit(c:Conduit) {
_conduit.push(c);
c.observe(function(f) {
@@ -103,21 +107,18 @@ class SinkImpl implements Sink {
}
public function direct<T>(sessID:String):Chan<T> {
- return chan("/__cs/"+sessID);
+ return chan(Core.CSROOT+"direct/"+sessID);
}
public function
authorize<T>(pipe:Chan<T>):Outcome<String,Chan<T>> {
var oc = Core.outcome();
- trace("auth :"+pipe.pid());
_conduit[0].authorize(pipe.pid())
.deliver(function(conduitAuthorized) {
switch(conduitAuthorized) {
case Right(_):
- trace("got a pipe back");
oc.resolve(Right(pipe));
case Left(msg):
- trace("got a message back");
oc.resolve(Left(msg));
}
});
@@ -10,6 +10,7 @@ class PartBaseImpl<S,B,G,E> implements Part_<S,B,G,E> {
public var _events:Observable<EPartState<E>>;
public var state:EPartState<E>;
public var info:PartInfo;
+ public var sstopper:Dynamic->Outcome<String,Dynamic>;
var parent:Dynamic;
@@ -19,8 +20,13 @@ class PartBaseImpl<S,B,G,E> implements Part_<S,B,G,E> {
partID = Type.getClassName(Type.getClass(parent));
_events = Core.event();
#if debug
- Core.info("Part created:"+Type.getClassName(Type.getClass(parent)));
+ partInfo("created");
#end
+
+ sstopper = function(d) {
+ throw "Default stop function called for :"+partID + ". Add a stop function with stop_(stopFunction)";
+ return null;
+ }
}
public function peer():Dynamic {
@@ -50,16 +56,13 @@ class PartBaseImpl<S,B,G,E> implements Part_<S,B,G,E> {
}
public function start(d:S,?oc:Outcome<B,G>):Outcome<B,G> {
- var p:Outcome<B,G> = null;
- p = parent.start_(d,oc);
+ var p:Outcome<B,G> = parent.start_(d,oc);
checkErr("start",p);
p.outcome(function(outcome) {
-#if debug
- Core.info("Part started:"+Type.getClassName(Type.getClass(parent)));
-#end
+ partInfo("started");
state = Started;
runningParts.push(parent);
@@ -73,24 +76,35 @@ class PartBaseImpl<S,B,G,E> implements Part_<S,B,G,E> {
}
public function stop(d:Dynamic) {
- var p:Outcome<String,Dynamic> = null;
- p = parent.stop_(d);
+ var p = sstopper(d);
checkErr("stop",p);
p.outcome(function(outcome) {
state = Stopped;
+ partInfo("stopped");
_events.notify(Stopped);
},function(msg) {
_events.notify(Error(msg));
- });
-
+ });
+
return p;
}
+
+ public function setStop(cb:Dynamic->Outcome<String,Dynamic>) {
+ partInfo("with user stop_()");
+ sstopper = cb;
+ }
+
+ function partInfo(info:String) {
+#if debug
+ Core.info(partID+" -> "+info,"PART");
+#end
+ }
function checkErr(type,outcome:Outcome<Dynamic,Dynamic>) {
if (outcome == null)
- throw Type.getClassName(Type.getClass(parent)) +" should not return null for "+type +" function";
+ throw partID +" should not return null for "+type +" function";
return outcome;
}
@@ -20,7 +20,6 @@ private typedef Cache = {
class HttpImpl implements HttpServer,implements Part<HostPort,String,HttpServer,HttpEvents> {
public var part_:Part_<HostPort,String,HttpServer,HttpEvents>;
- var _server:NodeHttpServer;
var _cache:Hash<Cache>;
var _getHandler:String->NodeHttpServerReq->NodeHttpServerResp->Int->Void;
var _routes:Array<{re:EReg,handler:THandler}>;
@@ -61,7 +60,7 @@ class HttpImpl implements HttpServer,implements Part<HostPort,String,HttpServer,
if (oc == null)
oc = Core.outcome();
- _server = Node.http.createServer(function(req,resp) {
+ var server = Node.http.createServer(function(req,resp) {
var
url = req.url,
match = false;
@@ -97,22 +96,24 @@ class HttpImpl implements HttpServer,implements Part<HostPort,String,HttpServer,
});
*/
- _server.listen(d.port,d.host,function() {
+ server.listen(d.port,d.host,function() {
+
+ stop_(function(d) {
+ var p = Core.outcome();
+ server.close();
+ //_server.on("close",function() {
+ p.resolve(Right(cast this));
+ // });
+ return p;
+ });
+
+
oc.resolve(Right(cast(this,HttpServer)));
});
return oc;
}
- public function stop_(?d) {
- var p = Core.outcome();
- _server.close();
- //_server.on("close",function() {
- p.resolve(Right(cast this));
- // });
- return p;
- }
-
public function
handler(r:EReg,handler:THandler):HttpServer {
_routes.push({re:r,handler:handler});

0 comments on commit 04e5a55

Please sign in to comment.