Skip to content
Permalink
Browse files

FOCI Single Sign On

  • Loading branch information
rayluo committed Mar 28, 2019
1 parent 56d8dde commit afa37b1e3e4baab8ab6bf0a0ed5ad470cbb51f85
Showing with 139 additions and 9 deletions.
  1. +54 −9 msal/application.py
  2. +1 −0 requirements.txt
  3. +84 −0 tests/test_application.py
@@ -305,26 +305,71 @@ def acquire_token_silent(
"token_type": "Bearer",
"expires_in": int(expires_in), # OAuth2 specs defines it as int
}
return self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
the_authority, decorate_scope(scopes, self.client_id), account,
**kwargs)

def _acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
self, authority, scopes, account, **kwargs):
query = {
"environment": authority.instance,
"home_account_id": (account or {}).get("home_account_id"),
# "realm": authority.tenant, # AAD RTs are tenant-independent
}
apps = self.token_cache.find( # Use find(), rather than token_cache.get(...)
TokenCache.CredentialType.APP_METADATA, query={
"environment": authority.instance, "client_id": self.client_id})
app_metadata = apps[0] if apps else {}
if not app_metadata: # Meaning this app is now used for the first time.
# When/if we have a way to directly detect current app's family,
# we'll rewrite this block, to support multiple families.
# For now, we try existing RTs (*). If it works, we are in that family.
# (*) RTs of a different app/family are not supposed to be
# shared with or accessible by us in the first place.
at = self._acquire_token_silent_by_finding_specific_refresh_token(
authority, scopes,
dict(query, family_id="1"), # A hack, we have only 1 family for now
rt_remover=lambda rt_item: None, # NO-OP b/c RTs are likely not mine
break_condition=lambda response: # Break loop when app not in family
# Based on an AAD-only behavior mentioned in internal doc here
# https://msazure.visualstudio.com/One/_git/ESTS-Docs/pullrequest/1138595
"client_mismatch" in response.get("error_additional_info", []),
**kwargs)
if at:
return at
if app_metadata.get("family_id"): # Meaning this app belongs to this family
at = self._acquire_token_silent_by_finding_specific_refresh_token(
authority, scopes, dict(query, family_id=app_metadata["family_id"]),
**kwargs)
if at:
return at
# Either this app is an orphan, so we will naturally use its own RT;
# or all attempts above have failed, so we fall back to non-foci behavior.
return self._acquire_token_silent_by_finding_specific_refresh_token(
authority, scopes, dict(query, client_id=self.client_id), **kwargs)

def _acquire_token_silent_by_finding_specific_refresh_token(
self, authority, scopes, query,
rt_remover=None, break_condition=lambda response: False, **kwargs):
matches = self.token_cache.find(
self.token_cache.CredentialType.REFRESH_TOKEN,
# target=scopes, # AAD RTs are scope-independent
query={
"client_id": self.client_id,
"environment": the_authority.instance,
"home_account_id": (account or {}).get("home_account_id"),
# "realm": the_authority.tenant, # AAD RTs are tenant-independent
})
client = self._build_client(self.client_credential, the_authority)
query=query)
logger.debug("Found %d RTs matching %s", len(matches), query)
client = self._build_client(self.client_credential, authority)
for entry in matches:
logger.debug("Cache hit an RT")
logger.debug("Cache attempts an RT")
response = client.obtain_token_by_refresh_token(
entry, rt_getter=lambda token_item: token_item["secret"],
scope=decorate_scope(scopes, self.client_id))
on_removing_rt=rt_remover or self.token_cache.remove_rt,
scope=scopes,
**kwargs)
if "error" not in response:
return response
logger.debug(
"Refresh failed. {error}: {error_description}".format(**response))
if break_condition(response):
break


class PublicClientApplication(ClientApplication): # browser app or mobile app
@@ -1 +1,2 @@
.
mock; python_version < '3.3'
@@ -2,8 +2,15 @@
import json
import logging

try:
from unittest.mock import * # Python 3
except:
from mock import * # Need an external mock package

from msal.application import *
import msal
from tests import unittest
from tests.test_token_cache import TokenCacheTestCase


THIS_FOLDER = os.path.dirname(__file__)
@@ -155,3 +162,80 @@ def test_auth_code(self):
error_description=result.get("error_description")))
self.assertCacheWorks(result)


class TestClientApplicationAcquireTokenSilentFociBehaviors(unittest.TestCase):

def setUp(self):
self.authority_url = "https://login.microsoftonline.com/common"
self.authority = msal.authority.Authority(self.authority_url)
self.scopes = ["s1", "s2"]
self.uid = "my_uid"
self.utid = "my_utid"
self.account = {"home_account_id": "{}.{}".format(self.uid, self.utid)}
self.frt = "what the frt"
self.cache = msal.SerializableTokenCache()
self.cache.add({ # Pre-populate a FRT
"client_id": "preexisting_family_app",
"scope": self.scopes,
"token_endpoint": "{}/oauth2/v2.0/token".format(self.authority_url),
"response": TokenCacheTestCase.build_response(
uid=self.uid, utid=self.utid, refresh_token=self.frt, foci="1"),
}) # The add(...) helper populates correct home_account_id for future searching

def test_unknown_orphan_app_will_attempt_frt_and_not_remove_it(self):
app = ClientApplication(
"unknown_orphan", authority=self.authority_url, token_cache=self.cache)
logger.debug("%s.cache = %s", self.id(), self.cache.serialize())
def tester(url, data=None, **kwargs):
self.assertEqual(self.frt, data.get("refresh_token"), "Should attempt the FRT")
return Mock(status_code=200, json=Mock(return_value={
"error": "invalid_grant",
"error_description": "Was issued to another client"}))
app._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
self.authority, self.scopes, self.account, post=tester)
self.assertNotEqual([], app.token_cache.find(
msal.TokenCache.CredentialType.REFRESH_TOKEN, query={"secret": self.frt}),
"The FRT should not be removed from the cache")

def test_known_orphan_app_will_skip_frt_and_only_use_its_own_rt(self):
app = ClientApplication(
"known_orphan", authority=self.authority_url, token_cache=self.cache)
rt = "RT for this orphan app. We will check it being used by this test case."
self.cache.add({ # Populate its RT and AppMetadata, so it becomes a known orphan app
"client_id": app.client_id,
"scope": self.scopes,
"token_endpoint": "{}/oauth2/v2.0/token".format(self.authority_url),
"response": TokenCacheTestCase.build_response(
uid=self.uid, utid=self.utid, refresh_token=rt),
})
logger.debug("%s.cache = %s", self.id(), self.cache.serialize())
def tester(url, data=None, **kwargs):
self.assertEqual(rt, data.get("refresh_token"), "Should attempt the RT")
return Mock(status_code=200, json=Mock(return_value={}))
app._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
self.authority, self.scopes, self.account, post=tester)

def test_unknown_family_app_will_attempt_frt_and_join_family(self):
def tester(url, data=None, **kwargs):
self.assertEqual(
self.frt, data.get("refresh_token"), "Should attempt the FRT")
return Mock(
status_code=200,
json=Mock(return_value=TokenCacheTestCase.build_response(
uid=self.uid, utid=self.utid, foci="1", access_token="at")))
app = ClientApplication(
"unknown_family_app", authority=self.authority_url, token_cache=self.cache)
at = app._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family(
self.authority, self.scopes, self.account, post=tester)
logger.debug("%s.cache = %s", self.id(), self.cache.serialize())
self.assertEqual("at", at.get("access_token"), "New app should get a new AT")
app_metadata = app.token_cache.find(
msal.TokenCache.CredentialType.APP_METADATA,
query={"client_id": app.client_id})
self.assertNotEqual([], app_metadata, "Should record new app's metadata")
self.assertEqual("1", app_metadata[0].get("family_id"),
"The new family app should be recorded as in the same family")
# Known family app will simply use FRT, which is largely the same as this one

# Will not test scenario of app leaving family. Per specs, it won't happen.

0 comments on commit afa37b1

Please sign in to comment.
You can’t perform that action at this time.