Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store errors in new analysis catalog table #109

Merged
merged 3 commits into from
Jul 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/analysis.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ function AnalysisFactory(DatabaseServiceClass) {

AnalysisFactory.prototype.create = function(configuration, definition, callback) {
debug('Using configuration=%j', configuration);
var databaseService = new this.DatabaseServiceClass(configuration.db, configuration.batch);
var databaseService = new this.DatabaseServiceClass(configuration.user, configuration.db, configuration.batch);

async.waterfall(
[
Expand Down
9 changes: 7 additions & 2 deletions lib/postgresql/batch-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ function runQueries(queryRunner, queries, callback) {
return callback(new Error('BatchClient inline execution requires a valid QueryRunner instance'));
}

var jobId = '646f6e65-772f-3c33-6279-726f63686f61';

async.eachSeries(
queries,
function(query, done) {
Expand All @@ -67,14 +69,17 @@ function runQueries(queryRunner, queries, callback) {
queryRunner.run(query.query, writeQuery, function (err) {
if (err) {
if (query.onerror) {
return queryRunner.run(query.onerror, writeQuery, done);
var onerrorQuery = query.onerror.replace(/<%=\s*job_id\s*%>/g, jobId);
onerrorQuery = onerrorQuery.replace(/<%=\s*error_message\s*%>/g, err.message);
return queryRunner.run(onerrorQuery, writeQuery, done);
}

return done(err);
}

if (query.onsuccess) {
return queryRunner.run(query.onsuccess, writeQuery, done);
var onsuccessQuery = query.onsuccess.replace(/<%=\s*job_id\s*%>/g, jobId);
return queryRunner.run(onsuccessQuery, writeQuery, done);
}

done();
Expand Down
52 changes: 38 additions & 14 deletions lib/service/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ var createQueryTemplate = dot.template([
'LIMIT 0'
].join('\n'));

function DatabaseService(dbParams, batchParams) {
function DatabaseService(user, dbParams, batchParams) {
this.user = user;
this.queryRunner = new QueryRunner(dbParams);
this.queryParser = new QueryParser(this.queryRunner);

Expand Down Expand Up @@ -126,6 +127,7 @@ DatabaseService.prototype.enqueue = function(queries, callback) {

var registerNodeQueryTemplate = dot.template([
'SELECT',
' \'{{=it._user}}\',',
' \'{{=it._nodeId}}\',',
' $json_def${{=it._analysisDef}}$json_def$::json,',
' {{=it._inputNodes}},',
Expand Down Expand Up @@ -153,11 +155,13 @@ DatabaseService.prototype.registerAnalysisInCatalog = function(analysis, callbac
if (!Array.isArray(nodes) || nodes.length === 0 || analysis.getRoot().type === Source.TYPE) {
return callback(null);
}
var user = this.user;
var nodesSql = [];
nodes.forEach(function(node) {
debug(node.type, node.getUpdatedAt());
var updatedAt = node.getUpdatedAt();
nodesSql.push(registerNodeQueryTemplate({
_user: user,
_nodeId: node.id(),
_inputNodes: pgArray(
node.getInputNodes().map(function(node) { return node.id(); }).map(pgQuoteCastMapper()), 'char[40]'
Expand All @@ -168,7 +172,7 @@ DatabaseService.prototype.registerAnalysisInCatalog = function(analysis, callbac
}));
});
var sql = 'INSERT INTO cdb_analysis_catalog' +
'(node_id, analysis_def, input_nodes, updated_at, status)\n' + nodesSql.join('\nUNION ALL\n') +
'(username, node_id, analysis_def, input_nodes, updated_at, status)\n' + nodesSql.join('\nUNION ALL\n') +
'ON CONFLICT (node_id) DO NOTHING';

debug('Registering node with SQL: %s', sql);
Expand All @@ -190,15 +194,35 @@ function populateCacheTableQuery(targetTableName, outputQuery) {
return 'INSERT INTO ' + targetTableName + ' ' + outputQuery;
}

function updateNodeAtAnalysisCatalogQuery(nodeId, status) {
function updateNodeAtAnalysisCatalogQuery(nodeId, columns) {
return [
'UPDATE cdb_analysis_catalog SET',
'status = \'' + status + '\',',
'updated_at = NOW()',
'WHERE node_id = \'' + nodeId + '\''
columns.join(','),
'WHERE node_id = \'' + nodeId + '\''
].join('\n');
}

function updateNodeStatusAtAnalysisCatalogQuery(nodeId, status) {
return updateNodeAtAnalysisCatalogQuery(nodeId, [
'status = \'' + status + '\'',
'updated_at = NOW()'
]);
}

function updateNodeAtAnalysisCatalogForJobResultQuery(nodeId, status, isError) {
var columns = [
'status = \'' + status + '\'',
'last_modified_by = \'<%= job_id %>\'',
'updated_at = NOW()'
];

if (isError) {
columns.push('last_error_message = $last_error_message$<%= error_message %>$last_error_message$');
}

return updateNodeAtAnalysisCatalogQuery(nodeId, columns);
}

DatabaseService.prototype.queueAnalysisOperations = function(analysis, callback) {
if (analysis.getRoot().type === Source.TYPE) {
return callback(null);
Expand Down Expand Up @@ -269,33 +293,33 @@ DatabaseService.prototype.queueAnalysisOperations = function(analysis, callback)

var asyncQueries = [];

// we want to mark all nodes updated_at to NOW() and status to pending before doing any work on them
// we want to mark all nodes updated_at to NOW() before doing any work on them
nodesToQueueForUpdate.forEach(function(nodeToUpdate) {
nodeToUpdate.setStatusFromInputNodes();
asyncQueries.push({
query: updateNodeAtAnalysisCatalogQuery(nodeToUpdate.id(), nodeToUpdate.getStatus()),
onerror: updateNodeAtAnalysisCatalogQuery(nodeToUpdate.id(), Node.STATUS.FAILED)
query: updateNodeStatusAtAnalysisCatalogQuery(nodeToUpdate.id(), nodeToUpdate.getStatus()),
onerror: updateNodeStatusAtAnalysisCatalogQuery(nodeToUpdate.id(), Node.STATUS.FAILED)
});
});

nodesToQueueForUpdate.forEach(function(nodeToUpdate) {
if (nodeToUpdate.shouldCacheQuery()) {
var targetTableName = nodeToUpdate.getTargetTable();
asyncQueries.push({
query: updateNodeAtAnalysisCatalogQuery(nodeToUpdate.id(), Node.STATUS.RUNNING)
query: updateNodeStatusAtAnalysisCatalogQuery(nodeToUpdate.id(), Node.STATUS.RUNNING)
});
asyncQueries.push({
query: transactionQuery([
deleteFromCacheTableQuery(targetTableName),
populateCacheTableQuery(targetTableName, nodeToUpdate.sql())
]),
onsuccess: updateNodeAtAnalysisCatalogQuery(nodeToUpdate.id(), Node.STATUS.READY),
onerror: updateNodeAtAnalysisCatalogQuery(nodeToUpdate.id(), Node.STATUS.FAILED)
onsuccess: updateNodeAtAnalysisCatalogForJobResultQuery(nodeToUpdate.id(), Node.STATUS.READY),
onerror: updateNodeAtAnalysisCatalogForJobResultQuery(nodeToUpdate.id(), Node.STATUS.FAILED, true)
});
} else {
asyncQueries.push({
query: updateNodeAtAnalysisCatalogQuery(nodeToUpdate.id(), Node.STATUS.READY),
onerror: updateNodeAtAnalysisCatalogQuery(nodeToUpdate.id(), Node.STATUS.FAILED)
query: updateNodeStatusAtAnalysisCatalogQuery(nodeToUpdate.id(), Node.STATUS.READY),
onerror: updateNodeStatusAtAnalysisCatalogQuery(nodeToUpdate.id(), Node.STATUS.FAILED)
});
}
});
Expand Down
16 changes: 15 additions & 1 deletion test/fixtures/cdb_analysis_catalog.sql
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
drop table if exists cdb_analysis_catalog;
create table cdb_analysis_catalog (
-- useful for multi account deployments
username text,
-- md5 hex hash
node_id char(40) CONSTRAINT cdb_analysis_catalog_pkey PRIMARY KEY,
-- being json allows to do queries like analysis_def->>'type' = 'buffer'
analysis_def json NOT NULL,
-- can reference other nodes in this very same table, allowing recursive queries
input_nodes char(40) ARRAY NOT NULL DEFAULT '{}',
status TEXT NOT NULL DEFAULT 'pending',
CONSTRAINT valid_status CHECK (
status IN ( 'pending', 'waiting', 'running', 'canceled', 'failed', 'ready' )
),
created_at timestamp with time zone NOT NULL DEFAULT now(),
-- should be updated when some operation was performed in the node
-- and anything associated to it might have changed
updated_at timestamp with time zone DEFAULT NULL,
-- should register last time the node was used
used_at timestamp with time zone NOT NULL DEFAULT now(),
-- should register the number of times the node was used
hits NUMERIC DEFAULT 0,
last_used_from char(40)
-- should register what was the last node using current node
last_used_from char(40),
-- last job modifying the node
last_modified_by uuid,
-- store error message for failures
last_error_message text
);

--GRANT SELECT, UPDATE, INSERT, DELETE ON TABLE cartodb.cdb_analysis_catalog TO "development_cartodb_user_359a4d9f-a063-4130-9674-799e90960886";
Expand Down