-
-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Add scheduled jobs #2743
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add scheduled jobs #2743
Conversation
|
@tkaemming @mattrobenolt thoughts? |
Current coverage is
|
src/sentry/tasks/scheduler.py
Outdated
| with Lock(lock_key, nowait=True, timeout=60): | ||
| queryset = list(ScheduledJob.objects.filter( | ||
| date_scheduled__lte=timezone.now(), | ||
| )[:100]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we had a ton of scheduled jobs, this would be possible that it'd start backing up, since it's only consuming 100 at a time, then waiting 1 minute between.
In practice, I don't think this is an issue since it'll be pretty lightly utilized.
And to be pedantic, this isn't a queryset. ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I explicitly chose to limit it to make sure we dont have a similar issue we have in the resolution cleanup (that its unbounded). If it gets backlogged that's fine. We could obviously be more smart and have it requeue itself, but I dont foresee this being an issue.
|
Overall, this is what I had in my head (you just beat me to it). +1 |
|
Seems reasonable — I don't have any background on what this is for, though. |
03928d0 to
59cc451
Compare
3c507a7 to
1736236
Compare
|
Going to merge once green as its needed in getsentry |
|
I'm going to take this over. |
src/sentry/tasks/scheduler.py
Outdated
|
|
||
| lock_key = 'scheduler:process' | ||
| try: | ||
| locks = LockManager(RedisLockBackend(redis.clusters.get('default'))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah. Just use:
from sentry.app import locks
from sentry.utils import TimedRetryPolicy
...
lock = locks.get('scheduler:process', duration=60)
with TimedRetryPolicy(5)(lock.acquire):
...You don't wanna do all the stuff manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What @mattrobenolt said.
If you want to maintain the nowait=True behavior from the previous change, you don't have to wrap the lock.acquire in the retry policy and can instead do this:
with locks.get('scheduler:process', duration=60).acquire():There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok fixed. I'm sticking with Matt's behavior, as it looks like it's relatively common in the codebase and some of the scheduled tasks include sending email, so I assume the potential for 3rd party glitches I'd want to retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, going with Ted's behavior.
856007b to
50f7783
Compare
tkaemming
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sticking with Matt's behavior, as it looks like it's relatively common in the codebase and some of the scheduled tasks include sending email, so I assume the potential for 3rd party glitches I'd want to retry.
I'm not sure I understand your rationale here, since this is going to retry every 60 seconds anyway but whatever.
src/sentry/tasks/scheduler.py
Outdated
| @@ -0,0 +1,33 @@ | |||
|
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra newline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
src/sentry/tasks/scheduler.py
Outdated
|
|
||
| lock = locks.get('scheduler:process', duration=60) | ||
| with TimedRetryPolicy(5)(lock.acquire): | ||
| queryset = list(ScheduledJob.objects.filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a queryset, like @mattrobenolt said.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't find @mattrobenolt's comment? Just cmd-F'd over this page...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried...something? Not sure if it's what you wanted fixed though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure where it went (victim of force push at some point maybe) but the gist of the comment is that the variable name doesn't actually represent what the value is, since casting it to list causes it not to be a QuerySet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that's what I thought. I moved somethings around (as part of the warning when there's >100 jobs), so queryset should be appropriate now.
src/sentry/tasks/scheduler.py
Outdated
| )[:100]) | ||
|
|
||
| for job in queryset: | ||
| logger.info('Sending scheduled job %s with payload %r', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably shouldn't be info, I'll defer to @JTCunning's judgement here if he's got opinions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a debug statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
src/sentry/tasks/scheduler.py
Outdated
| with TimedRetryPolicy(5)(lock.acquire): | ||
| queryset = list(ScheduledJob.objects.filter( | ||
| date_scheduled__lte=timezone.now(), | ||
| )[:100]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth logging something if the size of this is > 100 since that could get out of control.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
src/sentry/tasks/scheduler.py
Outdated
| from sentry.tasks.base import instrumented_task | ||
| from sentry.utils.retries import TimedRetryPolicy | ||
|
|
||
| logger = logging.getLogger('sentry') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger = logging.getLogger('sentry.scheduler')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
src/sentry/tasks/scheduler.py
Outdated
| )[:100]) | ||
|
|
||
| for job in queryset: | ||
| logger.info('Sending scheduled job %s with payload %r', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a debug statement.
src/sentry/tasks/scheduler.py
Outdated
|
|
||
| ScheduledJob.objects.filter( | ||
| id__in=[o.id for o in queryset], | ||
| ).delete() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably be deleting objects one by one after a successful enqueuing. If for some reason we error out on send_task, the next time this runs we will reschedule all jobs that were enqueued before the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
JTCunning
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logging stuff is good. Deferring rest of this to someone else.
Logging stuff is good. Deferring rest of this to someone else.
tkaemming
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More nitpicky stuff but not going to block on it
src/sentry/tasks/scheduler.py
Outdated
| date_scheduled__lte=timezone.now(), | ||
| ) | ||
| job_count = queryset.count() | ||
| if job_count > 100: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably better to just select 101 and see if size of the size of the result exceeds 100 to avoid making the database do the query twice. I'm not sure the exact number matters, we just want to be able to figure out if we're lagging behind or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Done.
src/sentry/tasks/scheduler.py
Outdated
| ) | ||
| job_count = queryset.count() | ||
| if job_count > 100: | ||
| logger.debug('More than 100 ScheduledJob\'s: %d jobs found.' % job_count) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should use parameterized logging instead of string formatting like the log statement below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I did not realized it worked that that. Done.
src/sentry/tasks/scheduler.py
Outdated
| from sentry.celery import app | ||
|
|
||
| with locks.get('scheduler.process', duration=60).acquire(): | ||
| job_list = ScheduledJob.objects.filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forget exactly how the query cache works but it'd be safer to at this point just coerce this result to a list so that way it's explicit that you don't intend for it to run twice.
src/sentry/tasks/scheduler.py
Outdated
| )[:101] | ||
|
|
||
| if len(job_list) > 100: | ||
| logger.debug('More than 100 ScheduledJob\'s: %r jobs found.', len(job_list)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will always be 101 if it logs now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also ScheduledJob's is a possessive, which this is not.
Migration Checklist
Generated by 🚫 danger |
WIP
Basic wire-up of scheduled jobs.
Refs GH-2730
This change is