Permalink
Browse files

cleanup of Session, in particular client just deals with an outcome

  • Loading branch information...
1 parent 96415f0 commit 273f29038776628cdcdebfacea1c9c542d0499b9 @cloudshift committed Mar 19, 2012
View
@@ -55,16 +55,13 @@ enum ChannelEvent {
ESession(event:ESessionOp);
}
+//typedef ChannelServerPrms = {http:HttpServer,sessMgr:SessionMgr};
+
interface ChannelServer
implements ChannelProvider,
- implements Part<Dynamic,String,ChannelServer,ChannelEvent> {
+ implements Part<SessionMgr,String,ChannelServer,ChannelEvent> {
- function addHttpServer(http:HttpServer):ChannelServer;
- function addHostPort(host:String,port:Int):ChannelServer;
- function addSessionMgr(sessionMgr:SessionMgr):ChannelServer;
function addChannelAuth(cb:String->Chan<Dynamic>->(Either<String,String>->Void)->Void):ChannelServer;
- function addSessionAuth(cb:ESessionOp->Void):ChannelServer;
- function session():SessionMgr;
function channel<T>(chanID:String):Outcome<String,Chan<T>>;
function direct<T>(sessID:String):Outcome<String,Chan<T>>;
}
View
@@ -23,20 +23,23 @@ enum ESessionOp {
#if CS_SERVER
interface SessionMgr implements Part<HttpServer,String,SessionMgr,ESessionOp> {
+ function authorize(cb:ESessionOp->Void):Void->Void;
function exists(sessID:String,cb:Bool->Void):Void;
- function stash(sessID:String,key:String,?val:Dynamic):Option<Dynamic>;
function logout(sessID:String,cb:ESession->Void):Void;
function http():HttpServer;
}
#elseif CS_BROWSER
interface SessionClient implements Part<Dynamic,String,SessionClient,ESession> {
- function login(pkt:Dynamic):Future<ESession>;
- function logout():Future<ESession>;
- function signup(pkt:Dynamic):Future<ESession>;
+ /**
+ ErrMsg on Left
+ SessionID on Right
+ */
+ function login(pkt:Dynamic):Outcome<String,String>;
+ function logout():Outcome<String,String>;
+ function signup(pkt:Dynamic):Outcome<String,String>;
function sessID():String;
- function stash(key:String,?val:Dynamic):Option<Dynamic>;
}
#end
@@ -16,7 +16,6 @@ class TChannelClient implements ChannelClient,
implements Part<String,ChannelClientError,ChannelClient,ESession> {
public var part_:Part_<String,ChannelClientError,ChannelClient,ESession>;
- var _conduit:Conduit;
var _sink:Sink;
var _host:String;
var _port:Int;
@@ -31,14 +30,24 @@ class TChannelClient implements ChannelClient,
if (oc == null)
oc = Core.outcome();
+ trace("init conduit with sessID:"+sessID);
Flow.pushConduit().start({host:"localhost",port:8082,sessID:sessID})
.oflatMap(function(conduit) {
- _conduit = conduit;
- return Flow.sink(sessID).start(_conduit);
+ trace("--> setting sink with "+sessID+" new conduit ="+conduit);
+ return Flow.sink().start(conduit);
})
.outcome(function(sink) {
_sink = sink;
- trace("got sink");
+ trace("SET NEW SINK");
+ stop_(function(d) {
+ var soc = Core.outcome();
+ _sink.stop().outcome(function(el) {
+ _sink = null;
+ trace("STOPPING SINK");
+ soc.resolve(Right(""));
+ });
+ return soc;
+ });
oc.resolve(Right(cast this));
});
@@ -55,7 +64,7 @@ class TChannelClient implements ChannelClient,
return _sink.authorize(_sink.chan(sessID));
}
- public function unsub(chan:Chan<Dynamic>) {
+ public function removeChannel(chan:Chan<Dynamic>) {
_sink.removeChan(chan);
}
}
@@ -8,16 +8,13 @@ import cloudshift.Session;
using cloudshift.Mixin;
import cloudshift.channel.Flow;
-class TChannelServer implements ChannelServer,implements Part<Dynamic,String,ChannelServer,ChannelEvent> {
- public var part_:Part_<Dynamic,String,ChannelServer,ChannelEvent>;
- var _http:HttpServer;
- var _session:SessionMgr;
+class TChannelServer implements ChannelServer,implements Part<SessionMgr,String,ChannelServer,ChannelEvent> {
+ public var part_:Part_<SessionMgr,String,ChannelServer,ChannelEvent>;
var _conduit:Conduit;
var _sink:Sink;
var _host:String;
var _port:Int;
var _channelAuth:String->Chan<Dynamic>->(Either<String,String>->Void)->Void;
- var _sessionAuth:ESessionOp->Void;
public function
new() {
@@ -29,88 +26,33 @@ class TChannelServer implements ChannelServer,implements Part<Dynamic,String,Cha
}
public function
- start_(d:Dynamic,?oc:Outcome<String,ChannelServer>) {
+ start_(sessMgr:SessionMgr,?oc:Outcome<String,ChannelServer>) {
if (oc == null)
oc = Core.outcome();
-
- if (_session == null) {
- _session = Session.manager();
- }
- /* provide an http if the user does not */
- if (_http == null) {
- _http = Http.server();
- if (_host == null)
- _host = "localhost";
- if (_port == null)
- _port = 8082;
-
- _http.start({host:_host,port:_port}).outcome(function(http) {
- gotHttp(oc);
- });
- } else {
- /* use the users, assuming it's started */
- if (_http.state() == Started) {
- gotHttp(oc);
- } else {
- Core.error("you need to start the http server");
- }
- }
- return oc;
- }
-
- public function
- gotHttp(oc:Outcome<String,ChannelServer>) {
- _session.start(_http)
- .oflatMap(function(sess) {
- _session = sess;
- var myoc:Outcome<String,Conduit> = Core.outcome();
- if (_sessionAuth != null) {
- _session.observe(_sessionAuth);
- } else {
- Core.warn("you may need to add a session authorizer");
- }
-
- Flow.pushConduit(sess).start({},myoc);
- return myoc;
- })
+
+ Flow.pushConduit(sessMgr).start({})
.oflatMap(function(push) {
- var myoc:Outcome<String,Sink> = Core.outcome();
_conduit = push;
- Flow.sink(push.session()).start(push,myoc);
- return myoc;
+ return Flow.sink().start(push);
})
.outcome(function(sink) {
_sink = sink;
if (_channelAuth != null) {
_sink.observe(function(dd:SinkEvent) {
switch(dd) {
case Authorize(sessID,chan,reply):
- _channelAuth(sessID,chan,reply);
+ if (_channelAuth == null)
+ reply(Right(""));
+ else
+ _channelAuth(sessID,chan,reply);
case ConnectionClose(sessID):
+ case Outgoing(_,_,_,_):
}
});
}
oc.resolve(Right(cast this));
});
- }
-
- public function
- addHttpServer(http):ChannelServer {
- _http = http;
- return this;
- }
-
- public function
- addHostPort(host:String,port:Int):ChannelServer {
- _host = host;
- _port = port;
- return this;
- }
-
- public function
- addSessionMgr(sessMgr:SessionMgr):ChannelServer {
- _session = sessMgr;
- return this;
+ return oc;
}
public function
@@ -133,14 +75,4 @@ class TChannelServer implements ChannelServer,implements Part<Dynamic,String,Cha
return this;
}
- public function
- addSessionAuth(cb:ESessionOp->Void):ChannelServer {
- _sessionAuth = cb;
- return this;
- }
-
- public function session() {
- return _session;
- }
-
}
@@ -29,60 +29,48 @@ class SessionClientImpl implements Part<Dynamic,String,SessionClient,ESession>,i
return oc;
}
- function doLogin(p:Future<ESession>,e:ESession) {
+ function doLogin(p:Outcome<String,String>,e:ESession) {
switch(e) {
case UserOk(sid):
_sessID = sid;
+ trace("set sessID ok");
+ p.resolve(Right(_sessID));
default:
+ p.resolve(Left(Std.string(e)));
}
notify(e);
- p.resolve(e);
}
public function
- login(pkt:Dynamic):Future<ESession> {
+ login(pkt:Dynamic):Outcome<String,String> {
var p = Core.future();
_proxy.login(pkt,callback(doLogin,p));
return p;
}
public function
- logout():Future<ESession> {
+ signup(pkt):Outcome<String,String> {
+ var p = Core.future();
+ _proxy.signup(pkt,callback(doLogin,p));
+ return p;
+ }
+
+ public function
+ logout():Outcome<String,String> {
var p = Core.future();
_proxy.logout(_sessID,function(e:ESession) {
switch(e) {
case UserRemoved:
- "user removed".info();
+ p.resolve(Right(""));
default:
+ p.resolve(Left("Can't remove user:"+Std.string(e)));
}
_sessID = null;
notify(e);
- p.resolve(e);
});
return p;
}
-
- public function
- signup(pkt):Future<ESession> {
- var p = Core.future();
- _proxy.signup(pkt,callback(doLogin,p));
- return p;
- }
-
+
public function sessID() { return _sessID; }
- public function
- stash(key:String,?val:Dynamic):Option<Dynamic> {
- if (_stash == null) {
- _stash = new Hash();
- }
-
- if (val == null)
- return _stash.getOption(key);
-
- _stash.set(key,val);
-
- return None;
- }
-
}
@@ -13,7 +13,7 @@ import cloudshift.session.SessionMgrProxy;
class SessionMgrImpl extends SessionMgrProxy,
implements Part<HttpServer,String,SessionMgr,ESessionOp>, implements SessionMgr {
- static var sessions = new Hash<Hash<Dynamic>>();
+ static var sessions = new Hash<Int>();
public var part_:Part_<HttpServer,String,SessionMgr,ESessionOp>;
var _http:HttpServer;
@@ -30,17 +30,18 @@ implements Part<HttpServer,String,SessionMgr,ESessionOp>, implements SessionMgr
_http.handler(new EReg(Session.REMOTE,""),remote.httpHandler);
- oc.resolve(Right(cast(this,SessionMgr)));
+ oc.resolve(Right(untyped this));
return oc;
}
public function http() {
return _http;
}
+ // proxy interface ......
+
override public function
login(pkt:Dynamic,cb:ESession->Void):Void {
- trace("trying login:"+ pkt.stringify());
notify(Login(pkt,function(status:ESession) {
respond(status,cb);
}));
@@ -58,14 +59,23 @@ implements Part<HttpServer,String,SessionMgr,ESessionOp>, implements SessionMgr
notify(Logout(sessID,function(status:ESession) {
switch(status) {
case UserRemoved:
- if (sessions.exists(sessID))
+ if (sessions.exists(sessID)) {
+ trace("removing sessID:"+sessID);
sessions.remove(sessID);
+
+ }
default:
}
cb(status);
}));
}
-
+
+ // public interface ...
+
+ public function authorize(cb:ESessionOp->Void):Void->Void {
+ return observe(cb);
+ }
+
public function
exists(sessID:String,cb:Bool->Void):Void {
cb(sessions.exists(sessID));
@@ -75,9 +85,9 @@ implements Part<HttpServer,String,SessionMgr,ESessionOp>, implements SessionMgr
respond(status:ESession,cb:ESession->Void) {
switch(status) {
case UserOk(sessID):
- if (!sessions.exists(sessID)) { // stash may have created it already
- Core.info("creating sess stash for "+sessID);
- sessions.set(sessID,new Hash());
+ if (!sessions.exists(sessID)) {
+ sessions.set(sessID,1);
+ trace("Adding sessionID:"+sessID);
}
case UserExists:
case UserRemoved:
@@ -86,21 +96,4 @@ implements Part<HttpServer,String,SessionMgr,ESessionOp>, implements SessionMgr
cb(status);
}
- public function
- stash(sessID:String,key:String,?val:Dynamic):Option<Dynamic> {
- var s = sessions.get(sessID);
-
- if (s == null) {
- s = new Hash();
- sessions.set(sessID,s);
- }
-
- if (val == null)
- return s.getOption(key);
-
- s.set(key,val);
-
- return None;
- }
-
}

0 comments on commit 273f290

Please sign in to comment.