Skip to content

Commit

Permalink
assign users to groups from OIDC claims
Browse files Browse the repository at this point in the history
  • Loading branch information
jeriox committed Oct 23, 2023
1 parent 2f1731c commit 768acea
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Generated by Django 4.2.6 on 2023-10-23 20:14

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("core", "0023_remove_userprofile_first_name_and_more"),
]

operations = [
migrations.AddField(
model_name="identityprovider",
name="create_missing_groups",
field=models.BooleanField(
default=False,
help_text="If enabled, groups from the claim defined above that do not exist yet will be created automatically.",
verbose_name="create missing groups",
),
),
migrations.AddField(
model_name="identityprovider",
name="group_claim",
field=models.CharField(
blank=True,
help_text="The name of the claim that contains the user's groups. Leave empty if your provider does not support this. You can use dot notation to access nested claims.",
max_length=254,
null=True,
verbose_name="group claim",
),
),
]
16 changes: 16 additions & 0 deletions ephios/core/models/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,22 @@ class IdentityProvider(Model):
verbose_name=_("default groups"),
help_text=_("The groups that users logging in with this provider will be added to."),
)
group_claim = models.CharField(
max_length=254,
blank=True,
null=True,
verbose_name=_("group claim"),
help_text=_(
"The name of the claim that contains the user's groups. Leave empty if your provider does not support this. You can use dot notation to access nested claims."
),
)
create_missing_groups = models.BooleanField(
default=False,
verbose_name=_("create missing groups"),
help_text=_(
"If enabled, groups from the claim defined above that do not exist yet will be created automatically."
),
)

def __str__(self):
return _("Identity provider {label}").format(label=self.label)
5 changes: 5 additions & 0 deletions ephios/core/views/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class IdentityProviderCreateView(StaffRequiredMixin, SuccessMessageMixin, Create
"client_secret",
"scopes",
"default_groups",
"group_claim",
"create_missing_groups",
"authorization_endpoint",
"token_endpoint",
"userinfo_endpoint",
Expand Down Expand Up @@ -155,13 +157,16 @@ class IdentityProviderUpdateView(StaffRequiredMixin, UpdateView):
"client_secret",
"scopes",
"default_groups",
"group_claim",
"create_missing_groups",
"authorization_endpoint",
"token_endpoint",
"userinfo_endpoint",
"end_session_endpoint",
"jwks_uri",
]
success_url = reverse_lazy("core:settings_idp_list")
success_message = _("Identity provider saved.")


class IdentityProviderDeleteView(StaffRequiredMixin, DeleteView):
Expand Down
21 changes: 20 additions & 1 deletion ephios/extra/auth.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from datetime import date
from functools import reduce
from typing import Any, Dict
from urllib.parse import urljoin

import jwt
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import ModelBackend
from django.contrib.auth.models import Group
from django.core.exceptions import SuspiciousOperation
from django.urls import reverse
from jwt import InvalidTokenError
Expand Down Expand Up @@ -42,7 +44,24 @@ def update_user(self, user, claims):
except ValueError:
pass
user.save()
if hasattr(self, "provider") and self.provider.default_groups.exists():
if self.provider.group_claim:
groups = []
groups_in_claims = (
reduce(
lambda d, key: d.get(key, None) if isinstance(d, dict) else None,
self.provider.group_claim.split("."),
claims,
)
or []
)
for group_name in groups_in_claims:
try:
groups.append(Group.objects.get(name__iexact=group_name))
except Group.DoesNotExist:
if self.provider.create_missing_groups:
groups.append(Group.objects.create(name=group_name))
user.groups.set(groups)
elif self.provider.default_groups.exists():
user.groups.add(*self.provider.default_groups.all())
return user

Expand Down
53 changes: 53 additions & 0 deletions tests/core/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest.mock import patch

from django.conf import settings
from django.contrib.auth.models import Group
from django.http import HttpRequest
from django.urls import reverse

Expand Down Expand Up @@ -64,3 +65,55 @@ def test_oidc_callback(MockEphiosOIDCAB, django_app, oidc_client, volunteer):
MockEphiosOIDCAB().authenticate.return_value = volunteer
response = django_app.get(reverse("core:oidc_callback"), params={"code": "123", "state": "123"})
assert response.status_code == 302


def test_assign_default_groups(oidc_client, groups, volunteer):
from ephios.extra.auth import EphiosOIDCAB

managers, planners, volunteers = groups
claims = {"email": volunteer.email}
oidc_client.default_groups.set([managers])
assert not volunteer.groups.filter(pk=managers.pk).exists()
backend = EphiosOIDCAB()
backend.provider = oidc_client
volunteer = backend.update_user(volunteer, claims)
assert volunteer.groups.filter(pk=managers.pk).exists()


def test_assign_groups_from_oidc_simple(oidc_client, groups, volunteer):
from ephios.extra.auth import EphiosOIDCAB

managers, planners, volunteers = groups
claims = {"email": volunteer.email, "roles": [managers.name]}
oidc_client.group_claim = "roles"
assert not volunteer.groups.filter(pk=managers.pk).exists()
backend = EphiosOIDCAB()
backend.provider = oidc_client
volunteer = backend.update_user(volunteer, claims)
assert set(volunteer.groups.all()) == {managers}


def test_assign_groups_from_oidc_nested(oidc_client, groups, volunteer):
from ephios.extra.auth import EphiosOIDCAB

managers, planners, volunteers = groups
claims = {"email": volunteer.email, "nested": {"roles": {"inside": [managers.name]}}}
oidc_client.group_claim = "nested.roles.inside"
assert not volunteer.groups.filter(pk=managers.pk).exists()
backend = EphiosOIDCAB()
backend.provider = oidc_client
volunteer = backend.update_user(volunteer, claims)
assert set(volunteer.groups.all()) == {managers}


def test_create_groups_from_oidc(oidc_client, volunteer):
from ephios.extra.auth import EphiosOIDCAB

claims = {"email": volunteer.email, "roles": ["test"]}
oidc_client.group_claim = "roles"
oidc_client.create_missing_groups = True
assert not Group.objects.filter(name="test").exists()
backend = EphiosOIDCAB()
backend.provider = oidc_client
volunteer = backend.update_user(volunteer, claims)
assert volunteer.groups.filter(name="test").exists()

0 comments on commit 768acea

Please sign in to comment.