diff --git a/src/oidcop/endpoint.py b/src/oidcop/endpoint.py index 52d13491..3f52bf40 100755 --- a/src/oidcop/endpoint.py +++ b/src/oidcop/endpoint.py @@ -201,7 +201,8 @@ def parse_request( LOGGER.info("Parsed and verified request: %s" % sanitize(req)) # Do any endpoint specific parsing - return self.do_post_parse_request(request=req, client_id=_client_id, **kwargs) + return self.do_post_parse_request(request=req, client_id=_client_id, http_info=http_info, + **kwargs) def get_client_id_from_token( self, diff --git a/src/oidcop/oauth2/add_on/device_authorization.py b/src/oidcop/oauth2/add_on/device_authorization.py deleted file mode 100644 index 54fc08e3..00000000 --- a/src/oidcop/oauth2/add_on/device_authorization.py +++ /dev/null @@ -1,6 +0,0 @@ -def add_support(endpoint, **kwargs): - _context = endpoint["token"].endpoint_context - - _db = kwargs.get("db") - if not _db: - _context.dev_auth_db = {} diff --git a/src/oidcop/oauth2/add_on/dpop.py b/src/oidcop/oauth2/add_on/dpop.py index 7e69c735..27dab666 100644 --- a/src/oidcop/oauth2/add_on/dpop.py +++ b/src/oidcop/oauth2/add_on/dpop.py @@ -1,6 +1,7 @@ from typing import Optional from cryptojwt import JWS +from cryptojwt import as_unicode from cryptojwt.jwk.jwk import key_from_jwk_dict from cryptojwt.jws.jws import factory from oidcmsg.message import SINGLE_REQUIRED_INT @@ -58,6 +59,7 @@ def create_header(self) -> str: payload = {k: self[k] for k in self.body_params} _jws = JWS(payload, alg=self["alg"]) _headers = {k: self[k] for k in self.header_params} + self.key.kid = "" _sjwt = _jws.sign_compact(keys=[self.key], **_headers) return _sjwt @@ -112,28 +114,27 @@ def post_parse_request(request, client_id, endpoint_context, **kwargs): if not _dpop.key: _dpop.key = key_from_jwk_dict(_dpop["jwk"]) - _jkt = str(_dpop.key.thumbprint("SHA-256")) - try: - endpoint_context.cdb[client_id]["dpop_jkt"][_jkt] = _dpop.key - except KeyError: - endpoint_context.cdb[client_id]["dpop_jkt"] = {_jkt: _dpop.key} - # Need something I can add as a reference when minting tokens - request["dpop_jkt"] = _jkt + request["dpop_jkt"] = as_unicode(_dpop.key.thumbprint("SHA-256")) return request def token_args(endpoint_context, client_id, token_args: Optional[dict] = None): - if "dpop.jkt" in endpoint_context.cdb[client_id]: - token_args.update({"cnf": {"jkt": endpoint_context.cdb[client_id]["dpop_jkt"]}}) + dpop_jkt = endpoint_context.cdb[client_id]["dpop_jkt"] + _jkt = list(dpop_jkt.keys())[0] + if "dpop_jkt" in endpoint_context.cdb[client_id]: + if token_args is None: + token_args = {"cnf": {"jkt": _jkt}} + else: + token_args.update({"cnf": {"jkt": endpoint_context.cdb[client_id]["dpop_jkt"]}}) return token_args def add_support(endpoint, **kwargs): # - _endp = endpoint["token"] - _endp.post_parse_request.append(post_parse_request) + _token_endp = endpoint["token"] + _token_endp.post_parse_request.append(post_parse_request) # Endpoint Context stuff # _endp.endpoint_context.token_args_methods.append(token_args) @@ -141,7 +142,7 @@ def add_support(endpoint, **kwargs): if not _algs_supported: _algs_supported = ["RS256"] - _endp.server_get("endpoint_context").provider_info[ + _token_endp.server_get("endpoint_context").provider_info[ "dpop_signing_alg_values_supported" ] = _algs_supported diff --git a/src/oidcop/oauth2/add_on/dpop_token.py b/src/oidcop/oauth2/add_on/dpop_token.py deleted file mode 100644 index 359bc25a..00000000 --- a/src/oidcop/oauth2/add_on/dpop_token.py +++ /dev/null @@ -1,196 +0,0 @@ -import logging -from typing import Union - -from cryptojwt.jwe.exception import JWEException -from cryptojwt.jws.exception import NoSuitableSigningKeys -from cryptojwt.jwt import utc_time_sans_frac -from oidcmsg.message import Message - -from oidcop.oidc.token import AccessTokenHelper -from oidcop.oidc.token import RefreshTokenHelper - -logger = logging.getLogger(__name__) - - -class DPOPAccessTokenHelper(AccessTokenHelper): - def process_request(self, req: Union[Message, dict], **kwargs): - """ - - :param req: - :param kwargs: - :return: - """ - _context = self.endpoint.server_get("endpoint_context") - - _mngr = _context.session_manager - _log_debug = logger.debug - - if req["grant_type"] != "authorization_code": - return self.error_cls( - error="invalid_request", error_description="Unknown grant_type" - ) - - try: - _access_code = req["code"].replace(" ", "+") - except KeyError: # Missing code parameter - absolutely fatal - return self.error_cls( - error="invalid_request", error_description="Missing code" - ) - - _session_info = _mngr.get_session_info_by_token(_access_code, grant=True) - grant = _session_info["grant"] - - code = grant.get_token(_access_code) - _authn_req = grant.authorization_request - - # If redirect_uri was in the initial authorization request - # verify that the one given here is the correct one. - if "redirect_uri" in _authn_req: - if req["redirect_uri"] != _authn_req["redirect_uri"]: - return self.error_cls( - error="invalid_request", error_description="redirect_uri mismatch" - ) - - _log_debug("All checks OK") - - issue_refresh = False - if "issue_refresh" in kwargs: - issue_refresh = kwargs["issue_refresh"] - else: - if "offline_access" in grant.scope: - issue_refresh = True - - _response = { - "token_type": "Bearer", - "scope": grant.scope, - } - - if "dpop_jkt" in req: - token_args = {"cnf": {"jkt": req["dpop_jkt"]}} - else: - token_args = {} - - token = self._mint_token( - type="access_token", - grant=grant, - session_id=_session_info["session_id"], - client_id=_session_info["client_id"], - based_on=code, - token_args=token_args, - ) - if "dpop_jkt" in req: - if token.extension is None: - token.extension = {"dpop_jkt": req["dpop_jkt"]} - else: - token.extension["dpop_jkt"] = req["dpop_jkt"] - - _response["access_token"] = token.value - _response["expires_in"] = token.expires_at - utc_time_sans_frac() - - if issue_refresh: - refresh_token = self._mint_token( - type="refresh_token", - grant=grant, - session_id=_session_info["session_id"], - client_id=_session_info["client_id"], - based_on=code, - ) - if "dpop_jkt" in req: - if refresh_token.extension is None: - refresh_token.extension = {"dpop_jkt": req["dpop_jkt"]} - else: - refresh_token.extension["dpop_jkt"] = req["dpop_jkt"] - - _response["refresh_token"] = refresh_token.value - - code.register_usage() - - # since the grant content has changed. Make sure it's stored - _mngr[_session_info["session_id"]] = grant - - if "openid" in _authn_req["scope"]: - try: - _idtoken = _context.idtoken.make(_session_info["session_id"]) - except (JWEException, NoSuitableSigningKeys) as err: - logger.warning(str(err)) - resp = self.error_cls( - error="invalid_request", - error_description="Could not sign/encrypt id_token", - ) - return resp - - _response["id_token"] = _idtoken - - return _response - - -class DPOPRefreshTokenHelper(RefreshTokenHelper): - def process_request(self, req: Union[Message, dict], **kwargs): - _context = self.endpoint.server_get("endpoint_context") - _mngr = _context.session_manager - - if req["grant_type"] != "refresh_token": - return self.error_cls( - error="invalid_request", error_description="Wrong grant_type" - ) - - token_value = req["refresh_token"] - _session_info = _mngr.get_session_info_by_token(token_value, grant=True) - token = _mngr.find_token(_session_info["session_id"], token_value) - - _grant = _session_info["grant"] - access_token = self._mint_token( - type="access_token", - grant=_grant, - session_id=_session_info["session_id"], - client_id=_session_info["client_id"], - based_on=token, - ) - - if "dpop_jkt" in req: - if access_token.extension is None: - access_token.extension = {"dpop_jkt": req["dpop_jkt"]} - else: - access_token.extension["dpop_jkt"] = req["dpop_jkt"] - - _resp = { - "access_token": access_token.value, - "token_type": access_token.token_type, - "scope": _grant.scope, - } - - if access_token.expires_at: - _resp["expires_in"] = access_token.expires_at - utc_time_sans_frac() - - _mints = token.usage_rules.get("supports_minting") - if "refresh_token" in _mints: - refresh_token = self._mint_token( - type="refresh_token", - grant=_grant, - session_id=_session_info["session_id"], - client_id=_session_info["client_id"], - based_on=token, - ) - refresh_token.usage_rules = token.usage_rules.copy() - if "dpop_jkt" in req: - if refresh_token.extension is None: - refresh_token.extension = {"dpop_jkt": req["dpop_jkt"]} - else: - refresh_token.extension["dpop_jkt"] = req["dpop_jkt"] - - _resp["refresh_token"] = refresh_token.value - - if "id_token" in _mints: - try: - _idtoken = _context.idtoken.make(_session_info["session_id"]) - except (JWEException, NoSuitableSigningKeys) as err: - logger.warning(str(err)) - resp = self.error_cls( - error="invalid_request", - error_description="Could not sign/encrypt id_token", - ) - return resp - - _resp["id_token"] = _idtoken - - return _resp diff --git a/src/oidcop/oauth2/device_authorization.py b/src/oidcop/oauth2/device_authorization.py deleted file mode 100644 index 5b2833f0..00000000 --- a/src/oidcop/oauth2/device_authorization.py +++ /dev/null @@ -1,62 +0,0 @@ -from oidcmsg.oauth2.device_authorization import AuthorizationRequest -from oidcmsg.oauth2.device_authorization import AuthorizationResponse -from oidcmsg.time_util import utc_time_sans_frac - -from oidcop import rndstr -from oidcop.endpoint import Endpoint - - -class AuthorizationEndpoint(Endpoint): - request_cls = AuthorizationRequest - response_cls = AuthorizationResponse - request_format = "urlencoded" - response_format = "json" - response_placement = "body" - endpoint_name = "device_authorization_endpoint" - name = "device_authorization" - - def __init__(self, server_get, **kwargs): - Endpoint.__init__(self, server_get, **kwargs) - self.verification_uri = kwargs.get("verification_uri") - self.expires_in = kwargs.get("expires_in", 300) - self.interval = kwargs.get("interval", 5) - - def process_request(self, request=None, **kwargs): - """ - Produces a device code and an end-user - code and provides the end-user verification URI. - - :param request: - :param kwargs: - :return: - """ - _device_code = rndstr(32) - _user_code = rndstr(8) - - _response = { - "device_code": _device_code, - "user_code": _user_code, - "verification_uri": self.verification_uri, - "expires_in": self.expires_in, - "interval": self.interval, - } - _info = { - "device_code": _device_code, - "user_code": _user_code, - "exp": utc_time_sans_frac() + self.expires_in, - "interval": self.interval, - } - - self.server_get("endpoint_context").dev_auth_db.set(_user_code, _info) - return {"response_args": _response} - - def verification_endpoint(self, query): - """ - Where the device's pull query is handled. - - :param query: - :return: - """ - _response = {} - - return _response diff --git a/src/oidcop/oidc/token.py b/src/oidcop/oidc/token.py index a528d7ad..93d80afa 100755 --- a/src/oidcop/oidc/token.py +++ b/src/oidcop/oidc/token.py @@ -123,6 +123,15 @@ def process_request(self, req: Union[Message, dict], **kwargs): _session_info = _mngr.get_session_info_by_token(_access_code, grant=True) grant = _session_info["grant"] + token_type = "Bearer" + + # Is DPOP supported + if "dpop_signing_alg_values_supported" in _context.provider_info: + _dpop_jkt = req.get("dpop_jkt") + if _dpop_jkt: + grant.extra["dpop_jkt"] = _dpop_jkt + token_type = "DPoP" + _based_on = grant.get_token(_access_code) _supports_minting = _based_on.usage_rules.get("supports_minting", []) @@ -146,7 +155,7 @@ def process_request(self, req: Union[Message, dict], **kwargs): issue_refresh = True _response = { - "token_type": "Bearer", + "token_type": token_type, "scope": grant.scope, } @@ -260,9 +269,17 @@ def process_request(self, req: Union[Message, dict], **kwargs): ) token_value = req["refresh_token"] - _session_info = _mngr.get_session_info_by_token( - token_value, grant=True - ) + _session_info = _mngr.get_session_info_by_token(token_value, grant=True) + grant = _session_info["grant"] + + token_type = "Bearer" + + # Is DPOP supported + if "dpop_signing_alg_values_supported" in _context.provider_info: + _dpop_jkt = req.get("dpop_jkt") + if _dpop_jkt: + grant.extra["dpop_jkt"] = _dpop_jkt + token_type = "DPoP" _grant = _session_info["grant"] token = _grant.get_token(token_value) @@ -276,7 +293,7 @@ def process_request(self, req: Union[Message, dict], **kwargs): _resp = { "access_token": access_token.value, - "token_type": access_token.type, + "token_type": token_type, "scope": _grant.scope, } diff --git a/src/oidcop/session/grant.py b/src/oidcop/session/grant.py index 6591c722..0faf1f10 100644 --- a/src/oidcop/session/grant.py +++ b/src/oidcop/session/grant.py @@ -1,3 +1,4 @@ +from typing import Dict from typing import List from typing import Optional from uuid import uuid1 @@ -27,11 +28,11 @@ class GrantMessage(ImpExp): } def __init__( - self, - scope: Optional[str] = "", - authorization_details: Optional[dict] = None, - claims: Optional[list] = None, - resources: Optional[list] = None, + self, + scope: Optional[str] = "", + authorization_details: Optional[dict] = None, + claims: Optional[list] = None, + resources: Optional[list] = None, ): ImpExp.__init__(self) self.scope = scope @@ -110,6 +111,7 @@ class Grant(Item): "authorization_details": {}, "authorization_request": AuthorizationRequest, "claims": {}, + "extra": {}, "issued_token": [SessionToken], "resources": [], "scope": [], @@ -124,21 +126,22 @@ class Grant(Item): } def __init__( - self, - scope: Optional[list] = None, - claims: Optional[dict] = None, - resources: Optional[list] = None, - authorization_details: Optional[dict] = None, - authorization_request: Optional[Message] = None, - authentication_event: Optional[AuthnEvent] = None, - issued_token: Optional[list] = None, - usage_rules: Optional[dict] = None, - issued_at: int = 0, - expires_in: int = 0, - expires_at: int = 0, - revoked: bool = False, - token_map: Optional[dict] = None, - sub: Optional[str] = "", + self, + scope: Optional[list] = None, + claims: Optional[dict] = None, + resources: Optional[list] = None, + authorization_details: Optional[dict] = None, + authorization_request: Optional[Message] = None, + authentication_event: Optional[AuthnEvent] = None, + issued_token: Optional[list] = None, + usage_rules: Optional[dict] = None, + issued_at: int = 0, + expires_in: int = 0, + expires_at: int = 0, + revoked: bool = False, + token_map: Optional[dict] = None, + sub: Optional[str] = "", + extra: Optional[Dict[str, str]] = None ): Item.__init__( self, @@ -157,6 +160,7 @@ def __init__( self.issued_token = issued_token or [] self.id = uuid1().hex self.sub = sub + self.extra = {} if token_map is None: self.token_map = TOKEN_MAP @@ -172,12 +176,12 @@ def get(self) -> object: ) def payload_arguments( - self, - session_id: str, - endpoint_context, - token_type: str, - scope: Optional[dict] = None, - extra_payload: Optional[dict] = None, + self, + session_id: str, + endpoint_context, + token_type: str, + scope: Optional[dict] = None, + extra_payload: Optional[dict] = None, ) -> dict: """ @@ -195,6 +199,10 @@ def payload_arguments( if extra_payload: payload.update(extra_payload) + _jkt = self.extra.get("dpop_jkt") + if _jkt: + payload["cnf"] = {"jkt": _jkt} + if self.authorization_request: client_id = self.authorization_request.get('client_id') if client_id: @@ -215,15 +223,15 @@ def payload_arguments( return payload def mint_token( - self, - session_id: str, - endpoint_context: object, - token_type: str, - token_handler: TokenHandler = None, - based_on: Optional[SessionToken] = None, - usage_rules: Optional[dict] = None, - scope: Optional[list] = None, - **kwargs, + self, + session_id: str, + endpoint_context: object, + token_type: str, + token_handler: TokenHandler = None, + based_on: Optional[SessionToken] = None, + usage_rules: Optional[dict] = None, + scope: Optional[list] = None, + **kwargs, ) -> Optional[SessionToken]: """ @@ -298,10 +306,10 @@ def get_token(self, value: str) -> Optional[SessionToken]: return None def revoke_token( - self, - value: Optional[str] = "", - based_on: Optional[str] = "", - recursive: bool = True, + self, + value: Optional[str] = "", + based_on: Optional[str] = "", + recursive: bool = True, ): for t in self.issued_token: if not value and not based_on: @@ -376,19 +384,19 @@ class ExchangeGrant(Grant): type = "exchange_grant" def __init__( - self, - scope: Optional[list] = None, - claims: Optional[dict] = None, - resources: Optional[list] = None, - authorization_details: Optional[dict] = None, - issued_token: Optional[list] = None, - usage_rules: Optional[dict] = None, - issued_at: int = 0, - expires_in: int = 0, - expires_at: int = 0, - revoked: bool = False, - token_map: Optional[dict] = None, - users: list = None, + self, + scope: Optional[list] = None, + claims: Optional[dict] = None, + resources: Optional[list] = None, + authorization_details: Optional[dict] = None, + issued_token: Optional[list] = None, + usage_rules: Optional[dict] = None, + issued_at: int = 0, + expires_in: int = 0, + expires_at: int = 0, + revoked: bool = False, + token_map: Optional[dict] = None, + users: list = None, ): Grant.__init__( self, diff --git a/tests/test_60_dpop.py b/tests/test_60_dpop.py new file mode 100644 index 00000000..62af2b79 --- /dev/null +++ b/tests/test_60_dpop.py @@ -0,0 +1,266 @@ +import os + +from cryptojwt.jwk.ec import ECKey +from cryptojwt.jwk.ec import new_ec_key +from cryptojwt.jws.jws import factory +from cryptojwt.key_jar import init_key_jar +from oidcmsg.oauth2 import AccessTokenRequest +from oidcmsg.oauth2 import AuthorizationRequest +from oidcmsg.time_util import utc_time_sans_frac + +from oidcop.authn_event import create_authn_event +import pytest + +from oidcop import user_info +from oidcop.client_authn import verify_client +from oidcop.configure import OPConfiguration +from oidcop.oauth2.add_on.dpop import DPoPProof +from oidcop.oauth2.add_on.dpop import post_parse_request +from oidcop.oauth2.authorization import Authorization +from oidcop.oidc.token import Token +from oidcop.server import Server +from oidcop.user_authn.authn_context import INTERNETPROTOCOLPASSWORD + +DPOP_HEADER = ( + "eyJ0eXAiOiJkcG9wK2p3dCIsImFsZyI6IkVTMjU2IiwiandrIjp7Imt0eSI6IkVDIiwieCI6Imw4dEZyaHgtMz" + "R0VjNoUklDUkRZOXpDa0RscEJoRjQyVVFVZldWQVdCRnMiLCJ5IjoiOVZFNGpmX09rX282NHpiVFRsY3VOSmFq" + "SG10NnY5VERWclUwQ2R2R1JEQSIsImNydiI6IlAtMjU2In19.eyJqdGkiOiItQndDM0VTYzZhY2MybFRjIiwia" + "HRtIjoiUE9TVCIsImh0dSI6Imh0dHBzOi8vc2VydmVyLmV4YW1wbGUuY29tL3Rva2VuIiwiaWF0IjoxNTYyMjY" + "yNjE2fQ.2-GxA6T8lP4vfrg8v-FdWP0A0zdrj8igiMLvqRMUvwnQg4PtFLbdLXiOSsX0x7NVY-FNyJK70nfbV37xRZT3Lg" +) + + +def test_verify_header(): + _dpop = DPoPProof() + assert _dpop.verify_header(DPOP_HEADER) + assert set(_dpop.keys()) == {'typ', 'alg', 'jwk', 'jti', 'htm', 'htu', 'iat'} + assert _dpop.verify() is None + + _dpop_dict = _dpop.to_dict() + _dpop2 = DPoPProof().from_dict(_dpop_dict) + assert isinstance(_dpop2.key, ECKey) + + ec_key = new_ec_key(crv="P-256") + _dpop2.key = ec_key + _dpop2["jwk"] = ec_key.to_dict() + + _header = _dpop2.create_header() + + _dpop3 = DPoPProof() + assert _dpop3.verify_header(_header) + # should have the same content as _dpop only the key is different + + assert _dpop["htm"] == _dpop3["htm"] + + +KEYDEFS = [ + {"type": "RSA", "key": "", "use": ["sig"]}, + {"type": "EC", "crv": "P-256", "use": ["sig"]}, +] + +ISSUER = "https://example.com/" + +KEYJAR = init_key_jar(key_defs=KEYDEFS, issuer_id=ISSUER) +KEYJAR.import_jwks(KEYJAR.export_jwks(True, ISSUER), "") + +RESPONSE_TYPES_SUPPORTED = [ + ["code"], + ["token"], + ["id_token"], + ["code", "token"], + ["code", "id_token"], + ["id_token", "token"], + ["code", "token", "id_token"], + ["none"], +] + +CAPABILITIES = { + "response_types_supported": [" ".join(x) for x in RESPONSE_TYPES_SUPPORTED], + "token_endpoint_auth_methods_supported": [ + "client_secret_post", + "client_secret_basic", + "client_secret_jwt", + "private_key_jwt", + ], + "response_modes_supported": ["query", "fragment", "form_post"], + "subject_types_supported": ["public", "pairwise", "ephemeral"], + "grant_types_supported": [ + "authorization_code", + "implicit", + "urn:ietf:params:oauth:grant-type:jwt-bearer", + ], + "claim_types_supported": ["normal", "aggregated", "distributed"], + "claims_parameter_supported": True, + "request_parameter_supported": True, + "request_uri_parameter_supported": True, +} + +AUTH_REQ = AuthorizationRequest( + client_id="client_1", + redirect_uri="https://example.com/cb", + scope=["openid"], + state="STATE", + response_type="code", +) + +TOKEN_REQ = AccessTokenRequest( + client_id="client_1", + redirect_uri="https://example.com/cb", + state="STATE", + grant_type="authorization_code", + client_secret="hemligt", +) + +BASEDIR = os.path.abspath(os.path.dirname(__file__)) + + +class TestEndpoint(object): + @pytest.fixture(autouse=True) + def create_endpoint(self): + conf = { + "issuer": ISSUER, + "password": "mycket hemligt", + "verify_ssl": False, + "capabilities": CAPABILITIES, + "add_on": { + "dpop": { + "function": "oidcop.oauth2.add_on.dpop.add_support", + "kwargs": { + "dpop_signing_alg_values_supported": ["ES256"] + } + }, + }, + "keys": {"uri_path": "jwks.json", "key_defs": KEYDEFS}, + "token_handler_args": { + "jwks_file": "private/token_jwks.json", + "code": {"lifetime": 600}, + "token": { + "class": "oidcop.token.jwt_token.JWTToken", + "kwargs": { + "lifetime": 3600, + "base_claims": {"eduperson_scoped_affiliation": None}, + "add_claims_by_scope": True, + "aud": ["https://example.org/appl"], + }, + }, + "refresh": { + "class": "oidcop.token.jwt_token.JWTToken", + "kwargs": {"lifetime": 3600, "aud": ["https://example.org/appl"], }, + }, + "id_token": { + "class": "oidcop.token.id_token.IDToken", + "kwargs": { + "base_claims": { + "email": {"essential": True}, + "email_verified": {"essential": True}, + } + }, + }, + }, + "endpoint": { + "authorization": { + "path": "{}/authorization", + "class": Authorization, + "kwargs": {}, + }, + "token": { + "path": "{}/token", + "class": Token, + "kwargs": {}}, + }, + "client_authn": verify_client, + "authentication": { + "anon": { + "acr": INTERNETPROTOCOLPASSWORD, + "class": "oidcop.user_authn.user.NoAuthn", + "kwargs": {"user": "diana"}, + } + }, + "template_dir": "template", + "userinfo": { + "class": user_info.UserInfo, + "kwargs": {"db_file": "users.json"}, + }, + } + server = Server(OPConfiguration(conf, base_path=BASEDIR), keyjar=KEYJAR) + self.endpoint_context = server.endpoint_context + self.endpoint_context.cdb["client_1"] = { + "client_secret": "hemligt", + "redirect_uris": [("https://example.com/cb", None)], + "client_salt": "salted", + "token_endpoint_auth_method": "client_secret_post", + "response_types": ["code", "token", "code id_token", "id_token"], + } + self.user_id = "diana" + self.token_endpoint = server.server_get("endpoint", "token") + self.session_manager = self.endpoint_context.session_manager + + def _create_session(self, auth_req, sub_type="public", sector_identifier=""): + if sector_identifier: + authz_req = auth_req.copy() + authz_req["sector_identifier_uri"] = sector_identifier + else: + authz_req = auth_req + client_id = authz_req["client_id"] + ae = create_authn_event(self.user_id) + return self.session_manager.create_session( + ae, authz_req, self.user_id, client_id=client_id, sub_type=sub_type + ) + + def _mint_code(self, grant, client_id): + session_id = self.session_manager.encrypted_session_id( + self.user_id, client_id, grant.id + ) + usage_rules = grant.usage_rules.get("authorization_code", {}) + _exp_in = usage_rules.get("expires_in") + + # Constructing an authorization code is now done + _code = grant.mint_token( + session_id=session_id, + endpoint_context=self.endpoint_context, + token_type="authorization_code", + token_handler=self.session_manager.token_handler["code"], + usage_rules=usage_rules, + ) + + if _exp_in: + if isinstance(_exp_in, str): + _exp_in = int(_exp_in) + if _exp_in: + _code.expires_at = utc_time_sans_frac() + _exp_in + return _code + + def test_post_parse_request(self): + auth_req = post_parse_request(AUTH_REQ, AUTH_REQ["client_id"], self.endpoint_context, + http_info={ + "headers": {"dpop": DPOP_HEADER}, + "url": 'https://server.example.com/token', + "method": "POST" + }) + assert auth_req + assert "dpop_jkt" in auth_req + + def test_process_request(self): + session_id = self._create_session(AUTH_REQ) + grant = self.session_manager[session_id] + code = self._mint_code(grant, AUTH_REQ["client_id"]) + + _token_request = TOKEN_REQ.to_dict() + _context = self.endpoint_context + _token_request["code"] = code.value + _req = self.token_endpoint.parse_request(_token_request, http_info={ + "headers": {"dpop": DPOP_HEADER}, + "url": 'https://server.example.com/token', + "method": "POST" + }) + + assert "dpop_jkt" in _req + + _resp = self.token_endpoint.process_request(request=_req) + assert _resp["response_args"]["token_type"] == "DPoP" + + access_token = _resp["response_args"]["access_token"] + jws = factory(access_token) + _payload = jws.jwt.payload() + assert "cnf" in _payload + assert _payload["cnf"]["jkt"] == _req["dpop_jkt"]