Skip to content

Commit

Permalink
Merge pull request #179 from duoi/class-based-views
Browse files Browse the repository at this point in the history
Class-based Login, Logout and Callback views, plus successful_login overridable method
  • Loading branch information
mingchen committed Oct 29, 2018
2 parents c257704 + 7348df3 commit 66bda8b
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 183 deletions.
3 changes: 3 additions & 0 deletions README.rst
Expand Up @@ -16,6 +16,7 @@ Features

- Supports CAS_ versions 1.0, 2.0 and 3.0.
- Support Single Sign Out
- Supports Token auth schemes
- Can fetch Proxy Granting Ticket
- Supports Django 1.5, 1.6, 1.7, 1.8, 1.9, 1.10, 1.11 and 2.0
- Supports using a `User custom model`_
Expand Down Expand Up @@ -155,6 +156,8 @@ your URL mappings:
url(r'^accounts/login$', django_cas_ng.views.login, name='cas_ng_login'),
url(r'^accounts/logout$', django_cas_ng.views.logout, name='cas_ng_logout'),
If you use the middleware, the ``login`` url must given the name ``cas_ng_login`` or it will create redirection issues.

You should also add an URL mapping for the ``CAS_PROXY_CALLBACK`` settings:

.. code-block:: python
Expand Down
19 changes: 16 additions & 3 deletions django_cas_ng/middleware.py
Expand Up @@ -9,7 +9,17 @@
from django.core.exceptions import PermissionDenied
from django.conf import settings
from django.contrib.auth import REDIRECT_FIELD_NAME
from django.contrib.auth.views import login, logout
try:
from django.contrib.auth.views import (
LoginView as login,
LogoutView as logout
)
except ImportError:
from django.contrib.auth.views import (
login,
logout
)

try:
# Django > 1.10 deprecates django.core.urlresolvers
from django.urls import reverse
Expand All @@ -25,7 +35,10 @@

import django

from .views import login as cas_login, logout as cas_logout
from .views import (
LoginView as cas_login,
LogoutView as cas_logout
)

__all__ = ['CASMiddleware']

Expand Down Expand Up @@ -73,4 +86,4 @@ def process_view(self, request, view_func, view_args, view_kwargs):
else:
raise PermissionDenied(_('You do not have staff privileges.'))
params = urllib_parse.urlencode({REDIRECT_FIELD_NAME: request.get_full_path()})
return HttpResponseRedirect(reverse(cas_login) + '?' + params)
return HttpResponseRedirect(reverse('cas_ng_login') + '?' + params)
272 changes: 159 additions & 113 deletions django_cas_ng/views.py
Expand Up @@ -6,12 +6,19 @@
import sys
import types

from django.utils.decorators import method_decorator
from django.utils.six.moves import urllib_parse
from django.conf import settings
from django.http import HttpResponseRedirect
from django.core.exceptions import PermissionDenied
from django.http import HttpResponse
from django.utils import timezone

try:
from django.views import View
except ImportError:
from django.views.generic import View

from django.views.decorators.csrf import csrf_exempt
from django.contrib.auth import (
logout as auth_logout,
Expand All @@ -20,7 +27,6 @@
)
from django.contrib import messages
from django.utils.translation import ugettext_lazy as _
from django.views.decorators.http import require_http_methods

from importlib import import_module

Expand All @@ -34,134 +40,174 @@
get_protocol, get_redirect_url,
get_user_from_session)

__all__ = ['login', 'logout', 'callback']
__all__ = ['LoginView', 'LogoutView', 'CallbackView']


@csrf_exempt
@require_http_methods(["GET", "POST"])
def login(request, next_page=None, required=False):
"""Forwards to CAS login URL or verifies CAS ticket"""
service_url = get_service_url(request, next_page)
client = get_cas_client(service_url=service_url, request=request)
class LoginView(View):
@method_decorator(csrf_exempt)
def dispatch(self, request, *args, **kwargs):
return super(LoginView, self).dispatch(request, *args, **kwargs)

if not next_page and settings.CAS_STORE_NEXT and 'CASNEXT' in request.session:
next_page = request.session['CASNEXT']
del request.session['CASNEXT']
def successful_login(self, request, next_page):
"""
This method is called on successful login. Override this method for
custom post-auth actions (i.e, to add a cookie with a token).
if not next_page:
next_page = get_redirect_url(request)

if request.method == 'POST' and request.POST.get('logoutRequest'):
clean_sessions(client, request)
:param request:
:param next_page:
:return:
"""
return HttpResponseRedirect(next_page)

# backward compability for django < 2.0
is_user_authenticated = False
def post(self, request):
if request.POST.get('logoutRequest'):
next_page = request.POST.get('next', settings.CAS_REDIRECT_URL)
service_url = get_service_url(request, next_page)
client = get_cas_client(service_url=service_url, request=request)

clean_sessions(client, request)
return HttpResponseRedirect(next_page)

if sys.version_info >= (3, 0):
bool_type = bool
else:
bool_type = types.BooleanType
def get(self, request):
"""
Forwards to CAS login URL or verifies CAS ticket
if isinstance(request.user.is_authenticated, bool_type):
is_user_authenticated = request.user.is_authenticated
else:
is_user_authenticated = request.user.is_authenticated()
:param request:
:return:
"""
next_page = request.GET.get('next')
required = request.GET.get('required', False)

if is_user_authenticated:
if settings.CAS_LOGGED_MSG is not None:
message = settings.CAS_LOGGED_MSG % request.user.get_username()
messages.success(request, message)
return HttpResponseRedirect(next_page)
service_url = get_service_url(request, next_page)
client = get_cas_client(service_url=service_url, request=request)

ticket = request.GET.get('ticket')
if ticket:
user = authenticate(ticket=ticket,
service=service_url,
request=request)
pgtiou = request.session.get("pgtiou")
if user is not None:
if not request.session.exists(request.session.session_key):
request.session.create()
auth_login(request, user)
SessionTicket.objects.create(
session_key=request.session.session_key,
ticket=ticket
)
if not next_page and settings.CAS_STORE_NEXT and 'CASNEXT' in request.session:
next_page = request.session['CASNEXT']
del request.session['CASNEXT']

if not next_page:
next_page = get_redirect_url(request)

if pgtiou and settings.CAS_PROXY_CALLBACK:
# Delete old PGT
ProxyGrantingTicket.objects.filter(
user=user,
session_key=request.session.session_key
).delete()
# Set new PGT ticket
try:
pgt = ProxyGrantingTicket.objects.get(pgtiou=pgtiou)
pgt.user = user
pgt.session_key = request.session.session_key
pgt.save()
except ProxyGrantingTicket.DoesNotExist:
pass

if settings.CAS_LOGIN_MSG is not None:
name = user.get_username()
message = settings.CAS_LOGIN_MSG % name
# backward compability for django < 2.0
is_user_authenticated = False

if sys.version_info >= (3, 0):
bool_type = bool
else:
bool_type = types.BooleanType

if isinstance(request.user.is_authenticated, bool_type):
is_user_authenticated = request.user.is_authenticated
else:
is_user_authenticated = request.user.is_authenticated()

if is_user_authenticated:
if settings.CAS_LOGGED_MSG is not None:
message = settings.CAS_LOGGED_MSG % request.user.get_username()
messages.success(request, message)
return HttpResponseRedirect(next_page)
elif settings.CAS_RETRY_LOGIN or required:
return HttpResponseRedirect(client.get_login_url())
return self.successful_login(request=request, next_page=next_page)

ticket = request.GET.get('ticket')
if ticket:
user = authenticate(ticket=ticket,
service=service_url,
request=request)
pgtiou = request.session.get("pgtiou")
if user is not None:
if not request.session.exists(request.session.session_key):
request.session.create()
auth_login(request, user)
SessionTicket.objects.create(
session_key=request.session.session_key,
ticket=ticket
)

if pgtiou and settings.CAS_PROXY_CALLBACK:
# Delete old PGT
ProxyGrantingTicket.objects.filter(
user=user,
session_key=request.session.session_key
).delete()
# Set new PGT ticket
try:
pgt = ProxyGrantingTicket.objects.get(pgtiou=pgtiou)
pgt.user = user
pgt.session_key = request.session.session_key
pgt.save()
except ProxyGrantingTicket.DoesNotExist:
pass

if settings.CAS_LOGIN_MSG is not None:
name = user.get_username()
message = settings.CAS_LOGIN_MSG % name
messages.success(request, message)
return self.successful_login(request=request, next_page=next_page)
elif settings.CAS_RETRY_LOGIN or required:
return HttpResponseRedirect(client.get_login_url())
else:
raise PermissionDenied(_('Login failed.'))
else:
raise PermissionDenied(_('Login failed.'))
else:
if settings.CAS_STORE_NEXT:
request.session['CASNEXT'] = next_page
return HttpResponseRedirect(client.get_login_url())


@require_http_methods(["GET"])
def logout(request, next_page=None):
"""Redirects to CAS logout page"""
# try to find the ticket matching current session for logout signal
try:
st = SessionTicket.objects.get(session_key=request.session.session_key)
ticket = st.ticket
except SessionTicket.DoesNotExist:
ticket = None
# send logout signal
cas_user_logout.send(
sender="manual",
user=request.user,
session=request.session,
ticket=ticket,
)
auth_logout(request)
# clean current session ProxyGrantingTicket and SessionTicket
ProxyGrantingTicket.objects.filter(session_key=request.session.session_key).delete()
SessionTicket.objects.filter(session_key=request.session.session_key).delete()
next_page = next_page or get_redirect_url(request)
if settings.CAS_LOGOUT_COMPLETELY:
protocol = get_protocol(request)
host = request.get_host()
redirect_url = urllib_parse.urlunparse(
(protocol, host, next_page, '', '', ''),
if settings.CAS_STORE_NEXT:
request.session['CASNEXT'] = next_page
return HttpResponseRedirect(client.get_login_url())


class LogoutView(View):
def get(self, request):
"""
Redirects to CAS logout page
:param request:
:return:
"""
next_page = request.GET.get('next')

# try to find the ticket matching current session for logout signal
try:
st = SessionTicket.objects.get(session_key=request.session.session_key)
ticket = st.ticket
except SessionTicket.DoesNotExist:
ticket = None
# send logout signal
cas_user_logout.send(
sender="manual",
user=request.user,
session=request.session,
ticket=ticket,
)
client = get_cas_client(request=request)
return HttpResponseRedirect(client.get_logout_url(redirect_url))
else:
# This is in most cases pointless if not CAS_RENEW is set. The user will
# simply be logged in again on next request requiring authorization.
return HttpResponseRedirect(next_page)
auth_logout(request)
# clean current session ProxyGrantingTicket and SessionTicket
ProxyGrantingTicket.objects.filter(session_key=request.session.session_key).delete()
SessionTicket.objects.filter(session_key=request.session.session_key).delete()
next_page = next_page or get_redirect_url(request)
if settings.CAS_LOGOUT_COMPLETELY:
protocol = get_protocol(request)
host = request.get_host()
redirect_url = urllib_parse.urlunparse(
(protocol, host, next_page, '', '', ''),
)
client = get_cas_client(request=request)
return HttpResponseRedirect(client.get_logout_url(redirect_url))
else:
# This is in most cases pointless if not CAS_RENEW is set. The user will
# simply be logged in again on next request requiring authorization.
return HttpResponseRedirect(next_page)


@csrf_exempt
@require_http_methods(["GET", "POST"])
def callback(request):
"""Read PGT and PGTIOU sent by CAS"""
if request.method == 'POST' and request.POST.get('logoutRequest'):
clean_sessions(get_cas_client(request=request), request)
return HttpResponse("{0}\n".format(_('ok')), content_type="text/plain")
elif request.method == 'GET':
class CallbackView(View):
"""
Read PGT and PGTIOU sent by CAS
"""
@method_decorator(csrf_exempt)
def dispatch(self, request, *args, **kwargs):
return super(CallbackView, self).dispatch(request, *args, **kwargs)

def post(self, request):
if request.POST.get('logoutRequest'):
clean_sessions(get_cas_client(request=request), request)
return HttpResponse("{0}\n".format(_('ok')), content_type="text/plain")

def get(self, request):
pgtid = request.GET.get('pgtId')
pgtiou = request.GET.get('pgtIou')
pgt = ProxyGrantingTicket.objects.create(pgtiou=pgtiou, pgt=pgtid)
Expand Down
4 changes: 2 additions & 2 deletions tests/test_middleware.py
Expand Up @@ -22,7 +22,7 @@ def test_root_as_cas_admin_prefix_with_cas_login(monkeypatch, settings):
lambda func: "/login/")
settings.CAS_ADMIN_PREFIX = "/"
response = _process_view_with_middleware(
CASMiddleware, '/login/', views.login)
CASMiddleware, '/login/', views.LoginView)
assert response is None


Expand All @@ -31,5 +31,5 @@ def test_root_as_cas_admin_prefix_with_cas_logout(monkeypatch, settings):
lambda func: "/login/")
settings.CAS_ADMIN_PREFIX = "/"
response = _process_view_with_middleware(
CASMiddleware, '/logout/', views.logout)
CASMiddleware, '/logout/', views.LogoutView)
assert response is None

0 comments on commit 66bda8b

Please sign in to comment.