Permalink
Browse files

add direct (private) messaging per session ID

  • Loading branch information...
1 parent da753f1 commit f25762a33e2a0262c9b4c7c4f6e6b3ea2c6e193e @cloudshift committed Feb 14, 2012
Showing with 12 additions and 6 deletions.
  1. +2 −2 cloudshift/Flow.hx
  2. +6 −4 cloudshift/flow/PushListenerImpl.hx
  3. +4 −0 cloudshift/flow/SinkImpl.hx
View
@@ -44,7 +44,7 @@ interface Conduit implements Part<Dynamic,Conduit,ConduitEvent> {
function pump(sessID:String,pkt:Dynamic,chanID:String,meta:Dynamic):Void;
#if nodejs
function subscriptions(sessID:String):Hash<Void->Void>;
- function session():SessionMgr;
+ function session():SessionMgr;
#end
}
@@ -59,6 +59,7 @@ interface Sink implements Part<Conduit,Sink,SinkEvent> {
function pipeFromId(chanID:String):Option<Pipe<Dynamic>>;
function authorize<T>(pipe:Pipe<T>):Future<Either<String,Pipe<T>>>;
function removePipe<T>(p:Pipe<T>):Void;
+ function direct<T>(sessID:String):Pipe<T>;
}
interface Pipe<T> { //implements Part<Dynamic,Pipe<T>,Pkt<T>> {
@@ -79,7 +80,6 @@ interface Pipe<T> { //implements Part<Dynamic,Pipe<T>,Pkt<T>> {
function peek(cb:EOperation->Void):Void;
}
-
typedef QuickFlow = {
var conduit:Conduit;
var session:SessionMgr;
@@ -41,8 +41,7 @@ class PushListenerImpl implements Conduit {
var sessID = s.sessID;
switch(_callbacks.getOption(sessID)) {
case Some(cbs):
- var goodCbs = cbs.filter(function(cb) return cb.fn != null);
-
+ var goodCbs = cbs.filter(function(cb) return cb.fn != null);
if (goodCbs.length > 0) {
if (now - s.lastConnection > SESSION_EXPIRE) {
removeSession(s);
@@ -68,7 +67,6 @@ class PushListenerImpl implements Conduit {
return prm;
}
-
function removeSession(session) {
if (session != null) {
Core.info("removing session");
@@ -115,7 +113,6 @@ class PushListenerImpl implements Conduit {
req.connection.once('close',function() {
var cbs = _callbacks.get(sessID);
while (cbs != null && cbs.length > 0) {
- Core.info("I'm closing the bastard");
var cb = cbs.shift();
cb.fn([]);
cb.fn = null;
@@ -155,6 +152,11 @@ class PushListenerImpl implements Conduit {
}
public function
+ direct(sessID:String,payload:Dynamic) {
+ _sessions.get(sessID).append(Flow.createPkt(payload,sessID,"/__cs/direct"));
+ }
+
+ public function
subscriptions(sessID:String):Hash<Void->Void> {
var sess = _sessions.get(sessID);
if (sess != null)
@@ -101,6 +101,10 @@ class SinkImpl implements Sink {
pipe._defaultFill(pkt,"",null);
}
+ public function direct<T>(sessID:String):Pipe<T> {
+ return pipe("/__cs/"+sessID);
+ }
+
public function
authorize<T>(pipe:Pipe<T>):Future<Either<String,Pipe<T>>> {
var prm = Core.future();

0 comments on commit f25762a

Please sign in to comment.