Skip to content
Browse files

add error type parameter

  • Loading branch information...
1 parent 14a3359 commit 97c28ffc788ffc1594fc5c8423bdbd667d9219e5 @cloudshift committed Mar 5, 2012
View
25 cloudshift/Flow.hx
@@ -34,15 +34,15 @@ enum ConduitEvent {
ConduitNoConnection(sessID:String);
}
-#if nodejs
-interface Conduit implements Part<Dynamic,Conduit,ConduitEvent> {
-#else
- interface Conduit implements Part<ConduitClientStart,Conduit,ConduitEvent> {
+#if CS_SERVER
+interface Conduit implements Part<Dynamic,String,Conduit,ConduitEvent> {
+#elseif CS_BROWSER
+ interface Conduit implements Part<ConduitClientStart,String,Conduit,ConduitEvent> {
#end
function authorize(pipeID:String):Future<Either<String,String>>;
function leave(pipeID:String):Future<Either<String,String>>;
function pump(sessID:String,pkt:Dynamic,chanID:String,meta:Dynamic):Void;
- #if nodejs
+ #if CS_SERVER
function subscriptions(sessID:String):Hash<Void->Void>;
function session():SessionMgr;
#end
@@ -53,7 +53,7 @@ enum SinkEvent {
ConnectionClose(sessID:String);
}
-interface Sink implements Part<Conduit,Sink,SinkEvent> {
+ interface Sink implements Part<Conduit,String,Sink,SinkEvent> {
function pipe<T>(chanID:String):Pipe<T>;
function addConduit(conduit:Conduit):Void ;
function pipeFromId(chanID:String):Option<Pipe<Dynamic>>;
@@ -62,7 +62,7 @@ interface Sink implements Part<Conduit,Sink,SinkEvent> {
function direct<T>(sessID:String):Pipe<T>;
}
-interface Pipe<T> { //implements Part<Dynamic,Pipe<T>,Pkt<T>> {
+interface Pipe<T> {
// internal use only
var _fill:Dynamic->String->Dynamic->Void;
function _defaultFill<T>(o:Dynamic,chanID:String,meta:Dynamic):Void;
@@ -80,17 +80,19 @@ interface Pipe<T> { //implements Part<Dynamic,Pipe<T>,Pkt<T>> {
function peek(cb:EOperation->Void):Void;
}
+ /*
typedef QuickFlow = {
var conduit:Conduit;
var session:SessionMgr;
var sink:Sink;
}
+ */
class Flow {
public static var PUSH = Core.CSROOT+"p";
- #if nodejs
+ #if CS_SERVER
public static function
sink(sessionMgr:SessionMgr):Sink {
@@ -104,17 +106,20 @@ class Flow {
return pl;
}
-
+
+ /*
public static function
quickFlow() {
return new cloudshift.flow.QuickFlowImpl();
}
+ */
- #else
+ #elseif CS_BROWSER
public static function
pushConduit():Conduit {
+ trace("inst PushClientImpl");
return new cloudshift.flow.PushClientImpl();
}
View
2 cloudshift/Http.hx
@@ -28,7 +28,7 @@ enum HttpEvents {
Close;
}
-interface HttpServer implements Part<HostPort,HttpServer,HttpEvents> {
+interface HttpServer implements Part<HostPort,String,HttpServer,HttpEvents> {
function fields(req:js.Node.NodeHttpServerReq,cb:TFields,?uploadDir:String):Void;
function serve(path:String,req:NodeHttpServerReq,resp:NodeHttpServerResp,?statusCode:Int):Void;
function serveNoCache(path:String,req:NodeHttpServerReq,resp:NodeHttpServerResp,?statusCode:Int):Void;
View
23 cloudshift/Session.hx
@@ -2,7 +2,8 @@
package cloudshift;
import cloudshift.Core;
-#if nodejs
+
+#if CS_SERVER
import cloudshift.Http;
#end
@@ -19,34 +20,38 @@ enum ESessionOp {
Signup(pkt:Dynamic,cb:ESession->Void) ;
}
-interface SessionMgr implements Part<Dynamic,SessionMgr,ESessionOp> {
+#if CS_SERVER
+
+interface SessionMgr implements Part<HttpServer,String,SessionMgr,ESessionOp> {
function exists(sessID:String,cb:Bool->Void):Void;
function stash(sessID:String,key:String,?val:Dynamic):Option<Dynamic>;
function logout(sessID:String,cb:ESession->Void):Void;
- #if nodejs
function http():HttpServer;
- #end
}
-interface SessionClient implements Part<HostPort,SessionClient,ESession> {
+#elseif CS_BROWSER
+
+interface SessionClient implements Part<Dynamic,String,SessionClient,ESession> {
function login(pkt:Dynamic):Future<ESession>;
function logout():Future<ESession>;
function signup(pkt:Dynamic):Future<ESession>;
function sessID():String;
function stash(key:String,?val:Dynamic):Option<Dynamic>;
}
+#end
+
class Session {
public static var REMOTE = Core.CSROOT+"__r";
- #if nodejs
+ #if CS_SERVER
public static function
- manager(http:HttpServer):SessionMgr {
- return new cloudshift.session.SessionMgrImpl(http);
+ manager():SessionMgr {
+ return new cloudshift.session.SessionMgrImpl();
}
- #else
+ #elseif CS_BROWSER
public static function
client() {
return new cloudshift.session.SessionClientImpl();
View
27 cloudshift/core/PartBaseImpl.hx
@@ -3,7 +3,7 @@ package cloudshift.core;
import cloudshift.Core;
using cloudshift.Mixin;
-class PartBaseImpl<S,R,E> implements Part_<S,R,E> {
+class PartBaseImpl<S,B,G,E> implements Part_<S,B,G,E> {
public var partID(default,null):String;
public var _events:Observable<EPartState<E>>;
@@ -43,23 +43,22 @@ class PartBaseImpl<S,R,E> implements Part_<S,R,E> {
_events.observe(cb);
}
- public function start(d:S):Outcome<String,R> {
- var p:Outcome<String,R> = null;
+ public function start(d:S,?oc:Outcome<B,G>):Outcome<B,G> {
+ var p:Outcome<B,G> = null;
- p = parent.start_(d);
+ p = parent.start_(d,oc);
checkErr("start",p);
- p.onError(function(msg) {
- _events.notify(Error(msg));
- });
-
- p.deliver(function(outcome) {
+ p.outcome(function(outcome) {
#if debug
Core.info("Part started:"+Type.getClassName(Type.getClass(parent)));
#end
_events.notify(Started);
+ },function(msg) {
+ return _events.notify(Error(Std.string(msg)));
});
+
return p;
}
@@ -69,18 +68,16 @@ class PartBaseImpl<S,R,E> implements Part_<S,R,E> {
checkErr("stop",p);
- p.onError(function(msg) {
- _events.notify(Error(msg));
- });
-
- p.deliver(function(outcome) {
+ p.outcome(function(outcome) {
_events.notify(Stopped);
+ },function(msg) {
+ _events.notify(Error(msg));
});
return p;
}
- function checkErr(type,outcome:Outcome<String,Dynamic>) {
+ function checkErr(type,outcome:Outcome<Dynamic,Dynamic>) {
if (outcome == null)
throw Type.getClassName(Type.getClass(parent)) +" should not return null for "+type +" function";
return outcome;
View
16 cloudshift/flow/PushClientImpl.hx
@@ -10,7 +10,7 @@ using cloudshift.Mixin;
using cloudshift.Flow;
class PushClientImpl implements Conduit {
- public var part_:Part_<ConduitClientStart,Conduit,ConduitEvent>;
+ public var part_:Part_<ConduitClientStart,String,Conduit,ConduitEvent>;
var _host:String;
var _port:Int;
var _url:String;
@@ -21,16 +21,22 @@ class PushClientImpl implements Conduit {
part_ = Core.part(this);
}
- public function start_(cs:ConduitClientStart) {
- var prm = Core.outcome();
+ public function start_(cs:ConduitClientStart,?oc:Outcome<String,Conduit>) {
+ if (oc == null) {
+ trace("creating new oc");
+ oc = Core.outcome();
+ } else
+ Core.info("using existing outcome");
+
_url = "http://"+js.Lib.window.location.host;
_sessID = cs.sessID;
_parted = false;
remoteInit(function(ignore) {
- prm.resolve(Right(cast(this,Conduit)));
+ trace("got remote init, resolving "+_sessID);
+ oc.resolve(Right(cast(this,Conduit)));
poll();
});
- return prm;
+ return oc;
}
public function stop_(?d:Dynamic) {
View
12 cloudshift/flow/PushListenerImpl.hx
@@ -25,7 +25,7 @@ class PushListenerImpl implements Conduit {
public static inline var ERROR = 500;
public static inline var OK = 200;
- public var part_:Part_<Dynamic,Conduit,ConduitEvent>;
+ public var part_:Part_<Dynamic,String,Conduit,ConduitEvent>;
var _callbacks:Hash<Array<Callback<Dynamic>>>;
var _sessMgr:SessionMgr;
@@ -56,10 +56,12 @@ class PushListenerImpl implements Conduit {
},SESSION_EXPIRE,null);
}
- public function start_(d:Dynamic) {
- var prm = Core.outcome();
- prm.resolve(Right(cast(this,Conduit)));
- return prm;
+ public function start_(d:Dynamic,?oc:Outcome<String,Conduit>) {
+ if (oc == null)
+ oc = Core.outcome();
+
+ oc.resolve(Right(cast this));
+ return oc;
}
public function stop_(?d:Dynamic):Outcome<String,Dynamic> {
View
11 cloudshift/flow/SinkImpl.hx
@@ -10,7 +10,7 @@ using cloudshift.Mixin;
using cloudshift.Flow;
class SinkImpl implements Sink {
- public var part_:Part_<Conduit,Sink,SinkEvent>;
+ public var part_:Part_<Conduit,String,Sink,SinkEvent>;
var _pipes:Hash<Pipe<Dynamic>>;
var _conduit:Array<Conduit>;
@@ -20,13 +20,14 @@ class SinkImpl implements Sink {
part_ = Core.part(this);
}
- public function start_(c:Conduit) {
- var prm = Core.outcome();
+ public function start_(c:Conduit,?oc:Outcome<String,Sink>) {
+ if (oc == null)
+ oc = Core.outcome();
_conduit = [];
_pipes = new Hash();
addConduit(c);
- prm.resolve(Right(cast(this,Sink)));
- return prm;
+ oc.resolve(Right(cast(this,Sink)));
+ return oc;
}
public function stop_(?d):Outcome<String,Dynamic> {
View
14 cloudshift/http/HttpImpl.hx
@@ -17,8 +17,8 @@ private typedef Cache = {
var buf:NodeBuffer;
}
-class HttpImpl implements HttpServer,implements Part<HostPort,HttpServer,HttpEvents> {
- public var part_:Part_<HostPort,HttpServer,HttpEvents>;
+class HttpImpl implements HttpServer,implements Part<HostPort,String,HttpServer,HttpEvents> {
+ public var part_:Part_<HostPort,String,HttpServer,HttpEvents>;
var _server:NodeHttpServer;
var _cache:Hash<Cache>;
@@ -57,8 +57,10 @@ class HttpImpl implements HttpServer,implements Part<HostPort,HttpServer,HttpEve
}
public function
- start_(d:HostPort) {
- var p = Core.outcome();
+ start_(d:HostPort,?oc:Outcome<String,HttpServer>) {
+ if (oc == null)
+ oc = Core.outcome();
+
_server = Node.http.createServer(function(req,resp) {
var
url = req.url,
@@ -96,10 +98,10 @@ class HttpImpl implements HttpServer,implements Part<HostPort,HttpServer,HttpEve
*/
_server.listen(d.port,d.host,function() {
- p.resolve(Right(cast(this,HttpServer)));
+ oc.resolve(Right(cast(this,HttpServer)));
});
- return p;
+ return oc;
}
public function stop_(?d) {
View
15 cloudshift/session/SessionClientImpl.hx
@@ -5,27 +5,28 @@ import cloudshift.Core;
import cloudshift.Session;
using cloudshift.Mixin;
-private class SessionProxy extends haxe.remoting.AsyncProxy<cloudshift.session.SessionMgrImpl> { }
+private class SessionProxy extends haxe.remoting.AsyncProxy<cloudshift.session.SessionMgrProxy> { }
-class SessionClientImpl implements Part<HostPort,SessionClient,ESession>,implements SessionClient {
+class SessionClientImpl implements Part<Dynamic,String,SessionClient,ESession>,implements SessionClient {
var _sessID:String;
var _proxy:SessionProxy;
var _stash:Hash<Dynamic>;
- public var part_:Part_<HostPort,SessionClient,ESession>;
+ public var part_:Part_<Dynamic,String,SessionClient,ESession>;
public function new() {
part_ = Core.part(this);
}
public function
- start_(hp:HostPort) {
- var prm = Core.outcome();
+ start_(d:Dynamic,?oc:Outcome<String,SessionClient>) {
+ if (oc == null)
+ oc = Core.outcome();
var cnx = haxe.remoting.HttpAsyncConnection.urlConnect("http://"+js.Lib.window.location.host+Session.REMOTE);
cnx.setErrorHandler( function(err) trace("Error : "+Std.string(err)) );
_proxy = new SessionProxy(cnx.Auth);
- prm.resolve(Right(cast(this,SessionClient)));
- return prm;
+ oc.resolve(Right(cast this));
+ return oc;
}
public function
View
50 cloudshift/session/SessionMgrImpl.hx
@@ -3,63 +3,63 @@ package cloudshift.session;
import cloudshift.Core;
import cloudshift.Session;
-#if nodejs
import cloudshift.Http;
import cloudshift.Remote;
-#end
using cloudshift.Mixin;
-class SessionMgrImpl implements Part<Dynamic,SessionMgr,ESessionOp>, implements SessionMgr {
+import cloudshift.session.SessionMgrProxy;
- static var sessions = new Hash<Hash<Dynamic>>();
+class SessionMgrImpl extends SessionMgrProxy,
+implements Part<HttpServer,String,SessionMgr,ESessionOp>, implements SessionMgr {
- #if nodejs
- public var part_:Part_<Dynamic,SessionMgr,ESessionOp>;
-
+ static var sessions = new Hash<Hash<Dynamic>>();
+ public var part_:Part_<HttpServer,String,SessionMgr,ESessionOp>;
var _http:HttpServer;
- public function new(http:HttpServer) {
+ public function new() {
part_ = Core.part(this);
+ }
+
+ public function start_(http:HttpServer,?oc:Outcome<String,SessionMgr>) {
+ if (oc == null)
+ oc = Core.outcome();
+
_http = http;
var remote = Remote.provider("Auth",this);
_http.handler(new EReg(Session.REMOTE,""),remote.httpHandler);
+
+ oc.resolve(Right(cast(this,SessionMgr)));
+ return oc;
}
- public function http() {
- return _http;
- }
-
- #end
-
- public function start_(d:Dynamic) {
- trace("in session start");
- var prm = Core.outcome();
- prm.resolve(Right(cast(this,SessionMgr)));
- return prm;
- }
-
+
public function
stop_(?d:Dynamic):Outcome<String,Dynamic> {
return null;
}
-
- public function
+
+ public function http() {
+ return _http;
+ }
+
+ override public function
login(pkt:Dynamic,cb:ESession->Void):Void {
+ trace("trying login:"+ pkt.stringify());
notify(Login(pkt,function(status:ESession) {
respond(status,cb);
}));
}
- public function
+ override public function
signup(pkt:Dynamic,cb:ESession->Void):Void {
notify(Signup(pkt,function(status:ESession) {
respond(status,cb);
}));
}
- public function
+ override public function
logout(sessID:String,cb:ESession->Void):Void {
notify(Logout(sessID,function(status:ESession) {
switch(status) {
View
1 cloudshift/sys/WriteStreamImpl.hx
@@ -5,6 +5,7 @@ import cloudshift.Core;
import cloudshift.Sys;
using cloudshift.Mixin;
+
import js.Node;
class WriteStreamImpl extends cloudshift.core.ObservableImpl<SysWriteStreamEvents> ,implements SysWriteStream {

0 comments on commit 97c28ff

Please sign in to comment.
Something went wrong with that request. Please try again.