Skip to content

Commit

Permalink
status.web.authz.Authz now has actionAllowed return a deferred
Browse files Browse the repository at this point in the history
This is verified by test_status_web_authz_Authz and needed to
happen for possible deferreds in authenticate methods.
  • Loading branch information
dzhurley committed Aug 22, 2011
1 parent a897508 commit 391d54c
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 25 deletions.
21 changes: 14 additions & 7 deletions master/buildbot/status/web/authz.py
Expand Up @@ -13,6 +13,7 @@
#
# Copyright Buildbot Team Members

from twisted.internet import defer
from buildbot.status.web.auth import IAuth

class Authz(object):
Expand Down Expand Up @@ -75,15 +76,21 @@ def actionAllowed(self, action, request, *args):
if cfg:
if cfg == 'auth' or callable(cfg):
if not self.auth:
return False
return defer.succeed(False)
user = request.args.get("username", ["<unknown>"])[0]
passwd = request.args.get("passwd", ["<no-password>"])[0]
if user == "<unknown>" or passwd == "<no-password>":
return defer.succeed(False)

d = defer.maybeDeferred(self.auth.authenticate, user, passwd)
def check_authenticate(res, cfg, user, *args):
if res:
if callable(cfg) and not cfg(user, *args):
return False
return True
return False
if self.auth.authenticate(user, passwd):
if callable(cfg) and not cfg(user, *args):
return False
return True
return False
d.addCallback(check_authenticate, cfg, user, *args)
return d
else:
return True # anyone can do this..
return defer.succeed(True) # anyone can do this..
return defer.succeed(False)
65 changes: 47 additions & 18 deletions master/buildbot/test/unit/test_status_web_authz_Authz.py
Expand Up @@ -15,6 +15,7 @@

from zope.interface import implements
from twisted.trial import unittest
from twisted.internet import defer

from buildbot.status.web.authz import Authz
from buildbot.status.web.auth import IAuth
Expand All @@ -40,56 +41,84 @@ class TestAuthz(unittest.TestCase):
def test_actionAllowed_Defaults(self):
"by default, nothing is allowed"
z = Authz()
failedActions = []
self.failedActions = []
self.dl = []
for a in Authz.knownActions:
if z.actionAllowed(a, StubRequest('foo', 'bar')):
failedActions.append(a)
if failedActions:
raise unittest.FailTest("action(s) %s do not default to False"
% (failedActions,))
md = z.actionAllowed(a, StubRequest('foo', 'bar'))
def check(res):
if res:
self.failedActions.append(a)
return
md.addCallback(check)
self.dl.append(md)
d = defer.DeferredList(self.dl)
def check_failed(_):
if self.failedActions:
raise unittest.FailTest("action(s) %s do not default to False"
% (self.failedActions,))
d.addCallback(check_failed)
return d

def test_actionAllowed_Positive(self):
"'True' should always permit access"
z = Authz(forceBuild=True)
assert z.actionAllowed('forceBuild',
StubRequest('foo', 'bar'))
d = z.actionAllowed('forceBuild', StubRequest('foo', 'bar'))
def check(res):
self.assertEqual(res, True)
d.addCallback(check)
return d

def test_actionAllowed_AuthPositive(self):
z = Authz(auth=StubAuth('jrobinson'),
stopBuild='auth')
assert z.actionAllowed('stopBuild',
StubRequest('jrobinson', 'bar'))
d = z.actionAllowed('stopBuild', StubRequest('jrobinson', 'bar'))
def check(res):
self.assertEqual(res, True)
d.addCallback(check)
return d

def test_actionAllowed_AuthNegative(self):
z = Authz(auth=StubAuth('jrobinson'),
stopBuild='auth')
assert not z.actionAllowed('stopBuild',
StubRequest('apeterson', 'bar'))
d = z.actionAllowed('stopBuild', StubRequest('apeterson', 'bar'))
def check(res):
self.assertEqual(res, False)
d.addCallback(check)
return d

def test_actionAllowed_AuthCallable(self):
myargs = []
def myAuthzFn(*args):
myargs.extend(args)
z = Authz(auth=StubAuth('uu'),
stopBuild=myAuthzFn)
z.actionAllowed('stopBuild', StubRequest('uu', 'shh'), 'arg', 'arg2')
self.assertEqual(myargs, ['uu', 'arg', 'arg2'])
d = z.actionAllowed('stopBuild', StubRequest('uu', 'shh'), 'arg', 'arg2')
def check(res):
self.assertEqual(myargs, ['uu', 'arg', 'arg2'])
d.addCallback(check)
return d

def test_actionAllowed_AuthCallableTrue(self):
def myAuthzFn(*args):
return True
z = Authz(auth=StubAuth('uu'),
stopBuild=myAuthzFn)
self.assertTrue(z.actionAllowed('stopBuild',
StubRequest('uu', 'shh')))
d = z.actionAllowed('stopBuild', StubRequest('uu', 'shh'))
def check(res):
self.assertEqual(res, True)
d.addCallback(check)
return d

def test_actionAllowed_AuthCallableFalse(self):
def myAuthzFn(*args):
return False
z = Authz(auth=StubAuth('uu'),
stopBuild=myAuthzFn)
self.assertFalse(z.actionAllowed('stopBuild',
StubRequest('uu', 'shh')))
d = z.actionAllowed('stopBuild', StubRequest('uu', 'shh'))
def check(res):
self.assertEqual(res, False)
d.addCallback(check)
return d

def test_advertiseAction_False(self):
z = Authz(forceBuild = False)
Expand Down

0 comments on commit 391d54c

Please sign in to comment.