diff --git a/src/sentry/api/endpoints/broadcast_details.py b/src/sentry/api/endpoints/broadcast_details.py index bc33ad255b9491..12f1a565df4e40 100644 --- a/src/sentry/api/endpoints/broadcast_details.py +++ b/src/sentry/api/endpoints/broadcast_details.py @@ -9,7 +9,7 @@ from sentry.api.exceptions import ResourceDoesNotExist from sentry.api.serializers import AdminBroadcastSerializer, BroadcastSerializer, serialize from sentry.api.validators import AdminBroadcastValidator, BroadcastValidator -from sentry.auth.superuser import is_active_superuser +from sentry.auth.superuser import has_superuser_permission from sentry.models import Broadcast, BroadcastSeen logger = logging.getLogger("sentry") @@ -19,7 +19,7 @@ class BroadcastDetailsEndpoint(Endpoint): permission_classes = (IsAuthenticated,) def _get_broadcast(self, request, broadcast_id): - if is_active_superuser(request) and request.access.has_permission("broadcasts.admin"): + if has_superuser_permission(request, "broadcasts.admin"): queryset = Broadcast.objects.all() else: queryset = Broadcast.objects.filter( @@ -31,12 +31,12 @@ def _get_broadcast(self, request, broadcast_id): raise ResourceDoesNotExist def _get_validator(self, request): - if is_active_superuser(request): + if has_superuser_permission(request, "broadcasts.admin"): return AdminBroadcastValidator return BroadcastValidator def _get_serializer(self, request): - if is_active_superuser(request): + if has_superuser_permission(request, "broadcasts.admin"): return AdminBroadcastSerializer return BroadcastSerializer diff --git a/src/sentry/api/endpoints/broadcast_index.py b/src/sentry/api/endpoints/broadcast_index.py index af53ab6d728ea7..a0402512d0766c 100644 --- a/src/sentry/api/endpoints/broadcast_index.py +++ b/src/sentry/api/endpoints/broadcast_index.py @@ -10,7 +10,7 @@ from sentry.api.paginator import DateTimePaginator from sentry.api.serializers import AdminBroadcastSerializer, BroadcastSerializer, serialize from sentry.api.validators import AdminBroadcastValidator, BroadcastValidator -from sentry.auth.superuser import is_active_superuser +from sentry.auth.superuser import has_superuser_permission from sentry.db.models.query import in_icontains from sentry.models import Broadcast, BroadcastSeen from sentry.search.utils import tokenize_query @@ -22,7 +22,7 @@ class BroadcastIndexEndpoint(OrganizationEndpoint): permission_classes = (OrganizationPermission,) def _get_serializer(self, request): - if is_active_superuser(request): + if has_superuser_permission(request, "broadcasts.admin"): return AdminBroadcastSerializer return BroadcastSerializer @@ -41,10 +41,8 @@ def convert_args(self, request, organization_slug=None, *args, **kwargs): return (args, kwargs) def get(self, request, organization=None): - if ( - request.GET.get("show") == "all" - and is_active_superuser(request) - and request.access.has_permission("broadcasts.admin") + if request.GET.get("show") == "all" and has_superuser_permission( + request, "broadcasts.admin" ): # superusers can slice and dice queryset = Broadcast.objects.all().order_by("-date_added") @@ -141,7 +139,7 @@ def put(self, request): return self.respond(result) def post(self, request): - if not (is_active_superuser(request) and request.access.has_permission("broadcasts.admin")): + if not has_superuser_permission(request, "broadcasts.admin"): return self.respond(status=401) validator = AdminBroadcastValidator(data=request.data) diff --git a/src/sentry/api/endpoints/user_details.py b/src/sentry/api/endpoints/user_details.py index cf11dc9e321161..dc7f74fa1e4eba 100644 --- a/src/sentry/api/endpoints/user_details.py +++ b/src/sentry/api/endpoints/user_details.py @@ -15,7 +15,7 @@ from sentry.api.serializers import serialize from sentry.api.serializers.models.user import DetailedUserSerializer from sentry.api.serializers.rest_framework import ListField -from sentry.auth.superuser import is_active_superuser +from sentry.auth.superuser import has_superuser_permission from sentry.constants import LANGUAGES from sentry.models import Organization, OrganizationMember, OrganizationStatus, User, UserOption @@ -140,7 +140,7 @@ def put(self, request, user): :auth: required """ - if is_active_superuser(request) and request.access.has_permission("users.admin"): + if has_superuser_permission(request, "users.admin"): serializer_cls = PrivilegedUserSerializer else: serializer_cls = UserSerializer @@ -233,9 +233,9 @@ def delete(self, request, user): hard_delete = serializer.validated_data.get("hardDelete", False) # Only active superusers can hard delete accounts - if hard_delete and not is_active_superuser(request): + if hard_delete and not has_superuser_permission(request, "users.admin"): return Response( - {"detail": "Only superusers may hard delete a user account"}, + {"detail": "Missing required permission to hard delete account."}, status=status.HTTP_403_FORBIDDEN, ) diff --git a/src/sentry/auth/superuser.py b/src/sentry/auth/superuser.py index 8779543b6cb3e1..b3423353ce7d31 100644 --- a/src/sentry/auth/superuser.py +++ b/src/sentry/auth/superuser.py @@ -58,6 +58,10 @@ def is_active_superuser(request): return su.is_active +def has_superuser_permission(request, permission): + return is_active_superuser(request) and request.access.has_permission(permission) + + class Superuser: allowed_ips = [ipaddress.ip_network(str(v), strict=False) for v in ALLOWED_IPS] diff --git a/tests/sentry/api/endpoints/test_user_details.py b/tests/sentry/api/endpoints/test_user_details.py index 35f9b7af4c8891..8a0c4ad907754e 100644 --- a/tests/sentry/api/endpoints/test_user_details.py +++ b/tests/sentry/api/endpoints/test_user_details.py @@ -280,14 +280,29 @@ def test_cannot_hard_delete_self(self): # Cannot hard delete your own account self.get_valid_response(self.user.id, hardDelete=True, organizations=[], status_code=403) - def test_hard_delete_account(self): + def test_hard_delete_account_without_permission(self): + self.user.update(is_superuser=True) user2 = self.create_user(email="user2@example.com") # failed authorization, user does not have permissions to delete another user self.get_valid_response(user2.id, hardDelete=True, organizations=[], status_code=403) # Reauthenticate as super user to hard delete an account + self.login_as(user=self.user, superuser=True) + + self.get_valid_response(user2.id, hardDelete=True, organizations=[], status_code=403) + + assert User.objects.filter(id=user2.id).exists() + + def test_hard_delete_account_with_permission(self): self.user.update(is_superuser=True) + user2 = self.create_user(email="user2@example.com") + + # failed authorization, user does not have permissions to delete another user + self.get_valid_response(user2.id, hardDelete=True, organizations=[], status_code=403) + + # Reauthenticate as super user to hard delete an account + UserPermission.objects.create(user=self.user, permission="users.admin") self.login_as(user=self.user, superuser=True) self.get_valid_response(user2.id, hardDelete=True, organizations=[], status_code=204)