Skip to content
This repository has been archived by the owner on Dec 15, 2018. It is now read-only.

Commit

Permalink
decorator to check for project admin
Browse files Browse the repository at this point in the history
Summary: This diff implements a decorator that we can use for requiring project admins. It does NOT contain any user-facing change.

Test Plan: unit tests

Reviewers: anupc

Reviewed By: anupc

Subscribers: changesbot, kylec

Differential Revision: https://tails.corp.dropbox.com/D221863
  • Loading branch information
Naphat Sanguansin committed Aug 24, 2016
1 parent 04176df commit 885ad0c
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 5 deletions.
47 changes: 47 additions & 0 deletions changes/api/auth.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import absolute_import, print_function

from flask import current_app, session, request
from fnmatch import fnmatch
from functools import wraps
from typing import Optional # NOQA

Expand Down Expand Up @@ -51,6 +52,52 @@ def wrapped(*args, **kwargs):
return wrapped


class ResourceNotFound(Exception):
pass


def requires_project_admin(get_project_slug):
"""
Require an authenticated user with project admin privileges.
Return a 401 Unauthorized if the user is not authenticated, or a
403 Forbidden if the user is lacking permissions.
Args:
get_project_slug: This is a function that will be given the exact
same argument as the wrapped function. This function should
return the project slug (not ID!). Raise `ResourceNotFound`
if the associated resource (plan, step, project, etc.) cannot
be found.
"""
def decorator(method):
@wraps(method)
def wrapped(self, *args, **kwargs):
user = get_current_user()
if user is None:
return self.respond({
'error': 'Not logged in.'
}, status_code=401)
if user.is_admin:
# global admins are automatically project admins
return method(self, *args, **kwargs)
if user.project_permissions is not None:
try:
slug = get_project_slug(self, *args, **kwargs)
except ResourceNotFound as e:
return self.respond({
'error': '{}'.format(e)
}, status_code=404)
for p in user.project_permissions:
if fnmatch(slug, p):
return method(self, *args, **kwargs)
return self.respond({
'error': 'User does not have access to this project.'
}, status_code=403)
return wrapped
return decorator


def get_current_user():
# type: () -> Optional[User]
"""
Expand Down
10 changes: 5 additions & 5 deletions changes/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ class User(db.Model):
is_admin = Column(Boolean, default=False, nullable=False)
date_created = Column(DateTime, default=datetime.utcnow)

# this keeps track of the list of wildcard patterns of project names that
# the user has access to. For example:
# - `foo` matches `foo`
# - `foo.*` matches anything starting with `foo.`, like `foo.staging`, `foo.`
# - `*foo*` matches anything containing `foo`, like `barfoobar`, `foo`
# this keeps track of the list of patterns of project names that
# the user has access to. Patterns will be matched using `fnmatch`,
# see https://docs.python.org/2/library/fnmatch.html for pattern format.
# Note that due to Grouper limitation, we cannot support `?` or `!`
# characters in the pattern.
project_permissions = Column(ARRAY(String(256)), nullable=True)

def __init__(self, **kwargs):
Expand Down
90 changes: 90 additions & 0 deletions tests/changes/api/test_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import mock
import pytest

from changes.api.auth import (
ResourceNotFound, requires_project_admin,
)
from changes.testutils import TestCase


class ProjectAdminTestCase(TestCase):

_project_slug = 'other:project-a'

class DidExecute(Exception):
pass

def _get_project_slug(self):
return self._project_slug

def _get_project_slug_error(self):
raise ResourceNotFound

@requires_project_admin(_get_project_slug)
def _sample_function(self):
raise self.DidExecute

@requires_project_admin(_get_project_slug_error)
def _sample_function_error(self):
raise self.DidExecute

respond = mock.MagicMock()

def test_global_admin(self):
user = self.create_user(email='user1@example.com', is_admin=True)
with mock.patch('changes.api.auth.get_current_user') as mocked:
mocked.return_value = user
with pytest.raises(self.DidExecute):
self._sample_function()

def test_authenticated_exact(self):
user = self.create_user(email='user1@example.com', project_permissions=['someproject', 'other:project-a', 'otherproject'])
with mock.patch('changes.api.auth.get_current_user') as mocked:
mocked.return_value = user
with pytest.raises(self.DidExecute):
self._sample_function()

def test_authenticated_pattern_trailing(self):
user = self.create_user(email='user1@example.com', project_permissions=['someproject', 'other:*', 'otherproject'])
with mock.patch('changes.api.auth.get_current_user') as mocked:
mocked.return_value = user
with pytest.raises(self.DidExecute):
self._sample_function()

def test_authenticated_pattern_both(self):
user = self.create_user(email='user1@example.com', project_permissions=['someproject', '*other:*', 'otherproject'])
with mock.patch('changes.api.auth.get_current_user') as mocked:
mocked.return_value = user
with pytest.raises(self.DidExecute):
self._sample_function()

def test_not_authenticated_none(self):
user = self.create_user(email='user1@example.com')
with mock.patch('changes.api.auth.get_current_user') as mocked:
mocked.return_value = user
self._sample_function()
_, kwargs = self.respond.call_args
assert kwargs['status_code'] == 403

def test_not_authenticated_pattern(self):
user = self.create_user(email='user1@example.com', project_permissions=['someproject*', 'otherproject'])
with mock.patch('changes.api.auth.get_current_user') as mocked:
mocked.return_value = user
self._sample_function()
_, kwargs = self.respond.call_args
assert kwargs['status_code'] == 403

def test_no_user(self):
with mock.patch('changes.api.auth.get_current_user') as mocked:
mocked.return_value = None
self._sample_function()
_, kwargs = self.respond.call_args
assert kwargs['status_code'] == 401

def test_resource_not_found(self):
user = self.create_user(email='user1@example.com', project_permissions=['someproject', 'other:project-a', 'otherproject'])
with mock.patch('changes.api.auth.get_current_user') as mocked:
mocked.return_value = user
status = self._sample_function_error()
_, kwargs = self.respond.call_args
assert kwargs['status_code'] == 404

0 comments on commit 885ad0c

Please sign in to comment.