Skip to content
This repository

Peas in a #862

Merged
merged 84 commits into from about 1 year ago

4 participants

Jeremie Miller Kristján Pétursson Thomas Muldowney Justin Parker
Jeremie Miller

just prepping this for merge, place to look/review at changes

temas and others added some commits
Thomas Muldowney temas Turn on some debug df16d01
Thomas Muldowney temas Raw log the synclet start/stop 7e27350
Thomas Muldowney temas Log more around pipeline inject 75c7e29
Thomas Muldowney temas Make the dMap not grow stack and tick friendly fc1b1e9
Kristján Pétursson kristjan Make friendsPump.vpump asynchronous
This is blocking up the whole pipeline.
cc53ace
Kristján Pétursson kristjan Fix missing async callback f26eb84
Kristján Pétursson kristjan No seriously, fix async callback d82c048
Thomas Muldowney temas Show the inject length for locals 6b6c904
Thomas Muldowney temas Make sure we have a valid base to length 01170d5
Thomas Muldowney temas Log the exit from an inject d6c6109
Thomas Muldowney temas Basic pump loging for debugging bf2f91c
Thomas Muldowney temas Fix the base in the logging for the pump 0788de8
Thomas Muldowney temas Actually give us a string f6a1e81
Thomas Muldowney temas Fix the logic here bb35e5c
Thomas Muldowney temas Trying to find the stuck bits d946a0a
Thomas Muldowney temas Fix the pump id in the logging bf972e9
Thomas Muldowney temas Short circuit some inject cases 4c1939f
Thomas Muldowney temas Stray code ef27e16
Kristján Pétursson kristjan Simplify firstRun check
This construction is silly and angers lint.
8eb03c0
Kristján Pétursson kristjan Fix missing cbEach in dMap.pump
Didn't catch all our paths. This was causing a hang during the pipeline.
a635f7f
Kristján Pétursson kristjan Fix numbering in pipeline logging
1-based counting was confusing and we were talking about the wrong pumps.. Now
the `injector` is 0.
af136a4
Thomas Muldowney temas isArray... my friend 2df3790
Thomas Muldowney temas Late prep once we have routes and some debug 74b5093
Thomas Muldowney temas More specific ijod timings 1fe8ded
Kristján Pétursson kristjan Redo Foursquare checkins synclet
Another recursive monstrosity destroyed.

The strategy here is to start at `now` on the first run and use the earliest
known checkin time with the API's `beforeTimestamp` to page backwards. Meantime,
we keep track on the side of the latest checkin we've seen. When we hit either
the end of history or a checkin we've seen before, we clear the cursor, set the
"we've seen this" time to the latest known checkin, and start over.

Since it looks more and more like large chunks of data hose the pipeline, I
modulated page size based on whether it's the first sync or not. At first sync,
we want to move as fast as we can, so we max out Foursquare's API at 250. Later
on, it's unlikely you've checked in 100 places since we last synced, so we can
drop to the smaller page size and save ourselves effort.
95b41fc
Thomas Muldowney temas Log addData timing more specifically 574de21
Thomas Muldowney temas Synchronize addData abd97c0
Thomas Muldowney temas We don't have an err here 0de062f
Thomas Muldowney temas Missed a callback removal 4fb2400
Thomas Muldowney temas Bit more sync cea681c
Thomas Muldowney temas Fix the instrument for s3 put timing 25ac7f0
Thomas Muldowney temas No ijod events right now c905750
Thomas Muldowney temas Move most of the console.log to logger.debug ee20ecb
Justin Parker jparkrr make the facebook personal feed page back in time
This is done by saving the oldest item on the page and passing it back
as 'until' on the next iteration. 'Since' is always passed in so we
page back until the last time we synced, so existing profiles will
need their configs reset.
2347ba4
Justin Parker jparkrr clean up a few things and ensure we can page forward 3d013cf
Justin Parker

working on writing some new tests. The old ones were specific to the old method of syncing.

kristjan and others added some commits
Kristján Pétursson kristjan Merge branch 'master' into peas-in-a
* master:
  More package.json updates
  Updates for package.json
  add optional tumblr_params for posting
  No more logger.verbose
  Add localLevel, logger.vital
  Improvements to logger
  prefix all logger.warn calls with WARNING.
  Don't crash testSynclet when missing a config
  Be super cautious with the pipeline

Conflicts:
	lib/podClient.js
799839f
Kristján Pétursson kristjan Update Foursquare checkins test
New URL's getting hit
520db6b
Thomas Muldowney temas Bail early when things fail 5d64b8f
Thomas Muldowney temas Disable pipeline to test just the synclet run speeds 3672935
Justin Parker jparkrr only page back if we got a new entry a4d6fea
Justin Parker jparkrr update tests to not use old next variable 6d3a305
Justin Parker jparkrr remove extra comments 4e47d3f
Justin Parker jparkrr small changes for clarity e47ca6a
Justin Parker jparkrr make sure we always save since c649812
Kristján Pétursson kristjan Page through Facebook Page data
This one is rare for users, but if they have a lot of pages, it was recursing.
The `getAdministeredPages` function felt like an extraneous layer, but as I
remove the other uses of `getPages`, I suspect there will be a useful pattern to
consolidate.
f89c67e
Kristján Pétursson kristjan Initialize sandboxed configs in testSynclet
This matches what `taskman` does, which is to initialize empty objects in the
case that any config data is absent. This avoids us having to check or set
`pi.config` at the beginning of every synclet.
cfb39b2
Kristján Pétursson kristjan Make Facebook photos page
I didn't make albums page on the assumption that very few people will have
*that* many of them. We also haven't paged any FQL yet.

I wanted to just update the album's `since` time as we gathered photos, but it
interacts poorly with `limit`. Basically, `limit` is applied first, so on the
second call, we get nothing back.

Instead, I parse the `after` out of Facebook's returned paging URL and use that
on the next call. I wanted to use the whole URL they give back, but it doesn't
include an `access_token` and this way fit better with existing code like
`apiUrl`.
680dfcd
Kristján Pétursson kristjan Update Facebook synclet tests
Just mashing around URLs for Fakeweb.

I'm checking `album.since` with `_.isNumber` as opposed to just `if
(album.since)` because set it to `0` on the first run and otherwise it wouldn't
be included.
cc44fd6
Kristján Pétursson kristjan Merge branch 'optimize-facebook' into peas-in-a
* optimize-facebook:
  Update Facebook synclet tests
  Make Facebook photos page
  Initialize sandboxed configs in testSynclet
  Page through Facebook Page data
be16513
Kristján Pétursson kristjan Separate pods' external and listening ports
In production, we want the pods' API hosts up on some port (say, 8070), but the
load balancer litening for HTTPS on 443, so this lets us split those up.
`listenPort` is the one the server will spin up on, and `port` is the one the
client uses.
ffd758d
Kristján Pétursson kristjan Revert "Disable pipeline to test just the synclet run speeds"
This reverts commit 3672935.
fb185c6
Kristján Pétursson kristjan Up number of workers used by profile sync script
We've had good results this high. Probably something we want configurable later.
7868ea0
Kristján Pétursson kristjan Make sync-profiles load everything in the DB
Since we're testing pods more holistically now, it's easier to do this than
build a file to load. We may want to accept a file later to do specific sets,
and all manner of other options, but for now it's simple.
1f16803
Justin Parker jparkrr Merge branch 'facebook-feed-paging' into peas-in-a bca8e23
Kristján Pétursson kristjan Add backlog reporting script
This asks `taskman` what the current backlog looks like and reports it to
statsd. If you want to run it a while, put it in a tmux session somewhere.
721c5c9
Kristján Pétursson kristjan Make sync-profiles script take a pod number
Now that there are more than one, we need to be able to specify where to kick
the sync.
25b5d2f
Kristján Pétursson kristjan Catch errors in Facebook photos synclet
When I rewrote this, forgot to watch for incoming errors. I hope it's a real
error causing `photos` to be null, but just in case I've checked that explicitly
too.
74bfac2
Kristján Pétursson kristjan Fix testSynclet when there's no return data
It was assuming the presence of `data` in the synclet's return value, but that's
not always there.

The script now also prints out the details of any exceptions that occur, because
that's a little useful.
51b68d2
Kristján Pétursson kristjan Fix infinite requeue of shutterfly photos
Now if the albums synclet never succeeds (maybe thanks to bad auth), we won't
immediately reenqueue the photos synclet forever. After three tries (30 seconds
in a clear queue), it'll treat the lack of albums as an error and back off
appropriately.
f2fe6eb
Kristján Pétursson kristjan Make testSynclet handle no data returned
Thought we would have caught this earlier, but sometimes when there's no data at
all returned from the synclet, this would bail.
08fee9c
Kristján Pétursson kristjan Print the final config after testSynclet runs
A lot of the time, this last state has a useful and important difference from
the paging states, so this output is nice.
ddbf1cd
Kristján Pétursson kristjan Print total entries fetched by testSynclet
Just handy to have at the end there for basic consistency checking.
412bc9e
Kristján Pétursson kristjan Don't crash testSynclet when nothing's returned
Synclets don't really have to return anything at all, but `testSynclet` would
bail looking for `data.config`. Now it's forgiving.
9132240
Kristján Pétursson kristjan Merge branch 'improve-testSynclet' into peas-in-a
* improve-testSynclet:
  Don't crash testSynclet when nothing's returned
  Print total entries fetched by testSynclet
  Print the final config after testSynclet runs
64bc048
Kristján Pétursson kristjan Split FB likes into separate synclets
Putting them together doesn't really help anything, but it does conflate their
errors (which I don't think were being properly reported before). Splitting them
up isolates them for easier management and debugging.
ef6345b
Kristján Pétursson kristjan Remove recursion in Facebook url_likes
Now the synclet yields while paging. This requires a little more state
management as we try to notice URLs we've already seen, but it's not bad.
d74c2f3
Kristján Pétursson kristjan Remove recursion from Facebook stream_likes
Facebook returns arbitrarily different batches of Likes depending on page size,
so to lose as few as possible, we're using a larger page size here. If the
amount of data coming back in the Posts becomes too much, we can cut them out
https://developers.facebook.com/docs/reference/api/field_expansion/.
c8367dd
Kristján Pétursson kristjan Remove recursion from Facebook page_likes
This will be the only likes synclet that pages forward in time unless we decide
it's important to reverse it. The reason being it's the only source of likes
that lets us page based on timestamp, and it's easier, more readable, and less
likely to have bugs if we just use that to run one direction.
591fccf
Kristján Pétursson kristjan Merge branch 'better-like-it' into peas-in-a
* better-like-it:
  Remove recursion from Facebook page_likes
  Remove recursion from Facebook stream_likes
  Remove recursion in Facebook url_likes
  Split FB likes into separate synclets
9f63fa5
Jeremie Miller quartzjer fix merge conflict 3e49c46
lib/dMap.js
@@ -187,17 +187,19 @@ exports.types = function (type, profiles) {
187 187
188 188 // run a specific service pumps, usually last
189 189 exports.pump = function (cset, callback) {
190   - cset.forEach(function (entry) {
  190 + async.forEachSeries(cset, function (entry, cbStep) {
3
Kristján Pétursson
kristjan added a note

@temas Why move to async for this? Seems we're calling N + 1 nextTicks when we could call it once on callback after a syncronous forEach, or toss out this function's callback completely and just return something. Or if we want that all, why not run in parallel?

Kristján Pétursson
kristjan added a note

Ah, this was part of the 'make pumps async' dive.

Thomas Muldowney
temas added a note

Correct on the second note.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Kristján Pétursson kristjan commented on the diff
lib/ijod.js
@@ -544,10 +556,10 @@ IJOD.prototype.addData = function(arg, callback) {
544 556 ]});
545 557
546 558 // if there's types, insert each of them too for filtering
547   - if (!arg.data || !arg.types) return callback();
  559 + if (!arg.data || !arg.types) return;
548 560
549 561 // TODO: This doesn't call any async code and can be converted
1
Kristján Pétursson
kristjan added a note

Comment's no longer relevant

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Kristján Pétursson

This is quite a hodgepodge, but I don't see anything that shouldn't go in. :+1:

Thomas Muldowney
temas commented

I feel like this needs master merged in before it can be really analyzed. Also, it's probably worth making sure the synclet changes are on master before that merge happens. I wouldn't say go forward until this is green to merge again.

Jeremie Miller quartzjer merged commit 94cbd52 into from
Jeremie Miller quartzjer deleted the branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 84 unique commits by 4 authors.

Mar 18, 2013
Thomas Muldowney temas Turn on some debug df16d01
Thomas Muldowney temas Raw log the synclet start/stop 7e27350
Thomas Muldowney temas Log more around pipeline inject 75c7e29
Thomas Muldowney temas Make the dMap not grow stack and tick friendly fc1b1e9
Kristján Pétursson kristjan Make friendsPump.vpump asynchronous
This is blocking up the whole pipeline.
cc53ace
Kristján Pétursson kristjan Fix missing async callback f26eb84
Kristján Pétursson kristjan No seriously, fix async callback d82c048
Thomas Muldowney temas Show the inject length for locals 6b6c904
Thomas Muldowney temas Make sure we have a valid base to length 01170d5
Thomas Muldowney temas Log the exit from an inject d6c6109
Thomas Muldowney temas Basic pump loging for debugging bf2f91c
Thomas Muldowney temas Fix the base in the logging for the pump 0788de8
Thomas Muldowney temas Actually give us a string f6a1e81
Thomas Muldowney temas Fix the logic here bb35e5c
Thomas Muldowney temas Trying to find the stuck bits d946a0a
Thomas Muldowney temas Fix the pump id in the logging bf972e9
Thomas Muldowney temas Short circuit some inject cases 4c1939f
Thomas Muldowney temas Stray code ef27e16
Kristján Pétursson kristjan Simplify firstRun check
This construction is silly and angers lint.
8eb03c0
Kristján Pétursson kristjan Fix missing cbEach in dMap.pump
Didn't catch all our paths. This was causing a hang during the pipeline.
a635f7f
Kristján Pétursson kristjan Fix numbering in pipeline logging
1-based counting was confusing and we were talking about the wrong pumps.. Now
the `injector` is 0.
af136a4
Thomas Muldowney temas isArray... my friend 2df3790
Mar 19, 2013
Thomas Muldowney temas Late prep once we have routes and some debug 74b5093
Thomas Muldowney temas More specific ijod timings 1fe8ded
Kristján Pétursson kristjan Redo Foursquare checkins synclet
Another recursive monstrosity destroyed.

The strategy here is to start at `now` on the first run and use the earliest
known checkin time with the API's `beforeTimestamp` to page backwards. Meantime,
we keep track on the side of the latest checkin we've seen. When we hit either
the end of history or a checkin we've seen before, we clear the cursor, set the
"we've seen this" time to the latest known checkin, and start over.

Since it looks more and more like large chunks of data hose the pipeline, I
modulated page size based on whether it's the first sync or not. At first sync,
we want to move as fast as we can, so we max out Foursquare's API at 250. Later
on, it's unlikely you've checked in 100 places since we last synced, so we can
drop to the smaller page size and save ourselves effort.
95b41fc
Thomas Muldowney temas Log addData timing more specifically 574de21
Thomas Muldowney temas Synchronize addData abd97c0
Thomas Muldowney temas We don't have an err here 0de062f
Thomas Muldowney temas Missed a callback removal 4fb2400
Thomas Muldowney temas Bit more sync cea681c
Thomas Muldowney temas Fix the instrument for s3 put timing 25ac7f0
Thomas Muldowney temas No ijod events right now c905750
Thomas Muldowney temas Move most of the console.log to logger.debug ee20ecb
Justin Parker jparkrr make the facebook personal feed page back in time
This is done by saving the oldest item on the page and passing it back
as 'until' on the next iteration. 'Since' is always passed in so we
page back until the last time we synced, so existing profiles will
need their configs reset.
2347ba4
Mar 20, 2013
Justin Parker jparkrr clean up a few things and ensure we can page forward 3d013cf
Kristján Pétursson kristjan Merge branch 'master' into peas-in-a
* master:
  More package.json updates
  Updates for package.json
  add optional tumblr_params for posting
  No more logger.verbose
  Add localLevel, logger.vital
  Improvements to logger
  prefix all logger.warn calls with WARNING.
  Don't crash testSynclet when missing a config
  Be super cautious with the pipeline

Conflicts:
	lib/podClient.js
799839f
Kristján Pétursson kristjan Update Foursquare checkins test
New URL's getting hit
520db6b
Mar 21, 2013
Thomas Muldowney temas Bail early when things fail 5d64b8f
Thomas Muldowney temas Disable pipeline to test just the synclet run speeds 3672935
Mar 22, 2013
Justin Parker jparkrr only page back if we got a new entry a4d6fea
Justin Parker jparkrr update tests to not use old next variable 6d3a305
Justin Parker jparkrr remove extra comments 4e47d3f
Justin Parker jparkrr small changes for clarity e47ca6a
Mar 25, 2013
Justin Parker jparkrr make sure we always save since c649812
Mar 26, 2013
Kristján Pétursson kristjan Page through Facebook Page data
This one is rare for users, but if they have a lot of pages, it was recursing.
The `getAdministeredPages` function felt like an extraneous layer, but as I
remove the other uses of `getPages`, I suspect there will be a useful pattern to
consolidate.
f89c67e
Kristján Pétursson kristjan Initialize sandboxed configs in testSynclet
This matches what `taskman` does, which is to initialize empty objects in the
case that any config data is absent. This avoids us having to check or set
`pi.config` at the beginning of every synclet.
cfb39b2
Kristján Pétursson kristjan Make Facebook photos page
I didn't make albums page on the assumption that very few people will have
*that* many of them. We also haven't paged any FQL yet.

I wanted to just update the album's `since` time as we gathered photos, but it
interacts poorly with `limit`. Basically, `limit` is applied first, so on the
second call, we get nothing back.

Instead, I parse the `after` out of Facebook's returned paging URL and use that
on the next call. I wanted to use the whole URL they give back, but it doesn't
include an `access_token` and this way fit better with existing code like
`apiUrl`.
680dfcd
Kristján Pétursson kristjan Update Facebook synclet tests
Just mashing around URLs for Fakeweb.

I'm checking `album.since` with `_.isNumber` as opposed to just `if
(album.since)` because set it to `0` on the first run and otherwise it wouldn't
be included.
cc44fd6
Kristján Pétursson kristjan Merge branch 'optimize-facebook' into peas-in-a
* optimize-facebook:
  Update Facebook synclet tests
  Make Facebook photos page
  Initialize sandboxed configs in testSynclet
  Page through Facebook Page data
be16513
Mar 27, 2013
Kristján Pétursson kristjan Separate pods' external and listening ports
In production, we want the pods' API hosts up on some port (say, 8070), but the
load balancer litening for HTTPS on 443, so this lets us split those up.
`listenPort` is the one the server will spin up on, and `port` is the one the
client uses.
ffd758d
Kristján Pétursson kristjan Revert "Disable pipeline to test just the synclet run speeds"
This reverts commit 3672935.
fb185c6
Kristján Pétursson kristjan Up number of workers used by profile sync script
We've had good results this high. Probably something we want configurable later.
7868ea0
Kristján Pétursson kristjan Make sync-profiles load everything in the DB
Since we're testing pods more holistically now, it's easier to do this than
build a file to load. We may want to accept a file later to do specific sets,
and all manner of other options, but for now it's simple.
1f16803
Justin Parker jparkrr Merge branch 'facebook-feed-paging' into peas-in-a bca8e23
Mar 28, 2013
Kristján Pétursson kristjan Add backlog reporting script
This asks `taskman` what the current backlog looks like and reports it to
statsd. If you want to run it a while, put it in a tmux session somewhere.
721c5c9
Kristján Pétursson kristjan Make sync-profiles script take a pod number
Now that there are more than one, we need to be able to specify where to kick
the sync.
25b5d2f
Apr 03, 2013
Kristján Pétursson kristjan Catch errors in Facebook photos synclet
When I rewrote this, forgot to watch for incoming errors. I hope it's a real
error causing `photos` to be null, but just in case I've checked that explicitly
too.
74bfac2
Kristján Pétursson kristjan Fix testSynclet when there's no return data
It was assuming the presence of `data` in the synclet's return value, but that's
not always there.

The script now also prints out the details of any exceptions that occur, because
that's a little useful.
51b68d2
Kristján Pétursson kristjan Fix infinite requeue of shutterfly photos
Now if the albums synclet never succeeds (maybe thanks to bad auth), we won't
immediately reenqueue the photos synclet forever. After three tries (30 seconds
in a clear queue), it'll treat the lack of albums as an error and back off
appropriately.
f2fe6eb
Kristján Pétursson kristjan Make testSynclet handle no data returned
Thought we would have caught this earlier, but sometimes when there's no data at
all returned from the synclet, this would bail.
08fee9c
Apr 08, 2013
Kristján Pétursson kristjan Print the final config after testSynclet runs
A lot of the time, this last state has a useful and important difference from
the paging states, so this output is nice.
ddbf1cd
Kristján Pétursson kristjan Print total entries fetched by testSynclet
Just handy to have at the end there for basic consistency checking.
412bc9e
Kristján Pétursson kristjan Don't crash testSynclet when nothing's returned
Synclets don't really have to return anything at all, but `testSynclet` would
bail looking for `data.config`. Now it's forgiving.
9132240
Kristján Pétursson kristjan Merge branch 'improve-testSynclet' into peas-in-a
* improve-testSynclet:
  Don't crash testSynclet when nothing's returned
  Print total entries fetched by testSynclet
  Print the final config after testSynclet runs
64bc048
Kristján Pétursson kristjan Split FB likes into separate synclets
Putting them together doesn't really help anything, but it does conflate their
errors (which I don't think were being properly reported before). Splitting them
up isolates them for easier management and debugging.
ef6345b
Kristján Pétursson kristjan Remove recursion in Facebook url_likes
Now the synclet yields while paging. This requires a little more state
management as we try to notice URLs we've already seen, but it's not bad.
d74c2f3
Kristján Pétursson kristjan Remove recursion from Facebook stream_likes
Facebook returns arbitrarily different batches of Likes depending on page size,
so to lose as few as possible, we're using a larger page size here. If the
amount of data coming back in the Posts becomes too much, we can cut them out
https://developers.facebook.com/docs/reference/api/field_expansion/.
c8367dd
Kristján Pétursson kristjan Remove recursion from Facebook page_likes
This will be the only likes synclet that pages forward in time unless we decide
it's important to reverse it. The reason being it's the only source of likes
that lets us page based on timestamp, and it's easier, more readable, and less
likely to have bugs if we just use that to run one direction.
591fccf
Kristján Pétursson kristjan Merge branch 'better-like-it' into peas-in-a
* better-like-it:
  Remove recursion from Facebook page_likes
  Remove recursion from Facebook stream_likes
  Remove recursion in Facebook url_likes
  Split FB likes into separate synclets
9f63fa5
Apr 09, 2013
Jeremie Miller quartzjer fix merge conflict 3e49c46
Apr 15, 2013
Jeremie Miller quartzjer fix conflict f0eedb0
Jeremie Miller quartzjer Merge branch 'master' of github.com:Singly/hallway into peas-in-a 216fd19
Jeremie Miller quartzjer Merge branch 'master' of github.com:Singly/hallway into peas-in-a b77da10
Apr 16, 2013
Jeremie Miller quartzjer Merge branch 'master' of github.com:Singly/hallway into peas-in-a ce2a3b4
Jeremie Miller quartzjer Merge branch 'master' of github.com:Singly/hallway into peas-in-a 74a48e4
Apr 17, 2013
Jeremie Miller quartzjer Merge branch 'master' of github.com:Singly/hallway into peas-in-a cb1a75c
Jeremie Miller quartzjer need dummy commit c3771b8
Jeremie Miller quartzjer fix merge conflict 7780ee4
Jeremie Miller quartzjer Merge branch 'master' of github.com:Singly/hallway into peas-in-a 209a40d
Apr 18, 2013
Jeremie Miller quartzjer Merge branch 'master' of github.com:Singly/hallway into peas-in-a cfde367
Jeremie Miller quartzjer don't require this pump anymore 62db856
Jeremie Miller quartzjer revert to sync 8b1653b
Jeremie Miller quartzjer remove dmap pump bce1757
Jeremie Miller quartzjer log these 588595c
This page is out of date. Refresh to see the latest.
1  Config/defaults.json
@@ -62,6 +62,7 @@
62 62 "domain": "localhost",
63 63 "secure": false,
64 64 "listenIP": "0.0.0.0",
  65 + "listenPort": 8070,
65 66 "port": 8070,
66 67 "auth": {
67 68 "user": "hal",
4 hallwayd.js
@@ -44,7 +44,7 @@ http.globalAgent.maxSockets = 2048;
44 44 https.globalAgent.maxSockets = 2048;
45 45
46 46 function startAPIHost(cbDone) {
47   - logger.vital("Starting an API host");
  47 + logger.vital("Starting an API hostess");
48 48
49 49 var webservice = require('webservice');
50 50
@@ -89,7 +89,7 @@ function startNexus(cbDone) {
89 89 function startPod(cbDone) {
90 90 logger.vital('Starting a Pod so HAL can\'t hear us.');
91 91 require('podService').startService(
92   - lconfig.pods.port,
  92 + lconfig.pods.listenPort,
93 93 lconfig.pods.listenIP,
94 94 cbDone
95 95 );
15 lib/dMap.js
@@ -185,21 +185,6 @@ exports.types = function (type, profiles) {
185 185 return ret;
186 186 };
187 187
188   -// run a specific service pumps, usually last
189   -exports.pump = function (cset, callback) {
190   - cset.forEach(function (entry) {
191   - var r = idr.parse(entry.idr);
192   - var svc = maps[r.host] || maps.system;
193   - if (!svc.pumps) return;
194   - // run all for now, TODO need selectivity here
195   - Object.keys(svc.pumps).forEach(function (name) {
196   - if (!svc.pumps[name][r.protocol]) return;
197   - svc.pumps[name][r.protocol](entry);
198   - });
199   - });
200   - callback(null, cset);
201   -};
202   -
203 188 // return the integer code for the given string type services start at 100, to
204 189 // find next one just:
205 190 // $ grep 'exports.ptype' lib/services/*/map.js
2  lib/friendsPump.js
@@ -41,7 +41,7 @@ exports.vpump = function(cset, auth, cbDone) {
41 41 var ndx = {};
42 42 var ver = reversion(auth);
43 43 dMap.types('contacts').forEach(function(key) { ndx[key] = "contact"; });
44   - cset.forEach(function(entry) {
  44 + cset.forEach(cset, function(entry){
45 45 var types = dMap.typeOf(entry.idr);
46 46 if (types.indexOf('contacts') >= 0) entry._v = ver;
47 47 });
2  lib/ijod-s3.js
@@ -105,7 +105,7 @@ exports.backend = function () {
105 105 }
106 106
107 107 if (res.statusCode === 200) {
108   - instruments.timing({"s3.getOne": (Date.now() - startTime)}).send();
  108 + instruments.timing({"s3.put": (Date.now() - startTime)}).send();
109 109 cb(null);
110 110 } else {
111 111 var msg = "";
131 lib/ijod.js
@@ -402,10 +402,11 @@ IJOD.prototype.startAddTransaction = function() {
402 402 */
403 403 };
404 404
405   -IJOD.prototype.commitAddTransaction = function(cbDone) {
  405 +IJOD.prototype.commitAddTransaction = function(timings, cbDone) {
406 406 if (!this.transactionItems || this.transactionItems.length === 0) {
407 407 return cbDone();
408 408 }
  409 + var startTime = Date.now();
409 410 //console.log("Committing %d items", this.transactionItems.length);
410 411 var totalSize = this.transactionItems.reduce(function(prev, cur, idx, arr) {
411 412 return prev + arr[idx].length;
@@ -414,39 +415,41 @@ IJOD.prototype.commitAddTransaction = function(cbDone) {
414 415 var writeBuffer = new Buffer(totalSize);
415 416 var idx = 0;
416 417 var self = this;
417   - lutil.forEachSeries(self.transactionItems, function(item, cb) {
  418 + self.transactionItems.forEach(function(item) {
418 419 item.copy(writeBuffer, idx);
419 420 idx += item.length;
420   - cb();
421   - }, function(err) {
422   - var startTime = Date.now();
423   - backend.put(self.path, writeBuffer, function (err) {
424   - writeBuffer = null;
425   - if (!err) {
426   - logger.debug("Saving %d entries to ijod", self.transactionQueries.length);
427   - // compose everything together, gnarly dude
428   - var allSql = "";
429   - var allBinds = [];
430   - var id;
431   - self.transactionQueries.forEach(function(query){
432   - id = query.id; // just need one of them since all in the same base
433   - allSql += query.sql + '; ';
434   - allBinds = allBinds.concat(query.binds);
435   - });
436   - dbMod(id, allSql, allBinds, function(err){
437   - if(err) logger.warn("err bulk writing",err);
438   - instruments.timing({
439   - "ijod.save_time": (Date.now() - startTime)
440   - }).send();
441   - instruments.increment("ijod.puts").send();
442   - self.transactionItems = null;
443   - cbDone();
444   - });
445   - } else {
446   - instruments.increment("ijod.put_errors").send();
447   - self.abortAddTransaction(cbDone);
448   - }
449   - });
  421 + });
  422 + timings["commit-prep"] = Date.now() - startTime;
  423 + startTime = Date.now();
  424 + backend.put(self.path, writeBuffer, function (err) {
  425 + timings["commit-backend"] = Date.now() - startTime;
  426 + startTime = Date.now();
  427 + writeBuffer = null;
  428 + if (!err) {
  429 + logger.debug("Saving %d entries to ijod", self.transactionQueries.length);
  430 + // compose everything together, gnarly dude
  431 + var allSql = "";
  432 + var allBinds = [];
  433 + var id;
  434 + self.transactionQueries.forEach(function(query){
  435 + id = query.id; // just need one of them since all in the same base
  436 + allSql += query.sql + '; ';
  437 + allBinds = allBinds.concat(query.binds);
  438 + });
  439 + dbMod(id, allSql, allBinds, function(err){
  440 + timings["commit-db"] = Date.now() - startTime;
  441 + if(err) logger.warn("err bulk writing",err);
  442 + instruments.timing({
  443 + "ijod.save_time": (Date.now() - startTime)
  444 + }).send();
  445 + instruments.increment("ijod.puts").send();
  446 + self.transactionItems = null;
  447 + cbDone();
  448 + });
  449 + } else {
  450 + instruments.increment("ijod.put_errors").send();
  451 + self.abortAddTransaction(cbDone);
  452 + }
450 453 });
451 454 };
452 455
@@ -462,16 +465,19 @@ IJOD.prototype.abortAddTransaction = function(cbDone) {
462 465 };
463 466
464 467 // takes arg of at least an id and data, callback(err) when done
465   -IJOD.prototype.addData = function(arg, callback) {
  468 +IJOD.prototype.addData = function(timings, arg) {
466 469 if (!arg || !arg.idr) return callback("invalid arg");
467 470 var tmpJson = JSON.stringify(arg);
468 471 var hash = arg.hash ? arg.hash : mmh.murmur128HexSync(tmpJson);
469 472 delete arg.hash;
  473 + var start = Date.now();
470 474 // ENTRY NORMALIZATION HAPPENS HERE
471 475 if (!arg.at) arg.at = Date.now();
472 476 arg.id = idr.id(arg.idr);
473 477 arg.idr = idr.toString(arg.idr);
474 478 var ll = dMap.get('ll',arg.data,arg.idr) || [null,null];
  479 + timings["add-dMap"] += Date.now() - start;
  480 + start = Date.now();
475 481 // build our participant matching binary string
476 482 var par = null;
477 483 var participants = parget(arg);
@@ -507,10 +513,16 @@ IJOD.prototype.addData = function(arg, callback) {
507 513 } else {
508 514 qsql += ', ?';
509 515 }
  516 + timings["add-friends"] += Date.now() - start;
  517 + start = Date.now();
510 518 var self = this;
511 519 this.startAddTransaction();
512 520 tmpJson = JSON.stringify(arg);
  521 + timings["add-json-stringify"] += Date.now() - start;
  522 + start = Date.now();
513 523 var gzdata = zlib.compress(new Buffer(tmpJson+"\n"));
  524 + timings["add-zlib"] += Date.now() - start;
  525 + start = Date.now();
514 526 self.transactionItems.push(gzdata);
515 527 var offset = self.len;
516 528 self.len += gzdata.length;
@@ -544,10 +556,10 @@ IJOD.prototype.addData = function(arg, callback) {
544 556 ]});
545 557
546 558 // if there's types, insert each of them too for filtering
547   - if (!arg.data || !arg.types) return callback();
  559 + if (!arg.data || !arg.types) return;
548 560
549 561 // TODO: This doesn't call any async code and can be converted
550   - async.forEachSeries(Object.keys(arg.types), function(type, cb) {
  562 + Object.keys(arg.types).forEach(function(type) {
551 563 var i2 = idr.clone(arg.idr);
552 564 i2.protocol = type;
553 565 instruments.increment("data.types." + type).send();
@@ -571,8 +583,10 @@ IJOD.prototype.addData = function(arg, callback) {
571 583 par
572 584 ]});
573 585
574   - cb();
575   - }, callback);
  586 + timings["add-sql"] += Date.now() - start;
  587 + });
  588 +
  589 + return;
576 590 };
577 591
578 592 /// Get a single entry from an IJOD, requested by specific IDR
@@ -765,7 +779,13 @@ exports.batchSmartAdd = function(entries, callback) {
765 779 var ij = new IJOD(pidPath);
766 780 logger.debug("Batch smart add", pidPath, entries.length);
767 781
768   - var timings = {};
  782 + var timings = {
  783 + "add-dMap" : 0,
  784 + "add-friends" : 0,
  785 + "add-json-stringify" : 0,
  786 + "add-zlib" : 0,
  787 + "add-sql" : 0
  788 + };
769 789 function handleError(msg) {
770 790 logger.error("Batch smart add error: %s", msg);
771 791 if (exports.debug) {
@@ -790,9 +810,9 @@ exports.batchSmartAdd = function(entries, callback) {
790 810 });
791 811 ij.startAddTransaction();
792 812 start = Date.now();
793   - async.forEachSeries(entries, function(entry, cb) {
  813 + entries.forEach(function(entry) {
794 814 if (!entry) {
795   - return process.nextTick(cb);
  815 + return;
796 816 }
797 817 var entryIdrHash = idr.hash(entry.idr);
798 818 if (knownIds[entryIdrHash]) {
@@ -801,33 +821,28 @@ exports.batchSmartAdd = function(entries, callback) {
801 821 // If the id and hashes match it's the same!
802 822 if (hash === knownIds[entryIdrHash]) {
803 823 instruments.increment("ijod.skipped_on_hash").send();
804   - return process.nextTick(cb);
  824 + return;
805 825 } else {
806 826 entry.hash = hash;
807 827 entry.updated = true;
808 828 }
809 829 }
810 830 // XXX This might be able to convert to a regular forEach?
811   - ij.addData(entry, function() { process.nextTick(cb); });
812   - }, function(error) {
813   - timings.addData = Date.now() - start;
814   - if (error) {
815   - ij.abortAddTransaction(function() {
816   - handleError(error);
817   - });
818   -
819   - // This is OK because handleError calls callback()
820   - return;
821   - }
  831 + ij.addData(timings, entry);
  832 + });
822 833
823   - start = Date.now();
824   - ij.commitAddTransaction(function(error) {
825   - timings.commit = Date.now() - start;
826   - callback(error, timings);
  834 + timings.addData = Date.now() - start;
  835 + if (error) {
  836 + ij.abortAddTransaction(function() {
  837 + handleError(error);
827 838 });
  839 + }
828 840
829   - //console.log("Batch done: %d", (Date.now() - t));
830   - }); // forEachSeries(entries)
  841 + start = Date.now();
  842 + ij.commitAddTransaction(timings, function(error) {
  843 + timings.commit = Date.now() - start;
  844 + callback(error, timings);
  845 + });
831 846 });
832 847 };
833 848
1  lib/nexusClient.js
@@ -212,6 +212,7 @@ exports.pipelineInject = function(base, entries, auth, cbDone) {
212 212 },
213 213 auth:lconfig.nexus.auth
214 214 };
  215 + logger.debug("Injecting to nexus");
215 216 request.post(params, function(err, res, body) {
216 217 if (err) {
217 218 logger.warn("Error pipeline injecting with the nexus");
16 lib/pipeline.js
@@ -12,12 +12,15 @@ var profileManager = require('profileManager');
12 12 var push = require('push');
13 13 var resolve = require('services/links/resolve');
14 14
15   -function wrapPumps(timings, pumps) {
  15 +function wrapPumps(timings, pumps, base) {
16 16 var newPumps = [];
17 17 var start = Date.now();
18 18
  19 + var pumpPos = 0;
19 20 pumps.forEach(function (pump) {
20 21 newPumps.push(function (cset, cb) {
  22 + logger.debug("Pump %d done for %s took %d", pumpPos, idr.toString(base), Date.now() - start);
  23 + ++pumpPos;
21 24 timings.push(Date.now() - start);
22 25 start = Date.now();
23 26
@@ -42,10 +45,10 @@ function pumper(entries, services, auth, cbDone) {
42 45 // NOTICE: temporarily disabling, determining relevancy and cost/impact of usage on everything
43 46 // resolve.pump,
44 47 // oembed.pump,
45   - dMap.pump,
  48 +// dMap.pump,
46 49 function (cset, cb) { friendsPump.vpump(cset, auth, cb); },
47 50 IJOD.pump,
48   - ijodEvent.pump,
  51 + //ijodEvent.pump,
49 52 function (cset, cb) { friendsPump.bump(cset, auth, cb); },
50 53 function (cset, cb) { push.pump(cset, auth, cb); }
51 54 ];
@@ -66,15 +69,14 @@ function pumper(entries, services, auth, cbDone) {
66 69 pumps.push(function (arg, cbStatsDone) {
67 70 instruments.timing({ 'pipeline.run': Date.now() - pipelineStart }).send();
68 71
69   - process.nextTick(function () {
70   - cbStatsDone(null, arg);
71   - });
  72 + cbStatsDone(null, arg);
72 73 });
73 74
74 75 var timings = [];
75   - pumps = wrapPumps(timings, pumps);
  76 + pumps = wrapPumps(timings, pumps, idr.base(entries[0].idr));
76 77
77 78 async.waterfall(pumps, function (err) {
  79 + logger.debug("All pumps done: %j", timings);
78 80 timings.shift();
79 81 cbDone(err, timings);
80 82 });
30 lib/podClient.js
@@ -249,10 +249,8 @@ exports.pipelineInject = function(bases, auth, cbDone) {
249 249 var completeTimings = [];
250 250 var lengths = [];
251 251
252   - if (!bases || typeof(bases) !== 'object') {
253   - logger.debug(bases);
254   - return cbDone(new Error('The bases argument was invalid'));
255   - }
  252 + logger.debug("Need to inject for %j", Object.keys(bases));
  253 +
256 254 // For each base we need to check the potential pid and send it out
257 255 async.forEach(Object.keys(bases), function(base, cbStep) {
258 256 if (!base || !bases[base]) return cbStep();
@@ -261,7 +259,7 @@ exports.pipelineInject = function(bases, auth, cbDone) {
261 259 if (pid) {
262 260 // Oh we have a pid check for the pod id
263 261 profileManager.loadProfile(pid, function(err, profile) {
264   - //console.log("Inject to %j", profile);
  262 + logger.debug("Inject to %j", profile);
265 263 if (!err && !profile) logger.warn('Profile missing for', base, pid);
266 264 if (err || !profile || !profile.pod) {
267 265 if (role === "apihost" && lconfig.pods.enabled) {
@@ -271,15 +269,27 @@ exports.pipelineInject = function(bases, auth, cbDone) {
271 269 cbStep(err);
272 270 });
273 271 } else {
274   - //console.log("Running a local inject!");
  272 + logger.debug("Running a local inject for %s of %d entries!", base, Array.isArray(bases[base]) ? bases[base].length : 0);
  273 + if (!Array.isArray(bases[base])) {
  274 + logger.error("Busted entry is not an array");
  275 + return cbStep();
  276 + }
  277 +
  278 + if (bases[base].length === 0) {
  279 + // Skip it all, nothing to do
  280 + return cbStep();
  281 + }
  282 +
275 283 var singleBase = {};
276 284 singleBase[base] = bases[base];
277 285 return pipeline.inject(singleBase, auth, function(err, timings) {
  286 + logger.debug("Inject complete for %s", base);
278 287 if (timings) completeTimings.push(timings);
279 288 cbStep(err);
280 289 });
281 290 }
282 291 }
  292 + logger.debug("Sending the inject to a pod");
283 293 // Send it to the pod
284 294 podRequest(podUrl(profile.pod, "/pipelineInject"), {
285 295 method: "POST",
@@ -297,6 +307,7 @@ exports.pipelineInject = function(bases, auth, cbDone) {
297 307 });
298 308 });
299 309 } else {
  310 + logger.debug("Fallback inject");
300 311 return nexusClient.pipelineInject(base, bases[base], auth, function(err, timings) {
301 312 if (timings) completeTimings.push(timings);
302 313 cbStep(err);
@@ -304,9 +315,9 @@ exports.pipelineInject = function(bases, auth, cbDone) {
304 315 }
305 316 }, function(err) {
306 317 //console.log(err);
307   - //console.log(Object.keys(bases));
308   - //console.log(lengths);
309   - //console.log(completeTimings);
  318 + logger.debug(Object.keys(bases));
  319 + logger.debug(lengths);
  320 + logger.debug(completeTimings);
310 321 if (completeTimings && completeTimings.length > 0) {
311 322 completeTimings = completeTimings.reduce(function(prev, cur, index, array) {
312 323 var res = [];
@@ -335,6 +346,7 @@ function podRequest(url, params, callback) {
335 346 auth: lconfig.pods.auth,
336 347 json: true
337 348 }, params);
  349 + logger.info("podRequest",url,params);
338 350 return request(url, params, callback);
339 351 }
340 352
25 lib/push.js
@@ -27,22 +27,27 @@ exports.pump = function(cset, auth, cbDone) {
27 27 // extract all the bases into clumps
28 28 var bases = {};
29 29 var oembeds = {};
30   - cset.forEach(function(entry) {
31   - var base = idr.toString(idr.base(entry.idr));
32   - // index the oembeds for easy lookup, and skip them from pushing
33   - if (base === 'oembed:links/oembed') {
34   - oembeds[entry.idr] = entry;
35   - return;
36   - }
37   - if (!bases[base]) bases[base] = [];
38   - if (entry.saved) bases[base].push(entry); // only new entries have a .hash
39   - });
40 30
  31 + logger.debug("Checking routes");
41 32 // and begins our async cascade
42 33 getRoutes(auth, function(routes) {
  34 + logger.debug("Got routes %j", routes);
43 35 if (Object.keys(routes).length === 0) {
44 36 return cbDone(null, cset);
45 37 }
  38 +
  39 + // Let's prep now that we have routes
  40 + cset.forEach(function(entry) {
  41 + var base = idr.toString(idr.base(entry.idr));
  42 + // index the oembeds for easy lookup, and skip them from pushing
  43 + if (base === 'oembed:links/oembed') {
  44 + oembeds[entry.idr] = entry;
  45 + return;
  46 + }
  47 + if (!bases[base]) bases[base] = [];
  48 + if (entry.saved) bases[base].push(entry); // only new entries have a .hash
  49 + });
  50 +
46 51 // do each isolated clustering in parallel
47 52 logger.debug("ROUTES",routes);
48 53 logger.debug("BASES",bases);
72 lib/services/facebook/feed.js
@@ -10,55 +10,73 @@
10 10 var fb = require('./lib.js');
11 11
12 12 exports.sync = function(pi, cb) {
  13 + var data = {};
  14 + var myID = pi.auth.pid.split('@')[0];
  15 + var base = 'post:' + pi.auth.pid + '/feed';
  16 + var baseSelf = base + '_self';
  17 + var baseOthers = base + '_others';
  18 +
13 19 var args = {
14   - id : "me",
15   - type : "feed",
  20 + id : 'me',
  21 + type : 'feed',
16 22 limit : 200,
17 23 accessToken : pi.auth.accessToken
18 24 };
19   - var resp = {data: {}, config: {}};
20 25
21 26 if (!pi.config) pi.config = {};
  27 + if (typeof pi.config.since === 'undefined') pi.config.since = 0;
  28 +
  29 + if (pi.config.paging) {
  30 + args.until = pi.config.pagingMax;
  31 + }
  32 +
  33 + // by always passing `since` we can stop paging back when no data is returned
22 34 if (pi.config.since) args.since = pi.config.since;
23   - if (pi.config.next) args.page = pi.config.next;
24 35
25 36 fb.getPostPage(args, function(err, posts){
26 37 if(err) return cb(err);
27   - if(!Array.isArray(posts.data)) return cb("No posts array");
28   -
29   - var myID = pi.auth.pid.match(/(\d+)@/)[1];
30   - var base = 'post:' + pi.auth.pid + '/feed';
31   - var baseSelf = base + '_self';
32   - var baseOthers = base + '_others';
  38 + if(!Array.isArray(posts.data)) return cb('No posts array');
33 39
34   - resp.data[base] = posts.data;
35   - resp.data[baseSelf] = [];
36   - resp.data[baseOthers] = [];
  40 + data[base] = posts.data;
  41 + data[baseSelf] = [];
  42 + data[baseOthers] = [];
37 43
38   - var since = args.since || 0;
  44 + var newest = pi.config.newest || pi.config.since || 0;
  45 + var oldest;
39 46 posts.data.forEach(function(post){
40   - // Find the newest
41   - if (post.updated_time > since) since = post.updated_time;
  47 + if (post.updated_time > newest) newest = post.updated_time;
  48 + if (!oldest || post.updated_time < oldest) oldest = post.updated_time;
  49 +
42 50 // Sort my posts from everyone else's
43   - var postedBy = (post.from && post.from.id === myID) ? baseSelf : baseOthers;
44   - resp.data[postedBy].push(post);
  51 + if (post.from && post.from.id === myID) {
  52 + data[baseSelf].push(post);
  53 + } else {
  54 + data[baseOthers].push(post);
  55 + }
45 56 });
46   - resp.config.since = since;
47   -
48 57
49 58 var auth = {accessToken : pi.auth.accessToken};
50 59 fb.getPostPhotos(auth, posts, function(err, photos) {
51   - if(photos) resp.data['photo:' + pi.auth.pid + '/home_photos'] = photos;
  60 + if(photos) data['photo:' + pi.auth.pid + '/home_photos'] = photos;
52 61
53   - // if we got full limit and we're paging through, always use that
54   - if (posts.data.length !== 0 && posts.paging && posts.paging.next) {
55   - resp.config.next = posts.paging.next;
56   - resp.config.nextRun = -1;
  62 + if (posts.data.length !== 0 || oldest > pi.config.since) {
  63 + // we want the next run to include entries before the oldest entry
  64 + pi.config = {
  65 + paging : true,
  66 + pagingMax : oldest - 1,
  67 + newest : newest,
  68 + since : pi.config.since,
  69 + nextRun : -1
  70 + };
57 71 } else {
58   - resp.config.next = false;
  72 + // no data was returned so paging stops
  73 + pi.config = {
  74 + paging : false,
  75 + since : newest
  76 + };
59 77 }
60 78
61   - cb(null, resp);
  79 + cb(null, {data: data, config: pi.config});
62 80 });
63 81 });
64 82 };
2  lib/services/facebook/lib.js
@@ -258,6 +258,8 @@ exports.getPostPage = function(arg, cbDone) {
258 258 limit: arg.limit || PAGE_SIZE
259 259 };
260 260 if (arg.since) params.since = arg.since;
  261 + if (arg.until) params.until = arg.until;
  262 +
261 263 var uri = exports.apiUrl(arg, '/' + arg.id + '/' + arg.type, params);
262 264 getPage(uri, cbDone);
263 265 };
6 lib/services/facebook/photos.js
@@ -82,7 +82,11 @@ exports.sync = function (pi, cb) {
82 82 if (album.after) params.after = album.after; // Paging cursor
83 83
84 84 fb.getDataPage(path, params, function (err, photos, cursor) {
85   - if(err || !photos) return cb(err, pi);
  85 + if (err) return cb(err);
  86 +
  87 + if (!photos || !Array.isArray(photos.data)) {
  88 + return cb(new Error('No photo data returned for ' + pi.auth.pid));
  89 + }
86 90
87 91 photos.data.forEach(function(photo) {
88 92 photo._album_id = album.object_id;
107 lib/services/foursquare/checkins.js
@@ -8,60 +8,71 @@
8 8 */
9 9
10 10 var request = require('request');
11   -var checkins_limit = 250;
  11 +var _ = require('underscore');
  12 +
12 13 var util = require('util');
13 14
  15 +var PAGE_SIZE = 100;
  16 +var FIRST_SYNC_PAGE_SIZE = 250;
  17 +
14 18 exports.sync = function(pi, cb) {
15   - if (!pi.config.checkinsThrough) pi.config.checkinsThrough = 0;
16   - getCheckins(pi.config, pi.auth.profile.id, pi.auth.accessToken, 0, function(err, checkins) {
17   - var data = {};
18   - data['checkin:'+pi.auth.pid+'/checkins'] = checkins;
19   - cb(err, {data:data, config:pi.config});
20   - });
21   -};
  19 + if (!pi.config) pi.config = {};
22 20
23   -function getCheckins(config, userID, token, offset, callback, checkins) {
24   - if (!checkins) checkins = [];
25   - var latest = config.checkinsThrough;
26   - latest += 1; //"afterTimestamp" is really "afterOrEqualToTimestamp"
27   - request.get({
28   - uri:'https://api.foursquare.com/v2/users/self/checkins.json' +
29   - '?limit=' + checkins_limit + '&offset=' + offset +
30   - '&oauth_token=' + token + '&afterTimestamp=' + latest,
31   - json:true
32   - },
33   - function(err, resp, js) {
34   - if (err) return callback(err);
35   - if (resp.statusCode !== 200) {
36   - return callback(
37   - new Error("status code " + resp.statusCode + " " + util.inspect(js))
38   - );
39   - }
40   - if (!js || !js.response || !js.response.checkins) {
41   - return callback(
42   - new Error("missing response.checkins: " + util.inspect(js))
  21 + var lastSync = pi.config.lastSync || 0;
  22 + var cursor = pi.config.cursor; // In the middle of paging, if present
  23 + // Page ASAP during first sync, then save effort
  24 + var pageSize = (lastSync > 0) ? PAGE_SIZE : FIRST_SYNC_PAGE_SIZE;
  25 +
  26 + var query = {
  27 + limit : pageSize,
  28 + oauth_token : pi.auth.accessToken,
  29 + v : '20130318'
  30 + };
  31 + if (cursor) query.beforeTimestamp = cursor;
  32 +
  33 + request.get('https://api.foursquare.com/v2/users/self/checkins.json', {
  34 + qs: query,
  35 + json: true
  36 + }, function(err, response, body) {
  37 + if (err) return cb(err);
  38 + if (response.statusCode !== 200) {
  39 + return cb(
  40 + new Error('Bad HTTP status ' + 200 + '. ' + util.inspect(checkins))
43 41 );
44 42 }
45   - var response = js.response;
46   - if (!(response.checkins && response.checkins.items)) { //we got nothing
47   - if (checkins.length > 0) config.checkinsThrough = checkins[0].createdAt;
48   - return callback(err, checkins);
  43 + var checkins = body && body.response && body.response.checkins;
  44 + if (!checkins) {
  45 + return cb(new Error('No checkins in response. ' + util.inspect(body)));
49 46 }
50   - var newCheckins = response.checkins.items;
51   - checkins = checkins.concat(newCheckins);
52   - if (newCheckins && newCheckins.length === checkins_limit) {
53   - getCheckins(
54   - config,
55   - userID,
56   - token,
57   - offset + checkins_limit,
58   - callback,
59   - checkins
60   - );
61   - }
62   - else {
63   - if (checkins[0]) config.checkinsThrough = checkins[0].createdAt;
64   - callback(err, checkins);
  47 + checkins = checkins.items || [];
  48 +
  49 + var resp = {
  50 + config : {},
  51 + data : {}
  52 + };
  53 +
  54 + var newCheckins = checkins.filter(function(checkin) {
  55 + return checkin.createdAt > lastSync;
  56 + });
  57 + resp.data['checkin:' + pi.auth.pid + '/checkins'] = newCheckins;
  58 +
  59 + var times = _.pluck(checkins, 'createdAt');
  60 +
  61 + // Track the latest checkin we know of.
  62 + // When we finish paging, we'll start again here.
  63 + var latestNewTime = _.max(times);
  64 + var latestKnown = pi.config.latestKnown || 0;
  65 + resp.config.latestKnown = _.max([latestNewTime, latestKnown]);
  66 +
  67 + // Check if we've crossed into checkins we already have
  68 + if (checkins.length === 0 || newCheckins.length < checkins.length) {
  69 + resp.config.cursor = null; // Done with this window
  70 + resp.config.lastSync = resp.config.latestKnown;
  71 + } else {
  72 + resp.config.cursor = _.min(times);
  73 + resp.config.nextRun = -1; // Still paging
65 74 }
  75 +
  76 + return cb(null, resp);
66 77 });
67   -}
  78 +};
7 lib/services/shutterfly/photos.js
@@ -15,8 +15,11 @@ function queueAlbums(pi, callback) {
15 15 }
16 16
17 17 exports.sync = function (pi, callback) {
18   - if (!pi.config.lastAlbumsSync) {
  18 + if (!pi.config.albumWait) pi.config.albumWait = 0;
  19 +
  20 + if (!pi.config.lastAlbumsSync && pi.config.albumWait < 3) {
19 21 pi.config.nextRun = 10; // Give the albums synclet 10 seconds
  22 + pi.config.albumWait++;
20 23 return callback(null, pi);
21 24 }
22 25
@@ -24,7 +27,7 @@ exports.sync = function (pi, callback) {
24 27 if (err) return callback(err, pi);
25 28
26 29 if (pi.config.albums.length === 0) {
27   - return callback('No albums to sync', pi);
  30 + return callback(new Error('No albums to sync'), pi);
28 31 }
29 32
30 33 var albumID = pi.config.albums.shift();
6 lib/taskman-ng.js
@@ -69,7 +69,7 @@ var lastHeartbeat = Date.now();
69 69 function checkLag(tag) {
70 70 var lag = toobusy.lag();
71 71 if (lag > 100) {
72   - console.log(tag + " lag: " + lag + " is toobusy? " + toobusy());
  72 + logger.debug(tag + " lag: " + lag + " is toobusy? " + toobusy());
73 73 }
74 74 }
75 75
@@ -325,7 +325,7 @@ function runTask(profileInfo, task, fnCallback) {
325 325 }
326 326
327 327 // This is the first run if no tstart is defined or it's 0
328   - var firstRun = !(task.tstart > 0);
  328 + var firstRun = task.tstart <= 0;
329 329
330 330 // Make sure tstart is initialized to current time
331 331 task.tstart = Date.now();
@@ -371,6 +371,7 @@ function runTask(profileInfo, task, fnCallback) {
371 371 var cfg = response.config;
372 372
373 373 function finishCleanup(err) {
  374 + logger.debug("Cleaning up synclet: %s", task.idr);
374 375 var configUpdate = cfg;
375 376 // if it is sandboxed, namespace it to this synclet's name
376 377 if (sandboxed) {
@@ -453,6 +454,7 @@ function runTask(profileInfo, task, fnCallback) {
453 454
454 455 // In case something in the synclet immediately barfs...
455 456 try {
  457 + logger.debug("Starting synclet: %s", task.idr);
456 458 servezas.synclet(task.service, task.synclet).sync(profileInfo, cleanup);
457 459 } catch (E) {
458 460 cleanup(E);
5 scripts/ijoder.js
@@ -75,7 +75,12 @@ console.log("Table: " + table);
75 75 console.log("Query: " + query);
76 76
77 77 partition.readFrom(idr, function (parts) {
  78 + console.log(parts);
78 79 parts[0].dal.query(query, [], function (err, rows) {
  80 + if (err || rows.length == 0) {
  81 + console.log("IDR not found");
  82 + return;
  83 + }
79 84 backend.get(rows[0].path, rows[0].offset, rows[0].len, function (err, buf) {