Skip to content

Commit

Permalink
datastore: get runs as a stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephen Sawchuk committed Jul 30, 2015
1 parent f872342 commit 85f00ae
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 70 deletions.
97 changes: 48 additions & 49 deletions lib/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@

'use strict';

var extend = require('extend');
var request = require('request').defaults({
pool: {
maxSockets: Infinity
}
});

var through = require('through2');

/**
* @type {module:datastore/entity}
* @private
Expand Down Expand Up @@ -117,11 +118,7 @@ function DatastoreRequest() {}
* //-
* var key = dataset.key(['Company', 123]);
*
* transaction.get(key, function(err, entities) {
* if (!err) {
* var entity = entities[0];
* }
* });
* transaction.get(key, function(err, entity) {});
*
* //-
* // Get multiple entities at once.
Expand All @@ -132,33 +129,6 @@ function DatastoreRequest() {}
* ], function(err, entities) {});
*
* //-
* // To control how many API requests are made and page through the results
* // manually, set `autoPaginate` to `false`.
* //-
* function onApiResponse(err, entities, nextQuery, apiResponse) {
* if (err) {
* console.error(err);
* return;
* }
*
* // `entities` is an array of results.
*
* if (nextQuery) {
* transaction.get(nextQuery, onApiResponse);
* }
* }
*
* var options = {
* keys: [
* dataset.key(['Company', 123]),
* // ...
* ],
* autoPaginate: false
* };
*
* transaction.get(options, onApiResponse);
*
* //-
* // Get the entities as a readable object stream.
* //-
* var keys = [
Expand All @@ -175,39 +145,68 @@ function DatastoreRequest() {}
* // All entities retrieved.
* });
*/
DatastoreRequest.prototype.get = function(options, callback) {
if (options instanceof entity.Key || util.is(options, 'array')) {
options = {
keys: util.arrayize(options)
};
DatastoreRequest.prototype.get = function(keys, callback) {
var self = this;

var isStreamMode = !callback;
var stream;

if (isStreamMode) {
stream = through.obj();
}

if (!options.keys) {
var isSingleLookup = keys instanceof entity.Key;
keys = util.arrayize(keys).map(entity.keyToKeyProto);

if (keys.length === 0) {
throw new Error('At least one Key object is required.');
}

var request = {
key: options.keys.map(entity.keyToKeyProto)
key: keys
};

this.makeReq_('lookup', request, function(err, resp) {
var entities = [];
this.makeReq_('lookup', request, onApiResponse);

function onApiResponse(err, resp) {
if (err) {
callback(err, null, null, resp);
if (isStreamMode) {
stream.emit('errror', err, resp);
} else {
callback(err, null, resp);
}
return;
}

var entities = entity.formatArray(resp.found);
var results = entity.formatArray(resp.found);

if (isStreamMode) {
results.forEach(function(entity) {
stream.push(entity);
});
} else {
entities = entities.concat(results);
}

var nextQuery = null;
var nextKeys = (resp.deferred || []).map(entity.keyFromKeyProto);

if (nextKeys.length > 0) {
nextQuery = extend(true, {}, options);
nextQuery.keys = nextQuery.keys.concat(nextKeys);
self.get(nextKeys, onApiResponse);
return;
}

callback(null, entities, nextQuery, resp);
});
if (isStreamMode) {
stream.push(null);
stream.end();
} else {
callback(null, isSingleLookup ? entities[0] : entities, resp);
}
}

if (isStreamMode) {
return stream;
}
};

/**
Expand Down Expand Up @@ -819,6 +818,6 @@ DatastoreRequest.prototype.makeReq_ = function(method, body, callback) {
* These methods can be used with either a callback or as a readable object
* stream. `streamRouter` is used to add this dual behavior.
*/
streamRouter.extend(DatastoreRequest, ['get', 'runQuery']);
streamRouter.extend(DatastoreRequest, 'runQuery');

module.exports = DatastoreRequest;
58 changes: 37 additions & 21 deletions system-test/datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ describe('datastore', function() {
ds.save({ key: postKey, data: post }, function(err) {
assert.ifError(err);

ds.get(postKey, function(err, entities) {
ds.get(postKey, function(err, entity) {
assert.ifError(err);

var entity = entities[0];
assert.deepEqual(entity.data, post);

ds.delete(postKey, done);
Expand All @@ -69,10 +68,9 @@ describe('datastore', function() {
ds.save({ key: postKey, data: post }, function(err) {
assert.ifError(err);

ds.get(postKey, function(err, entities) {
ds.get(postKey, function(err, entity) {
assert.ifError(err);

var entity = entities[0];
assert.deepEqual(entity.data, post);

ds.delete(postKey, done);
Expand All @@ -92,10 +90,9 @@ describe('datastore', function() {
var assignedId = postKey.path[1];
assert(assignedId);

ds.get(postKey, function(err, entities) {
ds.get(postKey, function(err, entity) {
assert.ifError(err);

var entity = entities[0];
assert.deepEqual(entity.data, data);

ds.delete(ds.key(['Post', assignedId]), done);
Expand All @@ -112,10 +109,9 @@ describe('datastore', function() {
// The key's path should now be complete.
assert(postKey.path[1]);

ds.get(postKey, function(err, entities) {
ds.get(postKey, function(err, entity) {
assert.ifError(err);

var entity = entities[0];
assert.deepEqual(entity.data, post);

ds.delete(postKey, done);
Expand All @@ -135,10 +131,9 @@ describe('datastore', function() {
ds.save({ key: postKey, method: 'insert', data: post }, function(err) {
assert.notEqual(err, null); // should fail insert

ds.get(postKey, function(err, entities) {
ds.get(postKey, function(err, entity) {
assert.ifError(err);

var entity = entities[0];
assert.deepEqual(entity.data, post);

ds.delete(postKey, done);
Expand Down Expand Up @@ -188,6 +183,33 @@ describe('datastore', function() {
});
});

it('should get multiple entities in a stream', function(done) {
var key1 = ds.key('Post');
var key2 = ds.key('Post');

ds.save([
{ key: key1, data: post },
{ key: key2, data: post }
], function(err) {
assert.ifError(err);

var firstKey = ds.key(['Post', key1.path[1]]);
var secondKey = ds.key(['Post', key2.path[1]]);

var numEntitiesEmitted = 0;

ds.get([firstKey, secondKey])
.on('error', done)
.on('data', function() {
numEntitiesEmitted++;
})
.on('end', function() {
assert.strictEqual(numEntitiesEmitted, 2);

ds.delete([firstKey, secondKey], done);
});
});
});
});

it('should save keys as a part of entity and query by key', function(done) {
Expand Down Expand Up @@ -510,10 +532,9 @@ describe('datastore', function() {
}, function(err) {
assert.ifError(err);

ds.get(key, function(err, entities) {
ds.get(key, function(err, entity) {
assert.ifError(err);

var entity = entities[0];
assert.deepEqual(entity.data, obj);

ds.delete(key, done);
Expand Down Expand Up @@ -550,22 +571,17 @@ describe('datastore', function() {
async.parallel([
// The key queued for deletion should have been deleted.
function(done) {
ds.get(deleteKey, function(err, entities) {
ds.get(deleteKey, function(err, entity) {
assert.ifError(err);

var entity = entities[0];
assert.equal(typeof entity, 'undefined');

done();
});
},

// Data should have been updated on the key.
function(done) {
ds.get(key, function(err, entities) {
ds.get(key, function(err, entity) {
assert.ifError(err);

var entity = entities[0];
assert.equal(entity.data.rating, 10);
done();
});
Expand Down Expand Up @@ -597,9 +613,9 @@ describe('datastore', function() {
assert.ifError(err);

// Should not return a result.
ds.get(key, function(err, entities) {
ds.get(key, function(err, entity) {
assert.ifError(err);
assert.strictEqual(entities.length, 0);
assert.strictEqual(entity, undefined);

// Incomplete key should have been given an id.
assert.equal(incompleteKey.path.length, 2);
Expand Down

0 comments on commit 85f00ae

Please sign in to comment.