Skip to content

Commit

Permalink
Enforce alg verification in id_token
Browse files Browse the repository at this point in the history
  • Loading branch information
tpazderka committed Dec 1, 2020
1 parent b195d09 commit 637c148
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/oic/oic/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ def parse_authz(

_log_info("response: %s" % sanitize(query))

if "algs" not in kwargs:
kwargs["algs"] = self.sign_enc_algs("id_token")
if "code" in self.consumer_config["response_type"]:
aresp, _state = self._parse_authz(query, **kwargs)

Expand Down
135 changes: 131 additions & 4 deletions tests/test_oic_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from jwkest.jwk import SYMKey

from oic.oauth2.message import MissingSigningKey
from oic.oauth2.message import WrongSigningAlgorithm
from oic.oic import DEF_SIGN_ALG
from oic.oic import Server
from oic.oic import response_types_to_grant_types
Expand Down Expand Up @@ -463,7 +464,7 @@ def test_complete_auth_token_idtoken(self):
_state = "state0"
self.consumer.consumer_config["response_type"] = ["id_token", "token"]
self.consumer.registration_response = RegistrationResponse(
id_token_signed_response_alg="RS256"
id_token_signed_response_alg="HS256"
)
self.consumer.authz_req = {} # Store AuthzReq with state as key

Expand All @@ -484,7 +485,7 @@ def test_complete_auth_token_idtoken(self):
location = (
"https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
"scope=openid&id_token={}".format(
token.to_jwt(key=KC_RSA.keys(), algorithm="RS256")
token.to_jwt(key=[SYMKey(key="hemlig")], algorithm="HS256")
)
)
with responses.RequestsMock() as rsps:
Expand All @@ -508,10 +509,83 @@ def test_complete_auth_token_idtoken(self):
parsed = urlparse(result.headers["location"])

with freeze_time("2019-08-09 11:00:00"):
part = self.consumer.parse_authz(
query=parsed.query, algs=self.consumer.sign_enc_algs("id_token")
part = self.consumer.parse_authz(query=parsed.query)
auth = part[0]
atr = part[1]
assert part[2] is None

assert auth is None
assert isinstance(atr, AccessTokenResponse)
assert _eq(
atr.keys(), ["access_token", "id_token", "token_type", "state", "scope"]
)

with freeze_time("2019-08-09 11:00:00"):
self.consumer.verify_id_token(
atr["id_token"], self.consumer.authz_req[atr["state"]]
)
auth = part[0]
atr = part[1]
idt = part[2]

assert isinstance(part, tuple)
assert auth is None
assert isinstance(atr, AccessTokenResponse)
assert _eq(
atr.keys(), ["access_token", "id_token", "token_type", "state", "scope"]
)
assert isinstance(idt, IdToken)

def test_complete_auth_token_idtoken_no_alg_config(self):
_state = "state0"
self.consumer.consumer_config["response_type"] = ["id_token", "token"]
self.consumer.provider_info = ProviderConfigurationResponse(
issuer="https://example.com"
) # abs min
self.consumer.authz_req = {} # Store AuthzReq with state as key

args = {
"client_id": self.consumer.client_id,
"response_type": self.consumer.consumer_config["response_type"],
"scope": ["openid"],
"nonce": "nonce",
}
token = IdToken(
iss="https://example.com",
aud="client_1",
sub="some_sub",
exp=1565348600,
iat=1565348300,
nonce="nonce",
)
location = (
"https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
"scope=openid&id_token={}".format(
token.to_jwt(key=[SYMKey(key="hemlig")], algorithm="HS256")
)
)
with responses.RequestsMock() as rsps:
rsps.add(
responses.GET,
"https://example.com/authorization",
status=302,
headers={"location": location},
)
result = self.consumer.do_authorization_request(
state=_state, request_args=args
)
query = parse_qs(urlparse(result.request.url).query)
assert query["client_id"] == ["client_1"]
assert query["scope"] == ["openid"]
assert query["response_type"] == ["id_token token"]
assert query["state"] == ["state0"]
assert query["nonce"] == ["nonce"]
assert query["redirect_uri"] == ["https://example.com/cb"]

parsed = urlparse(result.headers["location"])

with freeze_time("2019-08-09 11:00:00"):
part = self.consumer.parse_authz(query=parsed.query, algs={"sign": "HS256"})
auth = part[0]
atr = part[1]
idt = part[2]
Expand All @@ -524,6 +598,59 @@ def test_complete_auth_token_idtoken(self):
)
assert isinstance(idt, IdToken)

def test_complete_auth_token_idtoken_cipher_downgrade(self):
_state = "state0"
self.consumer.consumer_config["response_type"] = ["id_token", "token"]
self.consumer.provider_info = ProviderConfigurationResponse(
issuer="https://example.com"
) # abs min
self.consumer.authz_req = {} # Store AuthzReq with state as key

args = {
"client_id": self.consumer.client_id,
"response_type": self.consumer.consumer_config["response_type"],
"scope": ["openid"],
"nonce": "nonce",
}
token = IdToken(
iss="https://example.com",
aud="client_1",
sub="some_sub",
exp=1565348600,
iat=1565348300,
nonce="nonce",
)
# Downgrade the algorithm to `none`
location = (
"https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
"scope=openid&id_token={}".format(
token.to_jwt(key=KC_RSA.keys(), algorithm="none")
)
)
with responses.RequestsMock() as rsps:
rsps.add(
responses.GET,
"https://example.com/authorization",
status=302,
headers={"location": location},
)
result = self.consumer.do_authorization_request(
state=_state, request_args=args
)
query = parse_qs(urlparse(result.request.url).query)
assert query["client_id"] == ["client_1"]
assert query["scope"] == ["openid"]
assert query["response_type"] == ["id_token token"]
assert query["state"] == ["state0"]
assert query["nonce"] == ["nonce"]
assert query["redirect_uri"] == ["https://example.com/cb"]

parsed = urlparse(result.headers["location"])

with freeze_time("2019-08-09 11:00:00"):
with pytest.raises(WrongSigningAlgorithm):
part = self.consumer.parse_authz(query=parsed.query)

def test_userinfo(self):
_state = "state0"

Expand Down

0 comments on commit 637c148

Please sign in to comment.