Permalink
Browse files

don't skip the other q's when no callback exists

  • Loading branch information...
cloudshift committed Feb 14, 2012
1 parent e9d7f48 commit da753f15beb42ee8f484028995f065aa87d25afb
Showing with 12 additions and 28 deletions.
  1. +0 −3 cloudshift/flow/InternalApi.hx
  2. +12 −23 cloudshift/flow/MessageQImpl.hx
  3. +0 −2 cloudshift/flow/PushSessionImpl.hx
@@ -4,12 +4,9 @@ package cloudshift.flow;
import cloudshift.Core;
import cloudshift.Flow;
-
interface MessageQ {
function append(pkt:Dynamic):Void;
function setFlusher(cb:MessageQ->Bool):Void;
- function startFlushing(sessID:String):Void;
- function stopFlushing():Void;
function sessID():String;
function deQueue():Array<Dynamic>;
}
@@ -9,6 +9,7 @@ class MessageQImpl implements MessageQ {
static var timer:Int = -1;
static var waitingQs:Array<MessageQ> = [];
+ static var deferredQs:Array<MessageQ> = [];
static var flusher:MessageQ->Bool;
var _mq:Array<Dynamic>;
@@ -25,18 +26,20 @@ class MessageQImpl implements MessageQ {
static function initFlush() {
if (timer == -1) {
timer = js.Node.setInterval(function() {
- try {
- while (waitingQs.length > 0) {
- var mq:MessageQ = waitingQs.shift();
- if (!flusher(mq) ) {
- waitingQs.push(mq);
- break;
- }
+ while (waitingQs.length > 0) {
+ var mq:MessageQ = waitingQs.shift();
+ if (!flusher(mq) ) {
+ deferredQs.push(mq);
}
- } catch(exc:Dynamic) {
- Core.info("Got exc:"+exc);
}
},200,null);
+
+
+ var deferredTimer = js.Node.setInterval(function() {
+ if (deferredQs.length > 1)
+ waitingQs.push(deferredQs.shift());
+ },200,null);
+
}
}
@@ -65,18 +68,4 @@ class MessageQImpl implements MessageQ {
public inline function sessID() { return _sessID; }
- public function
- startFlushing(sessID:String) {
- /*
- if (flusher != null) {
- timer = js.Node.setInterval(function() {
- },250,null);
- }
- */
- }
-
- public function
- stopFlushing() {
- //js.Node.clearInterval(timer);
- }
}
@@ -34,12 +34,10 @@ class PushSessionImpl implements ConduitSession {
public function
flusher(flush:MessageQ->Bool) {
mq.setFlusher(flush);
- mq.startFlushing(sessID);
}
public function
shutDown() {
- mq.stopFlushing();
mq = null;
trace("removed all subs for "+sessID);
subs.values().foreach(function(f) f());

0 comments on commit da753f1

Please sign in to comment.