Skip to content

Commit

Permalink
feat: include UnwillingToPerformError as possible exception
Browse files Browse the repository at this point in the history
  • Loading branch information
Samir Musali committed Jun 21, 2023
1 parent 17fbba2 commit 1a75cb6
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 4 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -166,6 +166,7 @@ dmypy.json
# SageMath parsed files

# Environments
**/.DS_Store

# Spyder project settings

Expand Down
6 changes: 3 additions & 3 deletions authentik/sources/ldap/password.py
Expand Up @@ -4,7 +4,7 @@
from typing import Optional

from ldap3 import BASE
from ldap3.core.exceptions import LDAPAttributeError
from ldap3.core.exceptions import LDAPAttributeError, LDAPUnwillingToPerformResult
from structlog.stdlib import get_logger

from authentik.core.models import User
Expand Down Expand Up @@ -69,7 +69,7 @@ def check_ad_password_complexity_enabled(self) -> bool:
attributes=["pwdProperties"],
)
root_attrs = list(root_attrs)[0]
except (LDAPAttributeError, KeyError, IndexError):
except (LDAPAttributeError, LDAPUnwillingToPerformResult, KeyError, IndexError):

Check warning on line 72 in authentik/sources/ldap/password.py

View check run for this annotation

Codecov / codecov/patch

authentik/sources/ldap/password.py#L72

Added line #L72 was not covered by tests
return False
raw_pwd_properties = root_attrs.get("attributes", {}).get("pwdProperties", None)
if not raw_pwd_properties:
Expand All @@ -92,7 +92,7 @@ def change_password(self, user: User, password: str):
return
try:
self._connection.extend.microsoft.modify_password(user_dn, password)
except LDAPAttributeError:
except (LDAPAttributeError, LDAPUnwillingToPerformResult):

Check warning on line 95 in authentik/sources/ldap/password.py

View check run for this annotation

Codecov / codecov/patch

authentik/sources/ldap/password.py#L95

Added line #L95 was not covered by tests
self._connection.extend.standard.modify_password(user_dn, new_password=password)

def _ad_check_password_existing(self, password: str, user_dn: str) -> bool:
Expand Down
82 changes: 81 additions & 1 deletion tests/e2e/test_source_ldap_samba.py
@@ -1,8 +1,9 @@
"""test LDAP Source"""
from typing import Any, Optional
from unittest.mock import patch

from django.db.models import Q
from ldap3.core.exceptions import LDAPSessionTerminatedByServerError
from ldap3.core.exceptions import LDAPAttributeError, LDAPSessionTerminatedByServerError, LDAPUnwillingToPerformResult

from authentik.blueprints.tests import apply_blueprint
from authentik.core.models import Group, User
Expand Down Expand Up @@ -164,3 +165,82 @@ def test_sync_password(self):
user.refresh_from_db()
# Since password in samba was checked, it should be invalidated here too
self.assertFalse(user.has_usable_password())

@retry(exceptions=[LDAPSessionTerminatedByServerError])
@apply_blueprint("system/sources-ldap.yaml")
def test_change_password_nonexistent_user(self):
"""Test that an LDAPAttributeError is raised when trying to change the password of a nonexistent user"""
# Setup similar to the previous test
source = LDAPSource.objects.create(
name=generate_id(),
slug=generate_id(),
server_uri="ldap://localhost",
bind_cn="administrator@test.goauthentik.io",
bind_password=self.admin_password,
base_dn="dc=test,dc=goauthentik,dc=io",
additional_user_dn="ou=users",
additional_group_dn="ou=groups",
)
source.property_mappings.set(
LDAPPropertyMapping.objects.filter(
Q(managed__startswith="goauthentik.io/sources/ldap/default-")
| Q(managed__startswith="goauthentik.io/sources/ldap/ms-")
)
)
source.property_mappings_group.set(
LDAPPropertyMapping.objects.filter(name="goauthentik.io/sources/ldap/default-name")
)

# Attempt to change the password of a nonexistent user
password_changer = LDAPPasswordChanger(source)
user = User(username="nonexistent-user")
password = generate_id()

# Expect an LDAPAttributeError to be raised
with self.assertRaises(LDAPAttributeError):
password_changer.change_password(user, password)

@retry(exceptions=[LDAPSessionTerminatedByServerError])
@apply_blueprint(
"system/sources-ldap.yaml",
)
def test_change_password_unwilling_to_perform_result(self):
"""Test password change with an LDAPUnwillingToPerformResult exception"""
source = LDAPSource.objects.create(
name=generate_id(),
slug=generate_id(),
server_uri="ldap://localhost",
bind_cn="administrator@test.goauthentik.io",
bind_password=self.admin_password,
base_dn="dc=test,dc=goauthentik,dc=io",
additional_user_dn="ou=users",
additional_group_dn="ou=groups",
)
source.property_mappings.set(
LDAPPropertyMapping.objects.filter(
Q(managed__startswith="goauthentik.io/sources/ldap/default-")
| Q(managed__startswith="goauthentik.io/sources/ldap/ms-")
)
)
source.property_mappings_group.set(
LDAPPropertyMapping.objects.filter(name="goauthentik.io/sources/ldap/default-name")
)
UserLDAPSynchronizer(source).sync()
username = "bob"
password = generate_id()
result = self.container.exec_run(
["samba-tool", "user", "setpassword", username, "--newpassword", password]
)
self.assertEqual(result.exit_code, 0)
user: User = User.objects.get(username=username)

# Mock the connection's extend.microsoft.modify_password to raise LDAPUnwillingToPerformResult
with patch.object(source.connection().extend.microsoft, "modify_password", side_effect=LDAPUnwillingToPerformResult("Test exception")):
# Mock the connection's extend.standard.modify_password to track if it is called
with patch.object(source.connection().extend.standard, "modify_password") as mock_standard_modify:
# Attempt password change
changer = LDAPPasswordChanger(source)
changer.change_password(user, password)

# Check if standard modify_password method was called
mock_standard_modify.assert_called_once_with(user.attributes.get(LDAP_DISTINGUISHED_NAME, None), new_password=password)

0 comments on commit 1a75cb6

Please sign in to comment.