Skip to content
This repository was archived by the owner on Jun 23, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 10 additions & 20 deletions src/oidcop/authz/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ def usage_rules(self, client_id: Optional[str] = ""):
return _usage_rules

try:
_per_client = self.server_get("endpoint_context").cdb[client_id][
"token_usage_rules"
]
_per_client = self.server_get("endpoint_context").cdb[client_id]["token_usage_rules"]
except KeyError:
pass
else:
Expand All @@ -59,14 +57,11 @@ def usage_rules_for(self, client_id, token_type):
return {}

def __call__(
self,
session_id: str,
request: Union[dict, Message],
resources: Optional[list] = None,
self, session_id: str, request: Union[dict, Message], resources: Optional[list] = None,
) -> Grant:
session_info = self.server_get(
"endpoint_context"
).session_manager.get_session_info(session_id=session_id, grant=True)
session_info = self.server_get("endpoint_context").session_manager.get_session_info(
session_id=session_id, grant=True
)
grant = session_info["grant"]

args = self.grant_config.copy()
Expand All @@ -87,24 +82,19 @@ def __call__(
# After this is where user consent should be handled
scopes = request.get("scope", [])
grant.scope = scopes
grant.claims = self.server_get(
"endpoint_context"
).claims_interface.get_claims_all_usage(session_id=session_id, scopes=scopes)
grant.claims = self.server_get("endpoint_context").claims_interface.get_claims_all_usage(
session_id=session_id, scopes=scopes
)

return grant


class Implicit(AuthzHandling):
def __call__(
self,
session_id: str,
request: Union[dict, Message],
resources: Optional[list] = None,
self, session_id: str, request: Union[dict, Message], resources: Optional[list] = None,
) -> Grant:
args = self.grant_config.copy()
grant = self.server_get("endpoint_context").session_manager.get_grant(
session_id=session_id
)
grant = self.server_get("endpoint_context").session_manager.get_grant(session_id=session_id)
for arg, val in args:
setattr(grant, arg, val)
return grant
Expand Down
32 changes: 8 additions & 24 deletions src/oidcop/client_authn.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ def is_usable(self, request=None, authorization_token=None):

def verify(self, request, **kwargs):
if (
self.server_get("endpoint_context").cdb[request["client_id"]][
"client_secret"
]
self.server_get("endpoint_context").cdb[request["client_id"]]["client_secret"]
== request["client_secret"]
):
return {"client_id": request["client_id"]}
Expand All @@ -148,9 +146,7 @@ class BearerHeader(ClientSecretBasic):
tag = "bearer_header"

def is_usable(self, request=None, authorization_token=None):
if authorization_token is not None and authorization_token.startswith(
"Bearer "
):
if authorization_token is not None and authorization_token.startswith("Bearer "):
return True
return False

Expand Down Expand Up @@ -203,9 +199,7 @@ def verify(self, request, key_type, **kwargs):
if _sign_alg and _sign_alg.startswith("HS"):
if key_type == "private_key":
raise AttributeError("Wrong key type")
keys = _context.keyjar.get(
"sig", "oct", ca_jwt["iss"], ca_jwt.jws_header.get("kid")
)
keys = _context.keyjar.get("sig", "oct", ca_jwt["iss"], ca_jwt.jws_header.get("kid"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering why you brought all the pep8 modifications back to their original state but if you like it more so for me it's ok and totally relative.

Right it occurs to me that this revision will be more challenging;)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I shouldn't have run black on it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not without another commit in between 😜

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah!

_secret = _context.cdb[ca_jwt["iss"]].get("client_secret")
if _secret and keys[0].key != as_bytes(_secret):
raise AttributeError("Oct key used for signing not client_secret")
Expand Down Expand Up @@ -366,14 +360,10 @@ def verify_client(
if _method.is_usable(request, authorization_token):
try:
auth_info = _method.verify(
request=request,
authorization_token=authorization_token,
endpoint=endpoint,
request=request, authorization_token=authorization_token, endpoint=endpoint,
)
except Exception as err:
logger.warning(
"Verifying auth using {} failed: {}".format(_method.tag, err)
)
logger.warning("Verifying auth using {} failed: {}".format(_method.tag, err))
else:
if "method" not in auth_info:
auth_info["method"] = _method.tag
Expand Down Expand Up @@ -403,19 +393,15 @@ def verify_client(
raise UnknownClient("Unknown Client ID")

if not valid_client_info(_cinfo):
logger.warning(
"Client registration has timed out or " "client secret is expired."
)
logger.warning("Client registration has timed out or " "client secret is expired.")
raise InvalidClient("Not valid client")

# store what authn method was used
if auth_info.get("method"):
_request_type = request.__class__.__name__
_used_authn_method = endpoint_context.cdb[client_id].get("auth_method")
if _used_authn_method:
endpoint_context.cdb[client_id]["auth_method"][
_request_type
] = auth_info["method"]
endpoint_context.cdb[client_id]["auth_method"][_request_type] = auth_info["method"]
else:
endpoint_context.cdb[client_id]["auth_method"] = {
_request_type: auth_info["method"]
Expand All @@ -427,9 +413,7 @@ def verify_client(

try:
# get_client_id_from_token is a callback... Do not abuse for code readability.
auth_info["client_id"] = get_client_id_from_token(
endpoint_context, _token, request
)
auth_info["client_id"] = get_client_id_from_token(endpoint_context, _token, request)
except KeyError:
raise ValueError("Unknown token")

Expand Down
Loading