diff --git a/.travis.yml b/.travis.yml index a0a1b03f313c..dcf700fd0c8c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,6 @@ - language: python +services: + - redis-server addons: code_climate: repo_token: 5f3a06c425eef7be4b43627d7d07a3e46c45bdc07155217825ff7c49cb6a470c diff --git a/dev-reqs.txt b/dev-reqs.txt index 370aaa11e8cd..c53fdefa1ea4 100644 --- a/dev-reqs.txt +++ b/dev-reqs.txt @@ -9,6 +9,7 @@ psycopg2 pylint pythrifthiveapi pyyaml +redis statsd # Also install everything we need to build Sphinx docs -r dev-reqs-for-docs.txt diff --git a/setup.py b/setup.py index 17ebe43ee81f..507eec22755c 100644 --- a/setup.py +++ b/setup.py @@ -43,7 +43,7 @@ def get_git_sha(): scripts=['superset/bin/superset'], install_requires=[ 'boto3==1.4.4', - 'celery==3.1.25', + 'celery==4.0.2', 'colorama==0.3.9', 'cryptography==1.7.2', 'flask-appbuilder==1.9.0', @@ -80,6 +80,7 @@ def get_git_sha(): 'coverage', 'mock', 'nose', + 'redis', ], author='Maxime Beauchemin', author_email='maximebeauchemin@gmail.com', diff --git a/superset/assets/javascripts/SqlLab/actions.js b/superset/assets/javascripts/SqlLab/actions.js index 1a52939b278d..4203a757420b 100644 --- a/superset/assets/javascripts/SqlLab/actions.js +++ b/superset/assets/javascripts/SqlLab/actions.js @@ -155,7 +155,7 @@ export function runQuery(query) { } else if (msg === null) { msg = `[${textStatus}] ${errorThrown}`; } - if (msg.indexOf('The CSRF token is missing') > 0) { + if (msg.indexOf('CSRF token') > 0) { msg = 'Your session timed out, please refresh your page and try again.'; } dispatch(queryFailed(query, msg)); diff --git a/superset/assets/javascripts/SqlLab/components/QueryStateLabel.jsx b/superset/assets/javascripts/SqlLab/components/QueryStateLabel.jsx new file mode 100644 index 000000000000..c5433dd525b9 --- /dev/null +++ b/superset/assets/javascripts/SqlLab/components/QueryStateLabel.jsx @@ -0,0 +1,19 @@ +import React from 'react'; +import PropTypes from 'prop-types'; +import { Label } from 'react-bootstrap'; + +import { STATE_BSSTYLE_MAP } from '../constants'; + +const propTypes = { + query: PropTypes.object.isRequired, +}; + +export default function QueryStateLabel({ query }) { + const bsStyle = STATE_BSSTYLE_MAP[query.state]; + return ( + + ); +} +QueryStateLabel.propTypes = propTypes; diff --git a/superset/assets/javascripts/SqlLab/components/QueryTable.jsx b/superset/assets/javascripts/SqlLab/components/QueryTable.jsx index f05116904833..7f5f22955922 100644 --- a/superset/assets/javascripts/SqlLab/components/QueryTable.jsx +++ b/superset/assets/javascripts/SqlLab/components/QueryTable.jsx @@ -9,9 +9,9 @@ import VisualizeModal from './VisualizeModal'; import ResultSet from './ResultSet'; import ModalTrigger from '../../components/ModalTrigger'; import HighlightedSql from './HighlightedSql'; -import { STATE_BSSTYLE_MAP } from '../constants'; import { fDuration } from '../../modules/dates'; import { storeQuery } from '../../../utils/common'; +import QueryStateLabel from './QueryStateLabel'; const propTypes = { columns: PropTypes.array, @@ -164,9 +164,7 @@ class QueryTable extends React.PureComponent { } q.state = (
- - {q.state} - + {errorTooltip}
); diff --git a/superset/assets/javascripts/SqlLab/components/ResultSet.jsx b/superset/assets/javascripts/SqlLab/components/ResultSet.jsx index 7d79a1a44542..c9814ec214ad 100644 --- a/superset/assets/javascripts/SqlLab/components/ResultSet.jsx +++ b/superset/assets/javascripts/SqlLab/components/ResultSet.jsx @@ -6,6 +6,7 @@ import shortid from 'shortid'; import VisualizeModal from './VisualizeModal'; import HighlightedSql from './HighlightedSql'; import FilterableTable from '../../components/FilterableTable/FilterableTable'; +import QueryStateLabel from './QueryStateLabel'; const propTypes = { actions: PropTypes.object, @@ -165,6 +166,7 @@ export default class ResultSet extends React.PureComponent { return (
Loading... + {progressBar}
); diff --git a/superset/assets/javascripts/SqlLab/reducers.js b/superset/assets/javascripts/SqlLab/reducers.js index 2f5461ab8b27..7bef4546c55e 100644 --- a/superset/assets/javascripts/SqlLab/reducers.js +++ b/superset/assets/javascripts/SqlLab/reducers.js @@ -237,8 +237,7 @@ export const sqlLabReducer = function (state, action) { for (const id in action.alteredQueries) { const changedQuery = action.alteredQueries[id]; if (!state.queries.hasOwnProperty(id) || - (state.queries[id].changedOn !== changedQuery.changedOn && - state.queries[id].state !== 'stopped')) { + state.queries[id].state !== 'stopped') { if (changedQuery.changedOn > queriesLastUpdate) { queriesLastUpdate = changedQuery.changedOn; } diff --git a/superset/assets/package.json b/superset/assets/package.json index 6368832abc7f..6ef38ead2152 100644 --- a/superset/assets/package.json +++ b/superset/assets/package.json @@ -63,6 +63,7 @@ "prop-types": "^15.5.8", "react": "^15.5.1", "react-ace": "^5.0.1", + "react-addons-css-transition-group": "^15.6.0", "react-addons-shallow-compare": "^15.4.2", "react-alert": "^2.0.1", "react-bootstrap": "^0.31.0", diff --git a/superset/assets/spec/javascripts/sqllab/QueryStateLabel_spec.jsx b/superset/assets/spec/javascripts/sqllab/QueryStateLabel_spec.jsx new file mode 100644 index 000000000000..fc15baa484f4 --- /dev/null +++ b/superset/assets/spec/javascripts/sqllab/QueryStateLabel_spec.jsx @@ -0,0 +1,29 @@ +import React from 'react'; +import { Label } from 'react-bootstrap'; +import { shallow } from 'enzyme'; +import { describe, it } from 'mocha'; +import { expect } from 'chai'; + +import QueryStateLabel from '../../../javascripts/SqlLab/components/QueryStateLabel'; + +describe('SavedQuery', () => { + const mockedProps = { + query: { + state: 'running', + }, + }; + it('is valid', () => { + expect( + React.isValidElement(), + ).to.equal(true); + }); + it('is valid with props', () => { + expect( + React.isValidElement(), + ).to.equal(true); + }); + it('has an Overlay and a Popover', () => { + const wrapper = shallow(); + expect(wrapper.find(Label)).to.have.length(1); + }); +}); diff --git a/superset/config.py b/superset/config.py index a350db23b258..46d8596426e8 100644 --- a/superset/config.py +++ b/superset/config.py @@ -260,6 +260,10 @@ class CeleryConfig(object): # SQLLAB_DEFAULT_DBID SQLLAB_DEFAULT_DBID = None +# The MAX duration (in seconds) a query can run for before being killed +# by celery. +SQLLAB_ASYNC_TIME_LIMIT_SEC = 10 + # An instantiated derivative of werkzeug.contrib.cache.BaseCache # if enabled, it can be used to store the results of long-running queries # in SQL Lab by using the "Run Async" button/feature diff --git a/superset/models/core.py b/superset/models/core.py index 49f95d7669b5..5527f11d4529 100644 --- a/superset/models/core.py +++ b/superset/models/core.py @@ -30,6 +30,7 @@ ) from sqlalchemy.orm import relationship from sqlalchemy.orm.session import make_transient +from sqlalchemy.pool import NullPool from sqlalchemy.sql import text from sqlalchemy.sql.expression import TextAsFrom from sqlalchemy_utils import EncryptedType @@ -560,10 +561,12 @@ def set_sqlalchemy_uri(self, uri): conn.password = password_mask if conn.password else None self.sqlalchemy_uri = str(conn) # hides the password - def get_sqla_engine(self, schema=None): + def get_sqla_engine(self, schema=None, nullpool=False): extra = self.get_extra() uri = make_url(self.sqlalchemy_uri_decrypted) params = extra.get('engine_params', {}) + if nullpool: + params['poolclass'] = NullPool uri = self.db_engine_spec.adjust_database_uri(uri, schema) return create_engine(uri, **params) diff --git a/superset/sql_lab.py b/superset/sql_lab.py index 176ff4cdbdf4..4b0bd863bcd0 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -1,4 +1,3 @@ -import celery from time import sleep from datetime import datetime import json @@ -7,6 +6,7 @@ import sqlalchemy import uuid +from celery.exceptions import SoftTimeLimitExceeded from sqlalchemy.pool import NullPool from sqlalchemy.orm import sessionmaker @@ -20,6 +20,12 @@ config = app.config celery_app = get_celery_app(config) +stats_logger = app.config.get('STATS_LOGGER') +SQLLAB_TIMEOUT = config.get('SQLLAB_ASYNC_TIME_LIMIT_SEC', 600) + + +class SqlLabException(Exception): + pass def dedup(l, suffix='__'): @@ -43,28 +49,63 @@ def dedup(l, suffix='__'): return new_l -@celery_app.task(bind=True) -def get_sql_results(self, query_id, return_results=True, store_results=False): - """Executes the sql query returns the results.""" - if not self.request.called_directly: +def get_query(query_id, session, retry_count=5): + """attemps to get the query and retry if it cannot""" + query = None + attempt = 0 + while not query and attempt < retry_count: + try: + query = session.query(Query).filter_by(id=query_id).one() + except Exception: + attempt += 1 + logging.error( + "Query with id `{}` could not be retrieved".format(query_id)) + stats_logger.incr('error_attempting_orm_query_' + str(attempt)) + logging.error("Sleeping for a sec before retrying...") + sleep(1) + if not query: + stats_logger.incr('error_failed_at_getting_orm_query') + raise SqlLabException("Failed at getting query") + return query + + +def get_session(nullpool): + if nullpool: engine = sqlalchemy.create_engine( app.config.get('SQLALCHEMY_DATABASE_URI'), poolclass=NullPool) session_class = sessionmaker() session_class.configure(bind=engine) - session = session_class() + return session_class() else: session = db.session() session.commit() # HACK + return session + + +@celery_app.task(bind=True, soft_time_limit=SQLLAB_TIMEOUT) +def get_sql_results( + ctask, query_id, return_results=True, store_results=False): + """Executes the sql query returns the results.""" try: - query = session.query(Query).filter_by(id=query_id).one() + return execute_sql( + ctask, query_id, return_results, store_results) except Exception as e: - logging.error( - "Query with id `{}` could not be retrieved".format(query_id)) - logging.error("Sleeping for a sec and retrying...") - # Nasty hack to get around a race condition where the worker - # cannot find the query it's supposed to run - sleep(1) - query = session.query(Query).filter_by(id=query_id).one() + logging.exception(e) + stats_logger.incr('error_sqllab_unhandled') + sesh = get_session(not ctask.request.called_directly) + query = get_query(query_id, sesh) + query.error_message = str(e) + query.status = QueryStatus.FAILED + query.tmp_table_name = None + sesh.commit() + + +def execute_sql(ctask, query_id, return_results=True, store_results=False): + """Executes the sql query returns the results.""" + session = get_session(not ctask.request.called_directly) + + query = get_query(query_id, session) + payload = dict(query_id=query_id) database = query.database db_engine_spec = database.db_engine_spec @@ -76,22 +117,27 @@ def handle_error(msg): query.status = QueryStatus.FAILED query.tmp_table_name = None session.commit() - raise Exception(query.error_message) + payload.update({ + 'status': query.status, + 'error_essage': msg, + }) + return payload if store_results and not results_backend: - handle_error("Results backend isn't configured.") + return handle_error("Results backend isn't configured.") # Limit enforced only for retrieving the data, not for the CTA queries. superset_query = SupersetQuery(query.sql) executed_sql = superset_query.stripped() if not superset_query.is_select() and not database.allow_dml: - handle_error( + return handle_error( "Only `SELECT` statements are allowed against this database") if query.select_as_cta: if not superset_query.is_select(): - handle_error( + return handle_error( "Only `SELECT` statements can be used with the CREATE TABLE " "feature.") + return if not query.tmp_table_name: start_dttm = datetime.fromtimestamp(query.start_time) query.tmp_table_name = 'tmp_{}_table_{}'.format( @@ -112,7 +158,7 @@ def handle_error(msg): except Exception as e: logging.exception(e) msg = "Template rendering failed: " + utils.error_msg_from_exception(e) - handle_error(msg) + return handle_error(msg) query.executed_sql = executed_sql query.status = QueryStatus.RUNNING @@ -121,28 +167,31 @@ def handle_error(msg): session.commit() logging.info("Set query to 'running'") + engine = database.get_sqla_engine( + schema=query.schema, nullpool=not ctask.request.called_directly) try: - engine = database.get_sqla_engine(schema=query.schema) + engine = database.get_sqla_engine( + schema=query.schema, nullpool=not ctask.request.called_directly) conn = engine.raw_connection() cursor = conn.cursor() logging.info("Running query: \n{}".format(executed_sql)) logging.info(query.executed_sql) cursor.execute( query.executed_sql, **db_engine_spec.cursor_execute_kwargs) - except Exception as e: - logging.exception(e) - conn.close() - handle_error(db_engine_spec.extract_error_message(e)) - - try: logging.info("Handling cursor") db_engine_spec.handle_cursor(cursor, query, session) logging.info("Fetching data: {}".format(query.to_dict())) data = db_engine_spec.fetch_data(cursor, query.limit) + except SoftTimeLimitExceeded as e: + logging.exception(e) + conn.close() + return handle_error( + "SQL Lab timeout. This environment's policy is to kill queries " + "after {} seconds.".format(SQLLAB_TIMEOUT)) except Exception as e: logging.exception(e) conn.close() - handle_error(db_engine_spec.extract_error_message(e)) + return handle_error(db_engine_spec.extract_error_message(e)) conn.commit() conn.close() @@ -175,19 +224,17 @@ def handle_error(msg): session.merge(query) session.flush() - payload = { - 'query_id': query.id, + payload.update({ 'status': query.status, 'data': cdf.data if cdf.data else [], 'columns': cdf.columns if cdf.columns else [], 'query': query.to_dict(), - } - payload = json.dumps(payload, default=utils.json_iso_dttm_ser) - + }) if store_results: key = '{}'.format(uuid.uuid4()) logging.info("Storing results in results backend, key: {}".format(key)) - results_backend.set(key, utils.zlib_compress(payload)) + json_payload = json.dumps(payload, default=utils.json_iso_dttm_ser) + results_backend.set(key, utils.zlib_compress(json_payload)) query.results_key = key query.end_result_backend_time = utils.now_as_float() diff --git a/superset/stats_logger.py b/superset/stats_logger.py index b79cc8ffa70d..76eb7675c579 100644 --- a/superset/stats_logger.py +++ b/superset/stats_logger.py @@ -1,3 +1,6 @@ +from colorama import Fore, Style +import logging + class BaseStatsLogger(object): """Base class for logging realtime events""" @@ -26,13 +29,18 @@ def gauge(self, key): class DummyStatsLogger(BaseStatsLogger): def incr(self, key): - pass + logging.info( + Fore.CYAN + "[stats_logger] (incr) " + key + Style.RESET_ALL) def decr(self, key): - pass - - def gauge(self, key): - pass + logging.info( + Fore.CYAN + "[stats_logger] (decr) " + key + Style.RESET_ALL) + + def gauge(self, key, value): + logging.info(( + Fore.CYAN + "[stats_logger] (gauge) " + "{key} | {value}" + Style.RESET_ALL).format(**locals()) + ) try: diff --git a/superset/views/core.py b/superset/views/core.py index 8f6f81715659..8a5de069efe1 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -2020,7 +2020,7 @@ def sql_json(self): except Exception as e: logging.exception(e) msg = ( - "Failed to start remote query on worker. " + "Failed to start remote query on a worker. " "Tell your administrator to verify the availability of " "the message queue." ) @@ -2048,10 +2048,13 @@ def sql_json(self): # pylint: disable=no-value-for-parameter data = sql_lab.get_sql_results( query_id=query_id, return_results=True) + payload = json.dumps(data, default=utils.json_iso_dttm_ser) except Exception as e: logging.exception(e) return json_error_response("{}".format(e)) - return json_success(data) + if data.get('status') == QueryStatus.FAILED: + return json_error_response(payload) + return json_success(payload) @has_access @expose("/csv/") diff --git a/tests/superset_test_config.py b/tests/superset_test_config.py index 89b2c40c4c3e..6e6426dd4284 100644 --- a/tests/superset_test_config.py +++ b/tests/superset_test_config.py @@ -10,7 +10,6 @@ if 'SUPERSET__SQLALCHEMY_DATABASE_URI' in os.environ: SQLALCHEMY_DATABASE_URI = os.environ.get('SUPERSET__SQLALCHEMY_DATABASE_URI') -SQL_CELERY_DB_FILE_PATH = os.path.join(DATA_DIR, 'celerydb.sqlite') SQL_CELERY_RESULTS_DB_FILE_PATH = os.path.join(DATA_DIR, 'celery_results.sqlite') SQL_SELECT_AS_CTA = True SQL_MAX_ROW = 666 @@ -24,7 +23,7 @@ class CeleryConfig(object): - BROKER_URL = 'sqla+sqlite:///' + SQL_CELERY_DB_FILE_PATH + BROKER_URL = 'redis://localhost' CELERY_IMPORTS = ('superset.sql_lab', ) CELERY_RESULT_BACKEND = 'db+sqlite:///' + SQL_CELERY_RESULTS_DB_FILE_PATH CELERY_ANNOTATIONS = {'sql_lab.add': {'rate_limit': '10/s'}}