diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 887e1609..5abd5d61 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -102,6 +102,32 @@ def _test_username_password(self, username=username if ".b2clogin.com" not in authority else None, ) + def _test_device_flow( + self, client_id=None, authority=None, scope=None, **ignored): + assert client_id and authority and scope + self.app = msal.PublicClientApplication( + client_id, authority=authority) + flow = self.app.initiate_device_flow(scopes=scope) + assert "user_code" in flow, "DF does not seem to be provisioned: %s".format( + json.dumps(flow, indent=4)) + logger.info(flow["message"]) + + duration = 60 + logger.info("We will wait up to %d seconds for you to sign in" % duration) + flow["expires_at"] = min( # Shorten the time for quick test + flow["expires_at"], time.time() + duration) + result = self.app.acquire_token_by_device_flow(flow) + self.assertLoosely( # It will skip this test if there is no user interaction + result, + assertion=lambda: self.assertIn('access_token', result), + skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS) + if "access_token" not in result: + self.skip("End user did not complete Device Flow in time") + self.assertCacheWorksForUser(result, scope, username=None) + result["access_token"] = result["refresh_token"] = "************" + logger.info( + "%s obtained tokens: %s", self.id(), json.dumps(result, indent=4)) + THIS_FOLDER = os.path.dirname(__file__) CONFIG = os.path.join(THIS_FOLDER, "config.json") @@ -244,29 +270,7 @@ def setUpClass(cls): cls.config = json.load(f) def test_device_flow(self): - scopes = self.config["scope"] - self.app = msal.PublicClientApplication( - self.config['client_id'], authority=self.config["authority"]) - flow = self.app.initiate_device_flow(scopes=scopes) - assert "user_code" in flow, "DF does not seem to be provisioned: %s".format( - json.dumps(flow, indent=4)) - logger.info(flow["message"]) - - duration = 60 - logger.info("We will wait up to %d seconds for you to sign in" % duration) - flow["expires_at"] = min( # Shorten the time for quick test - flow["expires_at"], time.time() + duration) - result = self.app.acquire_token_by_device_flow(flow) - self.assertLoosely( # It will skip this test if there is no user interaction - result, - assertion=lambda: self.assertIn('access_token', result), - skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS) - if "access_token" not in result: - self.skip("End user did not complete Device Flow in time") - self.assertCacheWorksForUser(result, scopes, username=None) - result["access_token"] = result["refresh_token"] = "************" - logger.info( - "%s obtained tokens: %s", self.id(), json.dumps(result, indent=4)) + self._test_device_flow(**self.config) def get_lab_app( @@ -322,6 +326,12 @@ def setUpClass(cls): def tearDownClass(cls): cls.session.close() + @classmethod + def get_lab_app_object(cls, **query): # https://msidlab.com/swagger/index.html + url = "https://msidlab.com/api/app" + resp = cls.session.get(url, params=query) + return resp.json()[0] + @classmethod def get_lab_user_secret(cls, lab_name="msidlab4"): lab_name = lab_name.lower() @@ -336,67 +346,29 @@ def get_lab_user_secret(cls, lab_name="msidlab4"): def get_lab_user(cls, **query): # https://docs.msidlab.com/labapi/userapi.html resp = cls.session.get("https://msidlab.com/api/user", params=query) result = resp.json()[0] + _env = query.get("azureenvironment", "").lower() + authority_base = { + "azureusgovernment": "https://login.microsoftonline.us/" + }.get(_env, "https://login.microsoftonline.com/") + scope = { + "azureusgovernment": ["https://graph.microsoft.us/.default"], + }.get(_env, ["https://graph.microsoft.com/.default"]) return { # Mapping lab API response to our simplified configuration format - "authority": "https://login.microsoftonline.com/{}.onmicrosoft.com".format( - result["labName"]), + "authority": authority_base + result["tenantID"], "client_id": result["appId"], "username": result["upn"], "lab_name": result["labName"], - "scope": ["https://graph.microsoft.com/.default"], + "scope": scope, } - def test_aad_managed_user(self): # Pure cloud - config = self.get_lab_user(usertype="cloud") - self._test_username_password( - password=self.get_lab_user_secret(config["lab_name"]), **config) - - def test_adfs4_fed_user(self): - config = self.get_lab_user(usertype="federated", federationProvider="ADFSv4") - self._test_username_password( - password=self.get_lab_user_secret(config["lab_name"]), **config) - - def test_adfs3_fed_user(self): - config = self.get_lab_user(usertype="federated", federationProvider="ADFSv3") - self._test_username_password( - password=self.get_lab_user_secret(config["lab_name"]), **config) - - def test_adfs2_fed_user(self): - config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2") - self._test_username_password( - password=self.get_lab_user_secret(config["lab_name"]), **config) - - def test_adfs2019_fed_user(self): - config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2019") - self._test_username_password( - password=self.get_lab_user_secret(config["lab_name"]), **config) - - def test_ropc_adfs2019_onprem(self): - config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") - config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"] - config["client_id"] = "PublicClientId" - config["scope"] = self.adfs2019_scopes - self._test_username_password( - password=self.get_lab_user_secret(config["lab_name"]), **config) - - @unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented") - def test_adfs2019_onprem_acquire_token_by_auth_code(self): - """When prompted, you can manually login using this account: - - # https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019 - username = "..." # The upn from the link above - password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ - """ - scopes = self.adfs2019_scopes - config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") + def _test_acquire_token_by_auth_code( + self, client_id=None, authority=None, port=None, scope=None, + **ignored): + assert client_id and authority and port and scope (self.app, ac, redirect_uri) = _get_app_and_auth_code( - # Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259 - "PublicClientId", - authority="https://fs.%s.com/adfs" % config["lab_name"], - port=8080, - scopes=scopes, - ) + client_id, authority=authority, port=port, scopes=scope) result = self.app.acquire_token_by_authorization_code( - ac, scopes, redirect_uri=redirect_uri) + ac, scope, redirect_uri=redirect_uri) logger.debug( "%s: cache = %s, id_token_claims = %s", self.id(), @@ -409,25 +381,16 @@ def test_adfs2019_onprem_acquire_token_by_auth_code(self): # Note: No interpolation here, cause error won't always present error=result.get("error"), error_description=result.get("error_description"))) - self.assertCacheWorksForUser(result, scopes, username=None) - - @unittest.skipUnless( - os.getenv("LAB_OBO_CLIENT_SECRET"), - "Need LAB_OBO_CLIENT SECRET from https://msidlabs.vault.azure.net/secrets/TodoListServiceV2-OBO/c58ba97c34ca4464886943a847d1db56") - def test_acquire_token_obo(self): - # Some hardcoded, pre-defined settings - obo_client_id = "f4aa5217-e87c-42b2-82af-5624dd14ee72" - downstream_scopes = ["https://graph.microsoft.com/.default"] - config = self.get_lab_user(usertype="cloud") + self.assertCacheWorksForUser(result, scope, username=None) + def _test_acquire_token_obo(self, config_pca, config_cca): # 1. An app obtains a token representing a user, for our mid-tier service pca = msal.PublicClientApplication( - "c0485386-1e9a-4663-bc96-7ab30656de7f", authority=config.get("authority")) + config_pca["client_id"], authority=config_pca["authority"]) pca_result = pca.acquire_token_by_username_password( - config["username"], - self.get_lab_user_secret(config["lab_name"]), - scopes=[ # The OBO app's scope. Yours might be different. - "api://%s/read" % obo_client_id], + config_pca["username"], + config_pca["password"], + scopes=config_pca["scope"], ) self.assertIsNotNone( pca_result.get("access_token"), @@ -435,15 +398,15 @@ def test_acquire_token_obo(self): # 2. Our mid-tier service uses OBO to obtain a token for downstream service cca = msal.ConfidentialClientApplication( - obo_client_id, - client_credential=os.getenv("LAB_OBO_CLIENT_SECRET"), - authority=config.get("authority"), + config_cca["client_id"], + client_credential=config_cca["client_secret"], + authority=config_cca["authority"], # token_cache= ..., # Default token cache is all-tokens-store-in-memory. # That's fine if OBO app uses short-lived msal instance per session. # Otherwise, the OBO app need to implement a one-cache-per-user setup. ) cca_result = cca.acquire_token_on_behalf_of( - pca_result['access_token'], downstream_scopes) + pca_result['access_token'], config_cca["scope"]) self.assertNotEqual(None, cca_result.get("access_token"), str(cca_result)) # 3. Now the OBO app can simply store downstream token(s) in same session. @@ -453,12 +416,93 @@ def test_acquire_token_obo(self): # Assuming you already did that (which is not shown in this test case), # the following part shows one of the ways to obtain an AT from cache. username = cca_result.get("id_token_claims", {}).get("preferred_username") - self.assertEqual(config["username"], username) + self.assertEqual(config_cca["username"], username) if username: # A precaution so that we won't use other user's token account = cca.get_accounts(username=username)[0] - result = cca.acquire_token_silent(downstream_scopes, account) + result = cca.acquire_token_silent(config_cca["scope"], account) self.assertEqual(cca_result["access_token"], result["access_token"]) + def _test_acquire_token_by_client_secret( + self, client_id=None, client_secret=None, authority=None, scope=None, + **ignored): + assert client_id and client_secret and authority and scope + app = msal.ConfidentialClientApplication( + client_id, client_credential=client_secret, authority=authority) + result = app.acquire_token_for_client(scope) + self.assertIsNotNone(result.get("access_token"), "Got %s instead" % result) + + +class WorldWideTestCase(LabBasedTestCase): + + def test_aad_managed_user(self): # Pure cloud + config = self.get_lab_user(usertype="cloud") + config["password"] = self.get_lab_user_secret(config["lab_name"]) + self._test_username_password(**config) + + def test_adfs4_fed_user(self): + config = self.get_lab_user(usertype="federated", federationProvider="ADFSv4") + config["password"] = self.get_lab_user_secret(config["lab_name"]) + self._test_username_password(**config) + + def test_adfs3_fed_user(self): + config = self.get_lab_user(usertype="federated", federationProvider="ADFSv3") + config["password"] = self.get_lab_user_secret(config["lab_name"]) + self._test_username_password(**config) + + def test_adfs2_fed_user(self): + config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2") + config["password"] = self.get_lab_user_secret(config["lab_name"]) + self._test_username_password(**config) + + def test_adfs2019_fed_user(self): + config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2019") + config["password"] = self.get_lab_user_secret(config["lab_name"]) + self._test_username_password(**config) + + def test_ropc_adfs2019_onprem(self): + # Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259 + config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") + config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"] + config["client_id"] = "PublicClientId" + config["scope"] = self.adfs2019_scopes + config["password"] = self.get_lab_user_secret(config["lab_name"]) + self._test_username_password(**config) + + @unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented") + def test_adfs2019_onprem_acquire_token_by_auth_code(self): + """When prompted, you can manually login using this account: + + # https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019 + username = "..." # The upn from the link above + password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ + """ + config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") + config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"] + config["client_id"] = "PublicClientId" + config["scope"] = self.adfs2019_scopes + config["port"] = 8080 + self._test_acquire_token_by_auth_code(**config) + + @unittest.skipUnless( + os.getenv("LAB_OBO_CLIENT_SECRET"), + "Need LAB_OBO_CLIENT SECRET from https://msidlabs.vault.azure.net/secrets/TodoListServiceV2-OBO/c58ba97c34ca4464886943a847d1db56") + def test_acquire_token_obo(self): + config = self.get_lab_user(usertype="cloud") + + config_cca = {} + config_cca.update(config) + config_cca["client_id"] = "f4aa5217-e87c-42b2-82af-5624dd14ee72" + config_cca["scope"] = ["https://graph.microsoft.com/.default"] + config_cca["client_secret"] = os.getenv("LAB_OBO_CLIENT_SECRET") + + config_pca = {} + config_pca.update(config) + config_pca["client_id"] = "c0485386-1e9a-4663-bc96-7ab30656de7f" + config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"]) + config_pca["scope"] = ["api://%s/read" % config_cca["client_id"]] + + self._test_acquire_token_obo(config_pca, config_cca) + def _build_b2c_authority(self, policy): base = "https://msidlabb2c.b2clogin.com/msidlabb2c.onmicrosoft.com" return base + "/" + policy # We do not support base + "?p=" + policy @@ -472,29 +516,12 @@ def test_b2c_acquire_token_by_auth_code(self): # This won't work https://msidlab.com/api/user?usertype=b2c password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabb2c """ - scopes = ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"] - (self.app, ac, redirect_uri) = _get_app_and_auth_code( - "b876a048-55a5-4fc5-9403-f5d90cb1c852", - client_secret=self.get_lab_user_secret("MSIDLABB2C-MSAapp-AppSecret"), + self._test_acquire_token_by_auth_code( authority=self._build_b2c_authority("B2C_1_SignInPolicy"), + client_id="b876a048-55a5-4fc5-9403-f5d90cb1c852", port=3843, # Lab defines 4 of them: [3843, 4584, 4843, 60000] - scopes=scopes, + scope=["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"] ) - result = self.app.acquire_token_by_authorization_code( - ac, scopes, redirect_uri=redirect_uri) - logger.debug( - "%s: cache = %s, id_token_claims = %s", - self.id(), - json.dumps(self.app.token_cache._cache, indent=4), - json.dumps(result.get("id_token_claims"), indent=4), - ) - self.assertIn( - "access_token", result, - "{error}: {error_description}".format( - # Note: No interpolation here, cause error won't always present - error=result.get("error"), - error_description=result.get("error_description"))) - self.assertCacheWorksForUser(result, scopes, username=None) def test_b2c_acquire_token_by_ropc(self): self._test_username_password( @@ -505,6 +532,40 @@ def test_b2c_acquire_token_by_ropc(self): scope=["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"], ) + +class ArlingtonCloudTestCase(LabBasedTestCase): + environment = "azureusgovernment" + + def test_acquire_token_by_ropc(self): + config = self.get_lab_user(azureenvironment=self.environment) + config["password"] = self.get_lab_user_secret(config["lab_name"]) + self._test_username_password(**config) + + def test_acquire_token_by_client_secret(self): + config = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="no") + config["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret") + self._test_acquire_token_by_client_secret(**config) + + def test_acquire_token_obo(self): + config_cca = self.get_lab_user( + usertype="cloud", azureenvironment=self.environment, publicClient="no") + config_cca["scope"] = ["https://graph.microsoft.us/.default"] + config_cca["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret") + + config_pca = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="yes") + obo_app_object = self.get_lab_app_object( + usertype="cloud", azureenvironment=self.environment, publicClient="no") + config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"]) + config_pca["scope"] = ["{app_uri}/files.read".format(app_uri=obo_app_object.get("identifierUris"))] + + self._test_acquire_token_obo(config_pca, config_cca) + + def test_acquire_token_device_flow(self): + config = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="yes") + config["scope"] = ["user.read"] + self._test_device_flow(**config) + + if __name__ == "__main__": unittest.main()