Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refresh_queries shouldn't break because of a single query having a bad schedule object #4163

Merged
merged 9 commits into from Mar 1, 2020
@@ -36,6 +36,7 @@
json_loads,
mustache_render,
base_url,
sentry,
)
from redash.utils.configuration import ConfigurationContainer
from redash.models.parameterized_query import ParameterizedQuery
@@ -630,34 +631,39 @@ def outdated_queries(cls):
scheduled_queries_executions.refresh()

for query in queries:
if query.schedule["interval"] is None:
continue
try:
if query.schedule.get("disabled"):
continue

if query.schedule["until"] is not None:
schedule_until = pytz.utc.localize(
datetime.datetime.strptime(query.schedule["until"], "%Y-%m-%d")
)
if query.schedule["until"]:
schedule_until = pytz.utc.localize(
datetime.datetime.strptime(query.schedule["until"], "%Y-%m-%d")
)

if schedule_until <= now:
continue
if schedule_until <= now:
continue

retrieved_at = scheduled_queries_executions.get(query.id) or (
query.latest_query_data and query.latest_query_data.retrieved_at
)

if query.latest_query_data:
retrieved_at = query.latest_query_data.retrieved_at
else:
retrieved_at = now

retrieved_at = scheduled_queries_executions.get(query.id) or retrieved_at

if should_schedule_next(
retrieved_at,
now,
query.schedule["interval"],
query.schedule["time"],
query.schedule["day_of_week"],
query.schedule_failures,
):
key = "{}:{}".format(query.query_hash, query.data_source_id)
outdated_queries[key] = query
if should_schedule_next(
retrieved_at or now,
now,
query.schedule["interval"],
query.schedule["time"],
query.schedule["day_of_week"],
query.schedule_failures,
):
key = "{}:{}".format(query.query_hash, query.data_source_id)
outdated_queries[key] = query
except Exception as e:
query.schedule["disabled"] = True
db.session.commit()

message = "Could not determine if query %d is outdated due to %s. The schedule for this query has been disabled." % (query.id, repr(e))
logging.info(message)
sentry.capture_message(message)

return list(outdated_queries.values())

@@ -5,6 +5,7 @@
from rq import get_current_job
from rq.job import JobStatus
from rq.timeouts import JobTimeoutException
from rq.exceptions import NoSuchJobError

from redash import models, redis_connection, settings
from redash.query_runner import InterruptException
@@ -43,16 +44,22 @@ def enqueue_query(
job_id = pipe.get(_job_lock_id(query_hash, data_source.id))
if job_id:
logger.info("[%s] Found existing job: %s", query_hash, job_id)

job = Job.fetch(job_id)

status = job.get_status()
if status in [JobStatus.FINISHED, JobStatus.FAILED]:
logger.info(
"[%s] job found is ready (%s), removing lock",
query_hash,
status,
)
job_complete = None

try:
job = Job.fetch(job_id)
job_exists = True
status = job.get_status()
job_complete = status in [JobStatus.FINISHED, JobStatus.FAILED]

if job_complete:
message = "job found is complete (%s)" % status
except NoSuchJobError:
message = "job found has expired"
job_exists = False

if job_complete or not job_exists:
logger.info("[%s] %s, removing lock", query_hash, message)
redis_connection.delete(_job_lock_id(query_hash, data_source.id))
job = None

@@ -8,7 +8,7 @@
QueryDetachedFromDataSourceError,
)
from redash.tasks.failure_report import track_failure
from redash.utils import json_dumps
from redash.utils import json_dumps, sentry
from redash.worker import job, get_job_logger

from .execution import enqueue_query
@@ -27,85 +27,79 @@ def empty_schedules():
logger.info("Deleted %d schedules.", len(queries))


def refresh_queries():
logger.info("Refreshing queries...")

outdated_queries_count = 0
query_ids = []

with statsd_client.timer("manager.outdated_queries_lookup"):
for query in models.Query.outdated_queries():
if settings.FEATURE_DISABLE_REFRESH_QUERIES:
logger.info("Disabled refresh queries.")
elif query.org.is_disabled:
logger.debug(
"Skipping refresh of %s because org is disabled.", query.id
)
elif query.data_source is None:
logger.debug(
"Skipping refresh of %s because the datasource is none.", query.id
)
elif query.data_source.paused:
logger.debug(
"Skipping refresh of %s because datasource - %s is paused (%s).",
query.id,
query.data_source.name,
query.data_source.pause_reason,
)
else:
query_text = query.query_text

parameters = {p["name"]: p.get("value") for p in query.parameters}
if any(parameters):
try:
query_text = query.parameterized.apply(parameters).query
except InvalidParameterError as e:
error = u"Skipping refresh of {} because of invalid parameters: {}".format(
query.id, str(e)
)
track_failure(query, error)
continue
except QueryDetachedFromDataSourceError as e:
error = (
"Skipping refresh of {} because a related dropdown "
"query ({}) is unattached to any datasource."
).format(query.id, e.query_id)
track_failure(query, error)
continue

enqueue_query(
query_text,
query.data_source,
query.user_id,
scheduled_query=query,
metadata={"Query ID": query.id, "Username": "Scheduled"},
)

query_ids.append(query.id)
outdated_queries_count += 1

statsd_client.gauge("manager.outdated_queries", outdated_queries_count)

logger.info(
"Done refreshing queries. Found %d outdated queries: %s"
% (outdated_queries_count, query_ids)
)

status = redis_connection.hgetall("redash:status")
now = time.time()
def _should_refresh_query(query):
if settings.FEATURE_DISABLE_REFRESH_QUERIES:
logger.info("Disabled refresh queries.")
return False
elif query.org.is_disabled:
logger.debug("Skipping refresh of %s because org is disabled.", query.id)
return False
elif query.data_source is None:
logger.debug("Skipping refresh of %s because the datasource is none.", query.id)
return False
elif query.data_source.paused:
logger.debug(
"Skipping refresh of %s because datasource - %s is paused (%s).",
query.id,
query.data_source.name,
query.data_source.pause_reason,
)
return False
else:
return True


def _apply_default_parameters(query):
parameters = {p["name"]: p.get("value") for p in query.parameters}
if any(parameters):
try:
return query.parameterized.apply(parameters).query
except InvalidParameterError as e:
error = u"Skipping refresh of {} because of invalid parameters: {}".format(
query.id, str(e)
)
track_failure(query, error)
raise
except QueryDetachedFromDataSourceError as e:
error = (
"Skipping refresh of {} because a related dropdown "
"query ({}) is unattached to any datasource."
).format(query.id, e.query_id)
track_failure(query, error)
raise
else:
return query.query_text

redis_connection.hmset(
"redash:status",
{
"outdated_queries_count": outdated_queries_count,
"last_refresh_at": now,
"query_ids": json_dumps(query_ids),
},
)

statsd_client.gauge(
"manager.seconds_since_refresh", now - float(status.get("last_refresh_at", now))
)
def refresh_queries():
logger.info("Refreshing queries...")
enqueued = []
for query in models.Query.outdated_queries():
if not _should_refresh_query(query):
continue

try:
enqueue_query(
_apply_default_parameters(query),
query.data_source,
query.user_id,
scheduled_query=query,
metadata={"Query ID": query.id, "Username": "Scheduled"},
)
enqueued.append(query)
except Exception as e:
message = "Could not enqueue query %d due to %s" % (query.id, repr(e))
logging.info(message)
sentry.capture_message(message)

status = {
"outdated_queries_count": len(enqueued),
"last_refresh_at": time.time(),
"query_ids": json_dumps([q.id for q in enqueued]),
}

redis_connection.hmset("redash:status", status)
logger.info("Done refreshing queries: %s" % status)


def cleanup_query_results():
@@ -1,4 +1,5 @@
import sentry_sdk
from funcy import iffy
from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.redis import RedisIntegration
@@ -33,3 +34,6 @@ def init():
RqIntegration(),
],
)


capture_message = iffy(lambda _: settings.SENTRY_DSN, sentry_sdk.capture_message)
@@ -4,6 +4,7 @@
from mock import patch, Mock

from rq import Connection
from rq.exceptions import NoSuchJobError

from tests import BaseTestCase
from redash import redis_connection, rq_redis_connection, models
@@ -67,6 +68,33 @@ def test_multiple_enqueue_of_same_query(self, enqueue, _):

self.assertEqual(1, enqueue.call_count)

def test_multiple_enqueue_of_expired_job(self, enqueue, fetch_job):
query = self.factory.create_query()

with Connection(rq_redis_connection):
enqueue_query(
query.query_text,
query.data_source,
query.user_id,
False,
query,
{"Username": "Arik", "Query ID": query.id},
)

# "expire" the previous job
fetch_job.side_effect = NoSuchJobError

enqueue_query(
query.query_text,
query.data_source,
query.user_id,
False,
query,
{"Username": "Arik", "Query ID": query.id},
)

self.assertEqual(2, enqueue.call_count)

@patch("redash.settings.dynamic_settings.query_time_limit", return_value=60)
def test_limits_query_time(self, _, enqueue, __):
query = self.factory.create_query()
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.