Navigation Menu

Skip to content

Commit

Permalink
load: add Loader
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Apr 25, 2014
1 parent 9ecce5a commit fcc9597
Showing 1 changed file with 73 additions and 66 deletions.
139 changes: 73 additions & 66 deletions lib/adapter/api/groonga.js
Expand Up @@ -5,20 +5,83 @@ var statusCodes = {
SUCCESS: 0
};

function createResponse(statusCode, startTimeInMilliseconds, body) {
var elapsedTimeInMilliseconds = Date.now() - startTimeInMilliseconds;
function Loader(request, response, connection) {
this._request = request;
this._response = response;
this._connection = connection;
}

Loader.prototype.run = function run() {
this._jsonParser = new jsonParser();
this._statusCode = statusCodes.SUCCESS;
this._startTimeInMilliseconds = Date.now();
this._nRecords = 0;
this._nResponses = 0;
this._nAdded = 0;
this._isEnd = false;

this._jsonParser.onValue = this._onValue.bind(this);
this._request.on('data', function(chunk) {
this._jsonParser.write(chunk);
}.bind(this));
this._request.once('end', function() {
this._isEnd = true;

if (this._nRecords == 0) {
this._sendResponse([0]);
}
}.bind(this));
};

Loader.prototype._onValue = function(value) {
if (this._jsonParser.stack.length != 1) {
return;
}

this._nRecords++;

var message = {
table: this._request.query.table, // TODO: validate it
values: {}
};
Object.keys(value).forEach(function(key) {
if (key == '_key') {
message.key = value[key];
} else {
message.values[key] = value[key];
}
});

this._connection.emit('add', message, function(error, message) {
this._nResponses++;
if (error) {
// TODO: Output to error log file.
console.error("/d/load: failed to add a record", message);
} else {
var succeeded = message;
if (succeeded) {
this._nAdded++;
}
}
if (this._isEnd && this._nRecords == this._nResponses) {
this._sendResponse([this._nAdded]);
}
}.bind(this));
};

Loader.prototype._createResponse = function(body) {
var elapsedTimeInMilliseconds = Date.now() - this._startTimeInMilliseconds;
var header = [
statusCode,
startTimeInMilliseconds / 1000,
this._statusCode,
this._startTimeInMilliseconds / 1000,
elapsedTimeInMilliseconds / 1000,
];
return [header, body];
};

function sendResponse(response, statusCode, startTimeInMilliseconds, body) {
var groongaResponse =
createResponse(statusCode, startTimeInMilliseconds, body);
response.jsonp(groongaResponse);
Loader.prototype._sendResponse = function sendResponse(body) {
var groongaResponse = this._createResponse(body);
this._response.jsonp(groongaResponse);
};

module.exports = {
Expand All @@ -32,64 +95,8 @@ module.exports = {
method: 'POST',
path: '/d/load',
onRequest: function(request, connection, response) {
var parser = new jsonParser();
var nRecords = 0;
var nResponses = 0;
var nAdded = 0;
var isEnd = false;
var startTimeInMilliseconds = Date.now();

parser.onValue = function(value) {
if (parser.stack.length != 1) {
return;
}

nRecords++;

var message = {
table: request.query.table, // TODO: validate it
values: {}
};
Object.keys(value).forEach(function(key) {
if (key == '_key') {
message.key = value[key];
} else {
message.values[key] = value[key];
}
});

connection.emit('add', message, function(error, message) {
nResponses++;
if (error) {
// TODO: Output to error log file.
console.error("/d/load: failed to add a record", message);
} else {
var succeeded = message;
if (succeeded) {
nAdded++;
}
}
if (isEnd && nRecords == nResponses) {
sendResponse(response,
statusCodes.SUCCESS,
startTimeInMilliseconds,
[nAdded]);
}
});
};
request.on('data', function(chunk) {
parser.write(chunk);
});
request.once('end', function() {
isEnd = true;

if (nRecords == 0) {
sendResponse(response,
statusCodes.SUCCESS,
startTimeInMilliseconds,
[0]);
}
});
var loader = new Loader(request, response, connection);
loader.run();
}
})
};

0 comments on commit fcc9597

Please sign in to comment.