From 8e0b7dc28568ce9f2e9056ba4cecbbf235f10fbe Mon Sep 17 00:00:00 2001 From: Abhidnya Patil Date: Fri, 28 Feb 2020 02:57:02 -0800 Subject: [PATCH 01/10] Initial commit for ropc --- tests/test_e2e.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 887e1609..f338433d 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -336,9 +336,13 @@ def get_lab_user_secret(cls, lab_name="msidlab4"): def get_lab_user(cls, **query): # https://docs.msidlab.com/labapi/userapi.html resp = cls.session.get("https://msidlab.com/api/user", params=query) result = resp.json()[0] + authority_base = "https://login.microsoftonline.com/" + graph_endpoint = "https://graph.microsoft.com/.default" + if "azureenvironment" in query and query["azureenvironment"] == "azureusgovernment": + authority_base = "https://login.microsoftonline.us/" + graph_endpoint = "https://graph.microsoft.us/.default" return { # Mapping lab API response to our simplified configuration format - "authority": "https://login.microsoftonline.com/{}.onmicrosoft.com".format( - result["labName"]), + "authority": authority_base + result["tenantID"], "client_id": result["appId"], "username": result["upn"], "lab_name": result["labName"], @@ -505,6 +509,11 @@ def test_b2c_acquire_token_by_ropc(self): scope=["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"], ) + def test_arlington_acquire_token_by_ropc(self): + config = self.get_lab_user(azureenvironment="azureusgovernment") + self._test_username_password( + password=self.get_lab_user_secret(config["lab_name"]), **config) + if __name__ == "__main__": unittest.main() From 179a2caa68fa20e059769ac94a146600bbfdc902 Mon Sep 17 00:00:00 2001 From: Abhidnya Patil Date: Fri, 28 Feb 2020 14:02:16 -0800 Subject: [PATCH 02/10] Adding rest of the flows --- tests/test_e2e.py | 90 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index f338433d..82faf3b4 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -322,6 +322,12 @@ def setUpClass(cls): def tearDownClass(cls): cls.session.close() + @classmethod + def get_lab_app_object(cls, **query ): + url = "https://msidlab.com/api/app" + resp = cls.session.get(url, params=query) + return resp.json()[0] + @classmethod def get_lab_user_secret(cls, lab_name="msidlab4"): lab_name = lab_name.lower() @@ -346,7 +352,7 @@ def get_lab_user(cls, **query): # https://docs.msidlab.com/labapi/userapi.html "client_id": result["appId"], "username": result["upn"], "lab_name": result["labName"], - "scope": ["https://graph.microsoft.com/.default"], + "scope": [graph_endpoint], } def test_aad_managed_user(self): # Pure cloud @@ -514,6 +520,88 @@ def test_arlington_acquire_token_by_ropc(self): self._test_username_password( password=self.get_lab_user_secret(config["lab_name"]), **config) + def test_arlington_acquire_token_by_client_secret(self): + config = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="no") + app = msal.ConfidentialClientApplication(config.get("client_id"), + client_credential=self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret"), + authority=config.get("authority"), + ) + result = app.acquire_token_for_client(config.get("scope")) + self.assertNotEqual(None, result.get("access_token"), str(result)) + + def test_arlington_acquire_token_obo(self): + obo_config = self.get_lab_user( + usertype="cloud", azureenvironment="azureusgovernment", publicClient="no") + obo_app_object = self.get_lab_app_object( + usertype="cloud", azureenvironment="azureusgovernment", publicClient="no") + downstream_scopes = ["https://graph.microsoft.com/.default"] + config = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="yes") + + # 1. An app obtains a token representing a user, for our mid-tier service + pca = msal.PublicClientApplication( + client_id=config.get("client_id"), authority=config.get("authority")) + pca_result = pca.acquire_token_by_username_password( + config["username"], + self.get_lab_user_secret(config["lab_name"]), + scopes=[ # The OBO app's scope. Yours might be different. + "{app_uri}/files.read".format(app_uri=obo_app_object.get("identifierUris"))], + ) + self.assertIsNotNone( + pca_result.get("access_token"), + "PCA failed to get AT because %s" % json.dumps(pca_result, indent=2)) + + # 2. Our mid-tier service uses OBO to obtain a token for downstream service + cca = msal.ConfidentialClientApplication( + obo_config.get("client_id"), + client_credential=self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret"), + authority=obo_config.get("authority"), + # token_cache= ..., # Default token cache is all-tokens-store-in-memory. + # That's fine if OBO app uses short-lived msal instance per session. + # Otherwise, the OBO app need to implement a one-cache-per-user setup. + ) + cca_result = cca.acquire_token_on_behalf_of( + pca_result['access_token'], downstream_scopes) + self.assertNotEqual(None, cca_result.get("access_token"), str(cca_result)) + + # 3. Now the OBO app can simply store downstream token(s) in same session. + # Alternatively, if you want to persist the downstream AT, and possibly + # the RT (if any) for prolonged access even after your own AT expires, + # now it is the time to persist current cache state for current user. + # Assuming you already did that (which is not shown in this test case), + # the following part shows one of the ways to obtain an AT from cache. + username = cca_result.get("id_token_claims", {}).get("preferred_username") + self.assertEqual(config["username"], username) + if username: # A precaution so that we won't use other user's token + account = cca.get_accounts(username=username)[0] + result = cca.acquire_token_silent(downstream_scopes, account) + self.assertEqual(cca_result["access_token"], result["access_token"]) + + def test_arlington_acquire_token_device_code(self): + config = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="yes") + scopes = ["user.read"] + self.app = msal.PublicClientApplication( + config['client_id'], authority=config["authority"]) + flow = self.app.initiate_device_flow(scopes=scopes) + assert "user_code" in flow, "DF does not seem to be provisioned: %s".format( + json.dumps(flow, indent=4)) + logger.info(flow["message"]) + + duration = 60 + logger.info("We will wait up to %d seconds for you to sign in" % duration) + flow["expires_at"] = min( # Shorten the time for quick test + flow["expires_at"], time.time() + duration) + result = self.app.acquire_token_by_device_flow(flow) + self.assertLoosely( # It will skip this test if there is no user interaction + result, + assertion=lambda: self.assertIn('access_token', result), + skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS) + if "access_token" not in result: + self.skip("End user did not complete Device Flow in time") + self.assertCacheWorksForUser(result, scopes, username=None) + result["access_token"] = result["refresh_token"] = "************" + logger.info( + "%s obtained tokens: %s", self.id(), json.dumps(result, indent=4)) + if __name__ == "__main__": unittest.main() From e62f34e413fdbf7a127bc9ef864ea2b47f256a87 Mon Sep 17 00:00:00 2001 From: Abhidnya Patil Date: Wed, 11 Mar 2020 15:35:14 -0700 Subject: [PATCH 03/10] Refactoring --- tests/test_e2e.py | 320 +++++++++++++++++++++------------------------- 1 file changed, 149 insertions(+), 171 deletions(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 82faf3b4..8a65139d 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -323,7 +323,7 @@ def tearDownClass(cls): cls.session.close() @classmethod - def get_lab_app_object(cls, **query ): + def get_lab_app_object(cls, **query ): # https://msidlab.com/swagger/index.html url = "https://msidlab.com/api/app" resp = cls.session.get(url, params=query) return resp.json()[0] @@ -355,58 +355,18 @@ def get_lab_user(cls, **query): # https://docs.msidlab.com/labapi/userapi.html "scope": [graph_endpoint], } - def test_aad_managed_user(self): # Pure cloud - config = self.get_lab_user(usertype="cloud") - self._test_username_password( - password=self.get_lab_user_secret(config["lab_name"]), **config) - - def test_adfs4_fed_user(self): - config = self.get_lab_user(usertype="federated", federationProvider="ADFSv4") - self._test_username_password( - password=self.get_lab_user_secret(config["lab_name"]), **config) - - def test_adfs3_fed_user(self): - config = self.get_lab_user(usertype="federated", federationProvider="ADFSv3") - self._test_username_password( - password=self.get_lab_user_secret(config["lab_name"]), **config) - - def test_adfs2_fed_user(self): - config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2") - self._test_username_password( - password=self.get_lab_user_secret(config["lab_name"]), **config) - - def test_adfs2019_fed_user(self): - config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2019") - self._test_username_password( - password=self.get_lab_user_secret(config["lab_name"]), **config) - - def test_ropc_adfs2019_onprem(self): - config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") - config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"] - config["client_id"] = "PublicClientId" - config["scope"] = self.adfs2019_scopes - self._test_username_password( - password=self.get_lab_user_secret(config["lab_name"]), **config) - - @unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented") - def test_adfs2019_onprem_acquire_token_by_auth_code(self): - """When prompted, you can manually login using this account: + def _test_username_password_lab(self, config): + self._test_username_password(**config) - # https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019 - username = "..." # The upn from the link above - password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ - """ - scopes = self.adfs2019_scopes - config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") + def _test_acquire_token_by_auth_code_lab(self, config): (self.app, ac, redirect_uri) = _get_app_and_auth_code( - # Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259 - "PublicClientId", - authority="https://fs.%s.com/adfs" % config["lab_name"], - port=8080, - scopes=scopes, + config["client_id"], + authority=config["authority"], + port=config["port"], + scopes=config["scope"], ) result = self.app.acquire_token_by_authorization_code( - ac, scopes, redirect_uri=redirect_uri) + ac, config["scopes"], redirect_uri=redirect_uri) logger.debug( "%s: cache = %s, id_token_claims = %s", self.id(), @@ -419,25 +379,16 @@ def test_adfs2019_onprem_acquire_token_by_auth_code(self): # Note: No interpolation here, cause error won't always present error=result.get("error"), error_description=result.get("error_description"))) - self.assertCacheWorksForUser(result, scopes, username=None) - - @unittest.skipUnless( - os.getenv("LAB_OBO_CLIENT_SECRET"), - "Need LAB_OBO_CLIENT SECRET from https://msidlabs.vault.azure.net/secrets/TodoListServiceV2-OBO/c58ba97c34ca4464886943a847d1db56") - def test_acquire_token_obo(self): - # Some hardcoded, pre-defined settings - obo_client_id = "f4aa5217-e87c-42b2-82af-5624dd14ee72" - downstream_scopes = ["https://graph.microsoft.com/.default"] - config = self.get_lab_user(usertype="cloud") + self.assertCacheWorksForUser(result, config["scopes"], username=None) + def _test_acquire_token_obo_lab(self, config_pca, config_cca): # 1. An app obtains a token representing a user, for our mid-tier service pca = msal.PublicClientApplication( - "c0485386-1e9a-4663-bc96-7ab30656de7f", authority=config.get("authority")) + config_pca["client_id"], authority=config_pca["authority"]) pca_result = pca.acquire_token_by_username_password( - config["username"], - self.get_lab_user_secret(config["lab_name"]), - scopes=[ # The OBO app's scope. Yours might be different. - "api://%s/read" % obo_client_id], + config_pca["username"], + config_pca["password"], + scopes=config_pca["scope"], ) self.assertIsNotNone( pca_result.get("access_token"), @@ -445,15 +396,15 @@ def test_acquire_token_obo(self): # 2. Our mid-tier service uses OBO to obtain a token for downstream service cca = msal.ConfidentialClientApplication( - obo_client_id, - client_credential=os.getenv("LAB_OBO_CLIENT_SECRET"), - authority=config.get("authority"), + config_cca["client_id"], + client_credential=config_cca["client_secret"], + authority=config_cca["authority"], # token_cache= ..., # Default token cache is all-tokens-store-in-memory. # That's fine if OBO app uses short-lived msal instance per session. # Otherwise, the OBO app need to implement a one-cache-per-user setup. ) cca_result = cca.acquire_token_on_behalf_of( - pca_result['access_token'], downstream_scopes) + pca_result['access_token'], config_cca["scope"]) self.assertNotEqual(None, cca_result.get("access_token"), str(cca_result)) # 3. Now the OBO app can simply store downstream token(s) in same session. @@ -463,12 +414,114 @@ def test_acquire_token_obo(self): # Assuming you already did that (which is not shown in this test case), # the following part shows one of the ways to obtain an AT from cache. username = cca_result.get("id_token_claims", {}).get("preferred_username") - self.assertEqual(config["username"], username) + self.assertEqual(config_cca["username"], username) if username: # A precaution so that we won't use other user's token account = cca.get_accounts(username=username)[0] - result = cca.acquire_token_silent(downstream_scopes, account) + result = cca.acquire_token_silent(config_cca["scope"], account) self.assertEqual(cca_result["access_token"], result["access_token"]) + def _test_acquire_token_by_client_secret_lab(self, config): + app = msal.ConfidentialClientApplication(config["client_id"], + client_credential=config["client_secret"], + authority=config["authority"], + ) + result = app.acquire_token_for_client(config["scope"]) + self.assertNotEqual(None, result.get("access_token"), str(result)) + + def _test_acquire_token_device_code_lab(self, config): + self.app = msal.PublicClientApplication( + config['client_id'], authority=config["authority"]) + flow = self.app.initiate_device_flow(scopes=config["scope"]) + assert "user_code" in flow, "DF does not seem to be provisioned: %s".format( + json.dumps(flow, indent=4)) + logger.info(flow["message"]) + + duration = 60 + logger.info("We will wait up to %d seconds for you to sign in" % duration) + flow["expires_at"] = min( # Shorten the time for quick test + flow["expires_at"], time.time() + duration) + result = self.app.acquire_token_by_device_flow(flow) + self.assertLoosely( # It will skip this test if there is no user interaction + result, + assertion=lambda: self.assertIn('access_token', result), + skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS) + if "access_token" not in result: + self.skip("End user did not complete Device Flow in time") + self.assertCacheWorksForUser(result, config["scope"], username=None) + result["access_token"] = result["refresh_token"] = "************" + logger.info( + "%s obtained tokens: %s", self.id(), json.dumps(result, indent=4)) + + +class WorldWideTestCases(LabBasedTestCase): + + def test_aad_managed_user(self): # Pure cloud + config = self.get_lab_user(usertype="cloud") + config["password"] = self.get_lab_user_secret(config["lab_name"]) + self._test_username_password_lab(config) + + def test_adfs4_fed_user(self): + config = self.get_lab_user(usertype="federated", federationProvider="ADFSv4") + config["password"] = self.get_lab_user_secret(config["lab_name"]) + self._test_username_password_lab(config) + + def test_adfs3_fed_user(self): + config = self.get_lab_user(usertype="federated", federationProvider="ADFSv3") + config["password"] = self.get_lab_user_secret(config["lab_name"]) + self._test_username_password_lab(config) + + def test_adfs2_fed_user(self): + config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2") + config["password"] = self.get_lab_user_secret(config["lab_name"]) + self._test_username_password_lab(config) + + def test_adfs2019_fed_user(self): + config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2019") + config["password"] = self.get_lab_user_secret(config["lab_name"]) + self._test_username_password_lab(config) + + def test_ropc_adfs2019_onprem(self): + # Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259 + config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") + config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"] + config["client_id"] = "PublicClientId" + config["scope"] = self.adfs2019_scopes + config["password"] = self.get_lab_user_secret(config["lab_name"]) + self._test_username_password_lab(config) + + @unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented") + def test_adfs2019_onprem_acquire_token_by_auth_code(self): + """When prompted, you can manually login using this account: + + # https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019 + username = "..." # The upn from the link above + password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ + """ + config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") + config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"] + config["client_id"] = "PublicClientId" + config["scope"] = self.adfs2019_scopes + config["port"] = 8080 + self._test_acquire_token_by_auth_code_lab(config) + + @unittest.skipUnless( + os.getenv("LAB_OBO_CLIENT_SECRET"), + "Need LAB_OBO_CLIENT SECRET from https://msidlabs.vault.azure.net/secrets/TodoListServiceV2-OBO/c58ba97c34ca4464886943a847d1db56") + def test_acquire_token_obo(self): + config = self.get_lab_user(usertype="cloud") + + config_cca = config + config_cca["client_id"] = "f4aa5217-e87c-42b2-82af-5624dd14ee72" + config_cca["scope"] = ["https://graph.microsoft.com/.default"] + config_cca["client_secret"] = os.getenv("LAB_OBO_CLIENT_SECRET") + + config_pca = config + config_pca["client_id"] = "c0485386-1e9a-4663-bc96-7ab30656de7f" + config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"]) + config_pca["scope"] = "api://%s/read" % config_cca["client_id"] + + self._test_acquire_token_obo_lab(config_pca, config_cca) + def _build_b2c_authority(self, policy): base = "https://msidlabb2c.b2clogin.com/msidlabb2c.onmicrosoft.com" return base + "/" + policy # We do not support base + "?p=" + policy @@ -482,125 +535,50 @@ def test_b2c_acquire_token_by_auth_code(self): # This won't work https://msidlab.com/api/user?usertype=b2c password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabb2c """ - scopes = ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"] - (self.app, ac, redirect_uri) = _get_app_and_auth_code( - "b876a048-55a5-4fc5-9403-f5d90cb1c852", - client_secret=self.get_lab_user_secret("MSIDLABB2C-MSAapp-AppSecret"), - authority=self._build_b2c_authority("B2C_1_SignInPolicy"), - port=3843, # Lab defines 4 of them: [3843, 4584, 4843, 60000] - scopes=scopes, - ) - result = self.app.acquire_token_by_authorization_code( - ac, scopes, redirect_uri=redirect_uri) - logger.debug( - "%s: cache = %s, id_token_claims = %s", - self.id(), - json.dumps(self.app.token_cache._cache, indent=4), - json.dumps(result.get("id_token_claims"), indent=4), - ) - self.assertIn( - "access_token", result, - "{error}: {error_description}".format( - # Note: No interpolation here, cause error won't always present - error=result.get("error"), - error_description=result.get("error_description"))) - self.assertCacheWorksForUser(result, scopes, username=None) + config = {"authority": self._build_b2c_authority("B2C_1_SignInPolicy"), + "client_id": "b876a048-55a5-4fc5-9403-f5d90cb1c852", + "scope": ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"], "port": 3843} + self._test_acquire_token_by_auth_code_lab(config) def test_b2c_acquire_token_by_ropc(self): - self._test_username_password( - authority=self._build_b2c_authority("B2C_1_ROPC_Auth"), - client_id="e3b9ad76-9763-4827-b088-80c7a7888f79", - username="b2clocal@msidlabb2c.onmicrosoft.com", - password=self.get_lab_user_secret("msidlabb2c"), - scope=["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"], - ) + config = {"authority": self._build_b2c_authority("B2C_1_ROPC_Auth"), + "client_id": "e3b9ad76-9763-4827-b088-80c7a7888f79", + "username": "b2clocal@msidlabb2c.onmicrosoft.com", "password": self.get_lab_user_secret("msidlabb2c"), + "scope": ["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"]} + self._test_username_password_lab(config) + + +class SovereignCloudTestCase(LabBasedTestCase): def test_arlington_acquire_token_by_ropc(self): config = self.get_lab_user(azureenvironment="azureusgovernment") - self._test_username_password( - password=self.get_lab_user_secret(config["lab_name"]), **config) + config["password"] = self.get_lab_user_secret(config["lab_name"]) + self._test_username_password_lab(config) def test_arlington_acquire_token_by_client_secret(self): config = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="no") - app = msal.ConfidentialClientApplication(config.get("client_id"), - client_credential=self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret"), - authority=config.get("authority"), - ) - result = app.acquire_token_for_client(config.get("scope")) - self.assertNotEqual(None, result.get("access_token"), str(result)) + config["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret") + self._test_acquire_token_by_client_secret_lab(config) def test_arlington_acquire_token_obo(self): - obo_config = self.get_lab_user( + config_cca = self.get_lab_user( usertype="cloud", azureenvironment="azureusgovernment", publicClient="no") + config_cca["scope"] = ["https://graph.microsoft.us/.default"] + config_cca["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret") + + config_pca = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="yes") obo_app_object = self.get_lab_app_object( usertype="cloud", azureenvironment="azureusgovernment", publicClient="no") - downstream_scopes = ["https://graph.microsoft.com/.default"] - config = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="yes") - - # 1. An app obtains a token representing a user, for our mid-tier service - pca = msal.PublicClientApplication( - client_id=config.get("client_id"), authority=config.get("authority")) - pca_result = pca.acquire_token_by_username_password( - config["username"], - self.get_lab_user_secret(config["lab_name"]), - scopes=[ # The OBO app's scope. Yours might be different. - "{app_uri}/files.read".format(app_uri=obo_app_object.get("identifierUris"))], - ) - self.assertIsNotNone( - pca_result.get("access_token"), - "PCA failed to get AT because %s" % json.dumps(pca_result, indent=2)) - - # 2. Our mid-tier service uses OBO to obtain a token for downstream service - cca = msal.ConfidentialClientApplication( - obo_config.get("client_id"), - client_credential=self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret"), - authority=obo_config.get("authority"), - # token_cache= ..., # Default token cache is all-tokens-store-in-memory. - # That's fine if OBO app uses short-lived msal instance per session. - # Otherwise, the OBO app need to implement a one-cache-per-user setup. - ) - cca_result = cca.acquire_token_on_behalf_of( - pca_result['access_token'], downstream_scopes) - self.assertNotEqual(None, cca_result.get("access_token"), str(cca_result)) + config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"]) + config_pca["scope"] = ["{app_uri}/files.read".format(app_uri=obo_app_object.get("identifierUris"))] - # 3. Now the OBO app can simply store downstream token(s) in same session. - # Alternatively, if you want to persist the downstream AT, and possibly - # the RT (if any) for prolonged access even after your own AT expires, - # now it is the time to persist current cache state for current user. - # Assuming you already did that (which is not shown in this test case), - # the following part shows one of the ways to obtain an AT from cache. - username = cca_result.get("id_token_claims", {}).get("preferred_username") - self.assertEqual(config["username"], username) - if username: # A precaution so that we won't use other user's token - account = cca.get_accounts(username=username)[0] - result = cca.acquire_token_silent(downstream_scopes, account) - self.assertEqual(cca_result["access_token"], result["access_token"]) + self._test_acquire_token_obo_lab(config_pca, config_cca) def test_arlington_acquire_token_device_code(self): config = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="yes") - scopes = ["user.read"] - self.app = msal.PublicClientApplication( - config['client_id'], authority=config["authority"]) - flow = self.app.initiate_device_flow(scopes=scopes) - assert "user_code" in flow, "DF does not seem to be provisioned: %s".format( - json.dumps(flow, indent=4)) - logger.info(flow["message"]) + config["scope"] = ["user.read"] + self._test_acquire_token_device_code_lab(config) - duration = 60 - logger.info("We will wait up to %d seconds for you to sign in" % duration) - flow["expires_at"] = min( # Shorten the time for quick test - flow["expires_at"], time.time() + duration) - result = self.app.acquire_token_by_device_flow(flow) - self.assertLoosely( # It will skip this test if there is no user interaction - result, - assertion=lambda: self.assertIn('access_token', result), - skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS) - if "access_token" not in result: - self.skip("End user did not complete Device Flow in time") - self.assertCacheWorksForUser(result, scopes, username=None) - result["access_token"] = result["refresh_token"] = "************" - logger.info( - "%s obtained tokens: %s", self.id(), json.dumps(result, indent=4)) if __name__ == "__main__": unittest.main() From b254f74531082cf8bd7260c9ebb6bc64a9f6f0b8 Mon Sep 17 00:00:00 2001 From: Abhidnya Patil Date: Wed, 11 Mar 2020 15:36:32 -0700 Subject: [PATCH 04/10] Changing name for consistency --- tests/test_e2e.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 8a65139d..ba5a30d8 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -453,7 +453,7 @@ def _test_acquire_token_device_code_lab(self, config): "%s obtained tokens: %s", self.id(), json.dumps(result, indent=4)) -class WorldWideTestCases(LabBasedTestCase): +class WorldWideTestCase(LabBasedTestCase): def test_aad_managed_user(self): # Pure cloud config = self.get_lab_user(usertype="cloud") From 7039052fb0d3fdfdf1b3e645736852b0beda0136 Mon Sep 17 00:00:00 2001 From: Abhidnya Patil Date: Wed, 11 Mar 2020 15:49:17 -0700 Subject: [PATCH 05/10] changing scope to be a list --- tests/test_e2e.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index ba5a30d8..66e1ec8e 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -518,7 +518,7 @@ def test_acquire_token_obo(self): config_pca = config config_pca["client_id"] = "c0485386-1e9a-4663-bc96-7ab30656de7f" config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"]) - config_pca["scope"] = "api://%s/read" % config_cca["client_id"] + config_pca["scope"] = ["api://%s/read" % config_cca["client_id"]] self._test_acquire_token_obo_lab(config_pca, config_cca) From 3c397c3e988d3f1c059c6eae340791311ed3ae01 Mon Sep 17 00:00:00 2001 From: Abhidnya Patil Date: Wed, 11 Mar 2020 16:19:30 -0700 Subject: [PATCH 06/10] modifying values to avoid call by reference --- tests/test_e2e.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 66e1ec8e..a5e7fcdf 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -510,12 +510,14 @@ def test_adfs2019_onprem_acquire_token_by_auth_code(self): def test_acquire_token_obo(self): config = self.get_lab_user(usertype="cloud") - config_cca = config + config_cca = {} + config_cca.update(config) config_cca["client_id"] = "f4aa5217-e87c-42b2-82af-5624dd14ee72" config_cca["scope"] = ["https://graph.microsoft.com/.default"] config_cca["client_secret"] = os.getenv("LAB_OBO_CLIENT_SECRET") - config_pca = config + config_pca = {} + config_pca.update(config) config_pca["client_id"] = "c0485386-1e9a-4663-bc96-7ab30656de7f" config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"]) config_pca["scope"] = ["api://%s/read" % config_cca["client_id"]] From 34781cf537ae575013e63a83994da6b5baa63c0d Mon Sep 17 00:00:00 2001 From: Abhidnya Patil Date: Fri, 3 Apr 2020 03:17:52 -0700 Subject: [PATCH 07/10] Pr review iteration --- tests/test_e2e.py | 168 ++++++++++++++++++++-------------------------- 1 file changed, 74 insertions(+), 94 deletions(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index a5e7fcdf..243f6fcc 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -102,6 +102,32 @@ def _test_username_password(self, username=username if ".b2clogin.com" not in authority else None, ) + def _test_device_code( + self, client_id=None, authority=None, scope=None, **ignored): + assert client_id and authority and scope + self.app = msal.PublicClientApplication( + client_id, authority=authority) + flow = self.app.initiate_device_flow(scopes=scope) + assert "user_code" in flow, "DF does not seem to be provisioned: %s".format( + json.dumps(flow, indent=4)) + logger.info(flow["message"]) + + duration = 60 + logger.info("We will wait up to %d seconds for you to sign in" % duration) + flow["expires_at"] = min( # Shorten the time for quick test + flow["expires_at"], time.time() + duration) + result = self.app.acquire_token_by_device_flow(flow) + self.assertLoosely( # It will skip this test if there is no user interaction + result, + assertion=lambda: self.assertIn('access_token', result), + skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS) + if "access_token" not in result: + self.skip("End user did not complete Device Flow in time") + self.assertCacheWorksForUser(result, scope, username=None) + result["access_token"] = result["refresh_token"] = "************" + logger.info( + "%s obtained tokens: %s", self.id(), json.dumps(result, indent=4)) + THIS_FOLDER = os.path.dirname(__file__) CONFIG = os.path.join(THIS_FOLDER, "config.json") @@ -244,29 +270,7 @@ def setUpClass(cls): cls.config = json.load(f) def test_device_flow(self): - scopes = self.config["scope"] - self.app = msal.PublicClientApplication( - self.config['client_id'], authority=self.config["authority"]) - flow = self.app.initiate_device_flow(scopes=scopes) - assert "user_code" in flow, "DF does not seem to be provisioned: %s".format( - json.dumps(flow, indent=4)) - logger.info(flow["message"]) - - duration = 60 - logger.info("We will wait up to %d seconds for you to sign in" % duration) - flow["expires_at"] = min( # Shorten the time for quick test - flow["expires_at"], time.time() + duration) - result = self.app.acquire_token_by_device_flow(flow) - self.assertLoosely( # It will skip this test if there is no user interaction - result, - assertion=lambda: self.assertIn('access_token', result), - skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS) - if "access_token" not in result: - self.skip("End user did not complete Device Flow in time") - self.assertCacheWorksForUser(result, scopes, username=None) - result["access_token"] = result["refresh_token"] = "************" - logger.info( - "%s obtained tokens: %s", self.id(), json.dumps(result, indent=4)) + self._test_device_code(**self.config) def get_lab_app( @@ -323,7 +327,7 @@ def tearDownClass(cls): cls.session.close() @classmethod - def get_lab_app_object(cls, **query ): # https://msidlab.com/swagger/index.html + def get_lab_app_object(cls, **query): # https://msidlab.com/swagger/index.html url = "https://msidlab.com/api/app" resp = cls.session.get(url, params=query) return resp.json()[0] @@ -342,31 +346,29 @@ def get_lab_user_secret(cls, lab_name="msidlab4"): def get_lab_user(cls, **query): # https://docs.msidlab.com/labapi/userapi.html resp = cls.session.get("https://msidlab.com/api/user", params=query) result = resp.json()[0] - authority_base = "https://login.microsoftonline.com/" - graph_endpoint = "https://graph.microsoft.com/.default" - if "azureenvironment" in query and query["azureenvironment"] == "azureusgovernment": - authority_base = "https://login.microsoftonline.us/" - graph_endpoint = "https://graph.microsoft.us/.default" + _env = query.get("azureenvironment", "").lower() + authority_base = { + "azureusgovernment": "https://login.microsoftonline.us/" + }.get(_env, "https://login.microsoftonline.com/") + scope = { + "azureusgovernment": ["https://graph.microsoft.us/.default"], + }.get(_env, ["https://graph.microsoft.com/.default"]) return { # Mapping lab API response to our simplified configuration format "authority": authority_base + result["tenantID"], "client_id": result["appId"], "username": result["upn"], "lab_name": result["labName"], - "scope": [graph_endpoint], + "scope": scope, } - def _test_username_password_lab(self, config): - self._test_username_password(**config) - - def _test_acquire_token_by_auth_code_lab(self, config): + def _test_acquire_token_by_auth_code( + self, client_id=None, authority=None, port=None, scope=None, + **ignored): + assert client_id and authority and port and scope (self.app, ac, redirect_uri) = _get_app_and_auth_code( - config["client_id"], - authority=config["authority"], - port=config["port"], - scopes=config["scope"], - ) + client_id, authority, port, scope) result = self.app.acquire_token_by_authorization_code( - ac, config["scopes"], redirect_uri=redirect_uri) + ac, scope, redirect_uri=redirect_uri) logger.debug( "%s: cache = %s, id_token_claims = %s", self.id(), @@ -379,9 +381,9 @@ def _test_acquire_token_by_auth_code_lab(self, config): # Note: No interpolation here, cause error won't always present error=result.get("error"), error_description=result.get("error_description"))) - self.assertCacheWorksForUser(result, config["scopes"], username=None) + self.assertCacheWorksForUser(result, scope, username=None) - def _test_acquire_token_obo_lab(self, config_pca, config_cca): + def _test_acquire_token_obo(self, config_pca, config_cca): # 1. An app obtains a token representing a user, for our mid-tier service pca = msal.PublicClientApplication( config_pca["client_id"], authority=config_pca["authority"]) @@ -420,37 +422,14 @@ def _test_acquire_token_obo_lab(self, config_pca, config_cca): result = cca.acquire_token_silent(config_cca["scope"], account) self.assertEqual(cca_result["access_token"], result["access_token"]) - def _test_acquire_token_by_client_secret_lab(self, config): - app = msal.ConfidentialClientApplication(config["client_id"], - client_credential=config["client_secret"], - authority=config["authority"], - ) - result = app.acquire_token_for_client(config["scope"]) - self.assertNotEqual(None, result.get("access_token"), str(result)) - - def _test_acquire_token_device_code_lab(self, config): - self.app = msal.PublicClientApplication( - config['client_id'], authority=config["authority"]) - flow = self.app.initiate_device_flow(scopes=config["scope"]) - assert "user_code" in flow, "DF does not seem to be provisioned: %s".format( - json.dumps(flow, indent=4)) - logger.info(flow["message"]) - - duration = 60 - logger.info("We will wait up to %d seconds for you to sign in" % duration) - flow["expires_at"] = min( # Shorten the time for quick test - flow["expires_at"], time.time() + duration) - result = self.app.acquire_token_by_device_flow(flow) - self.assertLoosely( # It will skip this test if there is no user interaction - result, - assertion=lambda: self.assertIn('access_token', result), - skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS) - if "access_token" not in result: - self.skip("End user did not complete Device Flow in time") - self.assertCacheWorksForUser(result, config["scope"], username=None) - result["access_token"] = result["refresh_token"] = "************" - logger.info( - "%s obtained tokens: %s", self.id(), json.dumps(result, indent=4)) + def _test_acquire_token_by_client_secret( + self, client_id=None, client_secret=None, authority=None, scope=None, + **ignored): + assert client_id and client_secret and authority and scope + app = msal.ConfidentialClientApplication( + client_id, client_credential=client_secret, authority=authority) + result = app.acquire_token_for_client(scope) + self.assertIsNotNone(result.get("access_token"), "Got %s instead" % result) class WorldWideTestCase(LabBasedTestCase): @@ -458,27 +437,27 @@ class WorldWideTestCase(LabBasedTestCase): def test_aad_managed_user(self): # Pure cloud config = self.get_lab_user(usertype="cloud") config["password"] = self.get_lab_user_secret(config["lab_name"]) - self._test_username_password_lab(config) + self._test_username_password(**config) def test_adfs4_fed_user(self): config = self.get_lab_user(usertype="federated", federationProvider="ADFSv4") config["password"] = self.get_lab_user_secret(config["lab_name"]) - self._test_username_password_lab(config) + self._test_username_password(**config) def test_adfs3_fed_user(self): config = self.get_lab_user(usertype="federated", federationProvider="ADFSv3") config["password"] = self.get_lab_user_secret(config["lab_name"]) - self._test_username_password_lab(config) + self._test_username_password(**config) def test_adfs2_fed_user(self): config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2") config["password"] = self.get_lab_user_secret(config["lab_name"]) - self._test_username_password_lab(config) + self._test_username_password(**config) def test_adfs2019_fed_user(self): config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2019") config["password"] = self.get_lab_user_secret(config["lab_name"]) - self._test_username_password_lab(config) + self._test_username_password(**config) def test_ropc_adfs2019_onprem(self): # Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259 @@ -487,7 +466,7 @@ def test_ropc_adfs2019_onprem(self): config["client_id"] = "PublicClientId" config["scope"] = self.adfs2019_scopes config["password"] = self.get_lab_user_secret(config["lab_name"]) - self._test_username_password_lab(config) + self._test_username_password(**config) @unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented") def test_adfs2019_onprem_acquire_token_by_auth_code(self): @@ -502,7 +481,7 @@ def test_adfs2019_onprem_acquire_token_by_auth_code(self): config["client_id"] = "PublicClientId" config["scope"] = self.adfs2019_scopes config["port"] = 8080 - self._test_acquire_token_by_auth_code_lab(config) + self._test_acquire_token_by_auth_code(**config) @unittest.skipUnless( os.getenv("LAB_OBO_CLIENT_SECRET"), @@ -522,7 +501,7 @@ def test_acquire_token_obo(self): config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"]) config_pca["scope"] = ["api://%s/read" % config_cca["client_id"]] - self._test_acquire_token_obo_lab(config_pca, config_cca) + self._test_acquire_token_obo(config_pca, config_cca) def _build_b2c_authority(self, policy): base = "https://msidlabb2c.b2clogin.com/msidlabb2c.onmicrosoft.com" @@ -540,46 +519,47 @@ def test_b2c_acquire_token_by_auth_code(self): config = {"authority": self._build_b2c_authority("B2C_1_SignInPolicy"), "client_id": "b876a048-55a5-4fc5-9403-f5d90cb1c852", "scope": ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"], "port": 3843} - self._test_acquire_token_by_auth_code_lab(config) + self._test_acquire_token_by_auth_code(**config) def test_b2c_acquire_token_by_ropc(self): config = {"authority": self._build_b2c_authority("B2C_1_ROPC_Auth"), "client_id": "e3b9ad76-9763-4827-b088-80c7a7888f79", "username": "b2clocal@msidlabb2c.onmicrosoft.com", "password": self.get_lab_user_secret("msidlabb2c"), "scope": ["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"]} - self._test_username_password_lab(config) + self._test_username_password(**config) -class SovereignCloudTestCase(LabBasedTestCase): +class ArlingtonCloudTestCase(LabBasedTestCase): + environment = "azureusgovernment" def test_arlington_acquire_token_by_ropc(self): - config = self.get_lab_user(azureenvironment="azureusgovernment") + config = self.get_lab_user(azureenvironment=self.environment) config["password"] = self.get_lab_user_secret(config["lab_name"]) - self._test_username_password_lab(config) + self._test_username_password(**config) def test_arlington_acquire_token_by_client_secret(self): - config = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="no") + config = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="no") config["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret") - self._test_acquire_token_by_client_secret_lab(config) + self._test_acquire_token_by_client_secret(**config) def test_arlington_acquire_token_obo(self): config_cca = self.get_lab_user( - usertype="cloud", azureenvironment="azureusgovernment", publicClient="no") + usertype="cloud", azureenvironment=self.environment, publicClient="no") config_cca["scope"] = ["https://graph.microsoft.us/.default"] config_cca["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret") - config_pca = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="yes") + config_pca = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="yes") obo_app_object = self.get_lab_app_object( - usertype="cloud", azureenvironment="azureusgovernment", publicClient="no") + usertype="cloud", azureenvironment=self.environment, publicClient="no") config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"]) config_pca["scope"] = ["{app_uri}/files.read".format(app_uri=obo_app_object.get("identifierUris"))] - self._test_acquire_token_obo_lab(config_pca, config_cca) + self._test_acquire_token_obo(config_pca, config_cca) def test_arlington_acquire_token_device_code(self): - config = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="yes") + config = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="yes") config["scope"] = ["user.read"] - self._test_acquire_token_device_code_lab(config) + self._test_device_code(**config) if __name__ == "__main__": From f29954b39da4ddd34b73f2fe5843e4c12939fc69 Mon Sep 17 00:00:00 2001 From: Abhidnya Patil Date: Mon, 6 Apr 2020 17:41:16 -0700 Subject: [PATCH 08/10] PR review changes --- tests/test_e2e.py | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 243f6fcc..bf0bcc71 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -102,7 +102,7 @@ def _test_username_password(self, username=username if ".b2clogin.com" not in authority else None, ) - def _test_device_code( + def _test_device_flow( self, client_id=None, authority=None, scope=None, **ignored): assert client_id and authority and scope self.app = msal.PublicClientApplication( @@ -270,7 +270,7 @@ def setUpClass(cls): cls.config = json.load(f) def test_device_flow(self): - self._test_device_code(**self.config) + self._test_device_flow(**self.config) def get_lab_app( @@ -327,7 +327,7 @@ def tearDownClass(cls): cls.session.close() @classmethod - def get_lab_app_object(cls, **query): # https://msidlab.com/swagger/index.html + def get_lab_app_object(cls, **query): # https://msidlab.com/swagger/index.html url = "https://msidlab.com/api/app" resp = cls.session.get(url, params=query) return resp.json()[0] @@ -349,10 +349,10 @@ def get_lab_user(cls, **query): # https://docs.msidlab.com/labapi/userapi.html _env = query.get("azureenvironment", "").lower() authority_base = { "azureusgovernment": "https://login.microsoftonline.us/" - }.get(_env, "https://login.microsoftonline.com/") + }.get(_env, "https://login.microsoftonline.com/") scope = { "azureusgovernment": ["https://graph.microsoft.us/.default"], - }.get(_env, ["https://graph.microsoft.com/.default"]) + }.get(_env, ["https://graph.microsoft.com/.default"]) return { # Mapping lab API response to our simplified configuration format "authority": authority_base + result["tenantID"], "client_id": result["appId"], @@ -366,7 +366,7 @@ def _test_acquire_token_by_auth_code( **ignored): assert client_id and authority and port and scope (self.app, ac, redirect_uri) = _get_app_and_auth_code( - client_id, authority, port, scope) + client_id, authority=authority, port=port, scopes=scope) result = self.app.acquire_token_by_authorization_code( ac, scope, redirect_uri=redirect_uri) logger.debug( @@ -516,17 +516,20 @@ def test_b2c_acquire_token_by_auth_code(self): # This won't work https://msidlab.com/api/user?usertype=b2c password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabb2c """ - config = {"authority": self._build_b2c_authority("B2C_1_SignInPolicy"), - "client_id": "b876a048-55a5-4fc5-9403-f5d90cb1c852", - "scope": ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"], "port": 3843} - self._test_acquire_token_by_auth_code(**config) + self._test_acquire_token_by_auth_code( + authority=self._build_b2c_authority("B2C_1_SignInPolicy"), + client_id="b876a048-55a5-4fc5-9403-f5d90cb1c852", port=3843, + scope=["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"] + ) def test_b2c_acquire_token_by_ropc(self): - config = {"authority": self._build_b2c_authority("B2C_1_ROPC_Auth"), - "client_id": "e3b9ad76-9763-4827-b088-80c7a7888f79", - "username": "b2clocal@msidlabb2c.onmicrosoft.com", "password": self.get_lab_user_secret("msidlabb2c"), - "scope": ["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"]} - self._test_username_password(**config) + self._test_username_password( + authority=self._build_b2c_authority("B2C_1_ROPC_Auth"), + client_id="e3b9ad76-9763-4827-b088-80c7a7888f79", + username="b2clocal@msidlabb2c.onmicrosoft.com", + password=self.get_lab_user_secret("msidlabb2c"), + scope=["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"] + ) class ArlingtonCloudTestCase(LabBasedTestCase): @@ -556,10 +559,10 @@ def test_arlington_acquire_token_obo(self): self._test_acquire_token_obo(config_pca, config_cca) - def test_arlington_acquire_token_device_code(self): + def test_arlington_acquire_token_device_flow(self): config = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="yes") config["scope"] = ["user.read"] - self._test_device_code(**config) + self._test_device_flow(**config) if __name__ == "__main__": From c14ee3613e0b5a14ff9cb4c464e6699e5e5e05e3 Mon Sep 17 00:00:00 2001 From: Abhidnya Patil Date: Mon, 6 Apr 2020 17:44:58 -0700 Subject: [PATCH 09/10] renaming arlington methods --- tests/test_e2e.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index bf0bcc71..7a44802f 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -535,17 +535,17 @@ def test_b2c_acquire_token_by_ropc(self): class ArlingtonCloudTestCase(LabBasedTestCase): environment = "azureusgovernment" - def test_arlington_acquire_token_by_ropc(self): + def test_acquire_token_by_ropc(self): config = self.get_lab_user(azureenvironment=self.environment) config["password"] = self.get_lab_user_secret(config["lab_name"]) self._test_username_password(**config) - def test_arlington_acquire_token_by_client_secret(self): + def test_acquire_token_by_client_secret(self): config = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="no") config["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret") self._test_acquire_token_by_client_secret(**config) - def test_arlington_acquire_token_obo(self): + def test_acquire_token_obo(self): config_cca = self.get_lab_user( usertype="cloud", azureenvironment=self.environment, publicClient="no") config_cca["scope"] = ["https://graph.microsoft.us/.default"] @@ -559,7 +559,7 @@ def test_arlington_acquire_token_obo(self): self._test_acquire_token_obo(config_pca, config_cca) - def test_arlington_acquire_token_device_flow(self): + def test_acquire_token_device_flow(self): config = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="yes") config["scope"] = ["user.read"] self._test_device_flow(**config) From c3cfbca1c7045630a48165ec5834684451cc17c9 Mon Sep 17 00:00:00 2001 From: Abhidnya Patil Date: Tue, 7 Apr 2020 09:52:54 -0700 Subject: [PATCH 10/10] Review changes --- tests/test_e2e.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 7a44802f..5abd5d61 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -518,7 +518,8 @@ def test_b2c_acquire_token_by_auth_code(self): """ self._test_acquire_token_by_auth_code( authority=self._build_b2c_authority("B2C_1_SignInPolicy"), - client_id="b876a048-55a5-4fc5-9403-f5d90cb1c852", port=3843, + client_id="b876a048-55a5-4fc5-9403-f5d90cb1c852", + port=3843, # Lab defines 4 of them: [3843, 4584, 4843, 60000] scope=["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"] ) @@ -528,7 +529,7 @@ def test_b2c_acquire_token_by_ropc(self): client_id="e3b9ad76-9763-4827-b088-80c7a7888f79", username="b2clocal@msidlabb2c.onmicrosoft.com", password=self.get_lab_user_secret("msidlabb2c"), - scope=["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"] + scope=["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"], )