Permalink
Browse files

repiping to resume disconnected stats streamns

from  the place where they got disconnected
  • Loading branch information...
1 parent 14bf926 commit b17ef80a27c7773859cd03aa245f50e9ea3b7b2e @soldair soldair committed Feb 14, 2014
Showing with 71 additions and 4 deletions.
  1. +8 −4 lib/api.js
  2. +63 −0 lib/repipe.js
View
12 lib/api.js
@@ -56,11 +56,15 @@ module.exports = function(config,reconnect){
o.tail defaults true with no end
*/
var s = through();
- getConnection(function(err,con){
- con.mdm.createReadStream({type:'stats',})
- });
+ function call(){
+ getConnection(function(err,con){
+ var rstream = con.mdm.createReadStream({type:'stats',args:o});
+ rstream.pipe()
+
+ });
+ }
- return through(); // resume!
+ return s; // resume!
}
};
View
63 lib/repipe.js
@@ -0,0 +1,63 @@
+
+// no need to use disconnect. on error should always call for more work but pass (error first,last)
+
+module.exports = repipe;
+
+function repipe(s,makeSource){
+ // the last value from the source stream
+ function work(last){
+ var source = makeSource(last);
+ resumeable(last,source,s);
+ source.on('disconnect',function(last){
+ work(last);
+ })
+ }
+
+ work();
+}
+
+
+// its kinda like pipe except everythig bubbles.
+function resumeable(last,source,s){
+ source.on('data',function(data){
+ last = data;
+ s.write(data);
+ })
+
+ source.on('error',function(e){
+ console.log('ignore error 1')
+ if(e && e.code == "E_MDM") {
+ source.emit('disconnect',last,e);
+ source.disconnected = true;
+ source.end();
+ } else {
+ s.emit('error',e);
+ }
+ })
+
+ var onPause = function(){
+ source.pause();
+ };
+
+ var onDrain = function(){
+ source.resume();
+ };
+
+ var onEnd = function(){
+ source.end();
+ };
+
+ s.on('pause',onPause);
+ s.on('drain',onDrain);
+ s.on('end',onEnd);
+
+ source.on('end',function(){
+ s.removeListener('pause',onPause);
+ s.removeListener('drain',onDrain);
+ s.removeListener('end',onEnd);
+ if(!source.disconnected) s.end();// done for real.
+ })
+ return s;
+}
+
+

0 comments on commit b17ef80

Please sign in to comment.