Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 82 additions & 21 deletions ami/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,12 @@ def update_calculated_fields(self, *args, **kwargs):

def _get_object_perms(self, user):
"""
Get the object-level permissions for the user on this instance.
This method retrieves permissions like `update_modelname`, `create_modelname`, etc.
Retrieve project-scoped object permission codenames for the given user.

If the instance is linked to a Project, returns permission strings on that project that target this model (those ending with "_<model_name>"). If the instance has no related Project, returns an empty list.

Returns:
list[str]: Permission codenames for this instance (e.g., "update_modelname", "delete_modelname"), or an empty list if no project is associated.
"""
project = self.get_project()
if not project:
Expand All @@ -181,6 +185,16 @@ def _get_object_perms(self, user):
return object_perms

def check_model_level_permission(self, user: AbstractUser | AnonymousUser, action: str) -> bool:
"""
Determine whether the user has the specified model-level permission for this model. Always allows the "retrieve" (view) action for all users.

Parameters:
user (AbstractUser | AnonymousUser): The user whose permissions are being checked.
action (str): The action name (e.g., "create", "update", "destroy", "retrieve", or a custom action).

Returns:
bool: `True` if the user has the permission for the action on this model, `False` otherwise.
"""
model = self._meta.model_name
app_label = "main" # Assume all model level permissions are in 'main' app

Expand All @@ -199,8 +213,10 @@ def check_model_level_permission(self, user: AbstractUser | AnonymousUser, actio

def check_permission(self, user: AbstractUser | AnonymousUser, action: str) -> bool:
"""
Entry point for all permission checks.
Decides whether to perform model-level or object-level permission check.
Choose and perform the appropriate permission check (model-level or object-level) for the given action.

Returns:
`true` if the user is permitted to perform the action, `false` otherwise.
"""
# Get related project accessor
accessor = self.get_project_accessor()
Expand All @@ -213,10 +229,16 @@ def check_permission(self, user: AbstractUser | AnonymousUser, action: str) -> b

def check_object_level_permission(self, user: AbstractUser | AnonymousUser, action: str) -> bool:
"""
Check if the user has permission to perform the action
on this instance.
This method is used to determine if the user can perform
CRUD operations or custom actions on the model instance.
Determine whether the given user is permitted to perform the specified action on this model instance.

If the instance is not associated with a specific Project, falls back to model-level permission checks. For a "retrieve" action, allows access for non-draft projects; for draft projects, allows access only to project members, the project owner, or superusers. Maps CRUD actions ("create", "update", "partial_update", "destroy") to their corresponding project-scoped permissions and checks them against the related Project. Any other action is delegated to the instance's custom object-level permission check.

Parameters:
user (AbstractUser | AnonymousUser): The user for whom the permission is evaluated.
action (str): The action name to check (e.g., "retrieve", "create", "update", "partial_update", "destroy", or a custom action).

Returns:
bool: `True` if the user is allowed to perform the action on this instance, `False` otherwise.
"""
from ami.users.roles import BasicMember

Expand Down Expand Up @@ -245,8 +267,15 @@ def check_object_level_permission(self, user: AbstractUser | AnonymousUser, acti
return self.check_custom_object_level_permission(user, action)

def check_custom_object_level_permission(self, user: AbstractUser | AnonymousUser, action: str) -> bool:
"""Check custom object level permissions for the user on this instance.
This is used for actions that are not standard CRUD operations.
"""
Check a non-CRUD, object-scoped permission for the given user on this instance.

Parameters:
user (AbstractUser | AnonymousUser): The user whose permission is being checked.
action (str): Permission action prefix to check (e.g., "approve" -> checks "approve_<modelname>").

Returns:
bool: `True` if the user has the `<action>_<modelname>` permission for this instance's project (or for `None` if the instance has no project), `False` otherwise.
"""
assert self._meta.model_name is not None, "Model must have a model_name defined in Meta class."
model_name = self._meta.model_name.lower()
Expand All @@ -257,8 +286,12 @@ def check_custom_object_level_permission(self, user: AbstractUser | AnonymousUse

def get_permissions(self, user: AbstractUser | AnonymousUser) -> list[str]:
"""
Entry point for retrieving user permissions on this instance.
Decides whether to return model-level or object-level permissions.
Return the allowed action names the user may perform on this instance.

Determines whether permissions should be evaluated at the model (collection) level or the object (instance) level and returns the corresponding set of actions.

Returns:
allowed_actions (list[str]): A list of permission action names (e.g., "view", "update", "delete", custom actions) that the user is allowed to perform on this instance.
"""
accessor = self.get_project_accessor()

Expand All @@ -271,8 +304,10 @@ def get_permissions(self, user: AbstractUser | AnonymousUser) -> list[str]:

def get_model_level_permissions(self, user: AbstractUser | AnonymousUser) -> list[str]:
"""
Retrieve model-level permissions for the given user.
Returns a list of allowed actions such as ["create", "update", "delete"].
Return the model-level actions the given user is allowed to perform on this model.

Returns:
allowed_actions (list[str]): List of action names (e.g. "update", "delete", "view") the user has permission for on this model, plus any custom model-level actions.
"""
if user.is_superuser:
# Superusers get all possible actions
Expand All @@ -295,9 +330,15 @@ def get_model_level_permissions(self, user: AbstractUser | AnonymousUser) -> lis

def get_custom_model_level_permissions(self, user: AbstractUser | AnonymousUser) -> list[str]:
"""
Retrieve custom (non-CRUD) model-level permissions for the given user.
Custom permissions follow the pattern: <app_label>.<custom_action>_<model_name>
Example: "main.register_pipelines_processingservice"
Collect custom model-level permission actions the user has for this model.

Custom permissions are expected in the form "<app_label>.<action>_<model_name>" (for example, "main.register_pipelines_processingservice"). This method returns the distinct set of custom action names granted to the user for this model, excluding the standard CRUD actions.

Parameters:
user (AbstractUser | AnonymousUser): The user whose permissions will be inspected.

Returns:
list[str]: List of custom permission action names the user has for this model (e.g., ["register", "approve"]).
"""
model = self._meta.model_name
app_label = "main"
Expand All @@ -319,7 +360,13 @@ def get_custom_model_level_permissions(self, user: AbstractUser | AnonymousUser)

def get_object_level_permissions(self, user: AbstractUser | AnonymousUser) -> list[str]:
"""
Retrieve object-level permissions (including custom ones) for this instance.
Return the object-level action names the given user is allowed to perform on this instance.

Parameters:
user (AbstractUser | AnonymousUser): The user whose permissions will be evaluated.

Returns:
list[str]: A list of allowed object-level action names (e.g. "update", "delete") plus any custom object-level actions. If the instance is not associated with a project, this falls back to the model-level permissions for the user. Superusers always get "update" and "delete" in addition to custom object-level permissions.
"""

if user.is_superuser:
Expand All @@ -340,7 +387,14 @@ def get_object_level_permissions(self, user: AbstractUser | AnonymousUser) -> li

def get_custom_object_level_permissions(self, user: AbstractUser | AnonymousUser) -> list[str]:
"""
Retrieve custom (non-CRUD) permissions for this instance.
List custom object-level permission actions the user has for this instance.

Parameters:
user (AbstractUser | AnonymousUser): The user whose object-level permissions will be evaluated.

Returns:
list[str]: A list of custom permission action names (permission prefixes) the user has for this instance,
excluding the standard actions "view", "create", "update", and "delete".
"""
object_perms = self._get_object_perms(user)
custom_perms = {
Expand All @@ -353,7 +407,14 @@ def get_custom_object_level_permissions(self, user: AbstractUser | AnonymousUser
@classmethod
def get_collection_level_permissions(cls, user: AbstractUser | AnonymousUser, project) -> list[str]:
"""
Retrieve collection-level permissions for the given user.
Determine collection-level permissions for a user on this model within an optional project context.

Parameters:
user (AbstractUser | AnonymousUser): The user whose permissions are being evaluated.
project (Project | None): Optional Project instance to use for project-scoped permission checks.

Returns:
list[str]: `["create"]` if the user is allowed to create instances in this collection (either globally or on the given project), otherwise an empty list.
"""
app_label = "main"
if user.is_superuser:
Expand All @@ -370,4 +431,4 @@ def get_collection_level_permissions(cls, user: AbstractUser | AnonymousUser, pr
return []

class Meta:
abstract = True
abstract = True
35 changes: 25 additions & 10 deletions ami/base/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,17 @@ def add_object_level_permissions(
user: AbstractBaseUser | AnonymousUser, instance: BaseModel, response_data: dict
) -> dict:
"""
Adds object-level permissions to the response data for a given user and instance.
This function updates the `response_data` dictionary with the permissions that the
specified `user` has on the given `instance`'s project.
Add object-level permissions for `user` on `instance` into `response_data`.

Retrieves existing `user_permissions` from `response_data` (defaults to an empty set), adds permissions returned by `instance.get_permissions(user)` when `instance` is a BaseModel, stores the resulting permissions as a list under `user_permissions`, and returns the updated `response_data`.

Parameters:
user (AbstractBaseUser | AnonymousUser): The user for whom permissions are collected.
instance (BaseModel): The model instance whose object-level permissions are queried.
response_data (dict): Response dictionary to be augmented; may already contain a `user_permissions` entry.

Returns:
dict: The updated `response_data` with `user_permissions` set to a list of permission strings.
"""

permissions = response_data.get("user_permissions", set())
Expand All @@ -58,12 +66,19 @@ def add_object_level_permissions(


def add_collection_level_permissions(user: User | None, response_data: dict, model, project) -> dict:
"""Add collection-level permissions to the response data for a list view.

This function modifies the `response_data` dictionary to include user permissions
for creating new objects of the specified model type. If the user is logged in and
is an active staff member, or if the user has create_model permission, the
"create" permission is added to the `user_permissions` set in the `response_data`.
"""
Add collection-level permissions for a model to the response payload for a list view.

Queries the model for collection-level permissions for the given user and project, merges them into the response_data's "user_permissions" entry, and returns the updated response_data.

Parameters:
user (User | None): The requesting user or None for anonymous requests.
response_data (dict): The response payload to augment; its "user_permissions" key will be updated.
model: The model class providing collection-level permissions via a `get_collection_level_permissions(user, project)` method.
project: The project context passed to the model's permission lookup.

Returns:
dict: The updated response_data with "user_permissions" set to a list of permission strings.
"""

logger.info(f"add_collection_level_permissions model {model.__name__}, {type(model)} ")
Expand All @@ -83,4 +98,4 @@ def has_permission(self, request, view):
return True # Always allow — object-level handles actual checks

def has_object_permission(self, request, view, obj: BaseModel):
return obj.check_permission(request.user, view.action)
return obj.check_permission(request.user, view.action)
23 changes: 19 additions & 4 deletions ami/base/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,30 @@ def get_permissions(self, instance, instance_data):
)

def to_representation(self, instance):
"""
Serialize the given object and augment its representation with object-level permission information.

Parameters:
instance: The model instance (or object) to serialize.

Returns:
dict: Serialized representation of the instance with permission-related fields merged into the output.
"""
instance_data = super().to_representation(instance)
instance_data = self.get_permissions(instance=instance, instance_data=instance_data)
return instance_data

def get_instance_for_permission_check(self):
"""
Returns an unsaved model instance built from validated_data,
excluding ManyToMany fields and any non-model fields (like 'project').
Safe to use for permission checking before saving.
Builds an unsaved model instance from the serializer's validated_data for permission checks.

Only fields that correspond to the model's fields are included; many-to-many relations and any keys not defined on the model are excluded.

Returns:
An unsaved instance of self.Meta.model populated with the filtered validated data.

Raises:
ValueError: If the serializer has not been validated (no validated_data present).
"""
validated_data = getattr(self, "validated_data", {})
if not validated_data:
Expand Down Expand Up @@ -204,4 +219,4 @@ class FilterParamsSerializer(serializers.Serializer):
def clean(self) -> dict[str, typing.Any]:
if self.is_valid(raise_exception=True):
return self.validated_data
raise api_exceptions.ValidationError("Invalid filter parameters")
raise api_exceptions.ValidationError("Invalid filter parameters")
25 changes: 23 additions & 2 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,12 @@ def duration(self) -> datetime.timedelta | None:

def save(self, update_progress=True, *args, **kwargs):
"""
Create the job stages if they don't exist.
Ensure job progress/stages are initialized or refreshed and persist the Job model.

If the job exists and has stages and update_progress is True, recompute overall progress from stages before saving; otherwise ensure default stages are created. Persist the model and emit a warning when the progress summary's status does not match the model's status.

Parameters:
update_progress (bool): If True, update the JobProgress summary from current stages before saving. If False, skip recomputation (but still create stages if missing).
"""
if self.pk and self.progress.stages and update_progress:
self.update_progress(save=False)
Expand All @@ -950,6 +955,16 @@ def save(self, update_progress=True, *args, **kwargs):
logger.warning(f"Job {self} status mismatches progress: {self.progress.summary.status} != {self.status}")

def check_custom_object_level_permission(self, user, action: str) -> bool:
"""
Determine whether a given user has the custom object-level permission to perform an action on this job.

Parameters:
user: The user whose permissions are checked.
action (str): The action name (e.g., "run", "cancel", "retry", or other custom action). If this job references a single source image, the action is treated as "run_single_image".

Returns:
bool: `true` if the user has the computed permission for the job's project (or globally if no project), `false` otherwise.
"""
job_type = self.job_type_key.lower()
if self.source_image_single:
action = "run_single_image"
Expand All @@ -962,6 +977,12 @@ def check_custom_object_level_permission(self, user, action: str) -> bool:
return user.has_perm(permission_codename, project)

def get_custom_object_level_permissions(self, user) -> list[str]:
"""
Collect the custom object-level permission action names the given user has for this job's project and job type.

Returns:
list[str]: A list of permission action names (for example, "export" or "run") that the user has for this job on its project. Returns an empty list if the job has no associated project or no matching custom permissions.
"""
project = self.get_project()
if not project:
return []
Expand Down Expand Up @@ -1003,4 +1024,4 @@ class Meta:
ordering = ["-created_at"]
# permissions = [
# ("run_job", "Can run a job"),
# ("cancel_job", "Can cancel a job"),
# ("cancel_job", "Can cancel a job"),
8 changes: 6 additions & 2 deletions ami/jobs/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ def cancel(self, request, pk=None):

def perform_create(self, serializer):
"""
If the ``start_now`` parameter is passed, enqueue the job immediately.
Create and save a Job instance, optionally enqueuing it immediately when requested.

Constructs a Job from the provided serializer data and checks object-level permissions before saving.
If the request includes the "start_now" parameter and the user has permission to run the job, the job is enqueued.
If "start_now" is requested but the user lacks run permission, raises PermissionDenied.
"""
# All jobs created from the Jobs UI are ML jobs.
# @TODO Remove this when the UI is updated pass a job type
Expand Down Expand Up @@ -155,4 +159,4 @@ def get_queryset(self) -> QuerySet:

@extend_schema(parameters=[project_id_doc_param])
def list(self, request, *args, **kwargs):
return super().list(request, *args, **kwargs)
return super().list(request, *args, **kwargs)
Loading