Skip to content
This repository was archived by the owner on Jun 23, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions docs/source/contents/conf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,9 @@ An example::
"client_secret_post",
"client_secret_basic",
"client_secret_jwt",
"private_key_jwt"
]
"private_key_jwt",
],
"revoke_refresh_on_issue": True
}
},
"userinfo": {
Expand Down Expand Up @@ -754,3 +755,8 @@ allowed_scopes

A list with the scopes that are allowed to be used (defaults to the keys in the
clients scopes_to_claims).

-----------------------
revoke_refresh_on_issue
-----------------------
Configure whether to revoke the refresh token that was used to issue a new refresh token
12 changes: 12 additions & 0 deletions src/oidcop/oauth2/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,17 @@ def process_request(self, req: Union[Message, dict], **kwargs):

token.register_usage()

if ("client_id" in req
and req["client_id"] in _context.cdb
and "revoke_refresh_on_issue" in _context.cdb[req["client_id"]]
):
revoke_refresh = _context.cdb[req["client_id"]].get("revoke_refresh_on_issue")
else:
revoke_refresh = self.endpoint.revoke_refresh_on_issue

if revoke_refresh:
token.revoke()

return _resp

def post_parse_request(
Expand Down Expand Up @@ -365,6 +376,7 @@ def __init__(self, server_get, new_refresh_token=False, **kwargs):
self.allow_refresh = False
self.new_refresh_token = new_refresh_token
self.configure_grant_types(kwargs.get("grant_types_supported"))
self.revoke_refresh_on_issue = kwargs.get("revoke_refresh_on_issue", False)

def configure_grant_types(self, grant_types_supported):
if grant_types_supported is None:
Expand Down
33 changes: 21 additions & 12 deletions src/oidcop/oidc/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,10 @@ def process_request(self, req: Union[Message, dict], **kwargs):

logger.debug("All checks OK")

issue_refresh = False
if "issue_refresh" in kwargs:
issue_refresh = kwargs["issue_refresh"]
else:
if "offline_access" in grant.scope:
issue_refresh = True
issue_refresh = kwargs.get("issue_refresh", None)
# The existence of offline_access scope overwrites issue_refresh
if issue_refresh is None and "offline_access" in grant.scope:
issue_refresh = True

_response = {
"token_type": token_type,
Expand Down Expand Up @@ -242,12 +240,12 @@ def process_request(self, req: Union[Message, dict], **kwargs):
_resp["expires_in"] = access_token.expires_at - utc_time_sans_frac()

_mints = token.usage_rules.get("supports_minting")
issue_refresh = False
if "issue_refresh" in kwargs:
issue_refresh = kwargs["issue_refresh"]
else:
if "offline_access" in scope:
issue_refresh = True

issue_refresh = kwargs.get("issue_refresh", None)
# The existence of offline_access scope overwrites issue_refresh
if issue_refresh is None and "offline_access" in scope:
issue_refresh = True

if "refresh_token" in _mints and issue_refresh:
refresh_token = self._mint_token(
token_class="refresh_token",
Expand Down Expand Up @@ -281,6 +279,17 @@ def process_request(self, req: Union[Message, dict], **kwargs):

token.register_usage()

if ("client_id" in req
and req["client_id"] in _context.cdb
and "revoke_refresh_on_issue" in _context.cdb[req["client_id"]]
):
revoke_refresh = _context.cdb[req["client_id"]].get("revoke_refresh_on_issue")
else:
revoke_refresh = revoke_refresh = self.endpoint.revoke_refresh_on_issue

if revoke_refresh:
token.revoke()

return _resp

def post_parse_request(
Expand Down
82 changes: 79 additions & 3 deletions tests/test_24_oauth2_token_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ def test_do_2nd_refresh_access_token(self):
grant = self.endpoint_context.authz(session_id, areq)
code = self._mint_code(grant, areq["client_id"])

self.token_endpoint.revoke_refresh_on_issue = False
_cntx = self.endpoint_context

_token_request = TOKEN_REQ_DICT.copy()
Expand All @@ -423,8 +424,7 @@ def test_do_2nd_refresh_access_token(self):
_2nd_request = REFRESH_TOKEN_REQ.copy()
_2nd_request["refresh_token"] = _resp["response_args"]["refresh_token"]
_2nd_req = self.token_endpoint.parse_request(_request.to_json())
_2nd_resp = self.token_endpoint.process_request(request=_req, issue_refresh=True)

_2nd_resp = self.token_endpoint.process_request(request=_2nd_req, issue_refresh=True)
assert set(_2nd_resp.keys()) == {"cookie", "response_args", "http_headers"}
assert set(_2nd_resp["response_args"].keys()) == {
"access_token",
Expand Down Expand Up @@ -475,6 +475,82 @@ def test_new_refresh_token(self, conf):

assert first_refresh_token != second_refresh_token

def test_revoke_on_issue_refresh_token(self, conf):
self.endpoint_context.cdb["client_1"] = {
"client_secret": "hemligt",
"redirect_uris": [("https://example.com/cb", None)],
"client_salt": "salted",
"endpoint_auth_method": "client_secret_post",
"response_types": ["code", "token", "code id_token", "id_token"],
}

self.token_endpoint.revoke_refresh_on_issue = True
areq = AUTH_REQ.copy()
areq["scope"] = ["email"]

session_id = self._create_session(areq)
grant = self.endpoint_context.authz(session_id, areq)
code = self._mint_code(grant, areq["client_id"])

_token_request = TOKEN_REQ_DICT.copy()
_token_request["code"] = code.value
_req = self.token_endpoint.parse_request(_token_request)
_resp = self.token_endpoint.process_request(request=_req, issue_refresh=True)
assert "refresh_token" in _resp["response_args"]
first_refresh_token = _resp["response_args"]["refresh_token"]

_refresh_request = REFRESH_TOKEN_REQ.copy()
_refresh_request["refresh_token"] = first_refresh_token
_2nd_req = self.token_endpoint.parse_request(_refresh_request.to_json())
_2nd_resp = self.token_endpoint.process_request(request=_2nd_req, issue_refresh=True)
assert "refresh_token" in _2nd_resp["response_args"]
second_refresh_token = _2nd_resp["response_args"]["refresh_token"]

assert first_refresh_token != second_refresh_token
first_refresh_token = grant.get_token(first_refresh_token)
second_refresh_token = grant.get_token(second_refresh_token)
assert first_refresh_token.revoked is True
assert second_refresh_token.revoked is False

def test_revoke_on_issue_refresh_token_per_client(self, conf):
self.endpoint_context.cdb["client_1"] = {
"client_secret": "hemligt",
"redirect_uris": [("https://example.com/cb", None)],
"client_salt": "salted",
"endpoint_auth_method": "client_secret_post",
"response_types": ["code", "token", "code id_token", "id_token"],
}
self.endpoint_context.cdb[AUTH_REQ["client_id"]]["revoke_refresh_on_issue"] = True
areq = AUTH_REQ.copy()
areq["scope"] = ["openid", "offline_access"]

session_id = self._create_session(areq)
grant = self.endpoint_context.authz(session_id, areq)
code = self._mint_code(grant, areq["client_id"])

_token_request = TOKEN_REQ_DICT.copy()
_token_request["code"] = code.value
_req = self.token_endpoint.parse_request(_token_request)
_resp = self.token_endpoint.process_request(request=_req, issue_refresh=True)
assert "refresh_token" in _resp["response_args"]
first_refresh_token = _resp["response_args"]["refresh_token"]

_refresh_request = REFRESH_TOKEN_REQ.copy()
_refresh_request["refresh_token"] = first_refresh_token
_2nd_req = self.token_endpoint.parse_request(_refresh_request.to_json())
_2nd_resp = self.token_endpoint.process_request(request=_2nd_req, issue_refresh=True)
assert "refresh_token" in _2nd_resp["response_args"]
second_refresh_token = _2nd_resp["response_args"]["refresh_token"]

_2d_refresh_request = REFRESH_TOKEN_REQ.copy()
_2d_refresh_request["refresh_token"] = second_refresh_token

assert first_refresh_token != second_refresh_token
first_refresh_token = grant.get_token(first_refresh_token)
second_refresh_token = grant.get_token(second_refresh_token)
assert first_refresh_token.revoked is True
assert second_refresh_token.revoked is False

def test_refresh_scopes(self):
areq = AUTH_REQ.copy()
areq["scope"] = ["email", "profile"]
Expand Down Expand Up @@ -695,4 +771,4 @@ def test_refresh_token_request_other_client(self):
assert isinstance(_resp, TokenErrorResponse)
assert _resp.to_dict() == {
"error": "invalid_grant", "error_description": "Wrong client"
}
}
82 changes: 80 additions & 2 deletions tests/test_35_oidc_token_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def conf():
},
"refresh": {
"class": "oidcop.token.jwt_token.JWTToken",
"kwargs": {"lifetime": 3600, "aud": ["https://example.org/appl"], },
"kwargs": {"lifetime": 3600, "aud": ["https://example.org/appl"]},
},
"id_token": {"class": "oidcop.token.id_token.IDToken", "kwargs": {}},
},
Expand Down Expand Up @@ -390,7 +390,7 @@ def test_do_2nd_refresh_access_token(self):
session_id = self._create_session(areq)
grant = self.endpoint_context.authz(session_id, areq)
code = self._mint_code(grant, areq["client_id"])

self.token_endpoint.revoke_refresh_on_issue = False
_cntx = self.endpoint_context

_token_request = TOKEN_REQ_DICT.copy()
Expand Down Expand Up @@ -761,6 +761,84 @@ def test_new_refresh_token(self, conf):

assert first_refresh_token != second_refresh_token

def test_revoke_on_issue_refresh_token(self, conf):
self.endpoint_context.cdb["client_1"] = {
"client_secret": "hemligt",
"redirect_uris": [("https://example.com/cb", None)],
"client_salt": "salted",
"endpoint_auth_method": "client_secret_post",
"response_types": ["code", "token", "code id_token", "id_token"],
}
self.token_endpoint.revoke_refresh_on_issue = True
areq = AUTH_REQ.copy()
areq["scope"] = ["openid", "offline_access"]

session_id = self._create_session(areq)
grant = self.endpoint_context.authz(session_id, areq)
code = self._mint_code(grant, areq["client_id"])

_token_request = TOKEN_REQ_DICT.copy()
_token_request["code"] = code.value
_req = self.token_endpoint.parse_request(_token_request)
_resp = self.token_endpoint.process_request(request=_req, issue_refresh=True)
assert "refresh_token" in _resp["response_args"]
first_refresh_token = _resp["response_args"]["refresh_token"]

_refresh_request = REFRESH_TOKEN_REQ.copy()
_refresh_request["refresh_token"] = first_refresh_token
_2nd_req = self.token_endpoint.parse_request(_refresh_request.to_json())
_2nd_resp = self.token_endpoint.process_request(request=_2nd_req, issue_refresh=True)
assert "refresh_token" in _2nd_resp["response_args"]
second_refresh_token = _2nd_resp["response_args"]["refresh_token"]

_2d_refresh_request = REFRESH_TOKEN_REQ.copy()
_2d_refresh_request["refresh_token"] = second_refresh_token

assert first_refresh_token != second_refresh_token
first_refresh_token = grant.get_token(first_refresh_token)
second_refresh_token = grant.get_token(second_refresh_token)
assert first_refresh_token.revoked is True
assert second_refresh_token.revoked is False

def test_revoke_on_issue_refresh_token_per_client(self, conf):
self.endpoint_context.cdb["client_1"] = {
"client_secret": "hemligt",
"redirect_uris": [("https://example.com/cb", None)],
"client_salt": "salted",
"endpoint_auth_method": "client_secret_post",
"response_types": ["code", "token", "code id_token", "id_token"],
}
self.endpoint_context.cdb[AUTH_REQ["client_id"]]["revoke_refresh_on_issue"] = True
areq = AUTH_REQ.copy()
areq["scope"] = ["openid", "offline_access"]

session_id = self._create_session(areq)
grant = self.endpoint_context.authz(session_id, areq)
code = self._mint_code(grant, areq["client_id"])

_token_request = TOKEN_REQ_DICT.copy()
_token_request["code"] = code.value
_req = self.token_endpoint.parse_request(_token_request)
_resp = self.token_endpoint.process_request(request=_req, issue_refresh=True)
assert "refresh_token" in _resp["response_args"]
first_refresh_token = _resp["response_args"]["refresh_token"]

_refresh_request = REFRESH_TOKEN_REQ.copy()
_refresh_request["refresh_token"] = first_refresh_token
_2nd_req = self.token_endpoint.parse_request(_refresh_request.to_json())
_2nd_resp = self.token_endpoint.process_request(request=_2nd_req, issue_refresh=True)
assert "refresh_token" in _2nd_resp["response_args"]
second_refresh_token = _2nd_resp["response_args"]["refresh_token"]

_2d_refresh_request = REFRESH_TOKEN_REQ.copy()
_2d_refresh_request["refresh_token"] = second_refresh_token

assert first_refresh_token != second_refresh_token
first_refresh_token = grant.get_token(first_refresh_token)
second_refresh_token = grant.get_token(second_refresh_token)
assert first_refresh_token.revoked is True
assert second_refresh_token.revoked is False

def test_do_refresh_access_token_not_allowed(self):
areq = AUTH_REQ.copy()
areq["scope"] = ["openid", "offline_access"]
Expand Down