@@ -10,12 +10,14 @@
from jwkest .jwk import SYMKey
from oic .oauth2 .message import MissingSigningKey
from oic .oauth2 .message import WrongSigningAlgorithm
from oic .oic import DEF_SIGN_ALG
from oic .oic import Server
from oic .oic import response_types_to_grant_types
from oic .oic .consumer import IGNORE
from oic .oic .consumer import Consumer
from oic .oic .consumer import clean_response
from oic .oic .message import AccessTokenRequest
from oic .oic .message import AccessTokenResponse
from oic .oic .message import AuthorizationResponse
from oic .oic .message import IdToken
@@ -147,6 +149,9 @@ def setup_consumer(self, session_db_factory):
self .consumer .userinfo_endpoint = "https://example.com/userinfo" # type: ignore
self .consumer .client_secret = "hemlig"
self .consumer .secret_type = "basic"
self .consumer .provider_info = ProviderConfigurationResponse (
issuer = "https://example.com"
) # abs min
def test_backup_keys (self ):
keys = self .consumer .__dict__ .keys ()
@@ -326,6 +331,7 @@ def test_parse_authz(self):
self .consumer ._backup (_state )
part = self .consumer .parse_authz (query = result .headers ["location" ])
assert isinstance (part , tuple )
atr = part [0 ]
assert part [1 ] is None
assert part [2 ] is None
@@ -359,6 +365,7 @@ def test_parse_authz_implicit(self):
)
part = self .consumer .parse_authz (query = result .headers ["location" ])
assert isinstance (part , tuple )
assert part [0 ] is None
atr = part [1 ]
assert part [2 ] is None
@@ -440,6 +447,7 @@ def test_complete_auth_token(self):
parsed = urlparse (result .headers ["location" ])
part = self .consumer .parse_authz (query = parsed .query )
assert isinstance (part , tuple )
auth = part [0 ]
acc = part [1 ]
assert part [2 ] is None
@@ -456,8 +464,67 @@ def test_complete_auth_token_idtoken(self):
_state = "state0"
self .consumer .consumer_config ["response_type" ] = ["id_token" , "token" ]
self .consumer .registration_response = RegistrationResponse (
id_token_signed_response_alg = "RS256"
id_token_signed_response_alg = "HS256"
)
self .consumer .authz_req = {} # Store AuthzReq with state as key
args = {
"client_id" : self .consumer .client_id ,
"response_type" : self .consumer .consumer_config ["response_type" ],
"scope" : ["openid" ],
"nonce" : "nonce" ,
}
token = IdToken (
iss = "https://example.com" ,
aud = "client_1" ,
sub = "some_sub" ,
exp = 1565348600 ,
iat = 1565348300 ,
nonce = "nonce" ,
)
location = (
"https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
"scope=openid&id_token={}" .format (
token .to_jwt (key = [SYMKey (key = "hemlig" )], algorithm = "HS256" )
)
)
with responses .RequestsMock () as rsps :
rsps .add (
responses .GET ,
"https://example.com/authorization" ,
status = 302 ,
headers = {"location" : location },
)
result = self .consumer .do_authorization_request (
state = _state , request_args = args
)
query = parse_qs (urlparse (result .request .url ).query )
assert query ["client_id" ] == ["client_1" ]
assert query ["scope" ] == ["openid" ]
assert query ["response_type" ] == ["id_token token" ]
assert query ["state" ] == ["state0" ]
assert query ["nonce" ] == ["nonce" ]
assert query ["redirect_uri" ] == ["https://example.com/cb" ]
parsed = urlparse (result .headers ["location" ])
with freeze_time ("2019-08-09 11:00:00" ):
part = self .consumer .parse_authz (query = parsed .query )
assert isinstance (part , tuple )
auth = part [0 ]
atr = part [1 ]
idt = part [2 ]
assert auth is None
assert isinstance (atr , AccessTokenResponse )
assert _eq (
atr .keys (), ["access_token" , "id_token" , "token_type" , "state" , "scope" ]
)
assert isinstance (idt , IdToken )
def test_complete_auth_token_idtoken_no_alg_config (self ):
_state = "state0"
self .consumer .consumer_config ["response_type" ] = ["id_token" , "token" ]
self .consumer .provider_info = ProviderConfigurationResponse (
issuer = "https://example.com"
) # abs min
@@ -480,7 +547,7 @@ def test_complete_auth_token_idtoken(self):
location = (
"https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
"scope=openid&id_token={}" .format (
token .to_jwt (key = KC_RSA . keys () , algorithm = "RS256 " )
token .to_jwt (key = [ SYMKey ( key = "hemlig" )] , algorithm = "HS256 " )
)
)
with responses .RequestsMock () as rsps :
@@ -504,23 +571,196 @@ def test_complete_auth_token_idtoken(self):
parsed = urlparse (result .headers ["location" ])
with freeze_time ("2019-08-09 11:00:00" ):
part = self .consumer .parse_authz (
query = parsed .query , algs = self .consumer .sign_enc_algs ("id_token" )
)
part = self .consumer .parse_authz (query = parsed .query , algs = {"sign" : "HS256" })
assert isinstance (part , tuple )
auth = part [0 ]
atr = part [1 ]
assert part [2 ] is None
idt = part [2 ]
assert auth is None
assert isinstance (atr , AccessTokenResponse )
assert _eq (
atr .keys (), ["access_token" , "id_token" , "token_type" , "state" , "scope" ]
)
assert isinstance (idt , IdToken )
def test_complete_auth_token_idtoken_none_cipher_code (self ):
_state = "state0"
self .consumer .consumer_config ["response_type" ] = ["code" ]
self .consumer .registration_response = RegistrationResponse (
id_token_signed_response_alg = "none"
)
self .consumer .provider_info = ProviderConfigurationResponse (
issuer = "https://example.com"
) # abs min
self .consumer .authz_req = {} # Store AuthzReq with state as key
self .consumer .sdb [_state ] = {"redirect_uris" : []}
args = {
"client_id" : self .consumer .client_id ,
"response_type" : self .consumer .consumer_config ["response_type" ],
"scope" : ["openid" ],
"nonce" : "nonce" ,
}
token = IdToken (
iss = "https://example.com" ,
aud = "client_1" ,
sub = "some_sub" ,
exp = 1565348600 ,
iat = 1565348300 ,
nonce = "nonce" ,
at_hash = "aaa" ,
)
# Downgrade the algorithm to `none`
location = (
"https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
"scope=openid&id_token={}" .format (
token .to_jwt (key = KC_RSA .keys (), algorithm = "none" )
)
)
with responses .RequestsMock () as rsps :
rsps .add (
responses .GET ,
"https://example.com/authorization" ,
status = 302 ,
headers = {"location" : location },
)
result = self .consumer .do_authorization_request (
state = _state , request_args = args
)
query = parse_qs (urlparse (result .request .url ).query )
assert query ["client_id" ] == ["client_1" ]
assert query ["scope" ] == ["openid" ]
assert query ["response_type" ] == ["code" ]
assert query ["state" ] == ["state0" ]
assert query ["nonce" ] == ["nonce" ]
assert query ["redirect_uri" ] == ["https://example.com/cb" ]
parsed = urlparse (result .headers ["location" ])
with freeze_time ("2019-08-09 11:00:00" ):
part = self .consumer .parse_authz (query = parsed .query )
assert isinstance (part , tuple )
auth = part [0 ]
atr = part [1 ]
idt = part [2 ]
assert isinstance (auth , AuthorizationResponse )
assert isinstance (atr , AccessTokenResponse )
assert _eq (
atr .keys (), ["access_token" , "id_token" , "token_type" , "state" , "scope" ]
)
assert isinstance (idt , IdToken )
def test_complete_auth_token_idtoken_none_cipher_token (self ):
_state = "state0"
self .consumer .consumer_config ["response_type" ] = ["token" ]
self .consumer .registration_response = RegistrationResponse (
id_token_signed_response_alg = "none"
)
self .consumer .provider_info = ProviderConfigurationResponse (
issuer = "https://example.com"
) # abs min
self .consumer .authz_req = {} # Store AuthzReq with state as key
self .consumer .sdb [_state ] = {"redirect_uris" : []}
args = {
"client_id" : self .consumer .client_id ,
"response_type" : self .consumer .consumer_config ["response_type" ],
"scope" : ["openid" ],
"nonce" : "nonce" ,
}
token = IdToken (
iss = "https://example.com" ,
aud = "client_1" ,
sub = "some_sub" ,
exp = 1565348600 ,
iat = 1565348300 ,
nonce = "nonce" ,
)
# Downgrade the algorithm to `none`
location = (
"https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
"scope=openid&id_token={}" .format (
token .to_jwt (key = KC_RSA .keys (), algorithm = "none" )
)
)
with responses .RequestsMock () as rsps :
rsps .add (
responses .GET ,
"https://example.com/authorization" ,
status = 302 ,
headers = {"location" : location },
)
result = self .consumer .do_authorization_request (
state = _state , request_args = args
)
query = parse_qs (urlparse (result .request .url ).query )
assert query ["client_id" ] == ["client_1" ]
assert query ["scope" ] == ["openid" ]
assert query ["response_type" ] == ["token" ]
assert query ["state" ] == ["state0" ]
assert query ["nonce" ] == ["nonce" ]
assert query ["redirect_uri" ] == ["https://example.com/cb" ]
parsed = urlparse (result .headers ["location" ])
with freeze_time ("2019-08-09 11:00:00" ):
self .consumer .verify_id_token (
atr ["id_token" ], self .consumer .authz_req [atr ["state" ]]
with pytest .raises (WrongSigningAlgorithm ):
self .consumer .parse_authz (query = parsed .query )
def test_complete_auth_token_idtoken_cipher_downgrade (self ):
_state = "state0"
self .consumer .consumer_config ["response_type" ] = ["id_token" , "token" ]
self .consumer .provider_info = ProviderConfigurationResponse (
issuer = "https://example.com"
) # abs min
self .consumer .authz_req = {} # Store AuthzReq with state as key
args = {
"client_id" : self .consumer .client_id ,
"response_type" : self .consumer .consumer_config ["response_type" ],
"scope" : ["openid" ],
"nonce" : "nonce" ,
}
token = IdToken (
iss = "https://example.com" ,
aud = "client_1" ,
sub = "some_sub" ,
exp = 1565348600 ,
iat = 1565348300 ,
nonce = "nonce" ,
)
# Downgrade the algorithm to `none`
location = (
"https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
"scope=openid&id_token={}" .format (
token .to_jwt (key = KC_RSA .keys (), algorithm = "none" )
)
)
with responses .RequestsMock () as rsps :
rsps .add (
responses .GET ,
"https://example.com/authorization" ,
status = 302 ,
headers = {"location" : location },
)
result = self .consumer .do_authorization_request (
state = _state , request_args = args
)
query = parse_qs (urlparse (result .request .url ).query )
assert query ["client_id" ] == ["client_1" ]
assert query ["scope" ] == ["openid" ]
assert query ["response_type" ] == ["id_token token" ]
assert query ["state" ] == ["state0" ]
assert query ["nonce" ] == ["nonce" ]
assert query ["redirect_uri" ] == ["https://example.com/cb" ]
parsed = urlparse (result .headers ["location" ])
with freeze_time ("2019-08-09 11:00:00" ):
with pytest .raises (WrongSigningAlgorithm ):
self .consumer .parse_authz (query = parsed .query )
def test_userinfo (self ):
_state = "state0"
@@ -920,6 +1160,7 @@ def test_get_session_management_id(self):
self .consumer .sdb [_state ] = {"redirect_uris" : ["https://example.org/cb" ]}
resp = AuthorizationResponse (id_token = _signed_jwt , state = _state )
self .consumer .consumer_config ["response_type" ] = ["id_token" ]
self .consumer .authz_req [_state ] = AccessTokenRequest (nonce = "KUEYfRM2VzKDaaKD" )
self .consumer .parse_authz (resp .to_urlencoded ())
assert self .consumer .sso_db ["state" ]["smid" ] == smid
assert session_get (self .consumer .sso_db , "smid" , smid ) == [_state ]