Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

B2c implementation #104

Merged
merged 5 commits into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
87 changes: 53 additions & 34 deletions msal/authority.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import re
try:
from urllib.parse import urlparse
except ImportError: # Fall back to Python 2
from urlparse import urlparse
import logging

import requests
Expand All @@ -23,6 +26,8 @@ class Authority(object):
Once constructed, it contains members named "*_endpoint" for this instance.
TODO: It will also cache the previously-validated authority instances.
"""
_domains_without_user_realm_discovery = set([])

def __init__(self, authority_url, validate_authority=True,
verify=True, proxies=None, timeout=None,
):
Expand All @@ -37,18 +42,30 @@ def __init__(self, authority_url, validate_authority=True,
self.verify = verify
self.proxies = proxies
self.timeout = timeout
canonicalized, self.instance, tenant = canonicalize(authority_url)
tenant_discovery_endpoint = (
'https://{}/{}{}/.well-known/openid-configuration'.format(
self.instance,
tenant,
"" if tenant == "adfs" else "/v2.0" # the AAD v2 endpoint
))
if (tenant != "adfs" and validate_authority
authority, self.instance, tenant = canonicalize(authority_url)
is_b2c = self.instance.endswith(".b2clogin.com")
rayluo marked this conversation as resolved.
Show resolved Hide resolved
if (tenant != "adfs" and (not is_b2c) and validate_authority
and self.instance not in WELL_KNOWN_AUTHORITY_HOSTS):
tenant_discovery_endpoint = instance_discovery(
canonicalized + "/oauth2/v2.0/authorize",
payload = instance_discovery(
"https://{}{}/oauth2/v2.0/authorize".format(
self.instance, authority.path),
verify=verify, proxies=proxies, timeout=timeout)
if payload.get("error") == "invalid_instance":
raise ValueError(
"invalid_instance: "
"The authority you provided, %s, is not whitelisted. "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe use "on the allow list" instead...idk.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I thought about several options when I drafted that sentence. To me, "allow" does not sound right, because customer can of course choose a customized domain Contoso.com and we MSFT is in no position to disallow that. The precise cause of this error message is that the Contoso.com is not a so-called "well known" domain to MSFT Identity platform. And then again, it feels a bit weird to comment on other's name as "not well known", so I think "not whitelisted" sounds more neutral in tone. What do you think?

"If it is indeed your legit customized domain name, "
"you can turn off this check by passing in "
"validate_authority=False"
% authority_url)
tenant_discovery_endpoint = payload['tenant_discovery_endpoint']
else:
tenant_discovery_endpoint = (
'https://{}{}{}/.well-known/openid-configuration'.format(
self.instance,
authority.path, # In B2C scenario, it is "/tenant/policy"
"" if tenant == "adfs" else "/v2.0" # the AAD v2 endpoint
))
openid_config = tenant_discovery(
tenant_discovery_endpoint,
verify=verify, proxies=proxies, timeout=timeout)
Expand All @@ -58,42 +75,44 @@ def __init__(self, authority_url, validate_authority=True,
_, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID
self.is_adfs = self.tenant.lower() == 'adfs'

def user_realm_discovery(self, username):
resp = requests.get(
"https://{netloc}/common/userrealm/{username}?api-version=1.0".format(
netloc=self.instance, username=username),
headers={'Accept':'application/json'},
verify=self.verify, proxies=self.proxies, timeout=self.timeout)
resp.raise_for_status()
return resp.json()
# It will typically contain "ver", "account_type",
def user_realm_discovery(self, username, response=None):
# It will typically return a dict containing "ver", "account_type",
# "federation_protocol", "cloud_audience_urn",
# "federation_metadata_url", "federation_active_auth_url", etc.
if self.instance not in self.__class__._domains_without_user_realm_discovery:
resp = response or requests.get(
"https://{netloc}/common/userrealm/{username}?api-version=1.0".format(
netloc=self.instance, username=username),
headers={'Accept':'application/json'},
verify=self.verify, proxies=self.proxies, timeout=self.timeout)
if resp.status_code != 404:
resp.raise_for_status()
return resp.json()
self.__class__._domains_without_user_realm_discovery.add(self.instance)
return {} # This can guide the caller to fall back normal ROPC flow


def canonicalize(url):
# Returns (canonicalized_url, netloc, tenant). Raises ValueError on errors.
match_object = re.match(r'https://([^/]+)/([^/?#]+)', url.lower())
if not match_object:
def canonicalize(authority_url):
authority = urlparse(authority_url)
parts = authority.path.split("/")
if authority.scheme != "https" or len(parts) < 2 or not parts[1]:
raise ValueError(
"Your given address (%s) should consist of "
"an https url with a minimum of one segment in a path: e.g. "
"https://login.microsoftonline.com/<tenant_name>" % url)
return match_object.group(0), match_object.group(1), match_object.group(2)
"https://login.microsoftonline.com/<tenant> "
"or https://<tenant_name>.b2clogin.com/<tenant_name>.onmicrosoft.com/policy"
% authority_url)
return authority, authority.netloc, parts[1]

def instance_discovery(url, response=None, **kwargs):
# Returns tenant discovery endpoint
resp = requests.get( # Note: This URL seemingly returns V1 endpoint only
def instance_discovery(url, **kwargs):
return requests.get( # Note: This URL seemingly returns V1 endpoint only
'https://{}/common/discovery/instance'.format(
WORLD_WIDE # Historically using WORLD_WIDE. Could use self.instance too
# See https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadInstanceDiscovery.cs#L101-L103
# and https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadAuthority.cs#L19-L33
),
params={'authorization_endpoint': url, 'api-version': '1.0'},
**kwargs)
payload = response or resp.json()
if 'tenant_discovery_endpoint' not in payload:
raise MsalServiceError(status_code=resp.status_code, **payload)
return payload['tenant_discovery_endpoint']
**kwargs).json()

def tenant_discovery(tenant_discovery_endpoint, **kwargs):
# Returns Openid Configuration
Expand Down
57 changes: 31 additions & 26 deletions tests/test_authority.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import os

from msal.authority import *
from msal.exceptions import MsalServiceError
from tests import unittest


@unittest.skipIf(os.getenv("TRAVIS_TAG"), "Skip network io during tagged release")
class TestAuthority(unittest.TestCase):

def test_wellknown_host_and_tenant(self):
Expand All @@ -26,7 +29,7 @@ def test_lessknown_host_will_return_a_set_of_v1_endpoints(self):
self.assertNotIn('v2.0', a.token_endpoint)

def test_unknown_host_wont_pass_instance_discovery(self):
with self.assertRaisesRegexp(MsalServiceError, "invalid_instance"):
with self.assertRaisesRegexp(ValueError, "invalid_instance"):
Authority('https://unknown.host/tenant_doesnt_matter_in_this_case')

def test_invalid_host_skipping_validation_meets_connection_error_down_the_road(self):
Expand All @@ -37,19 +40,19 @@ def test_invalid_host_skipping_validation_meets_connection_error_down_the_road(s
class TestAuthorityInternalHelperCanonicalize(unittest.TestCase):

def test_canonicalize_tenant_followed_by_extra_paths(self):
self.assertEqual(
canonicalize("https://example.com/tenant/subpath?foo=bar#fragment"),
("https://example.com/tenant", "example.com", "tenant"))
_, i, t = canonicalize("https://example.com/tenant/subpath?foo=bar#fragment")
self.assertEqual("example.com", i)
self.assertEqual("tenant", t)

def test_canonicalize_tenant_followed_by_extra_query(self):
self.assertEqual(
canonicalize("https://example.com/tenant?foo=bar#fragment"),
("https://example.com/tenant", "example.com", "tenant"))
_, i, t = canonicalize("https://example.com/tenant?foo=bar#fragment")
self.assertEqual("example.com", i)
self.assertEqual("tenant", t)

def test_canonicalize_tenant_followed_by_extra_fragment(self):
self.assertEqual(
canonicalize("https://example.com/tenant#fragment"),
("https://example.com/tenant", "example.com", "tenant"))
_, i, t = canonicalize("https://example.com/tenant#fragment")
self.assertEqual("example.com", i)
self.assertEqual("tenant", t)

def test_canonicalize_rejects_non_https(self):
with self.assertRaises(ValueError):
Expand All @@ -64,20 +67,22 @@ def test_canonicalize_rejects_tenantless_host_with_trailing_slash(self):
canonicalize("https://no.tenant.example.com/")


class TestAuthorityInternalHelperInstanceDiscovery(unittest.TestCase):

def test_instance_discovery_happy_case(self):
self.assertEqual(
instance_discovery("https://login.windows.net/tenant"),
"https://login.windows.net/tenant/.well-known/openid-configuration")

def test_instance_discovery_with_unknown_instance(self):
with self.assertRaisesRegexp(MsalServiceError, "invalid_instance"):
instance_discovery('https://unknown.host/tenant_doesnt_matter_here')

def test_instance_discovery_with_mocked_response(self):
mock_response = {'tenant_discovery_endpoint': 'http://a.com/t/openid'}
endpoint = instance_discovery(
"https://login.microsoftonline.in/tenant.com", response=mock_response)
self.assertEqual(endpoint, mock_response['tenant_discovery_endpoint'])
@unittest.skipIf(os.getenv("TRAVIS_TAG"), "Skip network io during tagged release")
class TestAuthorityInternalHelperUserRealmDiscovery(unittest.TestCase):
def test_memorize(self):
# We use a real authority so the constructor can finish tenant discovery
authority = "https://login.microsoftonline.com/common"
self.assertNotIn(authority, Authority._domains_without_user_realm_discovery)
a = Authority(authority, validate_authority=False)

# We now pretend this authority supports no User Realm Discovery
class MockResponse(object):
status_code = 404
a.user_realm_discovery("john.doe@example.com", response=MockResponse())
self.assertIn(
"login.microsoftonline.com",
Authority._domains_without_user_realm_discovery,
"user_realm_discovery() should memorize domains not supporting URD")
a.user_realm_discovery("john.doe@example.com",
response="This would cause exception if memorization did not work")

98 changes: 81 additions & 17 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@
logging.basicConfig(level=logging.INFO)


def _get_app_and_auth_code(
client_id,
client_secret=None,
authority="https://login.microsoftonline.com/common",
port=44331,
scopes=["https://graph.windows.net/.default"],
rayluo marked this conversation as resolved.
Show resolved Hide resolved
):
from msal.oauth2cli.authcode import obtain_auth_code
app = msal.ClientApplication(client_id, client_secret, authority=authority)
redirect_uri = "http://localhost:%d" % port
ac = obtain_auth_code(port, auth_uri=app.get_authorization_request_url(
scopes, redirect_uri=redirect_uri))
assert ac is not None
return (app, ac, redirect_uri)

@unittest.skipIf(os.getenv("TRAVIS_TAG"), "Skip e2e tests during tagged release")
class E2eTestCase(unittest.TestCase):

Expand Down Expand Up @@ -49,9 +64,15 @@ def assertCacheWorksForUser(self, result_from_wire, scope, username=None):
result_from_cache = self.app.acquire_token_silent(scope, account=account)
self.assertIsNotNone(result_from_cache,
"We should get a result from acquire_token_silent(...) call")
self.assertNotEqual(
result_from_wire['access_token'], result_from_cache['access_token'],
"We should get a fresh AT (via RT)")
self.assertIsNotNone(
# We used to assert it this way:
# result_from_wire['access_token'] != result_from_cache['access_token']
# but ROPC in B2C tends to return the same AT we obtained seconds ago.
# Now looking back, "refresh_token grant would return a brand new AT"
# was just an empirical observation but never a committment in specs,
# so we adjust our way to assert here.
(result_from_cache or {}).get("access_token"),
"We should get an AT from acquire_token_silent(...) call")

def assertCacheWorksForApp(self, result_from_wire, scope):
# Going to test acquire_token_silent(...) to locate an AT from cache
Expand All @@ -70,7 +91,10 @@ def _test_username_password(self,
username, password, scopes=scope)
self.assertLoosely(result)
# self.assertEqual(None, result.get("error"), str(result))
self.assertCacheWorksForUser(result, scope, username=username)
self.assertCacheWorksForUser(
result, scope,
username=username if ".b2clogin.com" not in authority else None,
)


THIS_FOLDER = os.path.dirname(__file__)
Expand All @@ -95,23 +119,17 @@ def test_username_password(self):
self._test_username_password(**self.config)

def _get_app_and_auth_code(self):
from msal.oauth2cli.authcode import obtain_auth_code
app = msal.ClientApplication(
return _get_app_and_auth_code(
self.config["client_id"],
client_credential=self.config.get("client_secret"),
authority=self.config.get("authority"))
port = self.config.get("listen_port", 44331)
redirect_uri = "http://localhost:%s" % port
auth_request_uri = app.get_authorization_request_url(
self.config["scope"], redirect_uri=redirect_uri)
ac = obtain_auth_code(port, auth_uri=auth_request_uri)
self.assertNotEqual(ac, None)
return (app, ac, redirect_uri)
client_secret=self.config.get("client_secret"),
authority=self.config.get("authority"),
port=self.config.get("listen_port", 44331),
scopes=self.config["scope"],
)

def test_auth_code(self):
self.skipUnlessWithConfig(["client_id", "scope"])
(self.app, ac, redirect_uri) = self._get_app_and_auth_code()

result = self.app.acquire_token_by_authorization_code(
ac, self.config["scope"], redirect_uri=redirect_uri)
logger.debug("%s.cache = %s",
Expand Down Expand Up @@ -314,7 +332,7 @@ def get_lab_user_secret(cls, lab_name="msidlab4"):
lab_name = lab_name.lower()
if lab_name not in cls._secrets:
logger.info("Querying lab user password for %s", lab_name)
# Note: Short link won't work "https://aka.ms/GetLabUserSecret?Secret=%s"
# Short link only works in browser "https://aka.ms/GetLabUserSecret?Secret=%s"
# So we use the official link written in here
# https://microsoft.sharepoint-df.com/teams/MSIDLABSExtended/SitePages/Programmatically-accessing-LAB-API%27s.aspx
url = ("https://request.msidlab.com/api/GetLabUserSecret?code=KpY5uCcoKo0aW8VOL/CUO3wnu9UF2XbSnLFGk56BDnmQiwD80MQ7HA==&Secret=%s"
Expand Down Expand Up @@ -417,3 +435,49 @@ def test_acquire_token_obo(self):
result = cca.acquire_token_silent(downstream_scopes, account)
self.assertEqual(cca_result["access_token"], result["access_token"])

def _build_b2c_authority(self, policy):
base = "https://msidlabb2c.b2clogin.com/msidlabb2c.onmicrosoft.com"
return base + "/" + policy # We do not support base + "?p=" + policy

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's probably fine as supporting B2C in MSAL.Python it's a new feature.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, that inline comment is inside an internal test case, so it is more like a remind-to-ourselves, but did not mean it as a limitation or deficiency. In fact, currently all the public materials we can find, uses the "base/policy" format, rather than "base?p=policy" format. Such as:

That being said, I did heard from a previous internal presentation that the "base?p=policy" format would also work. We (the entire MSAL fleet) might actually want to switch to that format at a later time, because the current way - putting policy inside authority - has some intrinsic issues, but that is a different topic.


@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
def test_b2c_acquire_token_by_auth_code(self):
"""
When prompted, you can manually login using this account:

username="b2clocal@msidlabb2c.onmicrosoft.com"
# This won't work https://msidlab.com/api/user?usertype=b2c
password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabb2c
"""
scopes = ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"]
(self.app, ac, redirect_uri) = _get_app_and_auth_code(
"b876a048-55a5-4fc5-9403-f5d90cb1c852",
client_secret=self.get_lab_user_secret("MSIDLABB2C-MSAapp-AppSecret"),
authority=self._build_b2c_authority("B2C_1_SignInPolicy"),
port=3843, # Lab defines 4 of them: [3843, 4584, 4843, 60000]
scopes=scopes,
)
result = self.app.acquire_token_by_authorization_code(
ac, scopes, redirect_uri=redirect_uri)
logger.debug(
"%s: cache = %s, id_token_claims = %s",
self.id(),
json.dumps(self.app.token_cache._cache, indent=4),
json.dumps(result.get("id_token_claims"), indent=4),
)
self.assertIn(
"access_token", result,
"{error}: {error_description}".format(
# Note: No interpolation here, cause error won't always present
error=result.get("error"),
error_description=result.get("error_description")))
self.assertCacheWorksForUser(result, scopes, username=None)

def test_b2c_acquire_token_by_ropc(self):
self._test_username_password(
authority=self._build_b2c_authority("B2C_1_ROPC_Auth"),
client_id="e3b9ad76-9763-4827-b088-80c7a7888f79",
username="b2clocal@msidlabb2c.onmicrosoft.com",
password=self.get_lab_user_secret("msidlabb2c"),
scope=["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"],
)