New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limit number of celery task executions per second per user #16232
Conversation
I have implemented the core logic. For this to work in practice a task_user_id keyword parameter needs to be added to the celery task functions. |
This looks great! Just a few comments - Configuration changes: once you've made a change to Can you, please, move the database migration into a separate commit? We try to keep those separate (see #15663 (comment)). In general, while there's nothing wrong with one large commit (and we sometimes have those), it's often helpful when a large modification is split into several smaller/narrowly focused commits: that way it's easier to debug stuff; it also makes it easier to read the history and understand the evolution of the code base. |
...xy/model/migrations/alembic/versions_gxy/987ce9839ecb_create_celery_user_rate_limit_table.py
Outdated
Show resolved
Hide resolved
...xy/model/migrations/alembic/versions_gxy/987ce9839ecb_create_celery_user_rate_limit_table.py
Outdated
Show resolved
Hide resolved
I made the changes. I agree that behavioral logic does not belong in model classes. Model classes should be plain old objects. I only did so because there was no ideal place to put this type of database specific access logic. I do not think it belongs in the business logic layer either. In past projects I would create a data access logic layer to contain database specific access logic. This layer sits between business layer and database calls. In the case of Galaxy, sqlalchemy orm serves the purpose of a data access logic layer. However, in order to take advantage of database specific efficiencies available in postgres, the sqlalchemy layer exposes low level details to the calling business logic layer. Ideally, we should keep database specific logic in dedicated modules separate from the rest of the business logic. Another option is to write a custom sqlalchemy extension to abstract away this kind of detail from the business logic layer. |
I have listed all of the celery tasks below and grouped them according to how they are invoked. Marius mentioned in the meeting that we should user rate limit all tasks other than the celery beat ones. For tasks that are only invoked individually this makes sense. However, for tasks that are only invoked as part of a chain this could introduce significant overhead. The rate limiting logic implemented in the before_task hook involves making a database update call for each task invocation (This is true for postgres. For sqlite it is a select followed by an update). If we rate limit each individual task in a chain that means that we would incur a separate database update call for each task in the chain. For the second chain listed below which includes 4 tasks that means that each invocation of the chain would result in 4 database update calls. If we only rate limited one of the tasks in the chain there would be only one database update call per chain invocation. If, in practice, there could be thousands of invocations of this chain at a time this could result in significantly fewer database updates statements. Celery beat tasks: Invoked individually only asynchronously: Invoked as part of a chain only: Invoked as part of a chain and individually Invoked individually both synchronously and asynchronously Invoked synchronously only: Can’t find any calls: 2 Chains:
|
Right now there is only one task, Recalculate_user_disk_usage, that accepts a user_id as a parameter. It gets the user id from the transaction (ProvidesUserContext) parameter. It was mentioned that the currently logged in user isn’t necessarily the right user for rate limiting. How do I determine where to get the user id from for the other tasks? Do I get it from the transaction as in recalculate_user_disk_usage? If not, from where? |
Are there celery task execution statistics available for our production site? This could provide guidance on which tasks would benefit most from rate limiting. |
Thank you for addressing the comments! Could you, please, do a rebase instead of a merge? That gives us a clean commit history that's easier to read and debug. (the merge commits will disappear once you do the The tests are broken because of a migration error, which is easily fixable. There have been new migrations added to the dev branch. As a result, Alembic cannot construct a revision script sequence because now the
To fix this, you need to change the parent of your revision to the current head: Once the error is fixed, could you, please, extract the migration (i.e., the revision script module) into its own commit? (You could squash 76664fe, then split 41cd07c into a commit with the migration and a commit with everything else.) |
These are the steps I took when I started this work: git remote add upstream git@github.com:galaxyproject/galaxy
Given the above is it too late to rebase? If not Also, what steps should I have taken to do this the right way? |
@claudiofr All your steps look fine; so just a few comments. (You certainly won't have to make any edits from scratch!)
To do a rebase (it is not too late, and there will be no conflicts - I've just tried it):
If a rebase cannot proceed automatically, there'll be a message about conflicts you'll need to resolve. But in this case, there should be no conflicts. (If there are - let me know and I'll help.) That's all for the rebase! In order to extract the migration into its separate commit, here's what you can do. First, I suggest combining the 2 commits into 1: your first commit where you added the migration script and the commit where you made the change to it; then you can easily extract the migration in the next step. You can do it via squashing:
Replace "pick" with "squash" for the "Replace calls to alembic..." commit:
Then save and exit. You'll be asked to confirm (or change) the commit message. After that you're done. Your commit history looks like this:
Now you can split your first commit into the migration and everything else:
Save and exit. You are currently at the commit you want to change. You need to reset your changes, and then rearrange them as you see fit:
Now you can simply add the files and commit as you like. For example:
One note of caution: a rebase cannot be (easily) undone. So when I have potentially challenging rebases, I create a new branch and do a trial run to be on the safe side. Also, if things go wrong during the rebase, you can always do Also, just in case, here's my go-to for all things git: https://git-scm.com/book/en/v2/Git-Tools-Rewriting-History (this link is specific to what we're doing here). I hope this helps! Please ping me if any of this doesn't make sense. |
A rebase will fix this. See my previous comment (first part). When you see the output of the rebase command, run |
I made changes and prior problem appears to be fixed. Do you know when you might have a chance to address some of the issues I posted above about passing user id to tasks and which tasks to rate limit? |
Great! All tests are green! And yes, I'll address the other issues today. |
Definitely |
required: false | ||
desc: | | ||
Applies if celery_user_rate_limit is non-zero. Used for testing against | ||
a postgres db. Forces use of standard sql code rather than |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this config option used for tests only? If so, can we keep it in the testing code and not in the config schema? (we try to limit our user-facing configuration to options relevant for Galaxy admins). I don't think an admin would need to select what SQL dialect is being used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally I did not put it in the config schema for the reason you mention. That is why I referenced in the code using the self.config.get("celery_user_rate_limit_standard_before_start", False) syntax which you commented on earlier. However, it occurred to me that there is a chance, albeit small, that a developer may in the future introduce a config parameter with the same name in the config schema. So I decided to put it in the config schema. Without a central registry of all parameters including those used only for testing there is always a chance of a naming collision. However, I'm ok with removing it from the config schema and using the self.config.get syntax to get its value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see! I didn't realize at the time that it was only there for the purpose of setting up the right configuration for a test. Given that this option is only relevant in the context of a test, it (ideally) shouldn't be part of the main code base. (I say "ideally" because we still have a couple of settings like that left over from ancient times, but we try to remove those as time permits). However, you don't really need it: Galaxy can determine what path to take based on the value of database_connection
: with a non-postgres connection a test will automatically take the standard sql path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it would be useful to test a the standard sql path even with postgres. That is why I introduced this testing parameter. Given that it would have to be in the main code base.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'll never take that path: any such alternative sql implementations in galaxy are strictly postgres OR anything else (with sqlite being the default and mysql no longer formally supported, but know to work). So as long as the conn string is identified as postgres, the postgres-specific implementation should be selected.
found_user_ids = conn.scalars( | ||
text("select id from galaxy_user where id between 1 and :high"), {"high": num_users} | ||
).all() | ||
if len(expected_user_ids) > len(found_user_ids): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the default value of num_users
(which is what the tests are currently using EDIT: tests are using 3), len(expected_user_ids)
will be 1. Is that correct? So, when num_users == 2
, and the database has only one user, this code won't create any additional users, and execute tasks for one user only. I would expect that the number of users tested would equal the value of the num_users
parameter?
EDIT: I think that's indeed a bug: I've just ran into an integrity error under postgres: it couldn't find user.id=2. Changing to num_users + 2
in setup_users
fixed it. However, I haven't verified this - it could be an error in my code too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a bug but it never caused a problem because at the time this code runs there apparently are no users in the galaxy_user table so len(found_user_ids) is always 0 and the condition len(expected_user_ids) > len(found_user_ids) is always true. Apparently the integration testing framework creates a brand new empty database everytime. The test assumes that there will always be a user id = 1, but apparently it is not present at the time this function runs and it somehow gets populated at some point after this code runs. I will change the code to say:
select id from galaxy_user where id between 2 and :high
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sometimes a new db is created, sometimes it is not - it depends on how the tests are run. I think, run_tests.sh
will try to reuse the postgres database, whereas a direct execution via pytest will not, but I may be mistaken.
The assumption about user.id=1 holds when the test uses the database that has been created by the testing framework. However, when a separate db is created for a test (like when we need to execute under sqlite regardless of the db env var passed to the test in the CI, the default user is not created. So, my suggestion would be to not make any assumptions about existing records in the db.
lib/galaxy/app.py
Outdated
""" | ||
if self.config.celery_user_rate_limit: | ||
task_before_start: GalaxyTaskBeforeStart | ||
if ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a utility method is_postgres
that does exactly that, but is located in the wrong place. My suggestion is to move that method into galaxy.model.database_utils
(there's only one other place where it's used - it would need to be adjusted), and use it here. Given that you wouldn't have the celery_user_rate_limit_standard_before_start
config option, I think this code would become even simpler:
if is_postgres(self.config.database_connection):
task_before_start = GalaxyTaskBeforeStartUserRateLimitPostgres(
self.config.celery_user_rate_limit, self.model.session
)
else:
task_before_start = GalaxyTaskBeforeStartUserRateLimitStandard(
self.config.celery_user_rate_limit, self.model.session
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will make changes to use is_postgres. I would like to keep the celery_user_rate_limit_standard_before_start option. However, I guess I could never really see a need for it if we are using postgres as a database.
OK, almost done here!
So, 3 tests. And a few edits: You can use Add Finally, move the call to So, you might have something like this: class TestRateLimit(TestCeleryUserRateLimitIntegration):
def setUp(self):
super().setUp()
dburl = self._app.config.database_connection # that's how you get to galaxy app instance
setup_users(dburl)
@classmethod
def handle_galaxy_config_kwds(cls, config):
super().handle_galaxy_config_kwds(config)
config["check_migrate_databases"] = False # this goes ONLY in this one
config["database_connection"] = sqlite_url()
# ... all your sqlite-specific config settings + rate limiting go here
def test_mock_pass_user_id_task(self):
self._test_mock_pass_user_id_task([1, 2], 3, 0.1)
@skip_unless_postgres()
class TestRateLimitPostgres(TestCeleryUserRateLimitIntegration):
# setUp: same as previous class
# handle_galaxy_config_kwds: same, minus sqlite-related stuff
# test_mock_pass_user_id_task: same
class TestNoRateLimit(TestCeleryUserRateLimitIntegration):
# same as above; also no handle_galaxy_config_kwds Overall, this is great work, really! We're almost done - I'm very much looking forward to merging this. Thank you for all your work on this, and thank you for bearing with me! |
I'm adding task_user_id to all of the tasks. Calling functions contain either a user object of type, model.User, or a trans object. Can I assume that the model.User object and trans objects are always not None? This would mean I can pass user.id or trans.user.id without first having to verify that either user or trans.user is not None. |
I suppose it is safe to assume that if the caller's scope contains an object of type |
Introduce a celery_user_rate_limit config parameter which specifies how many tasks per second a user can execute. Create a custom celery Task class called GalaxyTask with a before_start hook that implements the rate limiting logic. Add a new table, celery_user_rate_limit that tracks the last scheduled execution time by user id. Add new integration test, test_celery_user_rate_limit.py.
Added migration script to create the new table which tracks the last scheduled execution time for a user which is used to schedule the next execution time in order to limit the task execution rate.
…dule. Moved behavioral logic from CeleryUserRateLimit model class to base_task module because model classes should be plain old classes. Ran make config-schema to update galaxy_options.rst and galaxy.yml.sample based on changes made to config_schema.yml.
…nality. Remove unnecessary integration tests. Change calls to celery tasks to pass values to new task_user_id parameter which is used for user rate limiting.
…nality. Fix signature to overriden PurgableManagerMixin.purge method. Rather than having an explicit user parameter assume it is in the **kwargs param.
…nality. Added # type: ignore[arg-type] for dburl parameter to is_posgress function.
I made the suggested code changes. I also added task_user_id to most of the celery task functions and changed the calling code to pass the user id. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @claudiofr, for your contribution and for addressing all the comments! This is great work!
Closes #14411
Introduce a celery_user_rate_limit config parameter which specifies how many tasks per second a user can execute. Create a custom celery Task class called GalaxyTask with a before_start hook that implements the rate limiting logic. Add a new table, celery_user_rate_limit that tracks the last scheduled execution time
by user id. Add new integration test, test_celery_user_rate_limit.py.
A task function must include the keyword parameter task_user_id for user rate limiting to work.
How to test the changes?
(Select all options that apply)
License