-
-
Notifications
You must be signed in to change notification settings - Fork 562
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
- Loading branch information
Showing
6 changed files
with
166 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
"""Test Applications API""" | ||
from unittest.mock import MagicMock, patch | ||
|
||
from django.urls import reverse | ||
|
||
from authentik.core.models import Application | ||
from authentik.core.tests.utils import create_test_admin_user, create_test_tenant | ||
from authentik.flows.models import Flow, FlowDesignation | ||
from authentik.flows.tests import FlowTestCase | ||
from authentik.tenants.models import Tenant | ||
|
||
|
||
class TestApplicationsViews(FlowTestCase): | ||
"""Test applications Views""" | ||
|
||
def setUp(self) -> None: | ||
self.user = create_test_admin_user() | ||
self.allowed = Application.objects.create( | ||
name="allowed", slug="allowed", meta_launch_url="https://goauthentik.io/%(username)s" | ||
) | ||
|
||
def test_check_redirect(self): | ||
"""Test redirect""" | ||
empty_flow = Flow.objects.create( | ||
name="foo", | ||
slug="foo", | ||
designation=FlowDesignation.AUTHENTICATION, | ||
) | ||
tenant: Tenant = create_test_tenant() | ||
tenant.flow_authentication = empty_flow | ||
tenant.save() | ||
response = self.client.get( | ||
reverse( | ||
"authentik_core:application-launch", | ||
kwargs={"application_slug": self.allowed.slug}, | ||
), | ||
follow=True, | ||
) | ||
self.assertEqual(response.status_code, 200) | ||
with patch( | ||
"authentik.flows.stage.StageView.get_pending_user", MagicMock(return_value=self.user) | ||
): | ||
response = self.client.post( | ||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": empty_flow.slug}) | ||
) | ||
self.assertEqual(response.status_code, 200) | ||
self.assertStageRedirects(response, f"https://goauthentik.io/{self.user.username}") | ||
|
||
def test_check_redirect_auth(self): | ||
"""Test redirect""" | ||
self.client.force_login(self.user) | ||
empty_flow = Flow.objects.create( | ||
name="foo", | ||
slug="foo", | ||
designation=FlowDesignation.AUTHENTICATION, | ||
) | ||
tenant: Tenant = create_test_tenant() | ||
tenant.flow_authentication = empty_flow | ||
tenant.save() | ||
response = self.client.get( | ||
reverse( | ||
"authentik_core:application-launch", | ||
kwargs={"application_slug": self.allowed.slug}, | ||
), | ||
) | ||
self.assertEqual(response.status_code, 302) | ||
self.assertEqual(response.url, f"https://goauthentik.io/{self.user.username}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
"""app views""" | ||
from django.http import Http404, HttpRequest, HttpResponse, HttpResponseRedirect | ||
from django.shortcuts import get_object_or_404 | ||
from django.utils.translation import gettext_lazy as _ | ||
from django.views import View | ||
|
||
from authentik.core.models import Application | ||
from authentik.flows.challenge import ( | ||
ChallengeResponse, | ||
ChallengeTypes, | ||
HttpChallengeResponse, | ||
RedirectChallenge, | ||
) | ||
from authentik.flows.models import in_memory_stage | ||
from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, FlowPlanner | ||
from authentik.flows.stage import ChallengeStageView | ||
from authentik.flows.views.executor import SESSION_KEY_PLAN | ||
from authentik.lib.utils.urls import redirect_with_qs | ||
from authentik.stages.consent.stage import ( | ||
PLAN_CONTEXT_CONSENT_HEADER, | ||
PLAN_CONTEXT_CONSENT_PERMISSIONS, | ||
) | ||
from authentik.tenants.models import Tenant | ||
|
||
|
||
class RedirectToAppLaunch(View): | ||
"""Application launch view, redirect to the launch URL""" | ||
|
||
def dispatch(self, request: HttpRequest, application_slug: str) -> HttpResponse: | ||
app = get_object_or_404(Application, slug=application_slug) | ||
# Check here if the application has any launch URL set, if not 404 | ||
launch = app.get_launch_url() | ||
if not launch: | ||
raise Http404 | ||
# Check if we're authenticated already, saves us the flow run | ||
if request.user.is_authenticated: | ||
return HttpResponseRedirect(app.get_launch_url(request.user)) | ||
# otherwise, do a custom flow plan that includes the application that's | ||
# being accessed, to improve usability | ||
tenant: Tenant = request.tenant | ||
flow = tenant.flow_authentication | ||
planner = FlowPlanner(flow) | ||
planner.allow_empty_flows = True | ||
plan = planner.plan( | ||
request, | ||
{ | ||
PLAN_CONTEXT_APPLICATION: app, | ||
PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.") | ||
% {"application": app.name}, | ||
PLAN_CONTEXT_CONSENT_PERMISSIONS: [], | ||
}, | ||
) | ||
plan.insert_stage(in_memory_stage(RedirectToAppStage)) | ||
request.session[SESSION_KEY_PLAN] = plan | ||
return redirect_with_qs("authentik_core:if-flow", request.GET, flow_slug=flow.slug) | ||
|
||
|
||
class RedirectToAppStage(ChallengeStageView): | ||
"""Final stage to be inserted after the user logs in""" | ||
|
||
def get_challenge(self, *args, **kwargs) -> RedirectChallenge: | ||
app = self.executor.plan.context[PLAN_CONTEXT_APPLICATION] | ||
launch = app.get_launch_url(self.get_pending_user()) | ||
# sanity check to ensure launch is still set | ||
if not launch: | ||
raise Http404 | ||
return RedirectChallenge( | ||
instance={ | ||
"type": ChallengeTypes.REDIRECT.value, | ||
"to": launch, | ||
} | ||
) | ||
|
||
def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: | ||
return HttpChallengeResponse(self.get_challenge()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters