Skip to content

Commit

Permalink
Stabilizing Celery / SQL Lab
Browse files Browse the repository at this point in the history
* upgrade celery to 4.0.2
* using Redis for unit tests (sqla broker not supported in Celery 4)
* Setting Celery's soft_time_limit based on `SQLLAB_ASYNC_TIME_LIMIT_SEC` config
* Better error handling in async tasks
* Better statsd logging in async tasks
* show [pending/running] query status in Results tab
* systematically using sqla NullPool on worker (async) to limit number
  of database connections
  • Loading branch information
mistercrunch committed Jun 20, 2017
1 parent b9915e7 commit 5f2796b
Show file tree
Hide file tree
Showing 16 changed files with 167 additions and 52 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
@@ -1,5 +1,6 @@

language: python
services:
- redis-server
addons:
code_climate:
repo_token: 5f3a06c425eef7be4b43627d7d07a3e46c45bdc07155217825ff7c49cb6a470c
Expand Down
1 change: 1 addition & 0 deletions dev-reqs.txt
Expand Up @@ -9,6 +9,7 @@ psycopg2
pylint
pythrifthiveapi
pyyaml
redis
statsd
# Also install everything we need to build Sphinx docs
-r dev-reqs-for-docs.txt
3 changes: 2 additions & 1 deletion setup.py
Expand Up @@ -43,7 +43,7 @@ def get_git_sha():
scripts=['superset/bin/superset'],
install_requires=[
'boto3==1.4.4',
'celery==3.1.25',
'celery==4.0.2',
'colorama==0.3.9',
'cryptography==1.7.2',
'flask-appbuilder==1.9.0',
Expand Down Expand Up @@ -80,6 +80,7 @@ def get_git_sha():
'coverage',
'mock',
'nose',
'redis',
],
author='Maxime Beauchemin',
author_email='maximebeauchemin@gmail.com',
Expand Down
2 changes: 1 addition & 1 deletion superset/assets/javascripts/SqlLab/actions.js
Expand Up @@ -155,7 +155,7 @@ export function runQuery(query) {
} else if (msg === null) {
msg = `[${textStatus}] ${errorThrown}`;
}
if (msg.indexOf('The CSRF token is missing') > 0) {
if (msg.indexOf('CSRF token') > 0) {
msg = 'Your session timed out, please refresh your page and try again.';
}
dispatch(queryFailed(query, msg));
Expand Down
19 changes: 19 additions & 0 deletions superset/assets/javascripts/SqlLab/components/QueryStateLabel.jsx
@@ -0,0 +1,19 @@
import React from 'react';
import PropTypes from 'prop-types';
import { Label } from 'react-bootstrap';

import { STATE_BSSTYLE_MAP } from '../constants';

const propTypes = {
query: PropTypes.object.isRequired,
};

export default function QueryStateLabel({ query }) {
const bsStyle = STATE_BSSTYLE_MAP[query.state];
return (
<Label className="m-r-3" bsStyle={bsStyle}>
{query.state}
</Label>
);
}
QueryStateLabel.propTypes = propTypes;
6 changes: 2 additions & 4 deletions superset/assets/javascripts/SqlLab/components/QueryTable.jsx
Expand Up @@ -9,9 +9,9 @@ import VisualizeModal from './VisualizeModal';
import ResultSet from './ResultSet';
import ModalTrigger from '../../components/ModalTrigger';
import HighlightedSql from './HighlightedSql';
import { STATE_BSSTYLE_MAP } from '../constants';
import { fDuration } from '../../modules/dates';
import { storeQuery } from '../../../utils/common';
import QueryStateLabel from './QueryStateLabel';

const propTypes = {
columns: PropTypes.array,
Expand Down Expand Up @@ -164,9 +164,7 @@ class QueryTable extends React.PureComponent {
}
q.state = (
<div>
<span className={'m-r-3 label label-' + STATE_BSSTYLE_MAP[q.state]}>
{q.state}
</span>
<QueryStateLabel query={query} />
{errorTooltip}
</div>
);
Expand Down
2 changes: 2 additions & 0 deletions superset/assets/javascripts/SqlLab/components/ResultSet.jsx
Expand Up @@ -6,6 +6,7 @@ import shortid from 'shortid';
import VisualizeModal from './VisualizeModal';
import HighlightedSql from './HighlightedSql';
import FilterableTable from '../../components/FilterableTable/FilterableTable';
import QueryStateLabel from './QueryStateLabel';

const propTypes = {
actions: PropTypes.object,
Expand Down Expand Up @@ -165,6 +166,7 @@ export default class ResultSet extends React.PureComponent {
return (
<div>
<img className="loading" alt="Loading..." src="/static/assets/images/loading.gif" />
<QueryStateLabel query={query} />
{progressBar}
</div>
);
Expand Down
3 changes: 1 addition & 2 deletions superset/assets/javascripts/SqlLab/reducers.js
Expand Up @@ -237,8 +237,7 @@ export const sqlLabReducer = function (state, action) {
for (const id in action.alteredQueries) {
const changedQuery = action.alteredQueries[id];
if (!state.queries.hasOwnProperty(id) ||
(state.queries[id].changedOn !== changedQuery.changedOn &&
state.queries[id].state !== 'stopped')) {
state.queries[id].state !== 'stopped') {
if (changedQuery.changedOn > queriesLastUpdate) {
queriesLastUpdate = changedQuery.changedOn;
}
Expand Down
1 change: 1 addition & 0 deletions superset/assets/package.json
Expand Up @@ -63,6 +63,7 @@
"prop-types": "^15.5.8",
"react": "^15.5.1",
"react-ace": "^5.0.1",
"react-addons-css-transition-group": "^15.6.0",
"react-addons-shallow-compare": "^15.4.2",
"react-alert": "^2.0.1",
"react-bootstrap": "^0.31.0",
Expand Down
29 changes: 29 additions & 0 deletions superset/assets/spec/javascripts/sqllab/QueryStateLabel_spec.jsx
@@ -0,0 +1,29 @@
import React from 'react';
import { Label } from 'react-bootstrap';
import { shallow } from 'enzyme';
import { describe, it } from 'mocha';
import { expect } from 'chai';

import QueryStateLabel from '../../../javascripts/SqlLab/components/QueryStateLabel';

describe('SavedQuery', () => {
const mockedProps = {
query: {
state: 'running',
},
};
it('is valid', () => {
expect(
React.isValidElement(<QueryStateLabel />),
).to.equal(true);
});
it('is valid with props', () => {
expect(
React.isValidElement(<QueryStateLabel {...mockedProps} />),
).to.equal(true);
});
it('has an Overlay and a Popover', () => {
const wrapper = shallow(<QueryStateLabel {...mockedProps} />);
expect(wrapper.find(Label)).to.have.length(1);
});
});
4 changes: 4 additions & 0 deletions superset/config.py
Expand Up @@ -260,6 +260,10 @@ class CeleryConfig(object):
# SQLLAB_DEFAULT_DBID
SQLLAB_DEFAULT_DBID = None

# The MAX duration (in seconds) a query can run for before being killed
# by celery.
SQLLAB_ASYNC_TIME_LIMIT_SEC = 10

# An instantiated derivative of werkzeug.contrib.cache.BaseCache
# if enabled, it can be used to store the results of long-running queries
# in SQL Lab by using the "Run Async" button/feature
Expand Down
5 changes: 4 additions & 1 deletion superset/models/core.py
Expand Up @@ -30,6 +30,7 @@
)
from sqlalchemy.orm import relationship
from sqlalchemy.orm.session import make_transient
from sqlalchemy.pool import NullPool
from sqlalchemy.sql import text
from sqlalchemy.sql.expression import TextAsFrom
from sqlalchemy_utils import EncryptedType
Expand Down Expand Up @@ -560,10 +561,12 @@ def set_sqlalchemy_uri(self, uri):
conn.password = password_mask if conn.password else None
self.sqlalchemy_uri = str(conn) # hides the password

def get_sqla_engine(self, schema=None):
def get_sqla_engine(self, schema=None, nullpool=False):
extra = self.get_extra()
uri = make_url(self.sqlalchemy_uri_decrypted)
params = extra.get('engine_params', {})
if nullpool:
params['poolclass'] = NullPool
uri = self.db_engine_spec.adjust_database_uri(uri, schema)
return create_engine(uri, **params)

Expand Down
113 changes: 80 additions & 33 deletions superset/sql_lab.py
@@ -1,4 +1,3 @@
import celery
from time import sleep
from datetime import datetime
import json
Expand All @@ -7,6 +6,7 @@
import sqlalchemy
import uuid

from celery.exceptions import SoftTimeLimitExceeded
from sqlalchemy.pool import NullPool
from sqlalchemy.orm import sessionmaker

Expand All @@ -20,6 +20,12 @@

config = app.config
celery_app = get_celery_app(config)
stats_logger = app.config.get('STATS_LOGGER')
SQLLAB_TIMEOUT = config.get('SQLLAB_ASYNC_TIME_LIMIT_SEC', 600)


class SqlLabException(Exception):
pass


def dedup(l, suffix='__'):
Expand All @@ -43,28 +49,63 @@ def dedup(l, suffix='__'):
return new_l


@celery_app.task(bind=True)
def get_sql_results(self, query_id, return_results=True, store_results=False):
"""Executes the sql query returns the results."""
if not self.request.called_directly:
def get_query(query_id, session, retry_count=5):
"""attemps to get the query and retry if it cannot"""
query = None
attempt = 0
while not query and attempt < retry_count:
try:
query = session.query(Query).filter_by(id=query_id).one()
except Exception:
attempt += 1
logging.error(
"Query with id `{}` could not be retrieved".format(query_id))
stats_logger.incr('error_attempting_orm_query_' + str(attempt))
logging.error("Sleeping for a sec before retrying...")
sleep(1)
if not query:
stats_logger.incr('error_failed_at_getting_orm_query')
raise SqlLabException("Failed at getting query")
return query


def get_session(nullpool):
if nullpool:
engine = sqlalchemy.create_engine(
app.config.get('SQLALCHEMY_DATABASE_URI'), poolclass=NullPool)
session_class = sessionmaker()
session_class.configure(bind=engine)
session = session_class()
return session_class()
else:
session = db.session()
session.commit() # HACK
return session


@celery_app.task(bind=True, soft_time_limit=SQLLAB_TIMEOUT)
def get_sql_results(
ctask, query_id, return_results=True, store_results=False):
"""Executes the sql query returns the results."""
try:
query = session.query(Query).filter_by(id=query_id).one()
return execute_sql(
ctask, query_id, return_results, store_results)
except Exception as e:
logging.error(
"Query with id `{}` could not be retrieved".format(query_id))
logging.error("Sleeping for a sec and retrying...")
# Nasty hack to get around a race condition where the worker
# cannot find the query it's supposed to run
sleep(1)
query = session.query(Query).filter_by(id=query_id).one()
logging.exception(e)
stats_logger.incr('error_sqllab_unhandled')
sesh = get_session(not ctask.request.called_directly)
query = get_query(query_id, sesh)
query.error_message = str(e)
query.status = QueryStatus.FAILED
query.tmp_table_name = None
sesh.commit()


def execute_sql(ctask, query_id, return_results=True, store_results=False):
"""Executes the sql query returns the results."""
session = get_session(not ctask.request.called_directly)

query = get_query(query_id, session)
payload = dict(query_id=query_id)

database = query.database
db_engine_spec = database.db_engine_spec
Expand All @@ -76,22 +117,27 @@ def handle_error(msg):
query.status = QueryStatus.FAILED
query.tmp_table_name = None
session.commit()
raise Exception(query.error_message)
payload.update({
'status': query.status,
'error_essage': msg,
})
return payload

if store_results and not results_backend:
handle_error("Results backend isn't configured.")
return handle_error("Results backend isn't configured.")

# Limit enforced only for retrieving the data, not for the CTA queries.
superset_query = SupersetQuery(query.sql)
executed_sql = superset_query.stripped()
if not superset_query.is_select() and not database.allow_dml:
handle_error(
return handle_error(
"Only `SELECT` statements are allowed against this database")
if query.select_as_cta:
if not superset_query.is_select():
handle_error(
return handle_error(
"Only `SELECT` statements can be used with the CREATE TABLE "
"feature.")
return
if not query.tmp_table_name:
start_dttm = datetime.fromtimestamp(query.start_time)
query.tmp_table_name = 'tmp_{}_table_{}'.format(
Expand All @@ -112,7 +158,7 @@ def handle_error(msg):
except Exception as e:
logging.exception(e)
msg = "Template rendering failed: " + utils.error_msg_from_exception(e)
handle_error(msg)
return handle_error(msg)

query.executed_sql = executed_sql
query.status = QueryStatus.RUNNING
Expand All @@ -121,28 +167,31 @@ def handle_error(msg):
session.commit()
logging.info("Set query to 'running'")

engine = database.get_sqla_engine(
schema=query.schema, nullpool=not ctask.request.called_directly)
try:
engine = database.get_sqla_engine(schema=query.schema)
engine = database.get_sqla_engine(
schema=query.schema, nullpool=not ctask.request.called_directly)
conn = engine.raw_connection()
cursor = conn.cursor()
logging.info("Running query: \n{}".format(executed_sql))
logging.info(query.executed_sql)
cursor.execute(
query.executed_sql, **db_engine_spec.cursor_execute_kwargs)
except Exception as e:
logging.exception(e)
conn.close()
handle_error(db_engine_spec.extract_error_message(e))

try:
logging.info("Handling cursor")
db_engine_spec.handle_cursor(cursor, query, session)
logging.info("Fetching data: {}".format(query.to_dict()))
data = db_engine_spec.fetch_data(cursor, query.limit)
except SoftTimeLimitExceeded as e:
logging.exception(e)
conn.close()
return handle_error(
"SQL Lab timeout. This environment's policy is to kill queries "
"after {} seconds.".format(SQLLAB_TIMEOUT))
except Exception as e:
logging.exception(e)
conn.close()
handle_error(db_engine_spec.extract_error_message(e))
return handle_error(db_engine_spec.extract_error_message(e))

conn.commit()
conn.close()
Expand Down Expand Up @@ -175,19 +224,17 @@ def handle_error(msg):
session.merge(query)
session.flush()

payload = {
'query_id': query.id,
payload.update({
'status': query.status,
'data': cdf.data if cdf.data else [],
'columns': cdf.columns if cdf.columns else [],
'query': query.to_dict(),
}
payload = json.dumps(payload, default=utils.json_iso_dttm_ser)

})
if store_results:
key = '{}'.format(uuid.uuid4())
logging.info("Storing results in results backend, key: {}".format(key))
results_backend.set(key, utils.zlib_compress(payload))
json_payload = json.dumps(payload, default=utils.json_iso_dttm_ser)
results_backend.set(key, utils.zlib_compress(json_payload))
query.results_key = key
query.end_result_backend_time = utils.now_as_float()

Expand Down

0 comments on commit 5f2796b

Please sign in to comment.