diff --git a/CHANGES/4068.bugfix b/CHANGES/4068.bugfix new file mode 100644 index 0000000000..0c5ef8aa0b --- /dev/null +++ b/CHANGES/4068.bugfix @@ -0,0 +1 @@ +Taught pulp-import to be able to use a subset of available worker-threads. diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index 9fa500cf1a..8a2327f971 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -296,6 +296,10 @@ DEFAULT_AUTO_FIELD = "django.db.models.AutoField" +# What percentage of available-workers will pulpimport use at a time, max +# By default, use all available workers. +IMPORT_WORKERS_PERCENT = 100 + # HERE STARTS DYNACONF EXTENSION LOAD (Keep at the very bottom of settings.py) # Read more at https://dynaconf.readthedocs.io/en/latest/guides/django.html import dynaconf # noqa diff --git a/pulpcore/app/tasks/importer.py b/pulpcore/app/tasks/importer.py index ffb0d08d28..b240698fa1 100644 --- a/pulpcore/app/tasks/importer.py +++ b/pulpcore/app/tasks/importer.py @@ -8,6 +8,7 @@ from gettext import gettext as _ from logging import getLogger +from django.conf import settings from django.core.files.storage import default_storage from django.db.models import F from naya.json import stream_array, tokenize @@ -34,6 +35,7 @@ ContentArtifactResource, ) from pulpcore.constants import TASK_STATES +from pulpcore.tasking.pulpcore_worker import Worker from pulpcore.tasking.tasks import dispatch log = getLogger(__name__) @@ -482,6 +484,18 @@ def validate_and_assemble(toc_filename): default_storage.save(base_path, f) # Now import repositories, in parallel. + + # We want to be able to limit the number of available-workers that import will consume, + # so that pulp can continue to work while doing an import. We accomplish this by creating + # a reserved-resource string for each repo-import-task based on that repo's index in + # the dispatch loop, mod number-of-workers-to-consume. + # + # By default (setting is not-set), import will continue to use 100% of the available + # workers. + import_workers_percent = int(settings.get("IMPORT_WORKERS_PERCENT", 100)) + total_workers = Worker.objects.online_workers().count() + import_workers = max(1, int(total_workers * (import_workers_percent / 100.0))) + with open(os.path.join(temp_dir, REPO_FILE), "r") as repo_data_file: data = json.load(repo_data_file) gpr = GroupProgressReport( @@ -493,7 +507,10 @@ def validate_and_assemble(toc_filename): ) gpr.save() - for src_repo in data: + for index, src_repo in enumerate(data): + # pulpcore-worker limiter + worker_rsrc = f"import-worker-{index % import_workers}" + exclusive_resources = [worker_rsrc] try: dest_repo = _destination_repo(importer, src_repo["name"]) except Repository.DoesNotExist: @@ -503,6 +520,8 @@ def validate_and_assemble(toc_filename): ) ) continue + else: + exclusive_resources.append(dest_repo) dispatch( import_repository_version,