Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 178 additions & 117 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,32 @@ def _test_username_password(self,
username=username if ".b2clogin.com" not in authority else None,
)

def _test_device_flow(
self, client_id=None, authority=None, scope=None, **ignored):
assert client_id and authority and scope
self.app = msal.PublicClientApplication(
client_id, authority=authority)
flow = self.app.initiate_device_flow(scopes=scope)
assert "user_code" in flow, "DF does not seem to be provisioned: %s".format(
json.dumps(flow, indent=4))
logger.info(flow["message"])

duration = 60
logger.info("We will wait up to %d seconds for you to sign in" % duration)
flow["expires_at"] = min( # Shorten the time for quick test
flow["expires_at"], time.time() + duration)
result = self.app.acquire_token_by_device_flow(flow)
self.assertLoosely( # It will skip this test if there is no user interaction
result,
assertion=lambda: self.assertIn('access_token', result),
skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS)
if "access_token" not in result:
self.skip("End user did not complete Device Flow in time")
self.assertCacheWorksForUser(result, scope, username=None)
result["access_token"] = result["refresh_token"] = "************"
logger.info(
"%s obtained tokens: %s", self.id(), json.dumps(result, indent=4))


THIS_FOLDER = os.path.dirname(__file__)
CONFIG = os.path.join(THIS_FOLDER, "config.json")
Expand Down Expand Up @@ -244,29 +270,7 @@ def setUpClass(cls):
cls.config = json.load(f)

def test_device_flow(self):
scopes = self.config["scope"]
self.app = msal.PublicClientApplication(
self.config['client_id'], authority=self.config["authority"])
flow = self.app.initiate_device_flow(scopes=scopes)
assert "user_code" in flow, "DF does not seem to be provisioned: %s".format(
json.dumps(flow, indent=4))
logger.info(flow["message"])

duration = 60
logger.info("We will wait up to %d seconds for you to sign in" % duration)
flow["expires_at"] = min( # Shorten the time for quick test
flow["expires_at"], time.time() + duration)
result = self.app.acquire_token_by_device_flow(flow)
self.assertLoosely( # It will skip this test if there is no user interaction
result,
assertion=lambda: self.assertIn('access_token', result),
skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS)
if "access_token" not in result:
self.skip("End user did not complete Device Flow in time")
self.assertCacheWorksForUser(result, scopes, username=None)
result["access_token"] = result["refresh_token"] = "************"
logger.info(
"%s obtained tokens: %s", self.id(), json.dumps(result, indent=4))
self._test_device_flow(**self.config)


def get_lab_app(
Expand Down Expand Up @@ -322,6 +326,12 @@ def setUpClass(cls):
def tearDownClass(cls):
cls.session.close()

@classmethod
def get_lab_app_object(cls, **query): # https://msidlab.com/swagger/index.html
url = "https://msidlab.com/api/app"
resp = cls.session.get(url, params=query)
return resp.json()[0]

@classmethod
def get_lab_user_secret(cls, lab_name="msidlab4"):
lab_name = lab_name.lower()
Expand All @@ -336,67 +346,29 @@ def get_lab_user_secret(cls, lab_name="msidlab4"):
def get_lab_user(cls, **query): # https://docs.msidlab.com/labapi/userapi.html
resp = cls.session.get("https://msidlab.com/api/user", params=query)
result = resp.json()[0]
_env = query.get("azureenvironment", "").lower()
authority_base = {
"azureusgovernment": "https://login.microsoftonline.us/"
}.get(_env, "https://login.microsoftonline.com/")
scope = {
"azureusgovernment": ["https://graph.microsoft.us/.default"],
}.get(_env, ["https://graph.microsoft.com/.default"])
return { # Mapping lab API response to our simplified configuration format
"authority": "https://login.microsoftonline.com/{}.onmicrosoft.com".format(
result["labName"]),
"authority": authority_base + result["tenantID"],
"client_id": result["appId"],
"username": result["upn"],
"lab_name": result["labName"],
"scope": ["https://graph.microsoft.com/.default"],
"scope": scope,
}

def test_aad_managed_user(self): # Pure cloud
config = self.get_lab_user(usertype="cloud")
self._test_username_password(
password=self.get_lab_user_secret(config["lab_name"]), **config)

def test_adfs4_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv4")
self._test_username_password(
password=self.get_lab_user_secret(config["lab_name"]), **config)

def test_adfs3_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv3")
self._test_username_password(
password=self.get_lab_user_secret(config["lab_name"]), **config)

def test_adfs2_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2")
self._test_username_password(
password=self.get_lab_user_secret(config["lab_name"]), **config)

def test_adfs2019_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2019")
self._test_username_password(
password=self.get_lab_user_secret(config["lab_name"]), **config)

def test_ropc_adfs2019_onprem(self):
config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019")
config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"]
config["client_id"] = "PublicClientId"
config["scope"] = self.adfs2019_scopes
self._test_username_password(
password=self.get_lab_user_secret(config["lab_name"]), **config)

@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
def test_adfs2019_onprem_acquire_token_by_auth_code(self):
"""When prompted, you can manually login using this account:

# https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019
username = "..." # The upn from the link above
password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
"""
scopes = self.adfs2019_scopes
config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019")
def _test_acquire_token_by_auth_code(
self, client_id=None, authority=None, port=None, scope=None,
**ignored):
assert client_id and authority and port and scope
(self.app, ac, redirect_uri) = _get_app_and_auth_code(
# Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259
"PublicClientId",
authority="https://fs.%s.com/adfs" % config["lab_name"],
port=8080,
scopes=scopes,
)
client_id, authority=authority, port=port, scopes=scope)
result = self.app.acquire_token_by_authorization_code(
ac, scopes, redirect_uri=redirect_uri)
ac, scope, redirect_uri=redirect_uri)
logger.debug(
"%s: cache = %s, id_token_claims = %s",
self.id(),
Expand All @@ -409,41 +381,32 @@ def test_adfs2019_onprem_acquire_token_by_auth_code(self):
# Note: No interpolation here, cause error won't always present
error=result.get("error"),
error_description=result.get("error_description")))
self.assertCacheWorksForUser(result, scopes, username=None)

@unittest.skipUnless(
os.getenv("LAB_OBO_CLIENT_SECRET"),
"Need LAB_OBO_CLIENT SECRET from https://msidlabs.vault.azure.net/secrets/TodoListServiceV2-OBO/c58ba97c34ca4464886943a847d1db56")
def test_acquire_token_obo(self):
# Some hardcoded, pre-defined settings
obo_client_id = "f4aa5217-e87c-42b2-82af-5624dd14ee72"
downstream_scopes = ["https://graph.microsoft.com/.default"]
config = self.get_lab_user(usertype="cloud")
self.assertCacheWorksForUser(result, scope, username=None)

def _test_acquire_token_obo(self, config_pca, config_cca):
# 1. An app obtains a token representing a user, for our mid-tier service
pca = msal.PublicClientApplication(
"c0485386-1e9a-4663-bc96-7ab30656de7f", authority=config.get("authority"))
config_pca["client_id"], authority=config_pca["authority"])
pca_result = pca.acquire_token_by_username_password(
config["username"],
self.get_lab_user_secret(config["lab_name"]),
scopes=[ # The OBO app's scope. Yours might be different.
"api://%s/read" % obo_client_id],
config_pca["username"],
config_pca["password"],
scopes=config_pca["scope"],
)
self.assertIsNotNone(
pca_result.get("access_token"),
"PCA failed to get AT because %s" % json.dumps(pca_result, indent=2))

# 2. Our mid-tier service uses OBO to obtain a token for downstream service
cca = msal.ConfidentialClientApplication(
obo_client_id,
client_credential=os.getenv("LAB_OBO_CLIENT_SECRET"),
authority=config.get("authority"),
config_cca["client_id"],
client_credential=config_cca["client_secret"],
authority=config_cca["authority"],
# token_cache= ..., # Default token cache is all-tokens-store-in-memory.
# That's fine if OBO app uses short-lived msal instance per session.
# Otherwise, the OBO app need to implement a one-cache-per-user setup.
)
cca_result = cca.acquire_token_on_behalf_of(
pca_result['access_token'], downstream_scopes)
pca_result['access_token'], config_cca["scope"])
self.assertNotEqual(None, cca_result.get("access_token"), str(cca_result))

# 3. Now the OBO app can simply store downstream token(s) in same session.
Expand All @@ -453,12 +416,93 @@ def test_acquire_token_obo(self):
# Assuming you already did that (which is not shown in this test case),
# the following part shows one of the ways to obtain an AT from cache.
username = cca_result.get("id_token_claims", {}).get("preferred_username")
self.assertEqual(config["username"], username)
self.assertEqual(config_cca["username"], username)
if username: # A precaution so that we won't use other user's token
account = cca.get_accounts(username=username)[0]
result = cca.acquire_token_silent(downstream_scopes, account)
result = cca.acquire_token_silent(config_cca["scope"], account)
self.assertEqual(cca_result["access_token"], result["access_token"])

def _test_acquire_token_by_client_secret(
self, client_id=None, client_secret=None, authority=None, scope=None,
**ignored):
assert client_id and client_secret and authority and scope
app = msal.ConfidentialClientApplication(
client_id, client_credential=client_secret, authority=authority)
result = app.acquire_token_for_client(scope)
self.assertIsNotNone(result.get("access_token"), "Got %s instead" % result)


class WorldWideTestCase(LabBasedTestCase):

def test_aad_managed_user(self): # Pure cloud
config = self.get_lab_user(usertype="cloud")
config["password"] = self.get_lab_user_secret(config["lab_name"])
self._test_username_password(**config)

def test_adfs4_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv4")
config["password"] = self.get_lab_user_secret(config["lab_name"])
self._test_username_password(**config)

def test_adfs3_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv3")
config["password"] = self.get_lab_user_secret(config["lab_name"])
self._test_username_password(**config)

def test_adfs2_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2")
config["password"] = self.get_lab_user_secret(config["lab_name"])
self._test_username_password(**config)

def test_adfs2019_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2019")
config["password"] = self.get_lab_user_secret(config["lab_name"])
self._test_username_password(**config)

def test_ropc_adfs2019_onprem(self):
# Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259
config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019")
config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"]
config["client_id"] = "PublicClientId"
config["scope"] = self.adfs2019_scopes
config["password"] = self.get_lab_user_secret(config["lab_name"])
self._test_username_password(**config)

@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
def test_adfs2019_onprem_acquire_token_by_auth_code(self):
"""When prompted, you can manually login using this account:

# https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019
username = "..." # The upn from the link above
password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
"""
config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019")
config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"]
config["client_id"] = "PublicClientId"
config["scope"] = self.adfs2019_scopes
config["port"] = 8080
self._test_acquire_token_by_auth_code(**config)

@unittest.skipUnless(
os.getenv("LAB_OBO_CLIENT_SECRET"),
"Need LAB_OBO_CLIENT SECRET from https://msidlabs.vault.azure.net/secrets/TodoListServiceV2-OBO/c58ba97c34ca4464886943a847d1db56")
def test_acquire_token_obo(self):
config = self.get_lab_user(usertype="cloud")

config_cca = {}
config_cca.update(config)
config_cca["client_id"] = "f4aa5217-e87c-42b2-82af-5624dd14ee72"
config_cca["scope"] = ["https://graph.microsoft.com/.default"]
config_cca["client_secret"] = os.getenv("LAB_OBO_CLIENT_SECRET")

config_pca = {}
config_pca.update(config)
config_pca["client_id"] = "c0485386-1e9a-4663-bc96-7ab30656de7f"
config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"])
config_pca["scope"] = ["api://%s/read" % config_cca["client_id"]]

self._test_acquire_token_obo(config_pca, config_cca)

def _build_b2c_authority(self, policy):
base = "https://msidlabb2c.b2clogin.com/msidlabb2c.onmicrosoft.com"
return base + "/" + policy # We do not support base + "?p=" + policy
Expand All @@ -472,29 +516,12 @@ def test_b2c_acquire_token_by_auth_code(self):
# This won't work https://msidlab.com/api/user?usertype=b2c
password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabb2c
"""
scopes = ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"]
(self.app, ac, redirect_uri) = _get_app_and_auth_code(
"b876a048-55a5-4fc5-9403-f5d90cb1c852",
client_secret=self.get_lab_user_secret("MSIDLABB2C-MSAapp-AppSecret"),
self._test_acquire_token_by_auth_code(
authority=self._build_b2c_authority("B2C_1_SignInPolicy"),
client_id="b876a048-55a5-4fc5-9403-f5d90cb1c852",
port=3843, # Lab defines 4 of them: [3843, 4584, 4843, 60000]
scopes=scopes,
scope=["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"]
)
result = self.app.acquire_token_by_authorization_code(
ac, scopes, redirect_uri=redirect_uri)
logger.debug(
"%s: cache = %s, id_token_claims = %s",
self.id(),
json.dumps(self.app.token_cache._cache, indent=4),
json.dumps(result.get("id_token_claims"), indent=4),
)
self.assertIn(
"access_token", result,
"{error}: {error_description}".format(
# Note: No interpolation here, cause error won't always present
error=result.get("error"),
error_description=result.get("error_description")))
self.assertCacheWorksForUser(result, scopes, username=None)

def test_b2c_acquire_token_by_ropc(self):
self._test_username_password(
Expand All @@ -505,6 +532,40 @@ def test_b2c_acquire_token_by_ropc(self):
scope=["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"],
)


class ArlingtonCloudTestCase(LabBasedTestCase):
environment = "azureusgovernment"

def test_acquire_token_by_ropc(self):
config = self.get_lab_user(azureenvironment=self.environment)
config["password"] = self.get_lab_user_secret(config["lab_name"])
self._test_username_password(**config)

def test_acquire_token_by_client_secret(self):
config = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="no")
config["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret")
self._test_acquire_token_by_client_secret(**config)

def test_acquire_token_obo(self):
config_cca = self.get_lab_user(
usertype="cloud", azureenvironment=self.environment, publicClient="no")
config_cca["scope"] = ["https://graph.microsoft.us/.default"]
config_cca["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret")

config_pca = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="yes")
obo_app_object = self.get_lab_app_object(
usertype="cloud", azureenvironment=self.environment, publicClient="no")
config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"])
config_pca["scope"] = ["{app_uri}/files.read".format(app_uri=obo_app_object.get("identifierUris"))]

self._test_acquire_token_obo(config_pca, config_cca)

def test_acquire_token_device_flow(self):
config = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="yes")
config["scope"] = ["user.read"]
self._test_device_flow(**config)


if __name__ == "__main__":
unittest.main()