diff --git a/CHANGES/4315.deprecation b/CHANGES/4315.deprecation new file mode 100644 index 0000000000..eeab6c2e38 --- /dev/null +++ b/CHANGES/4315.deprecation @@ -0,0 +1 @@ +Task filter ``reserved_resources_record`` has been deprecated and planned for removal in pulpcore 3.55. diff --git a/CHANGES/4315.misc b/CHANGES/4315.misc new file mode 100644 index 0000000000..48e347ff07 --- /dev/null +++ b/CHANGES/4315.misc @@ -0,0 +1,2 @@ +Added new task resource locking method ``get_prn`` that does not change with different settings. +``get_url`` is planned to be phased out of resource locking by pulpcore 3.55. diff --git a/pulpcore/app/replica.py b/pulpcore/app/replica.py index ab3dc3d0c4..d0e0f0e3c1 100644 --- a/pulpcore/app/replica.py +++ b/pulpcore/app/replica.py @@ -60,7 +60,7 @@ def __init__(self, pulp_ctx, task_group, tls_settings): # TODO check and compare this to distribution locking on the distribution viewset. if settings.DOMAIN_ENABLED: uri = f"/{self.domain.name}{uri}" - self.distros_uri = uri + self.distros_uris = [uri, f"pdrn:{self.domain.pulp_id}:distributions"] @staticmethod def needs_update(fields_dict, model_instance): @@ -176,7 +176,7 @@ def create_or_update_distribution(self, repository, upstream_distribution): general_update, task_group=self.task_group, shared_resources=[repository], - exclusive_resources=[self.distros_uri], + exclusive_resources=self.distros_uris, args=(distro.pk, self.app_label, self.distribution_serializer_name), kwargs={ "data": distribution_data, @@ -190,7 +190,7 @@ def create_or_update_distribution(self, repository, upstream_distribution): general_create, task_group=self.task_group, shared_resources=[repository], - exclusive_resources=[self.distros_uri], + exclusive_resources=self.distros_uris, args=(self.app_label, self.distribution_serializer_name), kwargs={"data": distribution_data}, ) @@ -221,7 +221,7 @@ def remove_missing(self, names): dispatch( general_multi_delete, task_group=self.task_group, - exclusive_resources=[self.distros_uri], + exclusive_resources=self.distros_uris, args=(distribution_ids,), ) diff --git a/pulpcore/app/util.py b/pulpcore/app/util.py index 17d5568008..00729a6f71 100644 --- a/pulpcore/app/util.py +++ b/pulpcore/app/util.py @@ -13,7 +13,7 @@ import gnupg from django.conf import settings -from django.db.models import Sum +from django.db.models import Model, Sum from django.urls import Resolver404, resolve, reverse from opentelemetry import metrics @@ -32,7 +32,7 @@ def get_url(model, domain=None): """ Get a resource url for the specified model instance or class. This returns the path component of - the resource URI. This is used in our resource locking/reservation code to identify resources. + the resource URI. Args: model (django.models.Model): A model instance or class. @@ -59,6 +59,45 @@ def get_url(model, domain=None): return reverse(get_view_name_for_model(model, view_action), kwargs=kwargs) +def get_prn(instance=None, uri=None): + """ + Get a Pulp Resource Name (PRN) for the specified model instance. It is similar to a HREF + url in that it uniquely identifies a resource, but it also has the guarantee that it will not + change regardless of API_ROOT or DOMAIN_ENABLED. This is used in our resource locking/ + reservation code to identify resources. + + The format for the PRN is as follows: + ``` + prn:model-label-lower:pk + ``` + + Examples: + instance=FileRepository(pk=123) -> prn:file.filerepository:123 + instance=Artifact(pk=abc) -> prn:core.artifact:abc + uri=/rerouted/api/v3/repositories/rpm/rpm/123/versions/2/ -> prn:core.repositoryversion:abc + uri=/pulp/foodomain/api/v3/content/ansible/role/123/ -> prn:ansible.role:123 + + Args: + instance Optional(django.models.Model): A model instance. + uri Optional(str): A resource URI + + Returns: + prn (str): The PRN of the passed in resource + """ + if uri: + from pulpcore.app.viewsets import NamedModelViewSet + + instance = NamedModelViewSet.get_resource(uri) + + if not isinstance(instance, Model): + raise ValidationError(_("instance({}) must be a Model").format(instance)) + + if isinstance(instance, models.MasterModel): + instance = instance.cast() + + return f"prn:{instance._meta.label_lower}:{instance.pk}" + + def extract_pk(uri): """ Resolve a resource URI to a simple PK value. diff --git a/pulpcore/app/views/orphans.py b/pulpcore/app/views/orphans.py index 30086360c6..ba9583fa78 100644 --- a/pulpcore/app/views/orphans.py +++ b/pulpcore/app/views/orphans.py @@ -23,6 +23,7 @@ def delete(self, request, format=None): uri = "/api/v3/orphans/cleanup/" if settings.DOMAIN_ENABLED: uri = f"/{request.pulp_domain.name}{uri}" - task = dispatch(orphan_cleanup, exclusive_resources=[uri]) + exclusive_resources = [uri, f"pdrn:{request.pulp_domain.pulp_id}:orphans"] + task = dispatch(orphan_cleanup, exclusive_resources=exclusive_resources) return OperationPostponedResponse(task, request) diff --git a/pulpcore/app/views/repair.py b/pulpcore/app/views/repair.py index 4e743136e7..681b1c556a 100644 --- a/pulpcore/app/views/repair.py +++ b/pulpcore/app/views/repair.py @@ -30,6 +30,9 @@ def post(self, request): uri = "/api/v3/repair/" if settings.DOMAIN_ENABLED: uri = f"/{request.pulp_domain.name}{uri}" - task = dispatch(repair_all_artifacts, exclusive_resources=[uri], args=[verify_checksums]) + exclusive_resources = [uri, f"pdrn:{request.pulp_domain.pulp_id}:repair"] + task = dispatch( + repair_all_artifacts, exclusive_resources=exclusive_resources, args=[verify_checksums] + ) return OperationPostponedResponse(task, request) diff --git a/pulpcore/app/viewsets/custom_filters.py b/pulpcore/app/viewsets/custom_filters.py index 70b1a76b36..53deab03ad 100644 --- a/pulpcore/app/viewsets/custom_filters.py +++ b/pulpcore/app/viewsets/custom_filters.py @@ -16,6 +16,7 @@ from pulpcore.app.models import ContentArtifact, RepositoryVersion, Publication from pulpcore.app.viewsets import NamedModelViewSet +from pulpcore.app.loggers import deprecation_logger class ReservedResourcesFilter(Filter): @@ -83,6 +84,9 @@ def filter(self, qs, value): Returns: django.db.models.query.QuerySet: Queryset filtered by the reserved resource """ + deprecation_logger.warning( + "This filter is deprecated. Please use reserved_resources(__in) instead." + ) if value is None: # a value was not supplied by a user diff --git a/pulpcore/app/viewsets/orphans.py b/pulpcore/app/viewsets/orphans.py index 60b4bfe8d4..a799b91a26 100644 --- a/pulpcore/app/viewsets/orphans.py +++ b/pulpcore/app/viewsets/orphans.py @@ -29,10 +29,11 @@ def cleanup(self, request): uri = "/api/v3/orphans/cleanup/" if settings.DOMAIN_ENABLED: uri = f"/{request.pulp_domain.name}{uri}" + exclusive_resources = [uri, f"pdrn:{request.pulp_domain.pulp_id}:orphans"] task = dispatch( orphan_cleanup, - exclusive_resources=[uri], + exclusive_resources=exclusive_resources, kwargs={"content_pks": content_pks, "orphan_protection_time": orphan_protection_time}, ) diff --git a/pulpcore/app/viewsets/publication.py b/pulpcore/app/viewsets/publication.py index 763e3b9dd4..3e72c0c055 100644 --- a/pulpcore/app/viewsets/publication.py +++ b/pulpcore/app/viewsets/publication.py @@ -41,6 +41,7 @@ LabelFilter, RepositoryVersionFilter, ) +from pulpcore.app.util import get_domain class PublicationContentFilter(Filter): @@ -543,7 +544,7 @@ def get_queryset(self): def async_reserved_resources(self, instance): """Return resource that locks all Distributions.""" - return ["/api/v3/distributions/"] + return ["/api/v3/distributions/", f"pdrn:{get_domain().pulp_id}:distributions"] class ListDistributionViewSet(BaseDistributionViewSet, mixins.ListModelMixin): diff --git a/pulpcore/app/viewsets/reclaim.py b/pulpcore/app/viewsets/reclaim.py index 7dadc8ca0b..a885698ef2 100644 --- a/pulpcore/app/viewsets/reclaim.py +++ b/pulpcore/app/viewsets/reclaim.py @@ -42,7 +42,7 @@ def reclaim(self, request): uri = "/api/v3/repositories/reclaim_space/" if settings.DOMAIN_ENABLED: uri = f"/{request.pulp_domain.name}{uri}" - exclusive_resources = [uri] + exclusive_resources = [uri, f"pdrn:{request.pulp_domain.pulp_id}:reclaim_space"] task = dispatch( reclaim_space, diff --git a/pulpcore/app/viewsets/replica.py b/pulpcore/app/viewsets/replica.py index f7298b43cb..afaccc6ad0 100644 --- a/pulpcore/app/viewsets/replica.py +++ b/pulpcore/app/viewsets/replica.py @@ -126,10 +126,11 @@ def replicate(self, request, pk): uri = "/api/v3/servers/" if settings.DOMAIN_ENABLED: uri = f"/{request.pulp_domain.name}{uri}" + exclusive_resources = [uri, f"pdrn:{request.pulp_domain.pulp_id}:servers"] dispatch( replicate_distributions, - exclusive_resources=[uri], + exclusive_resources=exclusive_resources, kwargs={"server_pk": pk}, task_group=task_group, ) diff --git a/pulpcore/app/viewsets/task.py b/pulpcore/app/viewsets/task.py index 1d837e026f..441f2fe306 100644 --- a/pulpcore/app/viewsets/task.py +++ b/pulpcore/app/viewsets/task.py @@ -46,7 +46,9 @@ class TaskFilter(BaseFilterSet): # This filter is deprecated and badly documented, but we need to keep it for compatibility # reasons - reserved_resources_record = ReservedResourcesRecordFilter() + reserved_resources_record = ReservedResourcesRecordFilter( + help_text=_("Deprecated, will be removed in pulpcore 3.55. Use reserved_resources instead.") + ) created_resources = CreatedResourcesFilter() # Non model field filters reserved_resources = ReservedResourcesFilter(exclusive=True, shared=True) diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index 1c89125f19..ca19becf9c 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -13,7 +13,7 @@ from django_guid import get_guid from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS from pulpcore.app.models import Task -from pulpcore.app.util import current_task, get_domain, get_url +from pulpcore.app.util import current_task, get_domain, get_prn, get_url from pulpcore.constants import ( TASK_FINAL_STATES, TASK_INCOMPLETE_STATES, @@ -30,7 +30,11 @@ def _validate_and_get_resources(resources): if isinstance(r, str): resource_set.add(r) elif isinstance(r, Model): + # TODO: In pulpcore 3.55 remove get_url from this list + # If 3.55 requires downtime then this line can be removed with no further changes + # Else, we must update the scheduling logic to account for prior tasks using get_url resource_set.add(get_url(r)) + resource_set.add(get_prn(r)) elif r is None: # Silently drop None values pass @@ -146,8 +150,10 @@ def dispatch( shared_resources = _validate_and_get_resources(shared_resources) # A task that is exclusive on a domain will block all tasks within that domain + domain_prn = get_prn(get_domain()) domain_url = get_url(get_domain()) - if domain_url not in exclusive_resources: + if not (domain_prn in exclusive_resources or domain_url in exclusive_resources): + shared_resources.append(domain_prn) shared_resources.append(domain_url) resources = exclusive_resources + [f"shared:{resource}" for resource in shared_resources] diff --git a/pulpcore/tests/functional/api/using_plugin/test_tasks.py b/pulpcore/tests/functional/api/using_plugin/test_tasks.py index f179bcbb29..cfe6fc3c59 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_tasks.py +++ b/pulpcore/tests/functional/api/using_plugin/test_tasks.py @@ -1,5 +1,6 @@ """Test that operations can be performed over tasks.""" import json +import subprocess from urllib.parse import urljoin from uuid import uuid4 @@ -46,7 +47,6 @@ def setup_filter_fixture( file_repo, file_remote_ssl_factory, basic_manifest_path, - tasks_api_client, monitor_task, ): remote = file_remote_ssl_factory(manifest_path=basic_manifest_path, policy="on_demand") @@ -59,29 +59,31 @@ def setup_filter_fixture( repo_update_action = file_bindings.RepositoriesFileApi.partial_update( file_repo.pulp_href, {"description": str(uuid4())} ) - repo_update_task = tasks_api_client.read(repo_update_action.task) + repo_update_task = monitor_task(repo_update_action.task) - return (repo_sync_task, repo_update_task) + return repo_sync_task, repo_update_task, file_repo, remote def test_filter_tasks_by_reserved_resources(setup_filter_fixture, tasks_api_client): """Filter all tasks by a particular reserved resource.""" - repo_sync_task, repo_update_task = setup_filter_fixture - reserved_resources_record = repo_update_task.reserved_resources_record[0] + repo_sync_task, repo_update_task, _, _ = setup_filter_fixture + for resource in repo_update_task.reserved_resources_record: + if "/api/v3/repositories/file/file/" in resource: + reserved_resources_record = resource + break + else: + assert False, "File repository not found in reserved_resources_record" - results = tasks_api_client.list(reserved_resources_record=[reserved_resources_record]).results - # Why reserved_resources_record parameter needs to be a list here? ^ + results = tasks_api_client.list(reserved_resources=reserved_resources_record).results assert results[0].pulp_href == repo_update_task.pulp_href assert len(results) == 2 # Filter all tasks by a non-existing reserved resource. - with pytest.raises(ApiException) as ctx: - tasks_api_client.list( - reserved_resources_record=["a_resource_should_be_never_named_like_this"] - ) - - assert ctx.value.status == 400 + results = tasks_api_client.list( + reserved_resources="a_resource_should_be_never_named_like_this" + ).results + assert len(results) == 0 # Filter all tasks by a particular created resource. created_resources = repo_sync_task.created_resources[0] @@ -97,3 +99,63 @@ def test_filter_tasks_by_reserved_resources(setup_filter_fixture, tasks_api_clie tasks_api_client.list(created_resources=created_resources) assert ctx.value.status == 404 + + +def get_prn(uri): + commands = f"from pulpcore.app.util import get_prn; print(get_prn(uri='{uri}'));" + process = subprocess.run(["pulpcore-manager", "shell", "-c", commands], capture_output=True) + + assert process.returncode == 0 + prn = process.stdout.decode().strip() + return prn + + +def test_reserved_resources_filter(setup_filter_fixture, tasks_api_client): + """Filter tasks using the ReservedResourcesFilter type filters.""" + repo_sync_task, repo_update_task, repo, remote = setup_filter_fixture + task_hrefs = {repo_sync_task.pulp_href, repo_update_task.pulp_href} + + repo_prn = get_prn(repo.pulp_href) + remote_prn = get_prn(remote.pulp_href) + + # Sanity check, TODO: remove pulp_href from filter checks in pulpcore 3.55 + assert repo_prn in repo_sync_task.reserved_resources_record + assert f"shared:{remote_prn}" in repo_sync_task.reserved_resources_record + assert repo_prn in repo_update_task.reserved_resources_record + assert remote_prn not in repo_update_task.reserved_resources_record + + # reserved_resources filter + href_results = tasks_api_client.list(reserved_resources=repo.pulp_href) + assert href_results.count == 2 + assert set(h.pulp_href for h in href_results.results) == task_hrefs + prn_results = tasks_api_client.list(reserved_resources=repo_prn) + assert prn_results == href_results + mixed_results = tasks_api_client.list(reserved_resources__in=[repo.pulp_href, remote_prn]) + assert mixed_results.count == 1 + assert mixed_results.results[0].pulp_href == repo_sync_task.pulp_href + + # shared_resources filter + href_results = tasks_api_client.list(shared_resources=repo.pulp_href) + assert href_results.count == 0 + href_results = tasks_api_client.list(shared_resources=remote.pulp_href) + assert href_results.count == 1 + assert href_results.results[0].pulp_href == repo_sync_task.pulp_href + prn_results = tasks_api_client.list(shared_resources=repo_prn) + assert prn_results.count == 0 + prn_results = tasks_api_client.list(shared_resources=remote_prn) + assert prn_results == href_results + mixed_results = tasks_api_client.list(shared_resources__in=[repo_prn, remote.pulp_href]) + assert mixed_results.count == 0 + + # exclusive_resources filter + href_results = tasks_api_client.list(exclusive_resources=remote.pulp_href) + assert href_results.count == 0 + href_results = tasks_api_client.list(exclusive_resources=repo.pulp_href) + assert href_results.count == 2 + assert set(h.pulp_href for h in href_results.results) == task_hrefs + prn_results = tasks_api_client.list(exclusive_resources=remote_prn) + assert prn_results.count == 0 + prn_results = tasks_api_client.list(exclusive_resources=repo_prn) + assert prn_results == href_results + mixed_results = tasks_api_client.list(exclusive_resources__in=[repo_prn, remote_prn]) + assert mixed_results.count == 0