Permalink
Browse files

renane Drain to Incoming, _conduit[] goes to _conduit for now

  • Loading branch information...
1 parent 3a8bd4b commit 806e820915a18ffb08df5e980be09178f711c957 @cloudshift committed Mar 19, 2012
@@ -33,7 +33,7 @@ typedef ConduitClientStart = {
}
enum ConduitEvent {
- Drain(pkt:Pkt<Dynamic>,sessID:String,cb:Either<String,String>->Void);
+ Incoming(pkt:Pkt<Dynamic>,sessID:String,cb:Either<String,String>->Void);
ConduitSessionExpire(sessID:String);
ConduitNoConnection(sessID:String);
}
@@ -55,6 +55,7 @@ interface Conduit implements Part<Dynamic,String,Conduit,ConduitEvent> {
enum SinkEvent {
Authorize(sessID:String,chan:Chan<Dynamic>,cb:Either<String,String>->Void);
ConnectionClose(sessID:String);
+ Outgoing(sessID:String,pkt:Dynamic,chan:String,meta:Dynamic);
}
interface Sink implements Part<Conduit,String,Sink,SinkEvent> {
@@ -73,8 +74,8 @@ class Flow {
#if CS_SERVER
public static function
- sink(sessionMgr:SessionMgr):Sink {
- return new cloudshift.channel.ServerSinkImpl(sessionMgr);
+ sink():Sink {
+ return new cloudshift.channel.ServerSinkImpl();
}
public static function
@@ -88,13 +89,12 @@ class Flow {
public static function
pushConduit():Conduit {
- trace("inst PushClientImpl");
return new cloudshift.channel.PushClientImpl();
}
public static function
- sink(sessID:String):Sink {
- return new cloudshift.channel.ClientSinkImpl(sessID);
+ sink():Sink {
+ return new cloudshift.channel.ClientSinkImpl();
}
#end
@@ -9,28 +9,33 @@ using cloudshift.Mixin;
class PushClientImpl implements Conduit {
public var part_:Part_<ConduitClientStart,String,Conduit,ConduitEvent>;
+
+ var _sessID:String;
var _host:String;
var _port:Int;
var _url:String;
- var _sessID:String;
var _parted:Bool;
-
+
public function new() {
part_ = Core.part(this);
}
public function start_(cs:ConduitClientStart,?oc:Outcome<String,Conduit>) {
if (oc == null)
oc = Core.outcome();
-
+
_url = "http://"+js.Lib.window.location.host;
+
+ trace("Setting client sessID tp :"+cs.sessID);
_sessID = cs.sessID;
_parted = false;
remoteInit(function(ignore) {
stop_(function(d) {
var soc = Core.outcome();
remoteClose(function(o) {
+ _sessID = null;
+ _parted = true;
soc.resolve(o);
});
return soc;
@@ -61,7 +66,7 @@ class PushClientImpl implements Conduit {
}
public function
- pump(sessID:String,userData:Dynamic,chanID:String,meta:Dynamic) {
+ pump(dummy:String,userData:Dynamic,chanID:String,meta:Dynamic) {
var
ud = Channel.createPkt(userData,_sessID,chanID,"m",meta),
pl = haxe.Serializer.run(ud),
@@ -98,7 +103,7 @@ class PushClientImpl implements Conduit {
function
handlePoll(pkts:Array<Pkt<Dynamic>>) {
for (p in pkts) {
- notify(Drain(p,_sessID,function(e) { }));
+ notify(Incoming(p,_sessID,function(e) { }));
}
poll();
}
@@ -132,6 +137,8 @@ class PushClientImpl implements Conduit {
function
client(cmd:String,userData:Dynamic,chanID:String,cb:Dynamic->Void) {
+
+ trace("Doing client req with "+_sessID);
var
ud = Channel.createPkt(userData,_sessID,chanID,cmd),
pl = haxe.Serializer.run(ud),
@@ -86,7 +86,8 @@ class PushListenerImpl implements Conduit {
_sessMgr.exists(sessID,function(exists) {
if (!exists) {
- write(res,ERROR,"no session");
+ Core.error("No session for "+sessID,"PUSH");
+ write(res,ERROR,"no session:"+sessID);
return;
}
@@ -128,7 +129,7 @@ class PushListenerImpl implements Conduit {
});
default:
- notify(Drain(pkt,sessID,function(sr:Either<String,String>) {
+ notify(Incoming(pkt,sessID,function(sr:Either<String,String>) {
write(res,OK,sr);
}));
}
@@ -12,42 +12,51 @@ using cloudshift.Channel;
class SinkImpl implements Sink {
public var part_:Part_<Conduit,String,Sink,SinkEvent>;
- var _chans:Hash<Chan<Dynamic>>;
- var _conduit:Array<Conduit>;
+ var _chans:Hash<Chan<Dynamic>>;
+ var _conduit:Conduit;
var _myfill:Dynamic->String->Dynamic->Void;
public function new() {
part_ = Core.part(this);
}
public function start_(c:Conduit,?oc:Outcome<String,Sink>) {
+
if (oc == null)
oc = Core.outcome();
- _conduit = [];
+
_chans = new Hash();
+ _conduit = c;
+
addConduit(c);
stop_(function(d) {
var soc = Core.outcome();
- _conduit[0].stop();
- soc.resolve(Right(""));
+ _conduit.stop().outcome(function(c) {
+ _conduit = null;
+ _chans = null;
+ soc.resolve(Right(""));
+ });
+
return soc;
});
- oc.resolve(Right(cast(this,Sink)));
+ oc.resolve(Right(untyped this));
return oc;
}
public function addConduit(c:Conduit) {
- _conduit.push(c);
- c.observe(function(f) {
+
+ // the sink observes the conduit ...
+
+ var removeConduitOb = c.observe(function(f) {
switch(f) {
- case Drain(pkt,sessID,cb):
+ case Incoming(pkt,sessID,cb):
var
pID = pkt.chanID(),
pip = _chans.get(pID),
op = pkt.operation();
-
+
if (pip == null)
pip = chan(pID);
@@ -63,14 +72,31 @@ class SinkImpl implements Sink {
case ConduitNoConnection(sessID),ConduitSessionExpire(sessID):
notify(ConnectionClose(sessID));
}
-
});
+
+ /*
+ trace("adding conduit sink observer");
+ var removeSinkOb = observe(function(s) {
+ switch(s) {
+ case Outgoing(sessID,pkt,chan,meta):
+ trace("should be pumping, sess is:"+sessID);
+ c.pump(sessID,pkt,chan,meta);
+ default:
+ }
+ });
+
+ _conduitCleaners.push(function() {
+ removeSinkOb();
+ removeConduitOb();
+ });
+ */
}
public function
chan<T>(pID):Chan<T> {
var ch = _chans.get(pID) ;
if (ch == null) {
+ trace("Creating new channel:"+pID);
ch = new ChanImpl<T>(pID);
_chans.set(pID,ch);
if (_myfill != null) {
@@ -102,22 +128,22 @@ class SinkImpl implements Sink {
throw "SinkImp:removeAllSubs, should be overridden";
}
- function reqMsg(pipe:Chan<Dynamic>,pkt:Pkt<Dynamic>) {
- pipe._defaultFill(pkt,"",null);
+ function reqMsg(chan:Chan<Dynamic>,pkt:Pkt<Dynamic>) {
+ chan._defaultFill(pkt,"",null);
}
public function direct<T>(sessID:String):Chan<T> {
return chan(Core.CSROOT+"direct/"+sessID);
}
public function
- authorize<T>(pipe:Chan<T>):Outcome<String,Chan<T>> {
+ authorize<T>(chan:Chan<T>):Outcome<String,Chan<T>> {
var oc = Core.outcome();
- _conduit[0].authorize(pipe.pid())
+ _conduit.authorize(chan.pid())
.deliver(function(conduitAuthorized) {
switch(conduitAuthorized) {
case Right(_):
- oc.resolve(Right(pipe));
+ oc.resolve(Right(chan));
case Left(msg):
oc.resolve(Left(msg));
}

0 comments on commit 806e820

Please sign in to comment.