Skip to content

Commit

Permalink
spanner: various bug fixes + test coverage (#2313)
Browse files Browse the repository at this point in the history
  • Loading branch information
callmehiphop authored and stephenplusplus committed May 31, 2017
1 parent 0ef2737 commit 563ec3b
Show file tree
Hide file tree
Showing 17 changed files with 3,981 additions and 808 deletions.
86 changes: 86 additions & 0 deletions packages/spanner/src/codec.js
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,89 @@ function encode(value) {
}

codec.encode = encode;

/**
* Get the corresponding Spanner data type.
*
* @private
*
* @param {*} field - The field value.
* @return {string}
*
* @example
* Database.getType_(NaN);
* // 'float64'
*/
function getType(field) {
if (is.bool(field)) {
return 'bool';
}

var isSpecialNumber = is.infinite(field) ||
(is.number(field) && isNaN(field));

if (is.decimal(field) || isSpecialNumber || field instanceof Float) {
return 'float64';
}

if (is.number(field) || field instanceof Int) {
return 'int64';
}

if (is.string(field)) {
return 'string';
}

if (Buffer.isBuffer(field)) {
return 'bytes';
}

if (is.date(field)) {
return 'timestamp';
}

if (field instanceof SpannerDate) {
return 'date';
}

if (is.array(field)) {
var child;

for (var i = 0; i < field.length; i++) {
child = field[i];

if (!is.nil(child)) {
break;
}
}

return {
type: 'array',
child: getType(child)
};
}

return 'unspecified';
}

codec.getType = getType;

/**
* A list of available Spanner types. The index of said type in Array aligns
* with the type code that query params require.
*
* @private
*/
var TYPES = [
'unspecified',
'bool',
'int64',
'float64',
'timestamp',
'date',
'string',
'bytes',
'array'
];

codec.TYPES = TYPES;
108 changes: 102 additions & 6 deletions packages/spanner/src/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,24 @@ Database.prototype.getSchema = function(callback) {
* database.run(query, function(err, rows) {});
*
* //-
* // If you need to enforce a specific param type, a types map can be provided.
* // This is typically useful if your param value can be null.
* //-
* var query = {
* sql: 'SELECT * FROM Singers WHERE name = @name AND id = @id',
* params: {
* id: spanner.int(8),
* name: null
* },
* types: {
* id: 'int64',
* name: 'string'
* }
* };
*
* database.run(query, function(err, rows) {});
*
* //-
* // If the callback is omitted, we'll return a Promise.
* //-
* database.run(query).then(function(data) {
Expand Down Expand Up @@ -577,6 +595,25 @@ Database.prototype.run = function(query, options, callback) {
* .on('end', function() {});
*
* //-
* // If you need to enforce a specific param type, a types map can be provided.
* // This is typically useful if your param value can be null.
* //-
* var query = {
* sql: 'SELECT * FROM Singers WHERE name = @name',
* params: {
* name: 'Eddie Wilson'
* },
* types: {
* name: 'string'
* }
* };
*
* database.runStream(query)
* .on('error', function(err) {})
* .on('data', function(row) {})
* .on('end', function() {});
*
* //-
* // If you anticipate many results, you can end a stream early to prevent
* // unnecessary processing and API requests.
* //-
Expand All @@ -598,11 +635,19 @@ Database.prototype.runStream = function(query, options) {
session: this.formattedName_
});

var fields = {};
var prop;

if (reqOpts.params) {
var fields = {};
reqOpts.types = reqOpts.types || {};

for (var prop in reqOpts.params) {
for (prop in reqOpts.params) {
var field = reqOpts.params[prop];

if (!reqOpts.types[prop]) {
reqOpts.types[prop] = codec.getType(field);
}

fields[prop] = codec.encode(field);
}

Expand All @@ -611,9 +656,48 @@ Database.prototype.runStream = function(query, options) {
};
}

if (reqOpts.types) {
var types = {};

for (prop in reqOpts.types) {
var type = reqOpts.types[prop];
var childType;
var child;

// if a type is an ARRAY, then we'll accept an object specifying
// the type and the child type
if (is.object(type)) {
childType = type.child;
child = codec.TYPES.indexOf(childType);
type = type.type;
}

var code = codec.TYPES.indexOf(type);

if (code === -1) {
throw new Error('Unknown param type: ' + type);
}

types[prop] = { code: code };

if (child === -1) {
throw new Error('Unknown param type: ' + childType);
}

if (is.number(child)) {
types[prop].arrayElementType = { code: child };
}
}

reqOpts.paramTypes = types;
delete reqOpts.types;
}

if (options) {
reqOpts.transaction = {
begin: TransactionRequest.formatTimestampOptions_(options)
singleUse: {
readOnly: TransactionRequest.formatTimestampOptions_(options)
}
};
}

Expand Down Expand Up @@ -647,6 +731,10 @@ Database.prototype.runStream = function(query, options) {
* @resource [Timestamp Bounds](https://cloud.google.com/spanner/docs/timestamp-bounds)
*
* @param {object=} options - [Transaction options](https://cloud.google.com/spanner/docs/timestamp-bounds).
* @param {number} options.timeout - Specify a timeout for the transaction. The
* transaction will be ran in its entirety, however if an abort error is
* returned the transaction will be retried if the timeout has not been met.
* Default: `60000` (milliseconds)
* @param {boolean} options.readOnly - Specifies if the transaction is
* read-only.
* @param {number} options.exactStaleness - Executes all reads at the timestamp
Expand Down Expand Up @@ -721,15 +809,23 @@ Database.prototype.runTransaction = function(options, runFn) {
options = null;
}

options = extend({}, options);

this.getTransaction_(options, function(err, transaction) {
if (err) {
runFn(err);
return;
}

transaction.run_(function() {
runFn(null, transaction);
});
transaction.beginTime_ = Date.now();
transaction.runFn_ = runFn;

if (options && options.timeout) {
transaction.timeout_ = options.timeout;
delete options.timeout;
}

runFn(null, transaction);
});
};

Expand Down
17 changes: 12 additions & 5 deletions packages/spanner/src/partial-result-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,15 @@ function partialResultStream(requestFn) {
});

var rowChunks = [];
var metadata;

var userStream = streamEvents(through.obj(function(row, _, next) {
var formattedRows = [];

if (row.metadata) {
metadata = row.metadata;
}

if (row.chunkedValue) {
rowChunks.push(row);
next();
Expand All @@ -94,9 +99,11 @@ function partialResultStream(requestFn) {

if (rowChunks.length > 0) {
// Done getting all the chunks. Put them together.
formattedRows.push(new RowBuilder(rowChunks.concat(row)).toJSON());
var builder = new RowBuilder(metadata, rowChunks.concat(row));
formattedRows = formattedRows.concat(builder.toJSON());
rowChunks.length = 0;
} else {
var formattedRow = partialResultStream.formatRow_(row);
var formattedRow = partialResultStream.formatRow_(metadata, row);
var multipleRows = is.array(formattedRow[0]);

if (multipleRows) {
Expand Down Expand Up @@ -160,16 +167,16 @@ function partialResultStream(requestFn) {
*
* @param {object} row - A `PartialResultSet` object.
*/
partialResultStream.formatRow_ = function(row) {
var fields = row.metadata.rowType.fields;
partialResultStream.formatRow_ = function(metadata, row) {
var fields = metadata.rowType.fields;

if (row.values.length > fields.length) {
// More than one row exists. Return an array of formatted rows.
var valueSets = chunk(row.values, fields.length);

return valueSets.map(function(valueSet) {
row.values = valueSet;
return partialResultStream.formatRow_(row);
return partialResultStream.formatRow_(metadata, row);
});
}

Expand Down
39 changes: 31 additions & 8 deletions packages/spanner/src/row-builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ var is = require('is');
*
* @private
*/
function RowBuilder(chunks) {
function RowBuilder(metadata, chunks) {
this.metadata = metadata;
this.fields = this.metadata.rowType.fields;
this.chunks = chunks;
this.rows = [[]];

this.metadata = chunks[0].metadata;
this.fields = this.metadata.rowType.fields;
this.rows = [[]];

Object.defineProperty(this, 'currentRow', {
get: function() {
Expand All @@ -41,6 +41,23 @@ function RowBuilder(chunks) {
});
}

/**
* Extracts value from chunk.
*/
RowBuilder.getValue = function(obj) {
var value = obj;

if (obj && obj.kind) {
value = obj[obj.kind];
}

if (value && value.values) {
value = value.values;
}

return value;
};

/**
* Format a value into the expected structure, e.g. turn struct values into an
* object.
Expand All @@ -67,6 +84,10 @@ RowBuilder.formatValue = function(field, value) {
*/
RowBuilder.merge = function(type, head, tail) {
var code = type.code;

head = RowBuilder.getValue(head);
tail = RowBuilder.getValue(tail);

var isMergeable = !is.nil(head) && !is.nil(tail) && code !== 'FLOAT64';
var merged = [];
var mergedItems;
Expand Down Expand Up @@ -98,10 +119,10 @@ RowBuilder.merge = function(type, head, tail) {
*/
RowBuilder.prototype.append = function(value) {
if (this.currentRow.length === this.fields.length) {
this.rows.push([value]);
} else {
this.currentRow.push(value);
this.rows.push([]);
}

this.currentRow.push(value);
};

/**
Expand All @@ -123,7 +144,9 @@ RowBuilder.prototype.build = function() {
merged.forEach(self.append.bind(self));
}

chunk.values.forEach(self.append.bind(self));
chunk.values.map(RowBuilder.getValue)
.forEach(self.append.bind(self));

previousChunk = chunk;
});
};
Expand Down
Loading

0 comments on commit 563ec3b

Please sign in to comment.