diff --git a/CHANGES/4068.bugfix b/CHANGES/4068.bugfix new file mode 100644 index 0000000000..0c5ef8aa0b --- /dev/null +++ b/CHANGES/4068.bugfix @@ -0,0 +1 @@ +Taught pulp-import to be able to use a subset of available worker-threads. diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index 34571cade6..e2148dde00 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -295,6 +295,10 @@ DOMAIN_ENABLED = False +# What percentage of available-workers will pulpimport use at a time, max +# By default, use all available workers. +IMPORT_WORKERS_PERCENT = 100 + # HERE STARTS DYNACONF EXTENSION LOAD (Keep at the very bottom of settings.py) # Read more at https://dynaconf.readthedocs.io/en/latest/guides/django.html from dynaconf import DjangoDynaconf, Validator # noqa diff --git a/pulpcore/app/tasks/importer.py b/pulpcore/app/tasks/importer.py index 344be114f8..9d0b117f21 100644 --- a/pulpcore/app/tasks/importer.py +++ b/pulpcore/app/tasks/importer.py @@ -8,6 +8,7 @@ from gettext import gettext as _ from logging import getLogger +from django.conf import settings from django.core.files.storage import default_storage from django.db.models import F from naya.json import stream_array, tokenize @@ -35,6 +36,7 @@ RepositoryResource, ) from pulpcore.constants import TASK_STATES +from pulpcore.tasking.pulpcore_worker import Worker from pulpcore.tasking.tasks import dispatch from pulpcore.plugin.importexport import BaseContentResource @@ -490,6 +492,18 @@ def safe_extract(tar, path=".", members=None, *, numeric_owner=False): default_storage.save(base_path, f) # Now import repositories, in parallel. + + # We want to be able to limit the number of available-workers that import will consume, + # so that pulp can continue to work while doing an import. We accomplish this by creating + # a reserved-resource string for each repo-import-task based on that repo's index in + # the dispatch loop, mod number-of-workers-to-consume. + # + # By default (setting is not-set), import will continue to use 100% of the available + # workers. + import_workers_percent = int(settings.get("IMPORT_WORKERS_PERCENT", 100)) + total_workers = Worker.objects.online_workers().count() + import_workers = max(1, int(total_workers * (import_workers_percent / 100.0))) + with open(os.path.join(temp_dir, REPO_FILE), "r") as repo_data_file: data = json.load(repo_data_file) gpr = GroupProgressReport( @@ -501,14 +515,16 @@ def safe_extract(tar, path=".", members=None, *, numeric_owner=False): ) gpr.save() - for src_repo in data: + for index, src_repo in enumerate(data): + # Lock the repo we're importing-into dest_repo_name = _get_destination_repo_name(importer, src_repo["name"]) - + # pulpcore-worker limiter + worker_rsrc = f"import-worker-{index % import_workers}" + exclusive_resources = [worker_rsrc] try: dest_repo = Repository.objects.get(name=dest_repo_name) except Repository.DoesNotExist: if create_repositories: - exclusive_resources = [] dest_repo_pk = "" else: log.warning( @@ -518,7 +534,7 @@ def safe_extract(tar, path=".", members=None, *, numeric_owner=False): ) continue else: - exclusive_resources = [dest_repo] + exclusive_resources.append(dest_repo) dest_repo_pk = dest_repo.pk dispatch(