-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] Add environment variable for query retry #5791
[Feature] Add environment variable for query retry #5791
Conversation
@@ -82,6 +90,12 @@ def enqueue_query( | |||
time_limit = settings.dynamic_settings.query_time_limit( | |||
scheduled_query, user_id, data_source.org_id | |||
) | |||
intervals = settings.dynamic_settings.query_retry_intervals(scheduled_query) | |||
if intervals: | |||
retry = Retry(max=len(intervals), intervals=intervals) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def enqueue(self, f, *args, **kwargs):
"""Creates a job to represent the delayed function call and enqueues it."""
(f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, retry, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
return self.enqueue_call(
func=f, args=args, kwargs=kwargs, timeout=timeout,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
description=description, depends_on=depends_on, job_id=job_id,
at_front=at_front, meta=meta, retry=retry
)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None,
at_front=False, meta=None, retry=None):
"""Creates a job to represent the delayed function call and enqueues
it.
nd
It is much like `.enqueue()`, except that it takes the function's args
and kwargs as explicit arguments. Any kwargs passed to this function
contain options for RQ itself.
"""
job = self.create_job(
func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl,
failure_ttl=failure_ttl, description=description, depends_on=depends_on,
job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout,
retry=retry
)
def create_job(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None,
meta=None, status=JobStatus.QUEUED, retry=None):
"""Creates a job based on parameters given."""
timeout = parse_timeout(timeout)
if timeout is None:
timeout = self._default_timeout
elif timeout == 0:
raise ValueError('0 timeout is not allowed. Use -1 for infinite timeout')
result_ttl = parse_timeout(result_ttl)
failure_ttl = parse_timeout(failure_ttl)
ttl = parse_timeout(ttl)
if ttl is not None and ttl <= 0:
raise ValueError('Job ttl must be greater than 0')
job = self.job_class.create(
func, args=args, kwargs=kwargs, connection=self.connection,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
status=status, description=description,
depends_on=depends_on, timeout=timeout, id=job_id,
origin=self.name, meta=meta, serializer=self.serializer
)
if retry:
job.retries_left = retry.max
job.retry_intervals = retry.intervals
return job
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def handle_job_failure(self, job, queue, started_job_registry=None,
exc_string=''):
"""Handles the failure or an executing job by:
1. Setting the job status to failed
2. Removing the job from StartedJobRegistry
3. Setting the workers current job to None
4. Add the job to FailedJobRegistry
"""
self.log.debug('Handling failed execution of job %s', job.id)
with self.connection.pipeline() as pipeline:
if started_job_registry is None:
started_job_registry = StartedJobRegistry(
job.origin,
self.connection,
job_class=self.job_class
)
# Requeue/reschedule if retry is configured
if job.retries_left and job.retries_left > 0:
retry = True
retry_interval = job.get_retry_interval()
job.retries_left = job.retries_left - 1
else:
retry = False
job.set_status(JobStatus.FAILED, pipeline=pipeline)
Thanks for this effort. Please complete the Description and How is this tested? segments of the pull request template. This way reviewers can have context on this change. |
Hi @surgachsurgach , thanks for your contribution! Would you mind describing the PR further + running some tests for it? We also need to rebase off of Additionally, would you mind updating the documentation here with the new env var behavior? |
What type of PR is this?
Description
How is this tested?
Related Tickets & Documents
Mobile & Desktop Screenshots/Recordings (if there are UI changes)