Permalink
Browse files

Flow is now the internal api

  • Loading branch information...
1 parent 9c7aaf0 commit 2020726ddbd55ccbd50e870d3e5130fa87f96a35 @cloudshift committed Mar 12, 2012
View
@@ -108,4 +108,44 @@ class Channel {
return new cloudshift.channel.TChannelClient();
}
#end
+
+ public static function
+ chanID(pkt:Pkt<Dynamic>) {
+ return pkt.m.ch;
+ }
+
+ public static function
+ operation(pkt:Pkt<Dynamic>) {
+ return pkt.m.op;
+ }
+
+ public static function
+ payload(pkt:Pkt<Dynamic>) {
+ return pkt.p;
+ }
+
+ public static function
+ setPayload(pkt:Pkt<Dynamic>,pl:Dynamic) {
+ pkt.p = pl;
+ }
+
+ public static function
+ sessID(pkt:Pkt<Dynamic>) {
+ return pkt.s;
+ }
+
+ public static function oldmeta(pkt:Pkt<Dynamic>) {
+ return pkt.m;
+ }
+
+ public static function meta(pkt:Pkt<Dynamic>):Dynamic {
+ return pkt.m.um;
+ }
+
+ public static function
+ createPkt<T>(userData:T,sessID:String,chan:String,op="m",meta:Dynamic=null):Pkt<T> {
+ return { p:userData,s:sessID,m:{ch:chan,op:op,um:meta} };
+ }
+
+
}
@@ -5,7 +5,6 @@ import cloudshift.Core;
import cloudshift.Session;
import cloudshift.Channel;
import cloudshift.channel.Flow;
-import cloudshift.channel.InternalApi;
class ClientSinkImpl extends SinkImpl {
View
@@ -8,9 +8,23 @@ import cloudshift.Channel;
#if nodejs
import cloudshift.Http;
-import cloudshift.channel.InternalApi;
#end
+interface MessageQ {
+ function append(pkt:Dynamic):Void;
+ function setFlusher(cb:MessageQ->Bool):Void;
+ function sessID():String;
+ function deQueue():Array<Dynamic>;
+}
+
+interface ConduitSession {
+ function append(pkt:Dynamic):Void;
+ function flusher(flush:MessageQ->Bool):Void;
+ function subscriptions():Hash<Void->Void>;
+ function shutDown():Void;
+ var lastConnection(default,default):Float;
+ var sessID(default,default):String;
+}
typedef ConduitClientStart = {
var host:String;
@@ -52,15 +66,6 @@ enum SinkEvent {
function direct<T>(sessID:String):Chan<T>;
}
-
- /*
-typedef QuickFlow = {
- var conduit:Conduit;
- var session:SessionMgr;
- var sink:Sink;
-}
- */
-
class Flow {
public static var PUSH = Core.CSROOT+"p";
@@ -1,24 +0,0 @@
-
-package cloudshift.channel;
-
-import cloudshift.Core;
-import cloudshift.channel.Flow;
-
-interface MessageQ {
- function append(pkt:Dynamic):Void;
- function setFlusher(cb:MessageQ->Bool):Void;
- function sessID():String;
- function deQueue():Array<Dynamic>;
-}
-
-interface ConduitSession {
- function append(pkt:Dynamic):Void;
- function flusher(flush:MessageQ->Bool):Void;
- function subscriptions():Hash<Void->Void>;
- function shutDown():Void;
- var lastConnection(default,default):Float;
- var sessID(default,default):String;
-}
-
-class InternalApi {}
-
@@ -1,7 +1,6 @@
package cloudshift.channel;
import cloudshift.channel.Flow;
-import cloudshift.channel.InternalApi;
using cloudshift.Mixin;
@@ -5,7 +5,6 @@ import cloudshift.Core;
using cloudshift.Mixin;
import cloudshift.Channel;
import cloudshift.channel.Flow;
-import cloudshift.channel.InternalApi;
using cloudshift.channel.Flow;
@@ -6,7 +6,6 @@ import cloudshift.Channel;
import cloudshift.channel.Flow;
import cloudshift.Session;
-import cloudshift.channel.InternalApi;
import cloudshift.core.ObservableImpl;
using cloudshift.Mixin;
using cloudshift.channel.Flow;
@@ -7,7 +7,6 @@ import cloudshift.Session;
import cloudshift.Channel;
import cloudshift.channel.Flow;
-import cloudshift.channel.InternalApi;
import cloudshift.Http;
import cloudshift.http.HttpImpl;
import cloudshift.core.ObservableImpl;
@@ -3,7 +3,6 @@ package cloudshift.channel;
import cloudshift.Http;
import cloudshift.channel.Flow;
-import cloudshift.channel.InternalApi;
import cloudshift.channel.MessageQImpl;
using cloudshift.Mixin;
@@ -1,63 +0,0 @@
-
-package cloudshift.channel;
-
-import cloudshift.Core;
-import cloudshift.Http;
-import cloudshift.Session;
-import cloudshift.channel.Flow;
-
-using cloudshift.Mixin;
-
-class QuickFlowImpl implements Part<HttpServer,QuickFlow,Dynamic> {
-
- public var part_:Part_<HttpServer,QuickFlow,Dynamic>;
-
- #if nodejs
- public var http:HttpServer;
- #end
- public var session:SessionMgr;
- public var conduit:Conduit;
- public var sink:Sink;
-
- public function new() {
- part_ = Core.part(this);
- }
-
- public function
- start_(http:HttpServer) {
- trace("starting quickflow imple");
- #if nodejs
- var sess = Session.manager();
- #else
- var sess = Session.client();
- #end
-
- var oc = Core.outcome();
- sess.start(http)
- .bad(function(reason) {
- oc.resolve(Left(reason));
- })
- .flatMap(function(sess) {
- session = sess;
- return Flow.pushConduit(sess).start({});
- })
- .flatMap(function(push) {
- conduit = push;
- return Flow.sink(push.session()).start(push);
- })
- .good(function(s) {
- sink = s;
- var forTyper:QuickFlow = this;
- oc.resolve(Right(forTyper));
- });
-
- return oc;
- }
-
- public function
- stop_(?s) {
- var f:Outcome<String,Dynamic> = Core.outcome();
- return f;
- }
-
-}
@@ -6,7 +6,6 @@ import cloudshift.Session;
import cloudshift.Channel;
import cloudshift.channel.Flow;
import cloudshift.Http;
-import cloudshift.channel.InternalApi;
using cloudshift.Mixin;
@@ -4,7 +4,6 @@ package cloudshift.channel;
import cloudshift.Core;
import cloudshift.Channel;
import cloudshift.channel.Flow;
-import cloudshift.channel.InternalApi;
import cloudshift.core.ObservableImpl;
using cloudshift.Mixin;
@@ -5,8 +5,8 @@ import cloudshift.Core;
import cloudshift.Channel;
import cloudshift.Http;
import cloudshift.Session;
-import cloudshift.channel.Flow;
using cloudshift.Mixin;
+import cloudshift.channel.Flow;
class TChannelServer implements ChannelServer,implements Part<Dynamic,String,ChannelServer,ChannelEvent> {
public var part_:Part_<Dynamic,String,ChannelServer,ChannelEvent>;

0 comments on commit 2020726

Please sign in to comment.