-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* async support * sanic support
- Loading branch information
Showing
28 changed files
with
1,130 additions
and
98 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,8 @@ | ||
__version__ = "0.1.0" | ||
|
||
from ._async.ability import Ability | ||
from ._async.policy import Policy, authorize | ||
from .action import Action | ||
from .permission import AutoPermission, Permission | ||
|
||
__all__ = ["Ability", "Action", "Policy", "authorize", "Permission", "AutoPermission"] |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
from typing import Any, Optional | ||
|
||
from denied.action import Action | ||
from denied.errors import UnauthorizedError, UndefinedPermission | ||
from denied.permission import Permission | ||
|
||
from .policy import Policy | ||
|
||
|
||
class Ability: | ||
def __init__( | ||
self, policy: Optional[Policy] = None, default_action: Action = Action.DENY | ||
): | ||
self._policy = policy or Policy() | ||
self._default_action = default_action | ||
|
||
async def authorize( | ||
self, permission: Permission, *args: Any, **kwargs: Any | ||
) -> None: | ||
"""Raises an UnauthorizedError if policy does not grant permission. | ||
Args: | ||
permission (Permission): a permission | ||
args (Any): arguments passed to the policy access method | ||
kwargs (Any): keyword argumentss passed to the policy access method | ||
Returns: | ||
None: | ||
""" | ||
if not await self.can(permission, *args, **kwargs): | ||
raise UnauthorizedError(permission) | ||
|
||
async def can(self, permission: Permission, *args: Any, **kwargs: Any) -> bool: | ||
"""Returns the result of the policy access method defined for the permission. | ||
If no access method is found the default_action is used. | ||
If permission was not defined and default_action is RAISE then an | ||
UndefinedPermission is raised. | ||
Args: | ||
permission (Permission): a permission | ||
args (Any): arguments passed to the policy access method | ||
kwargs (Any): keyword argumentss passed to the policy access method | ||
Returns: | ||
bool: True if permission is granted, False otherwise | ||
""" | ||
try: | ||
access_method = self._policy.get_access_method(permission) | ||
except UndefinedPermission as error: | ||
if self._default_action == Action.RAISE: | ||
raise error | ||
else: | ||
return True if self._default_action == Action.ALLOW else False | ||
|
||
return await access_method(*args, **kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
from typing import Any, Optional | ||
|
||
from denied.action import Action | ||
from denied.errors import UnauthorizedError, UndefinedPermission | ||
from denied.permission import Permission | ||
|
||
from .policy import Policy | ||
|
||
|
||
class Ability: | ||
def __init__( | ||
self, policy: Optional[Policy] = None, default_action: Action = Action.DENY | ||
): | ||
self._policy = policy or Policy() | ||
self._default_action = default_action | ||
|
||
def authorize(self, permission: Permission, *args: Any, **kwargs: Any) -> None: | ||
"""Raises an UnauthorizedError if policy does not grant permission. | ||
Args: | ||
permission (Permission): a permission | ||
args (Any): arguments passed to the policy access method | ||
kwargs (Any): keyword argumentss passed to the policy access method | ||
Returns: | ||
None: | ||
""" | ||
if not self.can(permission, *args, **kwargs): | ||
raise UnauthorizedError(permission) | ||
|
||
def can(self, permission: Permission, *args: Any, **kwargs: Any) -> bool: | ||
"""Returns the result of the policy access method defined for the permission. | ||
If no access method is found the default_action is used. | ||
If permission was not defined and default_action is RAISE then an | ||
UndefinedPermission is raised. | ||
Args: | ||
permission (Permission): a permission | ||
args (Any): arguments passed to the policy access method | ||
kwargs (Any): keyword argumentss passed to the policy access method | ||
Returns: | ||
bool: True if permission is granted, False otherwise | ||
""" | ||
try: | ||
access_method = self._policy.get_access_method(permission) | ||
except UndefinedPermission as error: | ||
if self._default_action == Action.RAISE: | ||
raise error | ||
else: | ||
return True if self._default_action == Action.ALLOW else False | ||
|
||
return access_method(*args, **kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
from typing import Any, Callable, Dict, Optional, Tuple | ||
|
||
from denied.errors import PermissionAlreadyDefined, UndefinedPermission | ||
from denied.permission import Permission | ||
from denied.utils import SyncAccessMethod | ||
|
||
|
||
class PolicyMetaclass(type): | ||
"""Metaclass used by the Policy class. | ||
It's used to register all the access methods defined by the @authorize() decorator. | ||
""" | ||
|
||
def __new__(cls, name: str, bases: Tuple[type, ...], attrs: Dict[str, Any]) -> type: | ||
"""Callback called when a Policy class is created. | ||
Loop over all the attributes and check if a `_authorized_permission` has | ||
been defined on it. | ||
If `_authorized_permission` is found we register the method | ||
in a dictionary in order to access it later using the | ||
permission being authorized. | ||
Args: | ||
cls: a Policy class | ||
name (str): class name | ||
bases (Tuple[type, ...]): base classes | ||
attrs (Dict[str, Any]): class attributes | ||
""" | ||
access_methods: Dict[Permission, str] = {} | ||
for name, value in attrs.items(): | ||
permission: Optional[Permission] = getattr( | ||
value, "_authorized_permission", None | ||
) | ||
if not permission: | ||
continue | ||
|
||
if permission in access_methods: | ||
raise PermissionAlreadyDefined(permission) | ||
access_methods[permission] = name | ||
|
||
attrs["_access_methods"] = access_methods | ||
return super().__new__(cls, name, bases, attrs) | ||
|
||
|
||
def authorize(permission: Permission) -> Callable[[SyncAccessMethod], SyncAccessMethod]: | ||
def decorator(func: SyncAccessMethod) -> SyncAccessMethod: | ||
"""Add an `_authorized_permission` attribute to the method | ||
in order for the metaclass to recognize it as an AccessMethod. | ||
Args: | ||
func (AccessMethod): method used to grant access | ||
Returns: | ||
AccessMethod: access method received as input | ||
""" | ||
setattr(func, "_authorized_permission", permission) | ||
return func | ||
|
||
return decorator | ||
|
||
|
||
class Policy(metaclass=PolicyMetaclass): | ||
_access_methods: Dict[Permission, str] | ||
|
||
def get_access_method(self, permission: Permission) -> SyncAccessMethod: | ||
"""Returns the AccessMethod that was registered for the permission | ||
received as input. | ||
If no AccessMethod is found it raises a UndefinedPermission error. | ||
Args: | ||
permission (Permission): a permission | ||
Returns: | ||
AccessMethod: access method registered for permission | ||
""" | ||
try: | ||
return getattr(self, self._access_methods[permission]) | ||
except KeyError: | ||
raise UndefinedPermission(permission) |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from enum import Enum | ||
|
||
|
||
class Action(Enum): | ||
DENY = "deny" | ||
ALLOW = "allow" | ||
RAISE = "raise" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.