Skip to content

Commit

Permalink
Merge pull request #661 from hudl/fix-cancelquery
Browse files Browse the repository at this point in the history
Fix cancelling queries for Redshift/Postgres
  • Loading branch information
arikfr committed Nov 24, 2015
2 parents 0f9f9a2 + 5b9b186 commit 0343fa7
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
4 changes: 4 additions & 0 deletions redash/query_runner/__init__.py
Expand Up @@ -9,6 +9,7 @@
__all__ = [
'ValidationError',
'BaseQueryRunner',
'InterruptException',
'TYPE_DATETIME',
'TYPE_BOOLEAN',
'TYPE_INTEGER',
Expand Down Expand Up @@ -38,6 +39,9 @@
TYPE_DATE
])

class InterruptException(Exception):
pass

class BaseQueryRunner(object):
def __init__(self, configuration):
jsonschema.validate(configuration, self.configuration_schema())
Expand Down
2 changes: 1 addition & 1 deletion redash/query_runner/pg.py
Expand Up @@ -142,7 +142,7 @@ def run_query(self, query):
logging.exception(e)
error = e.message
json_data = None
except KeyboardInterrupt:
except (KeyboardInterrupt, InterruptException):
connection.cancel()
error = "Query cancelled by user."
json_data = None
Expand Down
8 changes: 6 additions & 2 deletions redash/tasks.py
@@ -1,5 +1,6 @@
import time
import logging
import signal
from flask.ext.mail import Message
import redis
from celery import Task
Expand All @@ -8,7 +9,7 @@
from redash import redis_connection, models, statsd_client, settings, utils, mail
from redash.utils import gen_query_hash
from redash.worker import celery
from redash.query_runner import get_query_runner
from redash.query_runner import get_query_runner, InterruptException

logger = get_task_logger(__name__)

Expand Down Expand Up @@ -132,7 +133,7 @@ def ready(self):
return self._async_result.ready()

def cancel(self):
return self._async_result.revoke(terminate=True)
return self._async_result.revoke(terminate=True, signal='SIGINT')

@staticmethod
def _job_lock_id(query_hash, data_source_id):
Expand Down Expand Up @@ -263,9 +264,12 @@ def check_alerts_for_query(self, query_id):

mail.send(message)

def signal_handler(*args):
raise InterruptException

@celery.task(bind=True, base=BaseTask, track_started=True)
def execute_query(self, query, data_source_id, metadata):
signal.signal(signal.SIGINT, signal_handler)
start_time = time.time()

logger.info("Loading data source (%d)...", data_source_id)
Expand Down

0 comments on commit 0343fa7

Please sign in to comment.