Skip to content
Browse files

Channel module supercedes and simplifies Flow - flow still used inter…

…nally
  • Loading branch information...
1 parent 8e733d0 commit 5c9b2a1f4de3cb1a5a8231239b6942dacee4146c @cloudshift committed Mar 5, 2012
Showing with 302 additions and 0 deletions.
  1. +73 −0 cloudshift/Channel.hx
  2. +92 −0 cloudshift/channel/TChannelClient.hx
  3. +137 −0 cloudshift/channel/TChannelServer.hx
View
73 cloudshift/Channel.hx
@@ -0,0 +1,73 @@
+
+/**
+ Author: Ritchie Turner, cloudshift.cl 2012
+ Licence: MIT
+**/
+
+package cloudshift;
+
+import cloudshift.Core;
+#if CS_SERVER
+import cloudshift.Http;
+#end
+
+import cloudshift.Session;
+import cloudshift.Flow;
+
+typedef Chan<T> = Pipe<T>;
+
+
+#if CS_SERVER
+
+enum ChannelEvent {
+ EAuthorize(sessID:String,chan:Chan<Dynamic>,reply:Either<String,String>->Void);
+ ESession(event:ESessionOp);
+}
+
+interface ChannelServer implements Part<Dynamic,String,ChannelServer,ChannelEvent> {
+ function addHttpServer(http:HttpServer):ChannelServer;
+ function addHostPort(host:String,port:Int):ChannelServer;
+ function addSessionMgr(sessionMgr:SessionMgr):ChannelServer;
+ function addChannelAuth(cb:String->Chan<Dynamic>->(Either<String,String>->Void)->Void):ChannelServer;
+ function addSessionAuth(cb:ESessionOp->Void):ChannelServer;
+ function channel<T>(chanID:String):Chan<T>;
+}
+
+#end
+
+#if CS_BROWSER
+
+enum ChannelClientEvent {
+
+}
+
+enum ChannelClientError {
+ UserLoggedIn;
+ CantStartSessionClient;
+}
+
+interface ChannelClient implements Part<Dynamic,ChannelClientError,ChannelClient,ESession> {
+ function channel<T>(id:String):Outcome<String,Chan<T>>;
+ function logout():Void;
+ function unsub(chan:Chan<Dynamic>):Void;
+
+}
+
+#end
+
+class Channel {
+
+ #if CS_SERVER
+ public static function
+ server():ChannelServer {
+ return new cloudshift.channel.TChannelServer();
+ }
+ #end
+
+ #if CS_BROWSER
+ public static function
+ client():ChannelClient {
+ return new cloudshift.channel.TChannelClient();
+ }
+ #end
+}
View
92 cloudshift/channel/TChannelClient.hx
@@ -0,0 +1,92 @@
+
+/**
+ hxc: -D CS_BROWSER
+
+*/
+package cloudshift.channel;
+
+import cloudshift.Core;
+import cloudshift.Channel;
+import cloudshift.Session;
+import cloudshift.Flow;
+
+using cloudshift.Mixin;
+
+class TChannelClient implements ChannelClient,
+ implements Part<Dynamic,ChannelClientError,ChannelClient,ESession> {
+
+ public var part_:Part_<Dynamic,ChannelClientError,ChannelClient,ESession>;
+ var _session:SessionClient;
+ var _conduit:Conduit;
+ var _sink:Sink;
+ var _host:String;
+ var _port:Int;
+
+ public function
+ new() {
+ part_ = Core.part(this);
+ }
+
+ public function
+ start_(d:Dynamic,?oc:Outcome<ChannelClientError,ChannelClient>) {
+ if (oc == null)
+ oc = Core.outcome();
+
+ trace("starting client");
+ Session.client()
+ .start({host:"localhost",port:3844})
+ .oflatMap(function(sess) {
+ trace("got sess");
+ _session = sess;
+ var pcOutcome = Core.outcome();
+ _session.login(d).deliver(function(esess) {
+ switch(esess) {
+ case UserOk(sessID):
+ trace("user ok="+sessID);
+ Flow.pushConduit().start({host:"localhost",port:8082,sessID:sessID},pcOutcome);
+ default:
+ pcOutcome.cancel();
+ oc.resolve(Left(UserLoggedIn));
+ }
+ });
+ return pcOutcome;
+ },function(reason) {
+ oc.resolve(Left(CantStartSessionClient));
+ })
+ .oflatMap(function(conduit) {
+ _conduit = conduit;
+ trace("got conduit");
+ return Flow.sink(_session).start(_conduit);
+ })
+ .outcome(function(sink) {
+ _sink = sink;
+ trace("got sink");
+
+ oc.resolve(Right(cast this));
+ });
+
+ return oc;
+ }
+
+ public function
+ stop_(?d:Dynamic) {
+ var oc = Core.outcome();
+ oc.resolve(Right(d));
+ return oc;
+ }
+
+ public function
+ channel<T>(id:String):Outcome<String,Chan<T>> {
+ var oc = Core.outcome();
+ _sink.authorize(_sink.pipe(id)).deliver(cast oc.resolve);
+ return oc;
+ }
+
+ public function logout() {
+ _session.logout();
+ }
+
+ public function unsub(chan:Chan<Dynamic>) {
+ _sink.removePipe(chan);
+ }
+}
View
137 cloudshift/channel/TChannelServer.hx
@@ -0,0 +1,137 @@
+
+package cloudshift.channel;
+
+import cloudshift.Core;
+import cloudshift.Channel;
+import cloudshift.Http;
+import cloudshift.Session;
+import cloudshift.Flow;
+using cloudshift.Mixin;
+
+class TChannelServer implements ChannelServer,implements Part<Dynamic,String,ChannelServer,ChannelEvent> {
+ public var part_:Part_<Dynamic,String,ChannelServer,ChannelEvent>;
+ var _http:HttpServer;
+ var _session:SessionMgr;
+ var _conduit:Conduit;
+ var _sink:Sink;
+ var _host:String;
+ var _port:Int;
+ var _channelAuth:String->Chan<Dynamic>->(Either<String,String>->Void)->Void;
+ var _sessionAuth:ESessionOp->Void;
+
+ public function
+ new() {
+ part_ = Core.part(this);
+ }
+
+ public function
+ start_(d:Dynamic,?oc:Outcome<String,ChannelServer>) {
+ if (oc == null)
+ oc = Core.outcome();
+
+ if (_session == null)
+ _session = Session.manager();
+
+ /* provide an http if the user does not */
+ if (_http == null) {
+ _http = Http.server();
+ if (_host == null)
+ _host = "localhost";
+ if (_port == null)
+ _port = 3844;
+
+ _http.start({host:_host,port:_port}).outcome(function(http) {
+ gotHttp(oc);
+ });
+ } else {
+ /* use the users, assuming it's started */
+ gotHttp(oc);
+ }
+
+ return oc;
+ }
+
+ public function
+ gotHttp(oc:Outcome<String,ChannelServer>) {
+ trace("starting session");
+ _session.start(_http)
+ .oflatMap(function(sess) {
+ _session = sess;
+ var myoc:Outcome<String,Conduit> = Core.outcome();
+ trace("got session");
+ if (_sessionAuth == null) {
+ myoc.cancel();
+ } else {
+ _session.observe(_sessionAuth);
+ Flow.pushConduit(sess).start({},myoc);
+ }
+ return myoc;
+ },function(err) { trace("aha1 "+err); })
+ .oflatMap(function(push) {
+ var myoc:Outcome<String,Sink> = Core.outcome();
+ _conduit = push;
+ trace("got conduit");
+ Flow.sink(push.session()).start(push,myoc);
+ return myoc;
+ },function(err) { trace("aha2 "+err); })
+ .outcome(function(sink) {
+ _sink = sink;
+ if (_channelAuth != null) {
+ _sink.observe(function(dd:SinkEvent) {
+ switch(dd) {
+ case Authorize(sessID,chan,reply):
+ _channelAuth(sessID,chan,reply);
+ case ConnectionClose(sessID):
+ }
+ });
+ }
+ trace("got sink");
+ oc.resolve(Right(cast this));
+ });
+ }
+
+ public function
+ stop_(?d:Dynamic) {
+ var oc = Core.outcome();
+ oc.resolve(Right(d));
+ return oc;
+ }
+
+ public function
+ addHttpServer(http):ChannelServer {
+ _http = http;
+ return this;
+ }
+
+ public function
+ addHostPort(host:String,port:Int):ChannelServer {
+ _host = host;
+ _port = port;
+ return this;
+ }
+
+ public function
+ addSessionMgr(sessMgr:SessionMgr):ChannelServer {
+ _session = sessMgr;
+ return this;
+ }
+
+ public function
+ channel<T>(chanID:String):Chan<T> {
+ return _sink.pipe(chanID);
+ }
+
+ public function
+ addChannelAuth(cb:String->Chan<Dynamic>->(Either<String,String>->Void)->Void):ChannelServer {
+ _channelAuth = cb;
+ return this;
+ }
+
+ public function
+ addSessionAuth(cb:ESessionOp->Void):ChannelServer {
+ _sessionAuth = cb;
+ return this;
+ }
+
+
+}

0 comments on commit 5c9b2a1

Please sign in to comment.
Something went wrong with that request. Please try again.