Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Flow fully deprecated from public api

  • Loading branch information...
commit 9c7aaf0b920e29269dc2d983c1e40deef7803e7a 1 parent d8658d0
@cloudshift authored
View
31 cloudshift/Channel.hx
@@ -12,9 +12,36 @@ import cloudshift.Http;
#end
import cloudshift.Session;
-import cloudshift.Flow;
-typedef Chan<T> = Pipe<T>;
+typedef TMeta = {
+ var ch:String;
+ var op:String;
+ var um:Dynamic; /* user meta data */
+}
+
+typedef Pkt<T> = {
+ var s:String; // session ID
+ var p:T; // payload
+ var m:TMeta; // meta
+}
+
+interface Chan<T> {
+ // internal use only
+ var _fill:Dynamic->String->Dynamic->Void;
+ function _defaultFill<T>(o:Dynamic,chanID:String,meta:Dynamic):Void;
+
+ // public
+ function pub(o:T,?meta:Dynamic):Void;
+ function sub(cb:T->Void,?info:Dynamic):Void->Void;
+ function subPkt(cb:Pkt<T>->Void,?info:Dynamic):Void->Void;
+ function filter(cb:T->Null<T>):Void->Void;
+ function filterPkt(cb:Pkt<T>->Null<Pkt<T>>):Void->Void;
+ function pid():String;
+ function subs():Array<Dynamic>;
+ function removeAllSubs():Void;
+ function route<P>(chan:Chan<P>,?map:T->P):Void->Void;
+ function peek(cb:EOperation->Void):Void;
+}
interface ChannelProvider {
function channel<T>(chanID:String):Outcome<String,Chan<T>>;
View
86 cloudshift/Core.hx
@@ -17,17 +17,17 @@ enum Either<A, B> {
Right(v: B);
}
-enum EOperation {
- Add(info:Option<Dynamic>);
- Del(info:Option<Dynamic>);
-}
-
enum ELogLevel {
I(s:String);
W(s:String);
E(s:String);
}
+enum EOperation {
+ Add(info:Option<Dynamic>);
+ Del(info:Option<Dynamic>);
+}
+
interface Observable<T> {
var preNotify:T->Dynamic;
function notify(o:T):Void;
@@ -38,10 +38,8 @@ interface Observable<T> {
}
interface Future<T> {
- var sequence:Int;
function resolve(t: T): Future<T>;
function deliver(f: T -> Void): Future<T>;
- function deliverMe(f:Future<T>-> Void): Future<T>;
function isCanceled(): Bool;
function ifCanceled(f: Void -> Void): Future<T>;
function allowCancelOnlyIf(f: Void -> Bool): Future<T>;
@@ -66,10 +64,18 @@ enum EPartState<E> {
Except(e:Dynamic);
}
+typedef PartInfo = {
+ var name:String;
+ var ver:String;
+ var auth:String;
+}
+
// (S) start param object, (B) bad return type, (G) good return type, (E) event enum
interface Part_<S,B,G,E> {
var _events:Observable<EPartState<E>>;
var partID(default,null):String;
+ var state:EPartState<E>;
+ var info:PartInfo;
function start(d:S,?oc:Outcome<B,G>):Outcome<B,G>;
function stop(d:Dynamic):Outcome<String,Dynamic>;
function observe(cb:E->Void,?info:Dynamic):Void->Void;
@@ -90,6 +96,22 @@ typedef AnyPart = Part<Dynamic,Dynamic,Dynamic,Dynamic>;
class Core {
public static var CSROOT = "/__cs/";
+
+ public static function
+ init() {
+ logInit();
+ #if nodejs
+ Sys.events().observe(function(e) {
+ switch(e) {
+ case ProcessUncaughtException(exc):
+ trace(exc);
+ trace(haxe.Stack.exceptionStack());
+ case ProcessExit:
+ case SigInt(sig):
+ }
+ });
+ #end
+ }
public static inline function
future<T>():Future<T> {
@@ -102,13 +124,8 @@ class Core {
}
public static function
- part<S,B,G,E>(parent:Dynamic):Part_<S,B,G,E> {
- return new cloudshift.core.PartBaseImpl(parent);
- }
-
- public static function
- waitFor(toJoin:Array<Future<Dynamic>>):Future<Array<Dynamic>> {
- return cloudshift.core.FutureImpl.waitFor(toJoin);
+ part<S,B,G,E>(parent:Dynamic,?info:PartInfo):Part_<S,B,G,E> {
+ return new cloudshift.core.PartBaseImpl(parent,info);
}
public static function cancelledFuture() {
@@ -126,7 +143,7 @@ class Core {
}
inline static public function
- logTo(?fileName:String) {
+ logInit(?fileName:String) {
LogImpl.init(fileName);
}
@@ -191,7 +208,44 @@ class Core {
});
return oc;
}
-
+
+ public static
+ function waitFor(toJoin:Array<Future<Dynamic>>):Future<Array<Dynamic>> {
+ var
+ count = toJoin.length,
+ fut = Core.future();
+
+ toJoin.foreach(function(xprm:Future<Dynamic>) {
+ if(!Std.is(xprm,Future)) {
+ throw "not a future:"+xprm;
+ }
+
+ xprm.deliver(function(r:Dynamic) {
+ count--;
+ if (count == 0) {
+ fut.resolve(toJoin.map(function(el) {
+ return el.value().get();
+ }));
+ }
+ });
+ });
+ return fut;
+ }
+
+ public static function
+ listParts() {
+ cloudshift.core.PartBaseImpl.runningParts.foreach(function(p) {
+ if (p.info() != null) {
+ trace(p.info());
+ }
+ });
+ }
+
+ public static function
+ assert( cond : Bool, ?pos : haxe.PosInfos ) {
+ if( !cond )
+ Core.error("Assert failed in "+pos.className+"::"+pos.methodName,pos);
+ }
}
View
11 cloudshift/Mixin.hx
@@ -640,6 +640,17 @@ class PartX {
part.part_._events.observe(cb);
}
+ public static function
+ state<S,B,G,E>(part:Part<S,B,G,E>):EPartState<E> {
+ return part.part_.state;
+ }
+
+ public static function
+ info<S,B,G,E>(part:Part<S,B,G,E>) {
+ return part.part_.info;
+ }
+
+
}
class OutcomeX {
View
13 cloudshift/flow/ClientSinkImpl.hx → cloudshift/channel/ClientSinkImpl.hx
@@ -1,10 +1,11 @@
-package cloudshift.flow;
+package cloudshift.channel;
import cloudshift.Core;
import cloudshift.Session;
-import cloudshift.Flow;
-import cloudshift.flow.InternalApi;
+import cloudshift.Channel;
+import cloudshift.channel.Flow;
+import cloudshift.channel.InternalApi;
class ClientSinkImpl extends SinkImpl {
@@ -23,15 +24,15 @@ class ClientSinkImpl extends SinkImpl {
}
override function
- removePipe<T>(pipe:Pipe<T>) {
- super.removePipe(pipe);
+ removeChan<T>(pipe:Chan<T>) {
+ super.removeChan(pipe);
reqUnsub(_sessID,pipe,function(cb) {
Core.info("ok unsubbed:"+pipe.pid());
});
}
override function
- reqUnsub(sessId,pipe:Pipe<Dynamic>,cb:Either<String,String>->Void) {
+ reqUnsub(sessId,pipe:Chan<Dynamic>,cb:Either<String,String>->Void) {
_conduit[0].leave(pipe.pid()).deliver(cb);
}
}
View
57 cloudshift/Flow.hx → cloudshift/channel/Flow.hx
@@ -1,26 +1,16 @@
-package cloudshift;
+package cloudshift.channel;
import cloudshift.Core;
using cloudshift.Mixin;
import cloudshift.Session;
+import cloudshift.Channel;
#if nodejs
import cloudshift.Http;
-import cloudshift.flow.InternalApi;
+import cloudshift.channel.InternalApi;
#end
-typedef TMeta = {
- var ch:String;
- var op:String;
- var um:Dynamic; /* user meta data */
-}
-
-typedef Pkt<T> = {
- var s:String; // session ID
- var p:T; // payload
- var m:TMeta; // meta
-}
typedef ConduitClientStart = {
var host:String;
@@ -49,36 +39,19 @@ interface Conduit implements Part<Dynamic,String,Conduit,ConduitEvent> {
}
enum SinkEvent {
- Authorize(sessID:String,pipe:Pipe<Dynamic>,cb:Either<String,String>->Void);
+ Authorize(sessID:String,chan:Chan<Dynamic>,cb:Either<String,String>->Void);
ConnectionClose(sessID:String);
}
interface Sink implements Part<Conduit,String,Sink,SinkEvent> {
- function pipe<T>(chanID:String):Pipe<T>;
+ function chan<T>(chanID:String):Chan<T>;
function addConduit(conduit:Conduit):Void ;
- function pipeFromId(chanID:String):Option<Pipe<Dynamic>>;
- function authorize<T>(pipe:Pipe<T>):Outcome<String,Pipe<T>>;
- function removePipe<T>(p:Pipe<T>):Void;
- function direct<T>(sessID:String):Pipe<T>;
+ function chanFromId(chanID:String):Option<Chan<Dynamic>>;
+ function authorize<T>(chan:Chan<T>):Outcome<String,Chan<T>>;
+ function removeChan<T>(p:Chan<T>):Void;
+ function direct<T>(sessID:String):Chan<T>;
}
-interface Pipe<T> {
- // internal use only
- var _fill:Dynamic->String->Dynamic->Void;
- function _defaultFill<T>(o:Dynamic,chanID:String,meta:Dynamic):Void;
-
- // public
- function fill(o:T,?meta:Dynamic):Void;
- function drain(cb:T->Void,?info:Dynamic):Void->Void;
- function drainPkt(cb:Pkt<T>->Void,?info:Dynamic):Void->Void;
- function filter(cb:T->Null<T>):Void->Void;
- function filterPkt(cb:Pkt<T>->Null<Pkt<T>>):Void->Void;
- function pid():String;
- function drains():Array<Dynamic>;
- function removeAllDrains():Void;
- function divert<P>(chan:Pipe<P>,?map:T->P):Void->Void;
- function peek(cb:EOperation->Void):Void;
-}
/*
typedef QuickFlow = {
@@ -96,12 +69,12 @@ class Flow {
public static function
sink(sessionMgr:SessionMgr):Sink {
- return new cloudshift.flow.ServerSinkImpl(sessionMgr);
+ return new cloudshift.channel.ServerSinkImpl(sessionMgr);
}
public static function
pushConduit(sessionMgr:SessionMgr):Conduit {
- var pl = new cloudshift.flow.PushListenerImpl(sessionMgr);
+ var pl = new cloudshift.channel.PushListenerImpl(sessionMgr);
sessionMgr.http().handler(new EReg(Flow.PUSH,""),pl.postHandler);
return pl;
}
@@ -110,7 +83,7 @@ class Flow {
/*
public static function
quickFlow() {
- return new cloudshift.flow.QuickFlowImpl();
+ return new cloudshift.channel.QuickFlowImpl();
}
*/
@@ -120,18 +93,18 @@ class Flow {
public static function
pushConduit():Conduit {
trace("inst PushClientImpl");
- return new cloudshift.flow.PushClientImpl();
+ return new cloudshift.channel.PushClientImpl();
}
public static function
sink(sessID:String):Sink {
- return new cloudshift.flow.ClientSinkImpl(sessID);
+ return new cloudshift.channel.ClientSinkImpl(sessID);
}
#end
public static function
- pipeID(pkt:Pkt<Dynamic>) {
+ chanID(pkt:Pkt<Dynamic>) {
return pkt.m.ch;
}
View
4 cloudshift/flow/InternalApi.hx → cloudshift/channel/InternalApi.hx
@@ -1,8 +1,8 @@
-package cloudshift.flow;
+package cloudshift.channel;
import cloudshift.Core;
-import cloudshift.Flow;
+import cloudshift.channel.Flow;
interface MessageQ {
function append(pkt:Dynamic):Void;
View
6 cloudshift/flow/MessageQImpl.hx → cloudshift/channel/MessageQImpl.hx
@@ -1,7 +1,7 @@
-package cloudshift.flow;
-import cloudshift.Flow;
-import cloudshift.flow.InternalApi;
+package cloudshift.channel;
+import cloudshift.channel.Flow;
+import cloudshift.channel.InternalApi;
using cloudshift.Mixin;
View
31 cloudshift/flow/PipeImpl.hx → cloudshift/channel/PipeImpl.hx
@@ -1,14 +1,15 @@
-package cloudshift.flow;
+package cloudshift.channel;
import cloudshift.Core;
using cloudshift.Mixin;
-import cloudshift.Flow;
-import cloudshift.flow.InternalApi;
+import cloudshift.Channel;
+import cloudshift.channel.Flow;
+import cloudshift.channel.InternalApi;
-using cloudshift.Flow;
+using cloudshift.channel.Flow;
-class PipeImpl<T> implements Pipe<T> {
+class PipeImpl<T> implements Chan<T> {
var event_:cloudshift.core.ObservableImpl<Pkt<T>>;
@@ -42,14 +43,14 @@ class PipeImpl<T> implements Pipe<T> {
}
public function
- drain(cb:T->Void,?info:Dynamic):Void->Void {
+ sub(cb:T->Void,?info:Dynamic):Void->Void {
return event_.observe(function(pkt:Pkt<T>) {
cb(pkt.p);
},info);
}
public function
- drainPkt(cb:Pkt<T>->Void,?info:Dynamic):Void->Void {
+ subPkt(cb:Pkt<T>->Void,?info:Dynamic):Void->Void {
return event_.observe(cb,info);
}
@@ -59,7 +60,7 @@ class PipeImpl<T> implements Pipe<T> {
}
public function
- removeAllDrains() {
+ removeAllSubs() {
event_.removePeers();
}
@@ -69,7 +70,7 @@ class PipeImpl<T> implements Pipe<T> {
}
public function
- fill(msg:T,?meta:Dynamic):Void {
+ pub(msg:T,?meta:Dynamic):Void {
_fill(msg,_pID,meta);
}
@@ -110,21 +111,21 @@ class PipeImpl<T> implements Pipe<T> {
}
public function
- drains():Array<Dynamic> {
+ subs():Array<Dynamic> {
return event_.peers();
}
public function pid() {return _pID;}
public function
- divert<P>(chan:Pipe<P>,?map:T->P):Void->Void {
+ route<P>(chan:Chan<P>,?map:T->P):Void->Void {
if (map != null) {
- return drain(function(o) {
- chan.fill(map(o));
+ return sub(function(o) {
+ chan.pub(map(o));
});
} else {
- return drain(function(o) {
- chan.fill(cast o);
+ return sub(function(o) {
+ chan.pub(cast o);
});
}
}
View
10 cloudshift/flow/PushClientImpl.hx → cloudshift/channel/PushClientImpl.hx
@@ -1,13 +1,15 @@
-package cloudshift.flow;
+package cloudshift.channel;
import cloudshift.Core;
-import cloudshift.Flow;
+import cloudshift.Channel;
+
+import cloudshift.channel.Flow;
import cloudshift.Session;
-import cloudshift.flow.InternalApi;
+import cloudshift.channel.InternalApi;
import cloudshift.core.ObservableImpl;
using cloudshift.Mixin;
-using cloudshift.Flow;
+using cloudshift.channel.Flow;
class PushClientImpl implements Conduit {
public var part_:Part_<ConduitClientStart,String,Conduit,ConduitEvent>;
View
11 cloudshift/flow/PushListenerImpl.hx → cloudshift/channel/PushListenerImpl.hx
@@ -1,15 +1,18 @@
-package cloudshift.flow;
+package cloudshift.channel;
import cloudshift.Core;
import cloudshift.Session;
-import cloudshift.Flow;
-import cloudshift.flow.InternalApi;
+
+import cloudshift.Channel;
+
+import cloudshift.channel.Flow;
+import cloudshift.channel.InternalApi;
import cloudshift.Http;
import cloudshift.http.HttpImpl;
import cloudshift.core.ObservableImpl;
-using cloudshift.Flow;
+using cloudshift.channel.Flow;
using cloudshift.Mixin;
import js.Node;
View
8 cloudshift/flow/PushSessionImpl.hx → cloudshift/channel/PushSessionImpl.hx
@@ -1,10 +1,10 @@
-package cloudshift.flow;
+package cloudshift.channel;
import cloudshift.Http;
-import cloudshift.Flow;
-import cloudshift.flow.InternalApi;
-import cloudshift.flow.MessageQImpl;
+import cloudshift.channel.Flow;
+import cloudshift.channel.InternalApi;
+import cloudshift.channel.MessageQImpl;
using cloudshift.Mixin;
View
4 cloudshift/flow/QuickFlowImpl.hx → cloudshift/channel/QuickFlowImpl.hx
@@ -1,10 +1,10 @@
-package cloudshift.flow;
+package cloudshift.channel;
import cloudshift.Core;
import cloudshift.Http;
import cloudshift.Session;
-import cloudshift.Flow;
+import cloudshift.channel.Flow;
using cloudshift.Mixin;
View
23 cloudshift/flow/ServerSinkImpl.hx → cloudshift/channel/ServerSinkImpl.hx
@@ -1,11 +1,12 @@
-package cloudshift.flow;
+package cloudshift.channel;
import cloudshift.Core;
import cloudshift.Session;
-import cloudshift.Flow;
+import cloudshift.Channel;
+import cloudshift.channel.Flow;
import cloudshift.Http;
-import cloudshift.flow.InternalApi;
+import cloudshift.channel.InternalApi;
using cloudshift.Mixin;
@@ -20,12 +21,12 @@ class ServerSinkImpl extends SinkImpl {
}
override function
- reqSub(sessID:String,pipe:Pipe<Dynamic>,cb:Either<String,String>->Void) {
- var pID = pipe.pid();
- notify(Authorize(sessID,pipe,function(e:Either<String,String>) {
+ reqSub(sessID:String,chan:Chan<Dynamic>,cb:Either<String,String>->Void) {
+ var pID = chan.pid();
+ notify(Authorize(sessID,chan,function(e:Either<String,String>) {
switch(e) {
case Right(_):
- var unsub = pipe.drain(function(payload:Dynamic) {
+ var unsub = chan.sub(function(payload:Dynamic) {
_conduit[0].pump(sessID,payload,pID,null);
});
@@ -39,10 +40,10 @@ class ServerSinkImpl extends SinkImpl {
}
override function
- reqUnsub(sessID:String,pipe:Pipe<Dynamic>,cb:Either<String,String>->Void) {
- switch(_conduit[0].subscriptions(sessID).getOption(pipe.pid())) {
+ reqUnsub(sessID:String,chan:Chan<Dynamic>,cb:Either<String,String>->Void) {
+ switch(_conduit[0].subscriptions(sessID).getOption(chan.pid())) {
case Some(f):
- trace("remove function for "+pipe.pid());
+ trace("remove function for "+chan.pid());
f();
cb(Right(""));
case None:
@@ -67,7 +68,7 @@ class ServerSinkImpl extends SinkImpl {
world */
function myfill(payload,chanID,meta) {
- _pipes.get(chanID)._defaultFill(Flow.createPkt(payload,"server",chanID,meta),chanID,null);
+ _chans.get(chanID)._defaultFill(Flow.createPkt(payload,"server",chanID,meta),chanID,null);
}
}
View
45 cloudshift/flow/SinkImpl.hx → cloudshift/channel/SinkImpl.hx
@@ -1,18 +1,19 @@
-package cloudshift.flow;
+package cloudshift.channel;
import cloudshift.Core;
-import cloudshift.Flow;
-import cloudshift.flow.InternalApi;
+import cloudshift.Channel;
+import cloudshift.channel.Flow;
+import cloudshift.channel.InternalApi;
import cloudshift.core.ObservableImpl;
using cloudshift.Mixin;
-using cloudshift.Flow;
+using cloudshift.channel.Flow;
class SinkImpl implements Sink {
public var part_:Part_<Conduit,String,Sink,SinkEvent>;
- var _pipes:Hash<Pipe<Dynamic>>;
+ var _chans:Hash<Chan<Dynamic>>;
var _conduit:Array<Conduit>;
var _myfill:Dynamic->String->Dynamic->Void;
@@ -24,7 +25,7 @@ class SinkImpl implements Sink {
if (oc == null)
oc = Core.outcome();
_conduit = [];
- _pipes = new Hash();
+ _chans = new Hash();
addConduit(c);
oc.resolve(Right(cast(this,Sink)));
return oc;
@@ -40,12 +41,12 @@ class SinkImpl implements Sink {
switch(f) {
case Drain(pkt,sessID,cb):
var
- pID = pkt.pipeID(),
- pip = _pipes.get(pID),
+ pID = pkt.chanID(),
+ pip = _chans.get(pID),
op = pkt.operation();
if (pip == null)
- pip = pipe(pID);
+ pip = chan(pID);
switch(op) {
case "s": // subscribe
@@ -64,11 +65,11 @@ class SinkImpl implements Sink {
}
public function
- pipe<T>(pID):Pipe<T> {
- var ch = _pipes.get(pID) ;
+ chan<T>(pID):Chan<T> {
+ var ch = _chans.get(pID) ;
if (ch == null) {
ch = new PipeImpl<T>(pID);
- _pipes.set(pID,ch);
+ _chans.set(pID,ch);
if (_myfill != null) {
ch._fill = _myfill;
}
@@ -77,20 +78,20 @@ class SinkImpl implements Sink {
}
public function
- removePipe<T>(p:Pipe<T>) {
- _pipes.remove(p.pid());
+ removeChan<T>(p:Chan<T>) {
+ _chans.remove(p.pid());
}
public function
- pipeFromId(pID:String):Option<Pipe<Dynamic>> {
- return _pipes.getOption(pID);
+ chanFromId(pID:String):Option<Chan<Dynamic>> {
+ return _chans.getOption(pID);
}
- function reqSub(sessID:String,chan:Pipe<Dynamic>,cb:Either<String,String>->Void) {
+ function reqSub(sessID:String,chan:Chan<Dynamic>,cb:Either<String,String>->Void) {
throw "SinkImp:reqSub, should be overridden";
}
- function reqUnsub(sessID,chan:Pipe<Dynamic>,cb:Either<String,String>->Void) {
+ function reqUnsub(sessID,chan:Chan<Dynamic>,cb:Either<String,String>->Void) {
throw "SinkImp:reqUnsub, should be overridden";
}
@@ -98,16 +99,16 @@ class SinkImpl implements Sink {
throw "SinkImp:removeAllSubs, should be overridden";
}
- function reqMsg(pipe:Pipe<Dynamic>,pkt:Pkt<Dynamic>) {
+ function reqMsg(pipe:Chan<Dynamic>,pkt:Pkt<Dynamic>) {
pipe._defaultFill(pkt,"",null);
}
- public function direct<T>(sessID:String):Pipe<T> {
- return pipe("/__cs/"+sessID);
+ public function direct<T>(sessID:String):Chan<T> {
+ return chan("/__cs/"+sessID);
}
public function
- authorize<T>(pipe:Pipe<T>):Outcome<String,Pipe<T>> {
+ authorize<T>(pipe:Chan<T>):Outcome<String,Chan<T>> {
var oc = Core.outcome();
trace("auth :"+pipe.pid());
_conduit[0].authorize(pipe.pid())
View
8 cloudshift/channel/TChannelClient.hx
@@ -8,7 +8,7 @@ package cloudshift.channel;
import cloudshift.Core;
import cloudshift.Channel;
import cloudshift.Session;
-import cloudshift.Flow;
+import cloudshift.channel.Flow;
using cloudshift.Mixin;
@@ -57,7 +57,7 @@ class TChannelClient implements ChannelClient,
public function
channel<T>(id:String):Outcome<String,Chan<T>> {
var oc = Core.outcome();
- _sink.authorize(_sink.pipe(id)).outcome(function(val) {
+ _sink.authorize(_sink.chan(id)).outcome(function(val) {
oc.resolve(Right(val));
});
return oc;
@@ -65,11 +65,11 @@ class TChannelClient implements ChannelClient,
public function direct<T>(sessID:String):Outcome<String,Chan<T>> {
var oc = Core.outcome();
- _sink.authorize(_sink.pipe(sessID)).deliver(cast oc.resolve);
+ _sink.authorize(_sink.chan(sessID)).deliver(cast oc.resolve);
return oc;
}
public function unsub(chan:Chan<Dynamic>) {
- _sink.removePipe(chan);
+ _sink.removeChan(chan);
}
}
View
44 cloudshift/channel/TChannelServer.hx
@@ -5,7 +5,7 @@ import cloudshift.Core;
import cloudshift.Channel;
import cloudshift.Http;
import cloudshift.Session;
-import cloudshift.Flow;
+import cloudshift.channel.Flow;
using cloudshift.Mixin;
class TChannelServer implements ChannelServer,implements Part<Dynamic,String,ChannelServer,ChannelEvent> {
@@ -21,7 +21,11 @@ class TChannelServer implements ChannelServer,implements Part<Dynamic,String,Cha
public function
new() {
- part_ = Core.part(this);
+ part_ = Core.part(this,{
+ name:"Channel Server",
+ ver:"0.1",
+ auth:"Cloudshift"
+ });
}
public function
@@ -29,53 +33,50 @@ class TChannelServer implements ChannelServer,implements Part<Dynamic,String,Cha
if (oc == null)
oc = Core.outcome();
- if (_session == null)
+ if (_session == null) {
_session = Session.manager();
-
+ Core.info("created session");
+ }
/* provide an http if the user does not */
if (_http == null) {
_http = Http.server();
if (_host == null)
_host = "localhost";
if (_port == null)
- _port = 3844;
-
+ _port = 8082;
+
_http.start({host:_host,port:_port}).outcome(function(http) {
gotHttp(oc);
});
} else {
/* use the users, assuming it's started */
- gotHttp(oc);
+ if (_http.state() == Started) {
+ gotHttp(oc);
+ } else {
+ Core.error("you need to start the http server");
+ }
}
-
return oc;
}
public function
gotHttp(oc:Outcome<String,ChannelServer>) {
- trace("starting session");
_session.start(_http)
.oflatMap(function(sess) {
_session = sess;
var myoc:Outcome<String,Conduit> = Core.outcome();
- trace("got session");
- /*
- if (_sessionAuth == null) {
- myoc.cancel();
- oc.resolve(Left("need session auth"));
- } else {
+ if (_sessionAuth != null) {
_session.observe(_sessionAuth);
-
-
+ } else {
+ Core.warn("you may need to add a session authorizer");
}
- */
- Flow.pushConduit(sess).start({},myoc);
+
+ Flow.pushConduit(sess).start({},myoc);
return myoc;
})
.oflatMap(function(push) {
var myoc:Outcome<String,Sink> = Core.outcome();
_conduit = push;
- trace("got conduit");
Flow.sink(push.session()).start(push,myoc);
return myoc;
})
@@ -90,7 +91,6 @@ class TChannelServer implements ChannelServer,implements Part<Dynamic,String,Cha
}
});
}
- trace("got sink");
oc.resolve(Right(cast this));
});
}
@@ -125,7 +125,7 @@ class TChannelServer implements ChannelServer,implements Part<Dynamic,String,Cha
channel<T>(chanID:String):Outcome<String,Chan<T>> {
var oc = Core.outcome();
// on server getting a pipe is sync
- oc.resolve(Right(_sink.pipe(chanID)));
+ oc.resolve(Right(_sink.chan(chanID)));
return oc;
}
View
66 cloudshift/core/FutureImpl.hx
@@ -7,7 +7,6 @@ import cloudshift.Core;
using cloudshift.Mixin;
class FutureImpl<T> implements Future<T> {
- public var sequence:Int;
var _listeners: Array<T -> Void>;
var _result: T;
var _isSet: Bool;
@@ -132,16 +131,6 @@ class FutureImpl<T> implements Future<T> {
return this;
}
-
- public function deliverMe(f:Future<T>-> Void): Future<T> {
- if (isCanceled()) return this;
- else if (isDelivered()) f(this);
- else _listeners.push(function(g) {
- f(this);
- });
- return this;
- }
-
/** Uses the specified function to transform the result of this future into
* a different value, returning a future of that value.
@@ -233,58 +222,5 @@ class FutureImpl<T> implements Future<T> {
public static function create<T>(): Future<T> {
return new FutureImpl<T>();
- }
-
- /*
- public static function waitFor(toJoin:Array<Future<Dynamic>>):Future<Array<Dynamic>> {
- var
- joinLen = toJoin.length,
- myprm = create(),
- combined:Array<{seq:Int,val:Dynamic}> = [],
- sequence = 0;
-
- toJoin.foreach(function(xprm:Dynamic) {
- if(!Std.is(xprm,Future)) {
- throw "not a promise:"+xprm;
- }
-
- xprm.sequence = sequence++;
- xprm.deliverMe(function(r:Dynamic) {
- combined.push({ seq:r.sequence,val:r._result});
- if (combined.length == joinLen) {
- combined.sort(function(x,y) { return x.seq - y.seq; });
-
- //trace("combined :"+combined.map(function(el) { return el.seq; }).stringify());
- myprm.resolve(combined.map(function(el) { return el.val; }));
- }
- });
- });
-
- return myprm;
- }
- */
-
- public static
- function waitFor(toJoin:Array<Future<Dynamic>>):Future<Array<Dynamic>> {
- var
- count = toJoin.length,
- fut = Core.future();
-
- toJoin.foreach(function(xprm:Future<Dynamic>) {
- if(!Std.is(xprm,Future)) {
- throw "not a future:"+xprm;
- }
-
- xprm.deliver(function(r:Dynamic) {
- count--;
- if (count == 0) {
- fut.resolve(toJoin.map(function(el) {
- return el.value().get();
- }));
- }
- });
- });
- return fut;
- }
-
+ }
}
View
5 cloudshift/core/LogImpl.hx
@@ -28,6 +28,7 @@ class LogImpl {
public static function
init(?fileName:String) {
+ haxe.Log.trace = myTrace;
#if (!macro && nodejs)
if ( fileName != null) {
logFileFD = Node.fs.openSync(fileName,"a+",438);
@@ -82,12 +83,12 @@ class LogImpl {
static public function
info(msg:Dynamic,category="",?inf:haxe.PosInfos) {
- doTrace("info",category,msg,inf);
+ doTrace("info ",category,msg,inf);
}
static public function
warn(msg:Dynamic,category="",?inf:haxe.PosInfos) {
- doTrace("warn",category,msg,inf);
+ doTrace("warn ",category,msg,inf);
}
static public function
View
13 cloudshift/core/PartBaseImpl.hx
@@ -4,12 +4,18 @@ import cloudshift.Core;
using cloudshift.Mixin;
class PartBaseImpl<S,B,G,E> implements Part_<S,B,G,E> {
+ public static var runningParts:Array<AnyPart> = [];
+
public var partID(default,null):String;
public var _events:Observable<EPartState<E>>;
+ public var state:EPartState<E>;
+ public var info:PartInfo;
var parent:Dynamic;
- public function new(parent:Dynamic) {
+
+ public function new(parent:Dynamic,?info:PartInfo) {
this.parent = parent;
+ this.info = info;
partID = Type.getClassName(Type.getClass(parent));
_events = Core.event();
#if debug
@@ -54,6 +60,10 @@ class PartBaseImpl<S,B,G,E> implements Part_<S,B,G,E> {
#if debug
Core.info("Part started:"+Type.getClassName(Type.getClass(parent)));
#end
+ state = Started;
+
+ runningParts.push(parent);
+
_events.notify(Started);
},function(msg) {
return _events.notify(Error(Std.string(msg)));
@@ -69,6 +79,7 @@ class PartBaseImpl<S,B,G,E> implements Part_<S,B,G,E> {
checkErr("stop",p);
p.outcome(function(outcome) {
+ state = Stopped;
_events.notify(Stopped);
},function(msg) {
_events.notify(Error(msg));
View
30 usage/chat/ChatClient.hx
@@ -1,5 +1,6 @@
import cloudshift.Core;
+import cloudshift.Session;
import cloudshift.Channel;
using cloudshift.Mixin;
@@ -20,15 +21,24 @@ class ChatClient {
}
function login(nick:String) {
- Channel.client()
- .start(nick)
- .outcome(function(client) {
- _chanClient = client;
- trace("starting room");
- startRoom(nick,client);
- },function(reason) {
- trace(reason);
- });
+ Session.client().start({}).outcome(function(sess) {
+ sess.login(nick).deliver(function(es) {
+ switch(es) {
+ case UserOk(sessID):
+ trace("got sessID:"+sessID);
+ Channel.client()
+ .start(sessID)
+ .outcome(function(client) {
+ _chanClient = client;
+ trace("starting room");
+ startRoom(nick,client);
+ },function(reason) {
+ trace(reason);
+ });
+ default:
+ }
+ });
+ });
}
public function
@@ -56,7 +66,7 @@ class ChatClient {
function logout() {
- _chanClient.logout();
+ //_chanClient.logout();
_chanClient.unsub(_room);
ChatUi.reset();
}
View
40 usage/chat/ChatServer.hx
@@ -4,7 +4,6 @@
import cloudshift.Core;
import cloudshift.Http;
-import cloudshift.Flow;
import cloudshift.Session;
using cloudshift.Mixin;
import cloudshift.Channel;
@@ -20,6 +19,7 @@ class ChatServer {
}
public function new() {
+ Core.init();
Http.server()
.root("www")
.start({host:"localhost",port:8082})
@@ -42,9 +42,10 @@ class ChatServer {
}
function sessAuth(event:ESessionOp) {
+ trace("authing");
switch(event) {
case Login(pkt,reply):
-
+ trace("logging in with "+pkt);
var lp:LoginPkt = pkt;
if (nicks.exists(lp.nick)) {
trace("user exists");
@@ -74,23 +75,26 @@ class ChatServer {
public function
startRooms(cs:ChannelServer) {
- var room:Chan<MsgTypes> = cs.channel("/chat/room");
- room.filter(function(o) {
- switch(o) {
- case Chat(nick,msg):
- return Chat(nick.toUpperCase(),msg);
- default:
- }
- return o;
- });
+ Core.listParts();
+ cs.channel("/chat/room").outcome(function(room) {
+ trace("added rooms");
+ room.filter(function(o) {
+ switch(o) {
+ case Chat(nick,msg):
+ return Chat(nick.toUpperCase(),msg);
+ default:
+ }
+ return o;
+ });
- room.peek(function(pe) {
- switch(pe) {
- case Add(i):
- room.fill(Chat("bot","someone entered"));
- case Del(i):
- room.fill(Chat("bot","someone left"));
- }
+ room.peek(function(pe) {
+ switch(pe) {
+ case Add(i):
+ room.fill(Chat("bot","someone entered"));
+ case Del(i):
+ room.fill(Chat("bot","someone left"));
+ }
+ });
});
}
Please sign in to comment.
Something went wrong with that request. Please try again.