Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Add environment variable for query retry #5791

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions redash/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,15 @@
SCHEDULED_QUERY_TIME_LIMIT = int(
os.environ.get("REDASH_SCHEDULED_QUERY_TIME_LIMIT", -1)
)
SCHEDULED_QUERY_RETRY_INTERVALS = os.environ.get("REDASH_SCHEDULED_QUERY_RETRY_INTERVALS", None)
if SCHEDULED_QUERY_RETRY_INTERVALS:
SCHEDULED_QUERY_RETRY_INTERVALS = sorted(map(lambda x: int(x), SCHEDULED_QUERY_RETRY_INTERVALS))

# Time limit (in seconds) for adhoc queries. Set this to -1 to execute without a time limit.
ADHOC_QUERY_TIME_LIMIT = int(os.environ.get("REDASH_ADHOC_QUERY_TIME_LIMIT", -1))
ADHOC_QUERY_RETRY_INTERVALS = os.environ.get("REDASH_ADHOC_QUERY_RETRY_INTERVALS ", None)
if ADHOC_QUERY_RETRY_INTERVALS:
ADHOC_QUERY_RETRY_INTERVALS = sorted(map(lambda x: int(x), ADHOC_QUERY_RETRY_INTERVALS))

JOB_EXPIRY_TIME = int(os.environ.get("REDASH_JOB_EXPIRY_TIME", 3600 * 12))
JOB_DEFAULT_FAILURE_TTL = int(
Expand Down
14 changes: 12 additions & 2 deletions redash/settings/dynamic_settings.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections import defaultdict


# Replace this method with your own implementation in case you want to limit the time limit on certain queries or users.
def query_time_limit(is_scheduled, user_id, org_id):
from redash import settings
Expand All @@ -10,6 +11,15 @@ def query_time_limit(is_scheduled, user_id, org_id):
return settings.ADHOC_QUERY_TIME_LIMIT


def query_retry_intervals(is_scheduled):
from redash import settings

if is_scheduled:
return settings.SCHEDULED_QUERY_RETRY_INTERVALS
else:
return settings.ADHOC_QUERY_RETRY_INTERVALS


def periodic_jobs():
"""Schedule any custom periodic jobs here. For example:

Expand Down Expand Up @@ -58,6 +68,6 @@ def database_key_definitions(default):

return definitions

# Since you can define custom primary key types using `database_key_definitions`, you may want to load certain extensions when creating the database.
# Since you can define custom primary key types using `database_key_definitions`, you may want to load certain extensions when creating the database.
# To do so, simply add the name of the extension you'd like to load to this list.
database_extensions = []
database_extensions = []
15 changes: 15 additions & 0 deletions redash/tasks/queries/execution.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import signal
import time
from typing import List
import redis
import dataclasses

from rq import get_current_job
from rq.job import JobStatus
Expand All @@ -27,6 +29,12 @@ def _unlock(query_hash, data_source_id):
redis_connection.delete(_job_lock_id(query_hash, data_source_id))


@dataclasses.dataclass
class Retry:
max: int
intervals: List[int]


def enqueue_query(
query, data_source, user_id, is_api_key=False, scheduled_query=None, metadata={}
):
Expand Down Expand Up @@ -82,6 +90,12 @@ def enqueue_query(
time_limit = settings.dynamic_settings.query_time_limit(
scheduled_query, user_id, data_source.org_id
)
intervals = settings.dynamic_settings.query_retry_intervals(scheduled_query)
if intervals:
retry = Retry(max=len(intervals), intervals=intervals)
Copy link
Author

@surgachsurgach surgachsurgach Jul 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by rq/queue.py

    def enqueue(self, f, *args, **kwargs):
        """Creates a job to represent the delayed function call and enqueues it."""

        (f, timeout, description, result_ttl, ttl, failure_ttl,
         depends_on, job_id, at_front, meta, retry, args, kwargs) = Queue.parse_args(f, *args, **kwargs)

        return self.enqueue_call(
            func=f, args=args, kwargs=kwargs, timeout=timeout,
            result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
            description=description, depends_on=depends_on, job_id=job_id,
            at_front=at_front, meta=meta, retry=retry
        )

    def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
                     result_ttl=None, ttl=None, failure_ttl=None,
                     description=None, depends_on=None, job_id=None,
                     at_front=False, meta=None, retry=None):
        """Creates a job to represent the delayed function call and enqueues
        it.
nd
        It is much like `.enqueue()`, except that it takes the function's args
        and kwargs as explicit arguments.  Any kwargs passed to this function
        contain options for RQ itself.
        """

        job = self.create_job(
            func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl,
            failure_ttl=failure_ttl, description=description, depends_on=depends_on,
            job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout,
            retry=retry
        )
    def create_job(self, func, args=None, kwargs=None, timeout=None,
                   result_ttl=None, ttl=None, failure_ttl=None,
                   description=None, depends_on=None, job_id=None,
                   meta=None, status=JobStatus.QUEUED, retry=None):
        """Creates a job based on parameters given."""
        timeout = parse_timeout(timeout)

        if timeout is None:
            timeout = self._default_timeout
        elif timeout == 0:
            raise ValueError('0 timeout is not allowed. Use -1 for infinite timeout')

        result_ttl = parse_timeout(result_ttl)
        failure_ttl = parse_timeout(failure_ttl)

        ttl = parse_timeout(ttl)
        if ttl is not None and ttl <= 0:
            raise ValueError('Job ttl must be greater than 0')

        job = self.job_class.create(
            func, args=args, kwargs=kwargs, connection=self.connection,
            result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
            status=status, description=description,
            depends_on=depends_on, timeout=timeout, id=job_id,
            origin=self.name, meta=meta, serializer=self.serializer
        )

        if retry:
            job.retries_left = retry.max
            job.retry_intervals = retry.intervals

        return job

Copy link
Author

@surgachsurgach surgachsurgach Jul 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and
rq/worker.py v1.5.0

    def handle_job_failure(self, job, queue, started_job_registry=None,
                           exc_string=''):
        """Handles the failure or an executing job by:
            1. Setting the job status to failed
            2. Removing the job from StartedJobRegistry
            3. Setting the workers current job to None
            4. Add the job to FailedJobRegistry
        """
        self.log.debug('Handling failed execution of job %s', job.id)
        with self.connection.pipeline() as pipeline:
            if started_job_registry is None:
                started_job_registry = StartedJobRegistry(
                    job.origin,
                    self.connection,
                    job_class=self.job_class
                )

            # Requeue/reschedule if retry is configured
            if job.retries_left and job.retries_left > 0:
                retry = True
                retry_interval = job.get_retry_interval()
                job.retries_left = job.retries_left - 1
            else:
                retry = False
                job.set_status(JobStatus.FAILED, pipeline=pipeline)

else:
retry = None

metadata["Queue"] = queue_name

queue = Queue(queue_name)
Expand All @@ -91,6 +105,7 @@ def enqueue_query(
"is_api_key": is_api_key,
"job_timeout": time_limit,
"failure_ttl": settings.JOB_DEFAULT_FAILURE_TTL,
"retry": retry,
"meta": {
"data_source_id": data_source.id,
"org_id": data_source.org_id,
Expand Down