Skip to content

Commit

Permalink
Correct permissions for Users (NRV). Adjust tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
tizot committed May 11, 2016
1 parent 008fd4a commit 3eb6c17
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 85 deletions.
15 changes: 14 additions & 1 deletion sigma_core/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ def is_sigma_admin(self):
def is_in_cluster(self, cluster):
return cluster in self.clusters.all()

def is_cluster_admin(self, cluster):
from sigma_core.models.cluster import Cluster
ms = self.get_group_membership(cluster.group_ptr)
return (ms is not None and ms.perm_rank == Cluster.ADMINISTRATOR_RANK)

def is_admin_of_one_cluster(self, clusters):
from functools import reduce
import operator
return reduce(operator.or_, [self.is_cluster_admin(c) for c in clusters])

def has_common_cluster(self, user):
"""
Return True iff self has a cluster in common with user.
Expand All @@ -95,8 +105,11 @@ def has_common_cluster(self, user):
def has_common_group(self, user):
"""
Return True iff self has a group in common with user.
Warning: non symmetric function! u1.has_common_group(u2) may be different of u2.has_common_group(u1)
"""
return len(set(self.memberships.all().values_list('group', flat=True)).intersection(user.memberships.all().values_list('group', flat=True))) > 0
# We filter on self.memberships.perm_rank: we are really in the same group if you ARE really in the group.
# But, on the other hand, you can "see" pending request of other members.
return len(set(self.memberships.filter(perm_rank__gte=1).values_list('group', flat=True)).intersection(user.memberships.all().values_list('group', flat=True))) > 0

def get_group_membership(self, group):
from sigma_core.models.group_member import GroupMember
Expand Down
6 changes: 4 additions & 2 deletions sigma_core/serializers/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ class MinimalUserSerializer(serializers.ModelSerializer):
"""
Serialize an User with minimal data.
"""
class Meta(UserSerializerMeta):
pass
class Meta:
model = User
fields = ('id', 'lastname', 'firstname', 'is_active', 'clusters_ids', )
read_only_fields = ('is_active', )

clusters_ids = serializers.PrimaryKeyRelatedField(queryset=Cluster.objects.all(), many=True, source='clusters')

Expand Down
76 changes: 49 additions & 27 deletions sigma_core/tests/test_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ def test_get_user_unauthed(self):
response = self.client.get(self.user_url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)

def test_get_user_1_req_2(self):
def test_get_user_1_req_5(self):
# Client authenticated: request user in same cluster
self.client.force_authenticate(user=self.users[0])
response = self.client.get(self.user_url + "%d/" % self.users[1].id)
response = self.client.get(self.user_url + "%d/" % self.users[4].id)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['lastname'], self.users[1].lastname)
self.assertEqual(response.data['email'], self.users[1].email)
self.assertEqual(response.data['lastname'], self.users[4].lastname)
self.assertEqual(response.data['email'], self.users[4].email)
self.assertEqual(response.data['clusters_ids'], [c.id for c in self.clusters])

def test_get_user_3_req_7(self):
Expand All @@ -138,17 +138,26 @@ def test_get_user_2_req_6(self):
self.client.force_authenticate(user=self.users[1])
response = self.client.get(self.user_url + "%d/" % self.users[5].id)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['lastname'], self.users[1].lastname)
self.assertEqual(response.data['lastname'], self.users[5].lastname)
self.assertEqual(response.data['clusters_ids'], [self.clusters[1].id])
self.assertNotIn('email', response.data)
self.assertNotIn('photo', response.data)
self.assertIn('email', response.data)

def test_get_user_8_req_3(self):
# Client authenticated: request user in a group he's invited
# Client authenticated: request a user who is in a group I'm invited
self.client.force_authenticate(user=self.users[7])
response = self.client.get(self.user_url + "%d/" % self.users[2].id)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['lastname'], self.users[2].lastname)
self.assertEqual(response.data['clusters_ids'], [self.clusters[0].id])
self.assertNotIn('email', response.data)
self.assertNotIn('photo', response.data)

def test_get_user_3_req_8(self):
# Client authenticated: request a user who is invited in a group whose I'm a member
self.client.force_authenticate(user=self.users[2])
response = self.client.get(self.user_url + "%d/" % self.users[7].id)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['lastname'], self.users[7].lastname)
self.assertEqual(response.data['clusters_ids'], [self.clusters[1].id])
self.assertNotIn('email', response.data)
self.assertNotIn('photo', response.data)
Expand Down Expand Up @@ -229,19 +238,19 @@ def test_create_user_clusteradmin_wrong_cluster(self):
#### Modification requests
def test_edit_email_wrong_permission(self):
# Client wants to change another user's email
self.client.force_authenticate(user=self.users[0])
user_data = UserSerializer(self.users[1]).data
self.client.force_authenticate(user=self.users[1])
user_data = UserSerializer(self.users[0]).data
user_data['email'] = "pi@random.org"
response = self.client.put(self.user_url + "%d/" % self.users[1].id, user_data)
response = self.client.put(self.user_url + "%d/" % self.users[0].id, user_data)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)

def test_edit_is_superuser_no_permission(self):
# Client can't set himself as administrator !
self.client.force_authenticate(user=self.users[0])
user_data = UserSerializer(self.users[0]).data
self.client.force_authenticate(user=self.users[1])
user_data = UserSerializer(self.users[1]).data
user_data['is_superuser'] = True
response = self.client.put(self.user_url + "%d/" % self.users[0].id, user_data)
self.assertFalse(self.users[0].is_superuser);
response = self.client.put(self.user_url + "%d/" % self.users[1].id, user_data)
self.assertFalse(self.users[1].is_superuser)

def test_edit_email_nonvalid_email(self):
# Client wants to change his email with a non valid value
Expand All @@ -253,24 +262,24 @@ def test_edit_email_nonvalid_email(self):

def test_edit_email_ok(self):
# Client wants to change his email and succeed in
self.client.force_authenticate(user=self.users[0])
user_data = UserSerializer(self.users[0]).data
self.client.force_authenticate(user=self.users[1])
user_data = UserSerializer(self.users[1]).data
old_email = user_data['email']
user_data['email'] = "pi@random.org"
response = self.client.put(self.user_url + "%d/" % self.users[0].id, user_data)
response = self.client.put(self.user_url + "%d/" % self.users[1].id, user_data)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['email'], user_data['email'])
# Guarantee that tests are independant
self.users[0].email = old_email
self.users[0].save()
self.users[1].email = old_email
self.users[1].save()

def test_edit_profile_wrong_permission(self):
# Client wants to change another user's phone number
self.client.force_authenticate(user=self.users[0])
user_data = UserSerializer(self.users[1]).data
self.client.force_authenticate(user=self.users[1])
user_data = UserSerializer(self.users[0]).data
user_data['phone'] = "0123456789"
response = self.client.put(self.user_url + "%d/" % self.users[1].id, user_data)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
response = self.client.put(self.user_url + "%d/" % self.users[0].id, user_data)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)

def test_edit_profile_ok(self):
# Client wants to change his phone number
Expand All @@ -286,12 +295,25 @@ def test_edit_profile_ok(self):
self.users[0].save()

def test_edit_lastname_oneself(self):
# Client wants to change his lastname
self.client.force_authenticate(user=self.users[1])
user_data = UserSerializer(self.users[1]).data
user_data['lastname'] = "Daudet"
response = self.client.put(self.user_url + "%d/" % self.users[1].id, user_data)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)

def test_edit_lastname_oneself_clusteradmin(self):
# Client wants to change his lastname
self.client.force_authenticate(user=self.users[0])
user_data = UserSerializer(self.users[0]).data
old_lastname = user_data['lastname']
user_data['lastname'] = "Daudet"
response = self.client.put(self.user_url + "%d/" % self.users[0].id, user_data)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['lastname'], user_data['lastname'])
# Guarantee that tests are independant
self.users[1].lastname = old_lastname
self.users[1].save()

def test_edit_lastname_cluster_admin(self):
# Cluster admin wants to change the lastname of one cluster's member
Expand All @@ -303,8 +325,8 @@ def test_edit_lastname_cluster_admin(self):
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['lastname'], user_data['lastname'])
# Guarantee that tests are independant
self.users[1].lastname = old_lastname
self.users[1].save()
self.users[0].lastname = old_lastname
self.users[0].save()

def test_edit_lastname_sigma_admin(self):
# Admin wants to change an user's lastname
Expand Down
82 changes: 27 additions & 55 deletions sigma_core/views/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from sigma_core.models.user import User
from sigma_core.models.group_member import GroupMember
from sigma_core.serializers.user import UserSerializer, MyUserSerializer
from sigma_core.serializers.user import UserSerializer, MinimalUserSerializer, MyUserSerializer


reset_mail = {
Expand All @@ -33,8 +33,8 @@

class UserViewSet(mixins.CreateModelMixin, # Only Cluster admins can create users
mixins.ListModelMixin, # Can only see members within same cluster or group
mixins.RetrieveModelMixin, # Idem
mixins.UpdateModelMixin, # Only self
mixins.RetrieveModelMixin, # Can see anybody but with different serializations
mixins.UpdateModelMixin, # Only self or cluster admin
mixins.DestroyModelMixin, # Only self or Sigma admin
viewsets.GenericViewSet):
permission_classes = [IsAuthenticated, ]
Expand All @@ -53,78 +53,50 @@ def perform_create(self, serializer):

def list(self, request, *args, **kwargs):
"""
Get the list of users that you are allowed to see.
Get the list of users that you are allowed to see w.r.t. the Normal Rules of Visibility.
"""
# Sigma admins can list all the users
if request.user.is_sigma_admin():
return super().list(self, request, args, kwargs)

# We get visible users ids, based on their belongings to common clusters/groups (let's anticipate the pagination)
# We get visible users ids w.r.t. the Normal Rules of Visibility, based on their belongings to common clusters/groups (let's anticipate the pagination)
# Since clusters are groups, we only check that condition for groups
groups_ids = request.user.memberships.all().values_list('group_id', flat=True)
users_ids = User.objects.all().prefetch_related('memberships').filter(memberships__group__id__in=groups_ids).distinct().values_list('id', flat=True)

qs = User.objects.filter(id__in=users_ids)
groups_ids = request.user.memberships.filter(perm_rank__gte=1).values_list('group_id', flat=True)
qs = User.objects.prefetch_related('memberships').filter(is_active=True, memberships__group__id__in=groups_ids).distinct()
s = UserSerializer(qs, many=True, context={'request': request})
return Response(s.data, status=status.HTTP_200_OK)

def retrieve(self, request, pk=None):
"""
Retrieve an User according to its id (pk).
"""
# 1. Check if we are allowed to see this user
user = User.objects.only('id').filter(pk=pk).prefetch_related('clusters').get()

# 2. Check what we can see from this User
qs = User.objects.all().select_related('photo').prefetch_related('clusters')
gm = GroupMember.objects.select_related('group')
# Admin: can see everything
if request.user.is_sigma_admin() or user.id == request.user.id:
user = qs.prefetch_related(
Prefetch('memberships', queryset=gm)
).get(pk=pk)
# Same cluster: can see public groups memberships
elif request.user.has_common_cluster(user):
user = qs.prefetch_related(
Prefetch('memberships', queryset=gm.filter(
Q(group__is_private=False) | Q(group__in=request.user.get_groups_with_confirmed_membership()))
)
).get(pk=pk)
# In the general case, we only see common Group memberships
# Also verify that we are on the same Group
else:
user = qs.prefetch_related(
Prefetch('memberships', queryset=gm.filter(group__in=request.user.get_groups_with_confirmed_membership()))
).get(pk=pk)
if not user.memberships.exists(): # No membership visible => User not visible
return Response(status=status.HTTP_404_NOT_FOUND)
# 1. Retrieve user
try:
user = User.objects.all().prefetch_related('clusters').select_related('photo').get(pk=pk)
except User.DoesNotExist:
return Response(status=status.HTTP_404_NOT_FOUND)

s =UserSerializer(user, context={'request': request})
return Response(s.data, status=status.HTTP_200_OK)
# 2. Check permissions to choose serializer
# Admin, oneself, common cluster or common group: can see detailed user
if request.user.is_sigma_admin() or user.id == request.user.id or request.user.has_common_cluster(user) or request.user.has_common_group(user):
s = UserSerializer(user, context={'request': request})
else : # Others can only see minimal information
s = MinimalUserSerializer(user, context={'request': request})

# def create(self, request):
# """
# ---
# parameters_strategy:
# form: merge
# parameters:
# - name: clusters_ids
# type: array[integer]
# required: true
# """
# return super(UserViewSet, self).create(request)
return Response(s.data, status=status.HTTP_200_OK)

def update(self, request, pk=None):
if not request.user.is_sigma_admin() and int(pk) != request.user.id:
return Response(status=status.HTTP_404_NOT_FOUND)

try:
user = User.objects.get(pk=pk)
user = User.objects.prefetch_related('clusters').get(pk=pk)
except User.DoesNotExist:
raise Http404()
return Response(status=status.HTTP_404_NOT_FOUND)

# I can update my own profile, or another's profile if I'm a sigma/cluster admin
if not (request.user.is_sigma_admin() or int(pk) == request.user.id or request.user.is_admin_of_one_cluster(user.clusters.all())):
return Response(status=status.HTTP_403_FORBIDDEN)

# Names edition is allowed to Sigma admins only
if ((request.data['lastname'] != user.lastname or request.data['firstname'] != user.firstname)) and not request.user.is_sigma_admin():
# Names edition is allowed to sigma/clusters admins only
if (request.data['lastname'] != user.lastname or request.data['firstname'] != user.firstname) and not request.user.is_sigma_admin() and not request.user.is_admin_of_one_cluster(user.clusters.all()):
return Response('You cannot change your lastname or firstname', status=status.HTTP_400_BAD_REQUEST)

return super(UserViewSet, self).update(request, pk)
Expand Down

0 comments on commit 3eb6c17

Please sign in to comment.