-
Notifications
You must be signed in to change notification settings - Fork 108
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for processing OIDC claims
This allows us to handle group/admin assignment, first name/last name synchronizing, etc.
- Loading branch information
1 parent
973e7bd
commit 84d0462
Showing
6 changed files
with
442 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
import logging | ||
|
||
from django.conf import settings | ||
from django.contrib.auth import get_user_model | ||
from django.contrib.auth.models import Group, Permission | ||
from django.contrib.contenttypes.models import ContentType | ||
from django.utils.encoding import smart_str | ||
from django.utils.module_loading import import_string | ||
|
||
from mozilla_django_oidc.auth import OIDCAuthenticationBackend | ||
|
||
|
||
User = get_user_model() | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def username_from_email(email, claims=None): | ||
"""Generate a username for new users from their email address | ||
If this function is called, it's already tried to look up an email | ||
address to match a user and failed. But if the user portion of the email | ||
address already exists as a user, this function will try to find an | ||
unused username by appending a counter to the end.""" | ||
email_username = smart_str(email.split("@")[0]) | ||
|
||
username = email_username | ||
counter = 1 | ||
while User.objects.filter(username=username).exists(): | ||
username = email_username + str(counter) | ||
counter += 1 | ||
|
||
return username | ||
|
||
|
||
def process_roles_admin(user, roles): | ||
"""Adjust a user's is_superuser property based on OIDC roles claim""" | ||
user_modified = False | ||
|
||
if settings.OIDC_OP_ADMIN_ROLE is None: | ||
logger.debug("OIDC_OP_ADMIN_ROLE not set, not modifying is_superuser") | ||
return | ||
|
||
is_admin = settings.OIDC_OP_ADMIN_ROLE in roles | ||
|
||
if is_admin and not user.is_superuser: | ||
logger.debug(f"Setting is_superuser for {user.username}.") | ||
user.is_superuser = True | ||
user_modified = True | ||
elif not is_admin and user.is_superuser: | ||
logger.debug(f"Removing is_superuser from {user.username}.") | ||
user.is_superuser = False | ||
user_modified = True | ||
|
||
return user_modified | ||
|
||
|
||
def process_roles_user(user, roles): | ||
"""Adjust user group assignment based on OIDC roles""" | ||
user_modified = False | ||
|
||
if settings.OIDC_OP_USER_ROLE is None: | ||
logger.debug("OIDC_OP_USER_ROLE not set, not modifying user groups") | ||
return | ||
|
||
if settings.SSO_DEFAULT_GROUP is None: | ||
logger.debug("SSO_DEFAULT_GROUP not set, not modifying user groups") | ||
return | ||
|
||
# All SSO users will belong to a group that can log into the admin | ||
group = Group.objects.get_or_create(name=settings.SSO_DEFAULT_GROUP)[0] | ||
|
||
# Make sure this group has permission to log into the admin | ||
permission = Permission.objects.get( | ||
codename="access_admin", | ||
content_type=ContentType.objects.get( | ||
app_label="wagtailadmin", model="admin" | ||
), | ||
) | ||
if not group.permissions.contains(permission): | ||
group.permissions.add(permission) | ||
|
||
# Modify the user's membership in the group based on the roles claim | ||
is_user = settings.OIDC_OP_USER_ROLE in roles | ||
|
||
if is_user and not user.groups.contains(group): | ||
logger.debug(f"Adding {user.username} to {group.name}.") | ||
user.groups.add(group) | ||
user_modified = True | ||
elif not is_user and user.groups.contains(group): | ||
logger.debug(f"Removing {user.username} from {group.name}.") | ||
user.groups.remove(group) | ||
user_modified = True | ||
|
||
return user_modified | ||
|
||
|
||
def process_roles_claim(user, roles): | ||
"""Adjust is_superuser and user group assignment based on OIDC roles""" | ||
user_modified = process_roles_user(user, roles) | ||
user_modified = process_roles_admin(user, roles) or user_modified | ||
return user_modified | ||
|
||
|
||
def process_given_name_claim(user, given_name): | ||
"""Set a Django user's first_name to an OIDC given_name""" | ||
if user.first_name != given_name: | ||
user.first_name = given_name | ||
return True | ||
|
||
|
||
def process_family_name_claim(user, family_name): | ||
"""Set a Django user's last_name to an OIDC family_name""" | ||
if user.last_name != family_name: | ||
user.last_name = family_name | ||
return True | ||
|
||
|
||
class CFPBOIDCAuthenticationBackend(OIDCAuthenticationBackend): | ||
|
||
def get_userinfo(self, access_token, id_token, payload): | ||
# Roles are part of the parent payload here, and not the userinfo. | ||
# Preserve them if we can. | ||
roles = payload.get("roles", []) | ||
userinfo = super().get_userinfo(access_token, id_token, payload) | ||
userinfo["roles"] = roles | ||
return userinfo | ||
|
||
def process_claims(self, user, claims): | ||
"""Run a configured callable for each OIDC claim received | ||
If a claim processor modifies a user it should return True, and the | ||
user will be saved once all claims are processed.""" | ||
user_modified = False | ||
|
||
claim_processors = getattr(settings, "OIDC_CLAIMS_PROCESSORS", {}) | ||
for claim, processor_string in claim_processors.items(): | ||
if claim not in claims: | ||
continue | ||
|
||
processor = import_string(processor_string) | ||
processor_modified_user = processor(user, claims[claim]) | ||
user_modified = user_modified or processor_modified_user | ||
|
||
if user_modified: | ||
user.save() | ||
|
||
return user | ||
|
||
def update_user(self, user, claims): | ||
"""Update user access based on claims""" | ||
user = super().update_user(user, claims) | ||
return self.process_claims(user, claims) | ||
|
||
def create_user(self, claims): | ||
"""Return object for a newly created user account.""" | ||
user = super().create_user(claims) | ||
return self.update_user(user, claims) |
Oops, something went wrong.