Skip to content

Commit

Permalink
Introduce Pulp Resource Name (PRN) into task resource locking
Browse files Browse the repository at this point in the history
fixes: pulp#4315
  • Loading branch information
gerrod3 committed Nov 21, 2023
1 parent 9cab9f4 commit 28a646c
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGES/4315.deprecation
@@ -0,0 +1 @@
Task filter ``reserved_resources_record`` has been deprecated and planned for removal in pulpcore 3.55.
2 changes: 2 additions & 0 deletions CHANGES/4315.misc
@@ -0,0 +1,2 @@
Added new task resource locking method ``get_prn`` that does not change with different settings.
``get_url`` is planned to be phased out of resource locking by pulpcore 3.55.
41 changes: 40 additions & 1 deletion pulpcore/app/util.py
Expand Up @@ -13,6 +13,7 @@
import gnupg

from django.conf import settings
from django.db.models import Model
from django.urls import Resolver404, resolve, reverse
from rest_framework.serializers import ValidationError

Expand All @@ -28,7 +29,7 @@
def get_url(model, domain=None):
"""
Get a resource url for the specified model instance or class. This returns the path component of
the resource URI. This is used in our resource locking/reservation code to identify resources.
the resource URI.
Args:
model (django.models.Model): A model instance or class.
Expand All @@ -55,6 +56,44 @@ def get_url(model, domain=None):
return reverse(get_view_name_for_model(model, view_action), kwargs=kwargs)


def get_prn(instance=None, uri=None):
"""
Get a Pulp Resource Name (PRN) for the specified model instance. It is similar to a HREF
url in that it uniquely identifies a resource, but it also has the guarantee that it will not
change regardless of API_ROOT or DOMAIN_ENABLED. This is used in our resource locking/
reservation code to identify resources.
The format for the PRN is as follows:
```
prn:model-label-lower:id
```
Examples:
instance=FileRepository(pk=123) -> prn:file.filerepository:123
instance=Artifact(pk=abc) -> prn:core.artifact:abc
uri=/rerouted/api/v3/repositories/rpm/rpm/123/versions/2/ -> prn:core.repositoryversion:abc
uri=/pulp/foodomain/api/v3/content/ansible/role/123/ -> prn:ansible.role:123
Args:
instance Optional(django.models.Model): A model instance.
uri Optional(str): A resource URI
Returns:
prn (str): The PRN of the passed in resource
"""
if uri:
from pulpcore.app.viewsets import NamedModelViewSet
instance = NamedModelViewSet.get_resource(uri)

if not isinstance(instance, Model):
raise ValidationError(_("instance({}) must be a Model").format(instance))

if isinstance(instance, models.MasterModel):
instance = instance.cast()

return f"prn:{instance._meta.label_lower}:{instance.pk}"


def extract_pk(uri):
"""
Resolve a resource URI to a simple PK value.
Expand Down
4 changes: 4 additions & 0 deletions pulpcore/app/viewsets/custom_filters.py
Expand Up @@ -16,6 +16,7 @@

from pulpcore.app.models import ContentArtifact, RepositoryVersion, Publication
from pulpcore.app.viewsets import NamedModelViewSet
from pulpcore.app.loggers import deprecation_logger


class ReservedResourcesFilter(Filter):
Expand Down Expand Up @@ -83,6 +84,9 @@ def filter(self, qs, value):
Returns:
django.db.models.query.QuerySet: Queryset filtered by the reserved resource
"""
deprecation_logger.warning(
"This filter is deprecated. Please use reserved_resources(__in) instead."
)

if value is None:
# a value was not supplied by a user
Expand Down
4 changes: 3 additions & 1 deletion pulpcore/app/viewsets/task.py
Expand Up @@ -46,7 +46,9 @@
class TaskFilter(BaseFilterSet):
# This filter is deprecated and badly documented, but we need to keep it for compatibility
# reasons
reserved_resources_record = ReservedResourcesRecordFilter()
reserved_resources_record = ReservedResourcesRecordFilter(
help_text=_("Deprecated, will be removed in pulpcore 3.55. Use reserved_resources instead.")
)
created_resources = CreatedResourcesFilter()
# Non model field filters
reserved_resources = ReservedResourcesFilter(exclusive=True, shared=True)
Expand Down
11 changes: 7 additions & 4 deletions pulpcore/tasking/tasks.py
Expand Up @@ -12,7 +12,7 @@
from django_guid import get_guid
from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS
from pulpcore.app.models import Task
from pulpcore.app.util import current_task, get_domain, get_url
from pulpcore.app.util import current_task, get_domain, get_prn, get_url
from pulpcore.constants import TASK_FINAL_STATES, TASK_INCOMPLETE_STATES, TASK_STATES

_logger = logging.getLogger(__name__)
Expand All @@ -24,7 +24,9 @@ def _validate_and_get_resources(resources):
if isinstance(r, str):
resource_set.add(r)
elif isinstance(r, Model):
# TODO: Remove get_url in pulpcore 3.55
resource_set.add(get_url(r))
resource_set.add(get_prn(r))
elif r is None:
# Silently drop None values
pass
Expand Down Expand Up @@ -140,9 +142,10 @@ def dispatch(
shared_resources = _validate_and_get_resources(shared_resources)

# A task that is exclusive on a domain will block all tasks within that domain
domain_url = get_url(get_domain())
if domain_url not in exclusive_resources:
shared_resources.append(domain_url)
domain_prn = get_prn(get_domain())
if domain_prn not in exclusive_resources:
shared_resources.append(domain_prn)
shared_resources.append(get_url(get_domain()))
resources = exclusive_resources + [f"shared:{resource}" for resource in shared_resources]

notify_workers = False
Expand Down
62 changes: 60 additions & 2 deletions pulpcore/tests/functional/api/using_plugin/test_tasks.py
@@ -1,5 +1,6 @@
"""Test that operations can be performed over tasks."""
import json
import subprocess
from urllib.parse import urljoin
from uuid import uuid4

Expand Down Expand Up @@ -59,12 +60,12 @@ def setup_filter_fixture(
)
repo_update_task = tasks_api_client.read(repo_update_action.task)

return (repo_sync_task, repo_update_task)
return repo_sync_task, repo_update_task, file_repo, remote


def test_filter_tasks_by_reserved_resources(setup_filter_fixture, tasks_api_client):
"""Filter all tasks by a particular reserved resource."""
repo_sync_task, repo_update_task = setup_filter_fixture
repo_sync_task, repo_update_task, _, _ = setup_filter_fixture
reserved_resources_record = repo_update_task.reserved_resources_record[0]

results = tasks_api_client.list(reserved_resources_record=[reserved_resources_record]).results
Expand Down Expand Up @@ -95,3 +96,60 @@ def test_filter_tasks_by_reserved_resources(setup_filter_fixture, tasks_api_clie
tasks_api_client.list(created_resources=created_resources)

assert ctx.value.status == 404


def get_prn(uri):
commands = ["from pulpcore.app.util import get_prn;", f"print(get_prn(uri={uri}));"]
process = subprocess.run(
["pulpcore-manager", "shell", "-c", commands], capture_output=True
)

assert process.returncode == 0
prn = process.stdout.decode().strip()
return prn


def test_reserved_resources_filter(setup_filter_fixture, tasks_api_client):
"""Filter tasks using the ReservedResourcesFilter type filters."""
repo_sync_task, repo_update_task, repo, remote = setup_filter_fixture

repo_prn = get_prn(repo.pulp_href)
remote_prn = get_prn(remote.pulp_href)

# Sanity check, TODO: remove pulp_href from filter checks in pulpcore 3.55
assert repo_prn in repo_sync_task.reserved_resources_record
assert f"shared:{remote_prn}" in repo_sync_task.reserved_resources_record
assert repo_prn in repo_update_task.reserved_resources_record
assert remote_prn not in repo_update_task.reserved_resources_record

# reserved_resources filter
href_results = tasks_api_client.filter(reserved_resources=repo.pulp_href)
assert href_results.count == 2
assert set(href_results.results) == {repo_sync_task, repo_update_task}
prn_results = tasks_api_client.filter(reserved_resources=repo_prn)
mixed_results = tasks_api_client.filter(reserved_resources_in=[repo.pulp_href, remote_prn])
assert mixed_results == href_results == prn_results

# shared_resources filter
href_results = tasks_api_client.filter(shared_resources=repo.pulp_href)
assert href_results.count == 0
href_results = tasks_api_client.filter(shared_resources=remote.pulp_href)
assert href_results.count == 1
assert href_results.results[0] == repo_sync_task
prn_results = tasks_api_client.filter(shared_resources=repo_prn)
assert prn_results.count == 0
prn_results = tasks_api_client.filter(shared_resources=remote_prn)
mixed_results = tasks_api_client.filter(shared_resources__in=[repo_prn, remote.pulp_href])
assert prn_results == href_results == mixed_results

# exclusive_resources filter
href_results = tasks_api_client.filter(exclusive_resources=remote.pulp_href)
assert href_results.count == 0
href_results = tasks_api_client.filter(exclusive_resources=repo.pulp_href)
assert href_results.count == 2
assert set(href_results.results) == {repo_sync_task, repo_update_task}
prn_results = tasks_api_client.filter(exclusive_resources=remote_prn)
assert prn_results.count == 0
prn_results = tasks_api_client.filter(exclusive_resources=repo_prn)
mixed_results = tasks_api_client.filter(exclusive_resources__in=[repo_prn, remote_prn])
assert prn_results == href_results == mixed_results

0 comments on commit 28a646c

Please sign in to comment.