Skip to content

Commit

Permalink
Implement stop query functionality. (#2387)
Browse files Browse the repository at this point in the history
* Implement stop query functionality.

* Address comments
  • Loading branch information
bkyryliuk committed Mar 13, 2017
1 parent 0779da6 commit 6160a3f
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 3 deletions.
18 changes: 18 additions & 0 deletions superset/assets/javascripts/SqlLab/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,24 @@ export function runQuery(query) {
};
}

export function postStopQuery(query) {
return function (dispatch) {
const stopQueryUrl = '/superset/stop_query/';
const stopQueryRequestData = { client_id: query.id };
$.ajax({
type: 'POST',
dataType: 'json',
url: stopQueryUrl,
data: stopQueryRequestData,
success() {
if (!query.runAsync) {
dispatch(stopQuery(query));
}
},
});
};
}

export function setDatabases(databases) {
return { type: SET_DATABASES, databases };
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class SqlEditor extends React.PureComponent {
this.props.actions.setActiveSouthPaneTab('Results');
}
stopQuery() {
this.props.actions.stopQuery(this.props.latestQuery);
this.props.actions.postStopQuery(this.props.latestQuery);
}
createTableAs() {
this.startQuery(true, true);
Expand Down
19 changes: 18 additions & 1 deletion superset/db_engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from sqlalchemy import select
from sqlalchemy.sql import text
from superset.utils import SupersetTemplateException
from superset.utils import QueryStatus
from flask_babel import lazy_gettext as _

Grain = namedtuple('Grain', 'name label function')
Expand Down Expand Up @@ -272,6 +273,12 @@ class PrestoEngineSpec(BaseEngineSpec):
"date_add('day', 1, CAST({col} AS TIMESTAMP))))"),
)

@classmethod
def patch(cls):
from pyhive import presto
from superset.db_engines import presto as patched_presto
presto.Cursor.cancel = patched_presto.cancel

@classmethod
def sql_preprocessor(cls, sql):
return sql.replace('%', '%%')
Expand Down Expand Up @@ -342,6 +349,12 @@ def handle_cursor(cls, cursor, query, session):
while polled:
# Update the object and wait for the kill signal.
stats = polled.get('stats', {})

query = session.query(type(query)).filter_by(id=query.id).one()
if query.status == QueryStatus.STOPPED:
cursor.cancel()
break

if stats:
completed_splits = float(stats.get('completedSplits'))
total_splits = float(stats.get('totalSplits'))
Expand Down Expand Up @@ -566,13 +579,17 @@ def progress(cls, logs):
def handle_cursor(cls, cursor, query, session):
"""Updates progress information"""
from pyhive import hive
print("PATCHED TCLIService {}".format(hive.TCLIService.__file__))
unfinished_states = (
hive.ttypes.TOperationState.INITIALIZED_STATE,
hive.ttypes.TOperationState.RUNNING_STATE,
)
polled = cursor.poll()
while polled.operationState in unfinished_states:
query = session.query(type(query)).filter_by(id=query.id)
if query.status == QueryStatus.STOPPED:
cursor.cancel()
break

resp = cursor.fetch_logs()
if resp and resp.log:
progress = cls.progress(resp.log)
Expand Down
19 changes: 19 additions & 0 deletions superset/db_engines/presto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from pyhive import presto


# TODO(bogdan): Remove this when new pyhive release will be available.
def cancel(self):
if self._state == self._STATE_NONE:
raise presto.ProgrammingError("No query yet")
if self._nextUri is None:
assert self._state == self._STATE_FINISHED, \
"Should be finished if nextUri is None"
return

response = presto.requests.delete(self._nextUri)
if response.status_code != presto.requests.codes.no_content:
fmt = "Unexpected status code after cancel {}\n{}"
raise presto.OperationalError(
fmt.format(response.status_code, response.content))
self._state = self._STATE_FINISHED
return
7 changes: 7 additions & 0 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ def handle_error(msg):
conn.commit()
conn.close()

if query.status == utils.QueryStatus.STOPPED:
return json.dumps({
'query_id': query.id,
'status': query.status,
'query': query.to_dict(),
}, default=utils.json_iso_dttm_ser)

column_names = (
[col[0] for col in cursor.description] if cursor.description else [])
column_names = dedup(column_names)
Expand Down
2 changes: 1 addition & 1 deletion superset/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ class QueryStatus(object):

"""Enum-type class for query statuses"""

CANCELLED = 'cancelled'
STOPPED = 'stopped'
FAILED = 'failed'
PENDING = 'pending'
RUNNING = 'running'
Expand Down
14 changes: 14 additions & 0 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1960,6 +1960,20 @@ def results(self, key):
return json_success(
json.dumps(payload_json, default=utils.json_iso_dttm_ser))

@has_access_api
@expose("/stop_query/", methods=['POST'])
@log_this
def stop_query(self):
client_id = request.form.get('client_id')
query = db.session.query(models.Query).filter_by(
client_id=client_id).one()
if query.user_id != g.user.id:
return json_error_response(
"Only original author can stop the query.")
query.status = utils.QueryStatus.STOPPED
db.session.commit()
return Response(201)

@has_access_api
@expose("/sql_json/", methods=['POST', 'GET'])
@log_this
Expand Down

0 comments on commit 6160a3f

Please sign in to comment.