Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

sync stream works great

  • Loading branch information...
commit 6975cd393c0d17e802e40da8bc6a4941659f3f81 1 parent 4b662e5
@soldair soldair authored
Showing with 65 additions and 25 deletions.
  1. +2 −2 browser.js
  2. +47 −12 lib/api.js
  3. +16 −11 lib/repipe.js
View
4 browser.js
@@ -75,8 +75,8 @@ module.exports = window.pinoccioAPI = function(opts){
};
// start is optional. you will not get state events for those that have occured before start.
- api.sync = function(account,start){
- return a.sync(account,start);
+ api.sync = function(start){
+ return a.sync(api.token,start);
};
// request some or all stats by account.
View
59 lib/api.js
@@ -20,22 +20,25 @@ module.exports = function(config,reconnect){
var tries = this.restRetries
,z = this;
+ var streamKey;
var timer = setTimeout(function(){
var e = new Error("call timedout");
e.code = "E_TIMEOUT";
_cb(e);
+ if(streamKey) delete expectingStream[k];
},z.restTimeout);
var _cb = function(err,data){
- console.log('rest response> ',err,data,raw)
+ console.log('rest response> ',err,data)
if(err && tries > 0) return call();
if(err) return cb(err);
if(data.stream){
- // im expecting to follow up with a new strem connection for this callback response.
- //
+ // im expecting to follow up with a new stream connection for this callback response.
+ z.expectingStream[data.stream] = _cb;
+ streamKey = data.stream;
} else {
cb(data.error,data.data);
clearTimeout(timer);
@@ -44,26 +47,35 @@ module.exports = function(config,reconnect){
function call(){
--tries;
-
getConnection(function(err,con){
if(err) return _cb(err);
con.rest(obj,_cb);
});
-
};
call();
},
// sync the account's data in realtime
- sync:function(account){
+ sync:function(token,start){
+ var obj = {
+ type:"rest"
+ ,args:{
+ url: '/v1/sync',
+ data: { token: token },
+ method: 'get'
+ }
+ };
var s = through();
- repipe(s,function(last){
- return con.mdm.createReadStream({type:'sync',args:{account:account,start:last}});
+ repipe(s,function(last,done){
+ getConnection(function(err,con){
+ if(err) return done(err);
+ done(false,con.mdm.createReadStream(obj));
+ });
});
return s;
},
// stream stats data
- stats:function(o){
+ stats:function(token,o){
/*
o.troop
o.scout
@@ -72,10 +84,24 @@ module.exports = function(config,reconnect){
o.end = then
o.tail defaults true with no end
*/
+
+ o.token = token;
+
+ var obj = {
+ type:"rest"
+ ,args:{
+ url: '/v1/stats',
+ data: o,
+ method: 'get'
+ }
+ };
var s = through();
repipe(s,function(last){
- if(last) o.start = last.key;
- return con.mdm.createReadStream({type:'stats',args:o});
+ if(last) o.start = last.key;
+ getConnection(function(err,con){
+ if(err) return done(err);
+ done(false,con.mdm.createReadStream(obj));
+ });
});
return s; // resume!
@@ -93,13 +119,22 @@ module.exports = function(config,reconnect){
o.connection.mdm.on('connection',function(stream){
// the server has opened a stream from me.
if(stream.meta && stream.meta.type == "rest-stream") {
-
+ if(o.expectingStream[stream.meta.id]) {
+ o.expectingStream[stream.meta.id](false,stream);
+ delete o.expectingStream[stream.meta.id];
+ }
};
});
while(pending.length) pending.shift()(false,o.connection);
}).on('disconnect',function(){
o.connection = false;
+ Object.keys(o.expectingStream).forEach(function(k){
+ var e = new Error('disconnect before start');
+ e.code = "E_DISCONNECT";
+ o.expectingStream[k](e);
+ delete o.expectingStream[k];
+ });
})
}
View
27 lib/repipe.js
@@ -7,17 +7,22 @@ module.exports = repipe;
function repipe(s,makeSource){
// the last value from the source stream
function work(last){
- var source = makeSource(last);
- // not returning a source is how you stop the repipe.
- if(!source) {
- var e = new Error('repipe. no source stream');
- e.code = "E_REPIPE";
- return s.emit('error',e);
- }
-
- resumeable(last,source,s);
- source.on('disconnect',function(last){
- work(last);
+ makeSource(last,function(err,source){
+ if(err) {
+ return s.emit('error',err);
+ }
+ // not returning a source is how you stop the repipe.
+ if(!source) {
+ var e = new Error('repipe. no source stream');
+ e.code = "E_REPIPE";
+ return s.emit('error',e);
+ }
+
+ resumeable(last,source,s);
+ source.on('disconnect',function(last){
+ work(last);
+ });
+
});
}
Please sign in to comment.
Something went wrong with that request. Please try again.