Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Updated cas version from https://github.com/kstateome/django-cas

Replied admin only hack
  • Loading branch information...
commit e964e115d227c0ceb979dd03950a3eed02d546cb 1 parent 9afb000
David authored
View
4 ecwsp/django_cas/__init__.py → cas/__init__.py 100755 → 100644
@@ -13,6 +13,10 @@
'CAS_RETRY_LOGIN': False,
'CAS_SERVER_URL': None,
'CAS_VERSION': '2',
+ 'CAS_GATEWAY': False,
+ 'CAS_PROXY_CALLBACK': None,
+ 'CAS_RESPONSE_CALLBACKS': None,
+ 'CAS_CUSTOM_FORBIDDEN':None
}
for key, value in _DEFAULTS.iteritems():
View
61 ecwsp/django_cas/backends.py → cas/backends.py 100755 → 100644
@@ -4,8 +4,9 @@
from urlparse import urljoin
from django.conf import settings
-
-from django_cas.models import User
+from django.core.exceptions import ObjectDoesNotExist
+from cas.models import User, Tgt, PgtIOU
+from cas.utils import cas_response_callbacks
__all__ = ['CASBackend']
@@ -40,14 +41,28 @@ def _verify_cas2(ticket, service):
except ImportError:
from elementtree import ElementTree
- params = {'ticket': ticket, 'service': service}
+ if settings.CAS_PROXY_CALLBACK:
+ params = {'ticket': ticket, 'service': service, 'pgtUrl': settings.CAS_PROXY_CALLBACK}
+ else:
+ params = {'ticket': ticket, 'service': service}
+
url = (urljoin(settings.CAS_SERVER_URL, 'proxyValidate') + '?' +
urlencode(params))
+
page = urlopen(url)
try:
response = page.read()
tree = ElementTree.fromstring(response)
+
+ #Useful for debugging
+ #from xml.dom.minidom import parseString
+ #from xml.etree import ElementTree
+ #txt = ElementTree.tostring(tree)
+ #print parseString(txt).toprettyxml()
+
if tree[0].tag.endswith('authenticationSuccess'):
+ if settings.CAS_RESPONSE_CALLBACKS:
+ cas_response_callbacks(tree)
return tree[0][0].text
else:
return None
@@ -55,6 +70,39 @@ def _verify_cas2(ticket, service):
page.close()
+def verify_proxy_ticket(ticket, service):
+ """Verifies CAS 2.0+ XML-based proxy ticket.
+
+ Returns username on success and None on failure.
+ """
+
+ try:
+ from xml.etree import ElementTree
+ except ImportError:
+ from elementtree import ElementTree
+
+ params = {'ticket': ticket, 'service': service}
+
+ url = (urljoin(settings.CAS_SERVER_URL, 'proxyValidate') + '?' +
+ urlencode(params))
+
+ page = urlopen(url)
+
+ try:
+ response = page.read()
+ tree = ElementTree.fromstring(response)
+ if tree[0].tag.endswith('authenticationSuccess'):
+ username = tree[0][0].text
+ proxies = []
+ if len(tree[0]) > 1:
+ for element in tree[0][1]:
+ proxies.append(element.text)
+ return {"username": username, "proxies": proxies}
+ else:
+ return None
+ finally:
+ page.close()
+
_PROTOCOLS = {'1': _verify_cas1, '2': _verify_cas2}
if settings.CAS_VERSION not in _PROTOCOLS:
@@ -66,8 +114,13 @@ def _verify_cas2(ticket, service):
class CASBackend(object):
"""CAS authentication backend"""
+ supports_object_permissions = False
+ supports_inactive_user = False
+
def authenticate(self, ticket, service):
- """Verifies CAS ticket and gets or creates User object"""
+ """Verifies CAS ticket and gets or creates User object
+ NB: Use of PT to identify proxy
+ """
username = _verify(ticket, service)
if not username:
View
39 ecwsp/django_cas/decorators.py → cas/decorators.py 100755 → 100644
@@ -5,6 +5,7 @@
except ImportError:
from django.utils.functional import wraps
+from urllib import urlencode
from django.contrib.auth import REDIRECT_FIELD_NAME
from django.contrib.auth.decorators import login_required
from django.http import HttpResponseForbidden, HttpResponseRedirect
@@ -43,3 +44,41 @@ def permission_required(perm, login_url=None):
"""
return user_passes_test(lambda u: u.has_perm(perm), login_url=login_url)
+
+
+from django.conf import settings
+from django.core.exceptions import ImproperlyConfigured
+
+def gateway():
+ """Authenticates single sign on session if ticket is available,
+ but doesn't redirect to sign in url otherwise.
+ """
+ if settings.CAS_GATEWAY == False:
+ raise ImproperlyConfigured('CAS_GATEWAY must be set to True')
+ def wrap(func):
+ def wrapped_f(*args):
+
+ from cas.views import login
+ request = args[0]
+
+ if request.user.is_authenticated():
+ #Is Authed, fine
+ pass
+ else:
+ path_with_params = request.path + '?' + urlencode(request.GET.copy())
+ if request.GET.get('ticket'):
+ #Not Authed, but have a ticket!
+ #Try to authenticate
+ return login(request, path_with_params, False, True)
+ else:
+ #Not Authed, but no ticket
+ gatewayed = request.GET.get('gatewayed')
+ if gatewayed == 'true':
+ pass
+ else:
+ #Not Authed, try to authenticate
+ return login(request, path_with_params, False, True)
+
+ return func(*args)
+ return wrapped_f
+ return wrap
View
9 cas/exceptions.py
@@ -0,0 +1,9 @@
+"CasTicketException, CasConfigException"
+from django.core.exceptions import ValidationError
+
+class CasTicketException(ValidationError):
+ """The ticket fails to validate"""
+
+class CasConfigException(ValidationError):
+ """The config is wrong"""
+
View
37 ecwsp/django_cas/middleware.py → cas/middleware.py 100755 → 100644
@@ -2,13 +2,16 @@
from urllib import urlencode
-from django.http import HttpResponseRedirect, HttpResponseForbidden
from django.conf import settings
from django.contrib.auth import REDIRECT_FIELD_NAME
+from django.contrib.auth import logout as do_logout
from django.contrib.auth.views import login, logout
from django.core.urlresolvers import reverse
+from django.http import HttpResponseRedirect, HttpResponseForbidden
+from django.core.exceptions import ImproperlyConfigured
-from django_cas.views import login as cas_login, logout as cas_logout
+from cas.exceptions import CasTicketException
+from cas.views import login as cas_login, logout as cas_logout
__all__ = ['CASMiddleware']
@@ -29,13 +32,15 @@ def process_view(self, request, view_func, view_args, view_kwargs):
login URL, as well as calls to django.contrib.auth.views.login and
logout.
"""
+
try:
- next = request.GET['next'][:6]
+ next = request.GET['next'][:6]
except:
- next = False
+ next = False
+
if view_func == login and next == "/admin":
return cas_login(request, *view_args, **view_kwargs)
- elif str(view_func)[:16] == str(logout)[:16]:
+ elif view_func == logout:
return cas_logout(request, *view_args, **view_kwargs)
if settings.CAS_ADMIN_PREFIX:
@@ -49,7 +54,27 @@ def process_view(self, request, view_func, view_args, view_kwargs):
return None
else:
error = ('<h1>Forbidden</h1><p>You do not have staff '
- 'privileges. Click <a href="/accounts/logout"> here to log out.<a/></p>')
+ 'privileges.</p>')
return HttpResponseForbidden(error)
params = urlencode({REDIRECT_FIELD_NAME: request.get_full_path()})
return HttpResponseRedirect(reverse(cas_login) + '?' + params)
+
+ def process_exception(self, request, exception):
+ """When we get a CasTicketException, that is probably caused by the ticket timing out.
+ So logout/login and get the same page again."""
+ if isinstance(exception, CasTicketException):
+ do_logout(request)
+ # This assumes that request.path requires authentication.
+ return HttpResponseRedirect(request.path)
+ else:
+ return None
+
+class ProxyMiddleware(object):
+
+ # Middleware used to "fake" the django app that it lives at the Proxy Domain
+ def process_request(self, request):
+ proxy = getattr(settings, 'PROXY_DOMAIN', None)
+ if not proxy:
+ raise ImproperlyConfigured('To use Proxy Middleware you must set a PROXY_DOMAIN setting.')
+ else:
+ request.META['HTTP_HOST'] = proxy
View
71 cas/models.py
@@ -0,0 +1,71 @@
+from urlparse import urljoin
+from urllib import urlencode, urlopen
+from django.db import models
+from django.conf import settings
+from django.contrib.auth.models import User
+from django.core.exceptions import ObjectDoesNotExist
+from cas.exceptions import CasTicketException, CasConfigException
+# Ed Crewe - add in signals to delete old tickets
+from django.db.models.signals import post_save
+from datetime import datetime
+
+class Tgt(models.Model):
+ username = models.CharField(max_length = 255, unique = True)
+ tgt = models.CharField(max_length = 255)
+
+ def get_proxy_ticket_for(self, service):
+ """Verifies CAS 2.0+ XML-based authentication ticket.
+
+ Returns username on success and None on failure.
+ """
+ if not settings.CAS_PROXY_CALLBACK:
+ raise CasConfigException("No proxy callback set in settings")
+
+ try:
+ from xml.etree import ElementTree
+ except ImportError:
+ from elementtree import ElementTree
+
+ params = {'pgt': self.tgt, 'targetService': service}
+
+ url = (urljoin(settings.CAS_SERVER_URL, 'proxy') + '?' +
+ urlencode(params))
+
+ page = urlopen(url)
+
+ try:
+ response = page.read()
+ tree = ElementTree.fromstring(response)
+ if tree[0].tag.endswith('proxySuccess'):
+ return tree[0][0].text
+ else:
+ raise CasTicketException("Failed to get proxy ticket")
+ finally:
+ page.close()
+
+class PgtIOU(models.Model):
+ """ Proxy granting ticket and IOU """
+ pgtIou = models.CharField(max_length = 255, unique = True)
+ tgt = models.CharField(max_length = 255)
+ created = models.DateTimeField(auto_now = True)
+
+def get_tgt_for(user):
+ if not settings.CAS_PROXY_CALLBACK:
+ raise CasConfigException("No proxy callback set in settings")
+
+ try:
+ return Tgt.objects.get(username = user.username)
+ except ObjectDoesNotExist:
+ raise CasTicketException("no ticket found for user " + user.username)
+
+def delete_old_tickets(**kwargs):
+ """ Delete tickets if they are over 2 days old
+ kwargs = ['raw', 'signal', 'instance', 'sender', 'created']
+ """
+ sender = kwargs.get('sender', None)
+ now = datetime.now()
+ expire = datetime(now.year, now.month, now.day - 2)
+ sender.objects.filter(created__lt=expire).delete()
+
+post_save.connect(delete_old_tickets, sender=PgtIOU)
+#post_save.connect(delete_old_tickets, sender=Tgt)
View
334 cas/tests/cas_tests.py
@@ -0,0 +1,334 @@
+# Shell python script to test a live SSO set up - Ed Crewe 26 Nov 2010
+# It can be really fiddly testing out SSO proxy auth via typing in URLs etc
+# see Dave Spencer's guide at https://wiki.jasig.org/display/CAS/Proxy+CAS+Walkthrough
+# This does script does it for you against the deployed servers
+
+# Run via python 2.4 or above ...
+# python cas_tests.py [username]
+# You will need to edit the constants below to match your setup ...
+
+import unittest
+import sys
+import commands
+import getpass
+import urllib2
+import urllib
+from urlparse import urljoin
+import cookielib
+from xml.dom import minidom
+
+# Add in a separate test_config file if you wish of the following format
+try:
+ from test_config import *
+except:
+ # Please edit these urls to match your cas server, proxy and app server urls
+ CAS_SERVER_URL = 'https://signin.k-state.edu/WebISO/login'
+ APP_URL = 'http://webdev.labs.ome.ksu.edu/'
+ APP_RESTRICTED = 'connect'
+ PROXY_URL = 'https://webdev.labs.ome.ksu.edu/accounts/login/casProxyCallback/'
+ # Depending on your cas login form you may need to adjust these field name keys
+ TOKEN = '_eventID' # CSRF token field name
+ CAS_SUCCESS = 'Login successful' # CAS server successful login flag (find string in html page)
+ AUTH = {'username' : 'garrett', # user field name
+ 'password' : 'password', # password field name
+ 'submit' : 'submit' # login submit button
+ }
+ SCRIPT = 'manage.py shell --plain < get_pgt.py' # A script to extract the PGT from your proxying server
+
+class TestCAS(unittest.TestCase):
+ """ A class for testing a CAS setup both for standard and proxy authentication """
+
+ opener = None
+ auth = {}
+ urls = {}
+
+ def setUp(self):
+ self.cj = cookielib.CookieJar()
+ opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self.cj))
+ urllib2.install_opener(opener)
+ self.opener = opener
+ self.get_auth()
+ self.set_url('cas', CAS_SERVER_URL)
+ self.set_url('app', APP_URL)
+ self.set_url('proxy', PROXY_URL)
+
+ def set_url(self, name, url):
+ """ Make sure valid url with query string appended """
+ for end in ['/','.html','.htm']:
+ if url.endswith(end):
+ self.urls[name] = url
+ return
+ self.urls[name] = '%s/' % url
+
+ def test_cas(self):
+ """ Test ordinary and proxy CAS login
+ NB cant put these into separate tests since tickets
+ are required to be passed between tests
+ """
+ print 'Testing with following URLs'
+ print '---------------------------'
+ print 'CAS server = %s' % self.urls['cas']
+ print 'Application server = %s' % self.urls['app']
+ print 'Proxy CAS server = %s' % self.urls['proxy']
+ print ''
+ print 'Test ordinary CAS login'
+ print '-----------------------'
+ self.ticket = self.login()
+ self.get_restricted(opener=self.opener)
+ self.logout()
+
+ print ''
+ print 'Test get proxy ticket'
+ print '---------------------'
+ self.ticket = self.login()
+ iou = self.proxy1_iou()
+ if iou.startswith('PGT'):
+ print 'PASS: Got IOU - %s for %s' % (iou, self.urls['proxy'])
+ else:
+ print iou
+
+ pgt = self.proxy2_pgt(iou)
+ if pgt.startswith('PGT'):
+ print 'PASS: Got PGT - %s' % pgt
+ else:
+ print pgt
+
+ pt = self.proxy3_pt(pgt)
+ if pt.startswith('PT'):
+ print 'PASS: Got PT - %s' % pt
+ else:
+ print pt
+
+ # NB: Dont logout proxy app, but test proxy auth with new openers
+ # for the tests to be valid...
+
+ print ''
+ print 'Test SSO server login with proxy ticket'
+ print '---------------------------------------'
+ proxy = self.proxy4_login(pt)
+ if proxy:
+ print 'PASS: Got Success response for app %s using proxy %s' % (self.urls['app'], proxy)
+ else:
+ print 'FAIL: The proxy login to %s via %s has failed' % (self.urls['app'], self.urls['proxy'])
+
+ print ''
+ print 'Test direct proxy login'
+ print '-----------------------'
+ new_pt = self.proxy3_pt(pgt)
+ self.proxy5_login(new_pt)
+ return
+
+
+ def get_auth(self):
+ """ Get authentication by passing to this script on the command line """
+ if len(sys.argv) > 1:
+ self.auth['username'] = sys.argv[1]
+ else:
+ self.auth['username'] = getpass.getuser()
+ self.auth['password'] = getpass.getpass('CAS Password for user %s:' % AUTH['username'])
+ return
+
+ def get_token(self, url=None, token=TOKEN, page=''):
+ """ Get CSRF token """
+ if url:
+ try:
+ r = self.opener.open(url)
+ except:
+ return 'FAIL: URL not found %s' % url
+ page = r.read()
+ if not page:
+ return 'FAIL: Page is empty'
+ starts = ['<input type="hidden" name="%s"' % token,
+ 'value="']
+ return self.find_in_page(page, starts, '"')
+
+
+ def get_ticket(self, page, app_url):
+ """ Get CSRF token """
+ starts = [app_url,'?ticket=']
+ return self.find_in_page(page, starts, '"')
+
+ def find_in_dom(self, page, nesting=['body','form']):
+ """ Use dom to get values from XML or page """
+ dom = minidom.parseString(page)
+ for level in nesting:
+ try:
+ dom = dom.getElementsByTagName(level)[0]
+ except:
+ break
+ return dom.childNodes[0].nodeValue.strip()
+
+ def find_in_page(self, page, starts, stop):
+ """ Replace this with find_in_dom ?
+ Although without knowing the CAS login page this
+ is probably more generic.
+ Starts is a list to allow a series of marker points
+ in case a single start point marker is not unique
+ """
+ pagepart = page
+ start = 0
+ for part in starts:
+ point = pagepart.find(part)
+ if point>-1:
+ start += point
+ else:
+ return "FAIL: Couldnt find '%s' in page" % part
+ pagepart = pagepart[start:]
+ start = start + len(part)
+ end = page[start:].find(stop)
+ if end == -1:
+ end = len(page[start:])
+ end = start + end
+ found = page[start:end]
+ return found.strip()
+
+ def login(self):
+ """ Login to CAS server """
+ url = '%slogin?service=%s' % (self.urls['cas'], self.urls['app'])
+ ticket = ''
+ token = self.get_token(url)
+ if token:
+ if token.startswith('FAIL'):
+ print token
+ return ticket
+ else:
+ self.auth[TOKEN] = token
+ else:
+ print 'FAIL: CSRF Token could not be found on page'
+ return ticket
+ self.auth['service'] = self.urls['app']
+ data = urllib.urlencode(self.auth)
+ sso_resp = self.opener.open(url, data)
+ sso_page = sso_resp.read()
+ found = sso_page.find(CAS_SUCCESS) > -1
+ sso_resp.close()
+ if found:
+ ticket = self.get_ticket(sso_page, self.urls['app'])
+ print 'PASS: CAS logged in to %s' % url
+ else:
+ print 'FAIL: Couldnt login to %s' % url
+ return ticket
+
+ def logout(self):
+ """ Logout inbetween tests """
+ url = '%slogout' % self.urls['cas']
+ app_resp = self.opener.open(url)
+ app_resp.close()
+ self.cj.clear()
+ print 'Logged out'
+ return
+
+ def get_restricted(self, ticket='', opener=None, print_page=False):
+ """ Access a restricted URL and see if its accessible
+ Use token to check if this page has redirected to SSO login
+ ie. success for get_token is a fail for get restricted
+ """
+ url = '%s%s' % (self.urls['app'], APP_RESTRICTED)
+ if ticket:
+ url = '%s?ticket=%s' % (url, ticket)
+ try:
+ app_resp = opener.open(url)
+ ok = app_resp.code == 200
+ except:
+ print 'FAIL: couldnt log in to restricted app at %s' % url
+ return
+ page = app_resp.read()
+ if ok:
+ token = self.get_token(page=page)
+ if token and not token.startswith('FAIL'):
+ print 'FAIL: couldnt log in to restricted app at %s' % url
+ else:
+ print 'PASS: logged in to restricted app at %s' % url
+ else:
+ print 'FAIL: couldnt log in to restricted app at %s' % url
+ if print_page:
+ print page
+ app_resp.close()
+
+ def proxy1_iou(self):
+ """ Use login ticket to get proxy iou
+ NB: SSO server installation may require self.urls['proxy']/?pgtIou be called at the root
+ """
+ url_args = (self.urls['cas'], self.ticket, self.urls['app'], self.urls['proxy'])
+ url = '%sserviceValidate?ticket=%s&service=%s&pgtUrl=%s' % url_args
+ try:
+ iou = self.opener.open(url)
+ except:
+ return 'FAIL: service validate url=%s not found' % url
+ page = iou.read()
+ if page.find('cas:authenticationSuccess') > -1:
+ iou_ticket = self.find_in_dom(page,['cas:serviceResponse',
+ 'cas:authenticationSuccess',
+ 'cas:proxyGrantingTicket'])
+ if iou_ticket:
+ return iou_ticket
+ else:
+ if page:
+ return "FAIL: NO PGIOU\n\n%s" % page
+ else:
+ return 'FAIL: PGIOU Empty response from %s' % url
+ else:
+ return 'FAIL: PGIOU Response failed authentication'
+ return None
+
+ def proxy2_pgt(self, iou):
+ """ Dig out the proxy granting ticket using shell script so this test class
+ is independent of CAS implementation - eg. can substitute this function
+ to get proxy ticket from Java CAS instead of django-cas for example
+
+ For a django-cas implementation this can be read from the ORM
+ by calling the django shell environment
+ """
+ out = commands.getoutput(SCRIPT)
+ pgt = self.find_in_page(out, ['PGT',], ' ')
+ return 'PGT%s' % pgt
+
+ def proxy3_pt(self, pgt):
+ """ Use granting ticket to get proxy """
+ url_args = (self.urls['cas'], self.urls['app'], pgt)
+ url = '%sproxy?targetService=%s&pgt=%s' % url_args
+ try:
+ pt = self.opener.open(url)
+ except:
+ return 'FAIL: PTURL=%s not found' % url
+ page = pt.read()
+ if page.find('cas:serviceResponse') > -1:
+ try:
+ pt_ticket = self.find_in_dom(page,['cas:proxySuccess',
+ 'cas:proxyTicket'])
+ return pt_ticket
+ except:
+ print url
+ print page
+ return ''
+ return None
+
+
+ def proxy4_login(self, pt):
+ """ Check proxy ticket for service
+ Use a new opener so its not got any cookies / auth already
+ """
+ opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookielib.CookieJar()))
+ url_args = (self.urls['cas'], self.urls['app'], pt)
+ url = '%sproxyValidate?service=%s&ticket=%s' % url_args
+ try:
+ login = opener.open(url)
+ except:
+ return 'FAIL: PTURL=%s not found' % url
+ page = login.read()
+ print page
+ if page.find('cas:authenticationSuccess') > -1:
+ proxy = self.find_in_dom(page,['cas:proxies',
+ 'cas:proxy'])
+ return proxy
+ return None
+
+ def proxy5_login(self, pt):
+ """ Use proxy ticket to login directly to app
+ Use a new opener so its not got any cookies / auth already
+ """
+ opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookielib.CookieJar()))
+ return self.get_restricted(ticket=pt, opener=opener)
+
+if __name__ == '__main__':
+ unittest.main()
View
16 cas/tests/get_pgt.py
@@ -0,0 +1,16 @@
+# Run via bin/django shell --plain < get_pgt.py
+# to pick up all the django environment
+# Allows main test class to be independent of CAS implementation platform
+# TODO: pass in iou - if cant take args write to file and read here
+import atexit
+from cas.models import PgtIOU
+
+@atexit.register
+def lookup_pgt():
+ pgt = PgtIOU.objects.latest('created')
+ if pgt:
+ print pgt.tgt
+ else:
+ print 'FAIL'
+
+
View
16 cas/utils.py
@@ -0,0 +1,16 @@
+def cas_response_callbacks(tree):
+ from django.conf import settings
+ callbacks = []
+ callbacks.extend(settings.CAS_RESPONSE_CALLBACKS)
+ for path in callbacks:
+ i = path.rfind('.')
+ module, callback = path[:i], path[i+1:]
+ try:
+ mod = __import__(module, fromlist=[''])
+ except ImportError as e:
+ print "Import Error: %s" % e
+ try:
+ func = getattr(mod, callback)
+ except AttributeError, e:
+ print "Attribute Error: %s" % e
+ func(tree)
View
178 cas/views.py
@@ -0,0 +1,178 @@
+"""CAS login/logout replacement views"""
+from datetime import datetime
+from urllib import urlencode
+import urlparse
+from operator import itemgetter
+
+from django.http import HttpResponseRedirect, HttpResponseForbidden, HttpResponse
+from django.conf import settings
+from django.contrib.auth import REDIRECT_FIELD_NAME
+from cas.models import PgtIOU
+from django.contrib import messages
+
+__all__ = ['login', 'logout']
+
+
+def _service_url(request, redirect_to=None, gateway=False):
+ """Generates application service URL for CAS"""
+
+ protocol = ('http://', 'https://')[request.is_secure()]
+ host = request.get_host()
+ prefix = (('http://', 'https://')[request.is_secure()] + host)
+ service = protocol + host + request.path
+ if redirect_to:
+ if '?' in service:
+ service += '&'
+ else:
+ service += '?'
+ if gateway:
+ """ If gateway, capture params and reencode them before returning a url """
+ gateway_params = [ (REDIRECT_FIELD_NAME, redirect_to), ('gatewayed','true') ]
+ query_dict = request.GET.copy()
+ try:
+ del query_dict['ticket']
+ except:
+ pass
+ query_list = query_dict.items()
+
+ #remove duplicate params
+ for item in query_list:
+ for index, item2 in enumerate(gateway_params):
+ if item[0] == item2[0]:
+ gateway_params.pop(index)
+ extra_params = gateway_params + query_list
+
+ #Sort params by key name so they are always in the same order.
+ sorted_params = sorted(extra_params, key=itemgetter(0))
+ service += urlencode(sorted_params)
+ else:
+ service += urlencode({REDIRECT_FIELD_NAME: redirect_to})
+ return service
+
+
+def _redirect_url(request):
+ """Redirects to referring page, or CAS_REDIRECT_URL if no referrer is
+ set.
+ """
+
+ next = request.GET.get(REDIRECT_FIELD_NAME)
+ if not next:
+ if settings.CAS_IGNORE_REFERER:
+ next = settings.CAS_REDIRECT_URL
+ else:
+ next = request.META.get('HTTP_REFERER', settings.CAS_REDIRECT_URL)
+
+ host = request.get_host()
+ prefix = (('http://', 'https://')[request.is_secure()] + host)
+ if next.startswith(prefix):
+ next = next[len(prefix):]
+ return next
+
+
+def _login_url(service, ticket='ST', gateway=False):
+ """Generates CAS login URL"""
+ LOGINS = {'ST':'login',
+ 'PT':'proxyValidate'}
+ if gateway:
+ params = {'service': service, 'gateway':True}
+ else:
+ params = {'service': service}
+ if settings.CAS_EXTRA_LOGIN_PARAMS:
+ params.update(settings.CAS_EXTRA_LOGIN_PARAMS)
+ if not ticket:
+ ticket = 'ST'
+ login = LOGINS.get(ticket[:2],'login')
+
+ return urlparse.urljoin(settings.CAS_SERVER_URL, login) + '?' + urlencode(params)
+
+def _logout_url(request, next_page=None):
+ """Generates CAS logout URL"""
+
+ url = urlparse.urljoin(settings.CAS_SERVER_URL, 'logout')
+ if next_page:
+ protocol = ('http://', 'https://')[request.is_secure()]
+ host = request.get_host()
+ url += '?' + urlencode({'url': protocol + host + next_page})
+ return url
+
+
+def login(request, next_page=None, required=False, gateway=False):
+ """Forwards to CAS login URL or verifies CAS ticket"""
+
+ if not next_page:
+ next_page = _redirect_url(request)
+ if request.user.is_authenticated():
+ return HttpResponseRedirect(next_page)
+ ticket = request.GET.get('ticket')
+
+ if gateway:
+ service = _service_url(request, next_page, True)
+ else:
+ service = _service_url(request, next_page, False)
+
+ if ticket:
+ from django.contrib import auth
+ user = auth.authenticate(ticket=ticket, service=service)
+
+ if user is not None:
+ #Has ticket, logs in fine
+ auth.login(request, user)
+ if settings.CAS_PROXY_CALLBACK:
+ proxy_callback(request)
+ #keep urls
+ return HttpResponseRedirect(next_page)
+ elif settings.CAS_RETRY_LOGIN or required:
+ #Has ticket,
+ if gateway:
+ return HttpResponseRedirect(_login_url(service, ticket, True))
+ else:
+ return HttpResponseRedirect(_login_url(service, ticket, False))
+ else:
+ #Has ticket, not session
+ if getattr(settings, 'CAS_CUSTOM_FORBIDDEN'):
+ from django.core.urlresolvers import reverse
+ return HttpResponseRedirect(reverse(settings.CAS_CUSTOM_FORBIDDEN) + "?" + request.META['QUERY_STRING'])
+ else:
+ error = "<h1>Forbidden</h1><p>Login failed.</p>"
+ return HttpResponseForbidden(error)
+ else:
+ if gateway:
+ return HttpResponseRedirect(_login_url(service, ticket, True))
+ else:
+ return HttpResponseRedirect(_login_url(service, ticket, False))
+
+
+def logout(request, next_page=None):
+ """Redirects to CAS logout page"""
+
+ from django.contrib.auth import logout
+ logout(request)
+ if not next_page:
+ next_page = _redirect_url(request)
+ if settings.CAS_LOGOUT_COMPLETELY:
+ return HttpResponseRedirect(_logout_url(request, next_page))
+ else:
+ return HttpResponseRedirect(next_page)
+
+def proxy_callback(request):
+ """Handles CAS 2.0+ XML-based proxy callback call.
+ Stores the proxy granting ticket in the database for
+ future use.
+
+ NB: Use created and set it in python in case database
+ has issues with setting up the default timestamp value
+ """
+ pgtIou = request.GET.get('pgtIou')
+ tgt = request.GET.get('pgtId')
+
+ if not (pgtIou and tgt):
+ return HttpResponse('No pgtIOO', mimetype="text/plain")
+ try:
+ PgtIOU.objects.create(tgt=tgt, pgtIou=pgtIou, created=datetime.now())
+ request.session['pgt-TICKET'] = ticket
+ return HttpResponse('PGT ticket is: %s' % str(ticket, mimetype="text/plain"))
+ except:
+ return HttpResponse('PGT storage failed for %s' % str(request.GET), mimetype="text/plain")
+
+ return HttpResponse('Success', mimetype="text/plain")
+
View
4 django_sis/settings.py
@@ -185,9 +185,9 @@
CAS = False
if CAS:
CAS_SERVER_URL = ""
- AUTHENTICATION_BACKENDS = ('ldap_groups.accounts.backends.ActiveDirectoryGroupMembershipSSLBackend','django.contrib.auth.backends.ModelBackend','django_cas.backends.CASBackend',)
+ AUTHENTICATION_BACKENDS = ('ldap_groups.accounts.backends.ActiveDirectoryGroupMembershipSSLBackend','django.contrib.auth.backends.ModelBackend','cas.backends.CASBackend',)
MIDDLEWARE_CLASSES += (
- 'django_cas.middleware.CASMiddleware',
+ 'cas.middleware.CASMiddleware',
'django.middleware.doc.XViewMiddleware',
)
elif LDAP:
View
4 django_sis/urls.py
@@ -88,8 +88,8 @@
if settings.CAS:
urlpatterns += patterns('',
- (r'^accounts/login/$', 'django_cas.views.login'),
- (r'^accounts/logout/$', 'django_cas.views.logout'),
+ (r'^accounts/login/$', 'cas.views.login'),
+ (r'^accounts/logout/$', 'cas.views.logout'),
)
def handler500(request):
View
2  ecwsp/django_cas/models.py
@@ -1,2 +0,0 @@
-from django.db import models
-from django.contrib.auth.models import User
View
104 ecwsp/django_cas/views.py
@@ -1,104 +0,0 @@
-"""CAS login/logout replacement views"""
-
-from urllib import urlencode
-from urlparse import urljoin
-
-from django.http import get_host, HttpResponseRedirect, HttpResponseForbidden
-from django.conf import settings
-from django.contrib.auth import REDIRECT_FIELD_NAME
-
-__all__ = ['login', 'logout']
-
-def _service_url(request, redirect_to=None):
- """Generates application service URL for CAS"""
-
- protocol = ('http://', 'https://')[request.is_secure()]
- host = get_host(request)
- service = protocol + host + request.path
- if redirect_to:
- if '?' in service:
- service += '&'
- else:
- service += '?'
- service += urlencode({REDIRECT_FIELD_NAME: redirect_to})
- return service
-
-
-def _redirect_url(request):
- """Redirects to referring page, or CAS_REDIRECT_URL if no referrer is
- set.
- """
-
- next = request.GET.get(REDIRECT_FIELD_NAME)
- if not next:
- if settings.CAS_IGNORE_REFERER:
- next = settings.CAS_REDIRECT_URL
- else:
- next = request.META.get('HTTP_REFERER', settings.CAS_REDIRECT_URL)
- prefix = (('http://', 'https://')[request.is_secure()] +
- get_host(request))
- if next.startswith(prefix):
- next = next[len(prefix):]
- return next
-
-
-def _login_url(service):
- """Generates CAS login URL"""
-
- params = {'service': service}
- if settings.CAS_EXTRA_LOGIN_PARAMS:
- params.update(settings.CAS_EXTRA_LOGIN_PARAMS)
- return urljoin(settings.CAS_SERVER_URL, 'login') + '?' + urlencode(params)
-
-
-def _logout_url(request, next_page=None):
- """Generates CAS logout URL"""
-
- url = urljoin(settings.CAS_SERVER_URL, 'logout')
- if next_page:
- protocol = ('http://', 'https://')[request.is_secure()]
- host = get_host(request)
- url += '?' + urlencode({'service': protocol + host})
- return url
-
-
-def login(request, next_page=None, required=False):
- """Forwards to CAS login URL or verifies CAS ticket"""
-
- if not next_page:
- next_page = _redirect_url(request)
- if request.user.is_authenticated():
- message = "You are logged in as %s." % request.user.username
- request.user.message_set.create(message=message)
- return HttpResponseRedirect(next_page)
- ticket = request.GET.get('ticket')
- service = _service_url(request, next_page)
- if ticket:
- from django.contrib import auth
- user = auth.authenticate(ticket=ticket, service=service)
- if user is not None:
- auth.login(request, user)
- name = user.first_name or user.username
- message = "Login succeeded. Welcome, %s." % name
- user.message_set.create(message=message)
- return HttpResponseRedirect(next_page)
- elif settings.CAS_RETRY_LOGIN or required:
- return HttpResponseRedirect(_login_url(service))
- else:
- error = "<h1>Forbidden</h1><p>Login failed.</p>"
- return HttpResponseForbidden(error)
- else:
- return HttpResponseRedirect(_login_url(service))
-
-
-def logout(request, next_page=None):
- """Redirects to CAS logout page"""
-
- from django.contrib.auth import logout
- logout(request)
- if not next_page:
- next_page = _redirect_url(request)
- if settings.CAS_LOGOUT_COMPLETELY:
- return HttpResponseRedirect(_logout_url(request, next_page))
- else:
- return HttpResponseRedirect(next_page)

0 comments on commit e964e11

Please sign in to comment.
Something went wrong with that request. Please try again.