Permalink
Browse files

moved repipe to its own module

  • Loading branch information...
1 parent 6975cd3 commit be8e6b9249a21e6da73b9978b31a9c40b87e4adc @soldair soldair committed Feb 16, 2014
Showing with 12 additions and 135 deletions.
  1. +8 −3 lib/api.js
  2. +0 −75 lib/repipe.js
  3. +4 −3 package.json
  4. +0 −54 test/streamresume.js
View
@@ -3,7 +3,7 @@
var through = require('through');
//var rest = require('./resthttp');
var connection = require('./connection');
-var repipe = require('./repipe');
+var repipe = require('repipe');
module.exports = function(config,reconnect){
// make sure
@@ -66,7 +66,10 @@ module.exports = function(config,reconnect){
}
};
var s = through();
- repipe(s,function(last,done){
+ repipe(s,function(err,last,done){
+ console.log('repipe got error? ',err,' should i repipe?');
+ if(err && err.code != 'E_MDM') return done(err);
+
getConnection(function(err,con){
if(err) return done(err);
done(false,con.mdm.createReadStream(obj));
@@ -96,7 +99,9 @@ module.exports = function(config,reconnect){
}
};
var s = through();
- repipe(s,function(last){
+ repipe(s,function(err,last,done){
+ if(err && err.code != 'E_MDM') return done(err);
+
if(last) o.start = last.key;
getConnection(function(err,con){
if(err) return done(err);
View
@@ -1,75 +0,0 @@
-
-// todo decouple from our mdm implmentation and publish as own module.
-// no need to use disconnect. on error should always call for more work but pass (error first,last)
-
-module.exports = repipe;
-
-function repipe(s,makeSource){
- // the last value from the source stream
- function work(last){
- makeSource(last,function(err,source){
- if(err) {
- return s.emit('error',err);
- }
- // not returning a source is how you stop the repipe.
- if(!source) {
- var e = new Error('repipe. no source stream');
- e.code = "E_REPIPE";
- return s.emit('error',e);
- }
-
- resumeable(last,source,s);
- source.on('disconnect',function(last){
- work(last);
- });
-
- });
- }
-
- work();
-}
-
-
-// its kinda like pipe except everythig bubbles.
-function resumeable(last,source,s){
- source.on('data',function(data){
- last = data;
- s.write(data);
- })
-
- source.on('error',function(e){
- if(e && e.code == "E_MDM") {
- source.emit('disconnect',last,e);
- source.disconnected = true;
- source.end();
- } else {
- s.emit('error',e);
- }
- })
-
- var onPause = function(){
- source.pause();
- };
-
- var onDrain = function(){
- source.resume();
- };
-
- var onEnd = function(){
- source.end();
- };
-
- s.on('pause',onPause);
- s.on('drain',onDrain);
- s.on('end',onEnd);
-
- source.on('end',function(){
- s.removeListener('pause',onPause);
- s.removeListener('drain',onDrain);
- s.removeListener('end',onEnd);
- if(!source.disconnected) s.end();// done for real.
- })
- return s;
-}
-
-
View
@@ -5,8 +5,8 @@
"main": "index.js",
"scripts": {
"test": "tape test/*.js",
- "build":"browserify browser.js -o pinoccio-api.js",
- "prepublish":"npm run build"
+ "build": "browserify browser.js -o pinoccio-api.js",
+ "prepublish": "npm run build"
},
"bin": {
"pinoccio": "./bin/pinoccio.js"
@@ -45,7 +45,8 @@
"mux-demux": "~3.7.8",
"reconnect": "~2.0.0",
"cookie": "~0.1.0",
- "engine.io-options-from-url": "0.0.3"
+ "engine.io-options-from-url": "0.0.3",
+ "repipe": "~0.0.0"
},
"devDependencies": {
"tape": "~2.3.2",
View
@@ -1,54 +0,0 @@
-
-var through = require('through');
-var repipe = require('../lib/repipe')
-
-var lastread = false;
-var s = through(function(data){
- lastread = data;
- console.log('last item: ',lastread);
- this.queue(data);
-});
-
-var lastout = false;;
-var sout = through(function(data){
- lastout = data;
- console.log('out>',data)
-});
-
-s.pipe(sout);
-
-repipe(s,function(startFromHere){
- return makeNumbersStream(startFromHere)
-});
-
-
-
-function makeNumbersStream(start){
- var source = through();
-
- var i = start||0,t;
- t = setInterval(function(){
- ++i;
- source.write(i);
- console.log('write',i);
-
- if(i === 97){
-
- source.end();// im done.
-
- } else if(i%5 === 0){
- var e = new Error('unexpected disconnection');
- e.code = "E_MDM";
- source.emit('error',e);
- clearInterval(t);
- }
-
- },1);
-
- source.on('end',function(){
- clearInterval(t);
- });
-
- return source;
-}
-

0 comments on commit be8e6b9

Please sign in to comment.