Skip to content

Commit

Permalink
Disallow none alg in flows other than Authorization
Browse files Browse the repository at this point in the history
  • Loading branch information
tpazderka committed Dec 1, 2020
1 parent 637c148 commit fe0fbda
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 20 deletions.
16 changes: 14 additions & 2 deletions src/oic/oic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from oic.oauth2.message import ErrorResponse
from oic.oauth2.message import Message
from oic.oauth2.message import MessageFactory
from oic.oauth2.message import WrongSigningAlgorithm
from oic.oauth2.util import get_or_post
from oic.oic.message import SCOPE2CLAIMS
from oic.oic.message import AccessTokenResponse
Expand Down Expand Up @@ -1399,7 +1400,13 @@ def sign_enc_algs(self, typ):
return resp

def _verify_id_token(
self, id_token, nonce="", acr_values=None, auth_time=0, max_age=0
self,
id_token,
nonce="",
acr_values=None,
auth_time=0,
max_age=0,
response_type="",
):
"""
Verify IdToken.
Expand Down Expand Up @@ -1432,6 +1439,11 @@ def _verify_id_token(
if _now > id_token["exp"]:
raise OtherError("Passed best before date")

if response_type != ["code"] and id_token.jws_header["alg"] == "none":
raise WrongSigningAlgorithm(
"none is not allowed outside Authorization Flow."
)

if (
self.id_token_max_age
and _now > int(id_token["iat"]) + self.id_token_max_age
Expand All @@ -1458,7 +1470,7 @@ def verify_id_token(self, id_token, authn_req):
except KeyError:
pass

for param in ["acr_values", "max_age"]:
for param in ["acr_values", "max_age", "response_type"]:
try:
kwa[param] = authn_req[param]
except KeyError:
Expand Down
12 changes: 7 additions & 5 deletions src/oic/oic/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,19 @@ def verify_id_token(instance, check_hash=False, **kwargs):

if check_hash:
_alg = idt.jws_header["alg"]
# What if _alg == 'none'

hfunc = "HS" + _alg[-3:]
if _alg != "none":
hfunc = "HS" + _alg[-3:]
else:
# This is allowed only for `code` and it needs to be checked by a Client
hfunc = None

if "access_token" in instance:
if "access_token" in instance and hfunc is not None:
if "at_hash" not in idt:
raise MissingRequiredAttribute("Missing at_hash property", idt)
if idt["at_hash"] != jws.left_hash(instance["access_token"], hfunc):
raise AtHashError("Failed to verify access_token hash", idt)

if "code" in instance:
if "code" in instance and hfunc is not None:
if "c_hash" not in idt:
raise MissingRequiredAttribute("Missing c_hash property", idt)
if idt["c_hash"] != jws.left_hash(instance["code"], hfunc):
Expand Down
137 changes: 124 additions & 13 deletions tests/test_oic_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,39 +510,91 @@ def test_complete_auth_token_idtoken(self):

with freeze_time("2019-08-09 11:00:00"):
part = self.consumer.parse_authz(query=parsed.query)
assert isinstance(part, tuple)
auth = part[0]
atr = part[1]
assert part[2] is None
idt = part[2]

assert auth is None
assert isinstance(atr, AccessTokenResponse)
assert _eq(
atr.keys(), ["access_token", "id_token", "token_type", "state", "scope"]
)
assert isinstance(idt, IdToken)

with freeze_time("2019-08-09 11:00:00"):
self.consumer.verify_id_token(
atr["id_token"], self.consumer.authz_req[atr["state"]]
def test_complete_auth_token_idtoken_no_alg_config(self):
_state = "state0"
self.consumer.consumer_config["response_type"] = ["id_token", "token"]
self.consumer.provider_info = ProviderConfigurationResponse(
issuer="https://example.com"
) # abs min
self.consumer.authz_req = {} # Store AuthzReq with state as key

args = {
"client_id": self.consumer.client_id,
"response_type": self.consumer.consumer_config["response_type"],
"scope": ["openid"],
"nonce": "nonce",
}
token = IdToken(
iss="https://example.com",
aud="client_1",
sub="some_sub",
exp=1565348600,
iat=1565348300,
nonce="nonce",
)
location = (
"https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
"scope=openid&id_token={}".format(
token.to_jwt(key=[SYMKey(key="hemlig")], algorithm="HS256")
)
)
with responses.RequestsMock() as rsps:
rsps.add(
responses.GET,
"https://example.com/authorization",
status=302,
headers={"location": location},
)
result = self.consumer.do_authorization_request(
state=_state, request_args=args
)
query = parse_qs(urlparse(result.request.url).query)
assert query["client_id"] == ["client_1"]
assert query["scope"] == ["openid"]
assert query["response_type"] == ["id_token token"]
assert query["state"] == ["state0"]
assert query["nonce"] == ["nonce"]
assert query["redirect_uri"] == ["https://example.com/cb"]

parsed = urlparse(result.headers["location"])

with freeze_time("2019-08-09 11:00:00"):
part = self.consumer.parse_authz(query=parsed.query, algs={"sign": "HS256"})
assert isinstance(part, tuple)
auth = part[0]
atr = part[1]
idt = part[2]

assert isinstance(part, tuple)
assert auth is None
assert isinstance(atr, AccessTokenResponse)
assert _eq(
atr.keys(), ["access_token", "id_token", "token_type", "state", "scope"]
)
assert isinstance(idt, IdToken)

def test_complete_auth_token_idtoken_no_alg_config(self):
def test_complete_auth_token_idtoken_none_cipher_code(self):
_state = "state0"
self.consumer.consumer_config["response_type"] = ["id_token", "token"]
self.consumer.consumer_config["response_type"] = ["code"]
self.consumer.registration_response = RegistrationResponse(
id_token_signed_response_alg="none"
)
self.consumer.provider_info = ProviderConfigurationResponse(
issuer="https://example.com"
) # abs min
self.consumer.authz_req = {} # Store AuthzReq with state as key
self.consumer.sdb[_state] = {"redirect_uris": []}

args = {
"client_id": self.consumer.client_id,
Expand All @@ -557,11 +609,13 @@ def test_complete_auth_token_idtoken_no_alg_config(self):
exp=1565348600,
iat=1565348300,
nonce="nonce",
at_hash="aaa",
)
# Downgrade the algorithm to `none`
location = (
"https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
"scope=openid&id_token={}".format(
token.to_jwt(key=[SYMKey(key="hemlig")], algorithm="HS256")
token.to_jwt(key=KC_RSA.keys(), algorithm="none")
)
)
with responses.RequestsMock() as rsps:
Expand All @@ -577,27 +631,84 @@ def test_complete_auth_token_idtoken_no_alg_config(self):
query = parse_qs(urlparse(result.request.url).query)
assert query["client_id"] == ["client_1"]
assert query["scope"] == ["openid"]
assert query["response_type"] == ["id_token token"]
assert query["response_type"] == ["code"]
assert query["state"] == ["state0"]
assert query["nonce"] == ["nonce"]
assert query["redirect_uri"] == ["https://example.com/cb"]

parsed = urlparse(result.headers["location"])

with freeze_time("2019-08-09 11:00:00"):
part = self.consumer.parse_authz(query=parsed.query, algs={"sign": "HS256"})
part = self.consumer.parse_authz(query=parsed.query)
assert isinstance(part, tuple)
auth = part[0]
atr = part[1]
idt = part[2]

assert isinstance(part, tuple)
assert auth is None
assert isinstance(auth, AuthorizationResponse)
assert isinstance(atr, AccessTokenResponse)
assert _eq(
atr.keys(), ["access_token", "id_token", "token_type", "state", "scope"]
)
assert isinstance(idt, IdToken)

def test_complete_auth_token_idtoken_none_cipher_token(self):
_state = "state0"
self.consumer.consumer_config["response_type"] = ["token"]
self.consumer.registration_response = RegistrationResponse(
id_token_signed_response_alg="none"
)
self.consumer.provider_info = ProviderConfigurationResponse(
issuer="https://example.com"
) # abs min
self.consumer.authz_req = {} # Store AuthzReq with state as key
self.consumer.sdb[_state] = {"redirect_uris": []}

args = {
"client_id": self.consumer.client_id,
"response_type": self.consumer.consumer_config["response_type"],
"scope": ["openid"],
"nonce": "nonce",
}
token = IdToken(
iss="https://example.com",
aud="client_1",
sub="some_sub",
exp=1565348600,
iat=1565348300,
nonce="nonce",
)
# Downgrade the algorithm to `none`
location = (
"https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
"scope=openid&id_token={}".format(
token.to_jwt(key=KC_RSA.keys(), algorithm="none")
)
)
with responses.RequestsMock() as rsps:
rsps.add(
responses.GET,
"https://example.com/authorization",
status=302,
headers={"location": location},
)
result = self.consumer.do_authorization_request(
state=_state, request_args=args
)
query = parse_qs(urlparse(result.request.url).query)
assert query["client_id"] == ["client_1"]
assert query["scope"] == ["openid"]
assert query["response_type"] == ["token"]
assert query["state"] == ["state0"]
assert query["nonce"] == ["nonce"]
assert query["redirect_uri"] == ["https://example.com/cb"]

parsed = urlparse(result.headers["location"])

with freeze_time("2019-08-09 11:00:00"):
with pytest.raises(WrongSigningAlgorithm):
self.consumer.parse_authz(query=parsed.query)

def test_complete_auth_token_idtoken_cipher_downgrade(self):
_state = "state0"
self.consumer.consumer_config["response_type"] = ["id_token", "token"]
Expand Down Expand Up @@ -649,7 +760,7 @@ def test_complete_auth_token_idtoken_cipher_downgrade(self):

with freeze_time("2019-08-09 11:00:00"):
with pytest.raises(WrongSigningAlgorithm):
part = self.consumer.parse_authz(query=parsed.query)
self.consumer.parse_authz(query=parsed.query)

def test_userinfo(self):
_state = "state0"
Expand Down

0 comments on commit fe0fbda

Please sign in to comment.