Browse files

Test driving the new plata dynamo lib

  • Loading branch information...
1 parent 118a396 commit 927837d33012a91a1e76a24c5932eea10df90511 @imlucas committed Dec 9, 2012
Showing with 122 additions and 197 deletions.
  1. +1 −1 .jshintrc
  2. +121 −196 index.js
View
2 .jshintrc
@@ -41,7 +41,7 @@
"noempty": true,
"nonew": false,
"nomen": true,
- "onevar": true,
+ "onevar": false,
"plusplus": false,
"trailing": true,
"white": false,
View
317 index.js
@@ -1,6 +1,6 @@
"use strict";
-var dynamo = require("dynamo"),
+var aws = require("plata"),
when = require("when"),
sequence = require("sequence"),
_ = require("underscore"),
@@ -85,25 +85,16 @@ Model.prototype.batch = function(){
// Actually connect to dynamo or magneto.
Model.prototype.getDB = function(key, secret){
+ this.db = aws.dynamo;
+ aws.connect({'key': key, 'secret': secret});
+ log.debug('Dynamo client created.');
+
if(process.env.MAMBO_BACKEND === "magneto"){
log.debug('Using magneto');
- // Connect to Magneto
- this.client = dynamo.createClient();
- this.client.useSession = false;
- this.db = this.client.get(this.region || "us-east-1");
- this.db.host = "localhost";
this.db.port = process.env.MAGNETO_PORT || 8081;
+ this.db.host = "localhost:" + this.port;
log.debug('Connected to magneto on ' +this.db.host+ ':' + this.db.port);
}
- else {
- // Connect to DynamoDB
- this.client = dynamo.createClient({
- 'accessKeyId': key,
- 'secretAccessKey': secret
- });
- this.db = this.client.get(this.region || "us-east-1");
- log.debug('Dynamo client created.');
- }
return this.db;
};
@@ -117,8 +108,10 @@ Model.prototype.connect = function(key, secret, prefix, region){
log.debug('Reading schemas...');
this.schemas.forEach(function(schema){
var tableName = (this.prefix || "") + schema.tableName,
- table = this.db.get(tableName);
+ table = {};
+ // @todo (lucas) Now that this is on plata, dont think we need
+ // most/all of this craziness.
log.silly('Table name: ' + tableName);
_.extend(table, {
@@ -162,31 +155,20 @@ Model.prototype.createAll = function(){
// Checks if all tables exist in magneto. If a table doesn't exist
// it will be created.
Model.prototype.ensureTableMagneto = function(alias){
- var d = when.defer();
- sequence(this).then(function(next){
- this.db.listTables({}, next);
- }).then(function(next, err, data){
- if(!d.rejectIfError(err)){
- next(data);
- }
- }).then(function(next, data){
+ return this.db.listTables().then(function(next, data){
if(data.TableNames.indexOf(this.table(alias).name) !== -1){
- return d.resolve(false);
+ return false;
}
- this.db.add({
- 'name': this.table(alias).name,
- 'schema': this.schema(alias).schema,
- 'throughput': {
- 'read': 10,
- 'write': 10
- }
- }).save(function(err, table){
- if(!d.rejectIfError(err)){
- d.resolve(true);
+ var req = {
+ 'TableName': this.table(alias).name,
+ 'KeySchema': this.table(alias).key,
+ 'ProvisionedThroughput':{
+ 'ReadCapacityUnits':5,
+ 'WriteCapacityUnits':10
}
- });
+ };
+ return this.db.createTable(req);
});
- return d.promise;
};
// Low level get item wrapper.
@@ -199,8 +181,7 @@ Model.prototype.ensureTableMagneto = function(alias){
// - object. If empty, get all attributes.
// - consistentRead: boolean
Model.prototype.get = function(alias, hash, range, attributesToGet, consistentRead){
- var d = when.defer(),
- table = this.table(alias),
+ var table = this.table(alias),
schema = this.schema(alias),
request;
log.debug('Get `'+alias+'` with hash `'+hash+'` and range `'+range+'`');
@@ -229,20 +210,14 @@ Model.prototype.get = function(alias, hash, range, attributesToGet, consistentRe
}
log.silly('Built GET_ITEM request: ' + util.inspect(request, false, 5));
-
- // Make the request
- this.db.getItem(request, function(err, data){
- log.silly('GET_ITEM returned: err: ' + err + ', data: ' + util.inspect(data, false, 5));
- if(!d.rejectIfError(err)){
- return d.resolve((data.Item !== undefined) ?
- this.schema(alias).import(data.Item) : null);
- }
- else{
- log.error('GET_ITEM: ' + err.message + '\n' + err.stack);
- }
- }.bind(this));
-
- return d.promise;
+ return this.db.getItem(request).then(function(data){
+ log.silly('GET_ITEM returned: data: ' + util.inspect(data, false, 5));
+ return (data.Item !== undefined) ?
+ this.schema(alias).import(data.Item) : null;
+ }.bind(this), function(err){
+ log.error('GET_ITEM: ' + err.message + '\n' + err.stack);
+ return err;
+ });
};
// Lowlevel delete item wrapper
@@ -261,8 +236,7 @@ Model.prototype.delete = function(alias, hash, opts){
log.debug('Delete `'+alias+'` with hash `'+hash+'` and range `'+opts.range+'`');
- var d = when.defer(),
- table = this.table(alias),
+ var table = this.table(alias),
request = {
'TableName': table.name,
'Key': {
@@ -299,16 +273,13 @@ Model.prototype.delete = function(alias, hash, opts){
log.silly('Built DELETE_ITEM request: ' + util.inspect(request, false, 5));
// Make the request
- this.db.deleteItem(request, function(err, data){
- log.silly('DELETE_ITEM returned: err: ' + err + ', data: ' + util.inspect(data, false, 5));
- if(!d.rejectIfError(err)){
- return d.resolve(data);
- }
- else{
- log.error('DELETE_ITEM: ' + err.message + '\n' + err.stack);
- }
- }.bind(this));
- return d.promise;
+ return this.db.deleteItem(request).then(function(data){
+ log.silly('DELETE_ITEM returned: ' + util.inspect(data, false, 5));
+ return data;
+ }.bind(this), function(err){
+ log.error('DELETE_ITEM: ' + err.message + '\n' + err.stack);
+ return err;
+ });
};
var sortObjects = function(objects, values, property){
@@ -362,8 +333,7 @@ var sortObjects = function(objects, values, property){
// ]
Model.prototype.batchGet = function(req){
log.debug('Batch get ' + util.inspect(req, false, 5));
- var d = when.defer(),
- request = {
+ var request = {
'RequestItems': {}
},
results = {},
@@ -398,31 +368,29 @@ Model.prototype.batchGet = function(req){
log.silly('Built DELETE_ITEM request: ' + util.inspect(request, false, 5));
// Make the request
- this.db.batchGetItem(request, function(err, data){
- log.silly('BATCH_GET returned: err: ' + err + ', data: ' + util.inspect(data, false, 5));
- if(!d.rejectIfError(err)){
- // translate the response from dynamo format to exfm format
- req.forEach(function(tableData){
- var table = this.table(tableData.alias),
- schema = this.schema(tableData.alias),
- items = data.Responses[table.name].Items;
-
- results[tableData.alias] = items.map(function(dynamoObj){
- return schema.import(dynamoObj);
- }.bind(this));
-
- // Sort the results
- results[tableData.alias] = sortObjects(results[tableData.alias],
- tableData.hashes, table.hashName);
+ return this.db.batchGetItem(request).then(function(data){
+ log.silly('BATCH_GET returned: ' + util.inspect(data, false, 5));
+
+ // translate the response from dynamo format to exfm format
+ req.forEach(function(tableData){
+ var table = this.table(tableData.alias),
+ schema = this.schema(tableData.alias),
+ items = data.Responses[table.name].Items;
+ results[tableData.alias] = items.map(function(dynamoObj){
+ return schema.import(dynamoObj);
}.bind(this));
- d.resolve(results);
- }
- else{
- log.error('BATCH_GET: ' + err.message + '\n' + err.stack);
- }
- }.bind(this));
- return d.promise;
+
+ // Sort the results
+ results[tableData.alias] = sortObjects(results[tableData.alias],
+ tableData.hashes, table.hashName);
+
+ }.bind(this));
+ return results;
+ }.bind(this), function(err){
+ log.error('BATCH_GET: ' + err.message + '\n' + err.stack);
+ return err;
+ });
};
@@ -447,8 +415,7 @@ Model.prototype.batchGet = function(req){
// );
Model.prototype.batchWrite = function(puts, deletes){
log.debug('Batch write: puts`'+util.inspect(puts, false, 10)+'`, deletes`'+util.inspect(deletes, false, 10)+'` ');
- var d = when.defer(),
- self = this,
+ var self = this,
req = {
'RequestItems': {}
},
@@ -497,24 +464,17 @@ Model.prototype.batchWrite = function(puts, deletes){
}
log.silly('Built BATCH_WRITE request: ' + util.inspect(req, false, 10));
-
- this.db.batchWriteItem(req, function(err, data){
- log.silly('BATCH_WRITE returned: err: ' + err + ', data: ' + util.inspect(data, false, 5));
- if(!d.rejectIfError(err)){
- var success = {};
-
- Object.keys(data.Responses).forEach(function(tableName){
- success[self.tableNameToAlias(tableName)] = data.Responses[tableName].ConsumedCapacityUnits;
- });
-
- d.resolve({'success': success,
- 'unprocessed': data.UnprocessedItems});
- }
- else{
- log.error('BATCH_WRITE: ' + err.message + '\n' + err.stack);
- }
+ return this.db.batchWriteItem(req, function(err, data){
+ log.silly('BATCH_WRITE returned: ' + util.inspect(data, false, 5));
+ var success = {};
+ Object.keys(data.Responses).forEach(function(tableName){
+ success[self.tableNameToAlias(tableName)] = data.Responses[tableName].ConsumedCapacityUnits;
+ });
+ return {'success': success,'unprocessed': data.UnprocessedItems};
+ }, function(err){
+ log.error('BATCH_WRITE: ' + err.message + '\n' + err.stack);
+ return err;
});
- return d.promise;
};
// http://docs.amazonwebservices.com/amazondynamodb/latest/developerguide/API_PutItem.html
@@ -547,8 +507,7 @@ Model.prototype.batchWrite = function(puts, deletes){
// returnValues: See AWS docs for an explanation.
Model.prototype.put = function(alias, obj, expected, returnOldValues){
log.debug('Put `'+alias+'` '+ util.inspect(obj, false, 10));
- var d = when.defer(),
- table = this.table(alias),
+ var table = this.table(alias),
request,
schema = this.schema(alias),
clean = schema.export(obj);
@@ -577,16 +536,13 @@ Model.prototype.put = function(alias, obj, expected, returnOldValues){
log.silly('Built PUT request: ' + util.inspect(request, false, 10));
// Make the request
- this.db.putItem(request, function(err, data){
- log.silly('PUT returned: err: ' + err + ', data: ' + util.inspect(data, false, 5));
- if(!d.rejectIfError(err)){
- return d.resolve(obj);
- }
- else{
- log.error('PUT: ' + err.message + '\n' + err.stack);
- }
+ return this.db.putItem(request).then(function(data){
+ log.silly('PUT returned: ' + util.inspect(data, false, 5));
+ return obj;
+ }, function(err){
+ log.error('PUT: ' + err.message + '\n' + err.stack);
+ return err;
});
- return d.promise;
};
// usage:
@@ -609,8 +565,7 @@ Model.prototype.updateItem = function(alias, hash, attrs, opts){
log.debug('Update `'+alias+'` with hash `'+hash+'` and range `'+opts.range+'`');
log.debug(util.inspect(attrs, false, 5));
- var d = when.defer(),
- response = [],
+ var response = [],
table = this.table(alias),
schema = this.schema(alias),
request = {
@@ -672,19 +627,16 @@ Model.prototype.updateItem = function(alias, hash, attrs, opts){
log.silly('Built UPDATE_ITEM request: ' + util.inspect(request, false, 10));
// Make the request
- this.db.updateItem(request, function(err, data){
- log.silly('UPDATE_ITEM returned: err: ' + err + ', data: ' + util.inspect(data, false, 5));
- if(!d.rejectIfError(err)){
- if (opts.returnValues !== undefined) {
- return d.resolve(schema.import(data.Attributes));
- }
- return d.resolve(data);
- }
- else{
- log.error('UPDATE_ITEM: ' + err.message + '\n' + err.stack);
+ return this.db.updateItem(request, function(err, data){
+ log.silly('UPDATE_ITEM returned: ' + util.inspect(data, false, 5));
+ if (opts.returnValues !== undefined) {
+ return schema.import(data.Attributes);
}
- }.bind(this));
- return d.promise;
+ return data;
+ }, function(err){
+ log.error('UPDATE_ITEM: ' + err.message + '\n' + err.stack);
+ return err;
+ });
};
// usage:
@@ -711,8 +663,7 @@ Model.prototype.query = function(alias, hash, opts){
log.debug('Query `'+alias+'` with hash `'+hash+'` and range `'+opts.range+'`');
log.silly('Query options: ' + util.inspect(opts, false, 5));
- var d = when.defer(),
- response = [],
+ var response = [],
table = this.table(alias),
schema = this.schema(alias),
request = {
@@ -780,25 +731,21 @@ Model.prototype.query = function(alias, hash, opts){
log.silly('Built QUERY request: ' + util.inspect(request, false, 10));
// Make the request
- this.db.query(request, function(err, data){
- log.silly('QUERY returned: err: ' + err + ', data: ' + util.inspect(data, false, 5));
- if(!d.rejectIfError(err)){
- var items = data.Items.map(function(item){
- return schema.import(item);
- }.bind(this));
- return d.resolve(items);
- }
- else{
- log.error('QUERY: ' + err.message + '\n' + err.stack);
- }
- }.bind(this));
- return d.promise;
+ return this.db.query(request, function(err, data){
+ log.silly('QUERY returned: ' + util.inspect(data, false, 5));
+ return data.Items.map(function(item){
+ return schema.import(item);
+ });
+ }, function(err){
+ log.error('QUERY: ' + err.message + '\n' + err.stack);
+ return err;
+ });
};
// # DANGER: THIS WILL DROP YOUR TABLES AND SHOULD ONLY BE USED IN TESTING.
Model.prototype.recreateTable = function(alias) {
- var d = when.defer(),
+ var self = this,
table = this.table(alias),
tableRequest = {
'TableName': table.name
@@ -808,55 +755,34 @@ Model.prototype.recreateTable = function(alias) {
// if (process.env.NODE_ENV !== 'testing') {
// throw new Error('Can only recreate a table in testing environment');
// }
- sequence(this).then(function(next){
- this.db.describeTable(tableRequest, function(err, data){
- if (!err) {
- next(data);
- }
- else {
- throw new Error(err);
- }
- });
- }).then(function(next, data){
- tableDescription = data;
- this.db.deleteTable(tableRequest, function(err, data){
- if (!err) {
- next(data);
- }
- else {
- throw new Error(err);
- }
- });
- }).then(function(next, data){
- tableRequest.KeySchema = tableDescription.Table.KeySchema;
- tableRequest.ProvisionedThroughput = tableDescription.Table.ProvisionedThroughput;
- this.isTableDeleted(table.name).then(next);
-
- }).then(function(next){
- this.db.createTable(tableRequest, function(err, data){
- if (!err) {
- return next(data);
- }
- else {
- throw new Error(err);
- }
+ return this.db.describeTable(tableRequest)
+ .then(function(next, data){
+ tableDescription = data;
+ return self.db.deleteTable(tableRequest);
+ })
+ .then(function(){
+ tableRequest.KeySchema = tableDescription.Table.KeySchema;
+ tableRequest.ProvisionedThroughput = tableDescription.Table.ProvisionedThroughput;
+ return self.isTableDeleted(table.name);
+ })
+ .then(function(){
+ return self.db.createTable(tableRequest);
+ })
+ .then(function(){
+ return self.isTableActive(table.name);
+ })
+ .then(function(){
+ return true;
});
- }).then(function(next, data){
- this.isTableActive(table.name).then(function(){
- d.resolve(true);
- });
- });
- return d.promise;
};
Model.prototype.isTableDeleted = function(tableName){
var d = when.defer(),
self = this;
- this.db.describeTable({
- 'TableName': tableName
- }, function(err, data){
- if (data === undefined) {
- return d.resolve(true);
+
+ this.db.describeTable({'TableName': tableName}).then(function(data){
+ if (!data) {
+ d.resolve(true);
}
else {
setTimeout(function(){
@@ -872,9 +798,8 @@ Model.prototype.isTableDeleted = function(tableName){
Model.prototype.isTableActive = function(tableName){
var d = when.defer(),
self = this;
- this.db.describeTable({
- 'TableName': tableName
- }, function(err, data){
+
+ this.db.describeTable({'TableName': tableName}).then(function(data){
if (data.Table.TableStatus === 'ACTIVE') {
return d.resolve(true);
}

0 comments on commit 927837d

Please sign in to comment.