Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

Pull up auth caching: r2079 r2102 r2105

  • Loading branch information
wsanchez committed Jan 11, 2008
1 parent f873f3e commit e96b849c52d5dfd27d80945a7f54c22ec9465888
Showing with 72 additions and 36 deletions.
  1. +32 −5 twistedcaldav/directory/appleopendirectory.py
  2. +40 −31 twistedcaldav/directory/test/util.py
@@ -764,12 +764,25 @@ def proxyFor(self):

def verifyCredentials(self, credentials):
if isinstance(credentials, UsernamePassword):
# Check cached password
try:
return opendirectory.authenticateUserBasic(self.service.directory, self._nodename, self.shortName, credentials.password)
if credentials.password == self.password:
return True
except AttributeError:
pass

# Check with directory services
try:
if opendirectory.authenticateUserBasic(self.service.directory, self._nodename, self.shortName, credentials.password):
# Cache the password to avoid future DS queries
self.password = credentials.password
return True
except opendirectory.ODError, e:
logging.err("Open Directory (node=%s) error while performing basic authentication for user %s: %s"
% (self.service.realmName, self.shortName, e), system="OpenDirectoryService")
return False
% (self.service.realmName, self.shortName, e), system="OpenDirectoryService")

return False

elif isinstance(credentials, DigestedCredentials):
try:
# We need a special format for the "challenge" and "response" strings passed into open directory, as it is
@@ -788,14 +801,28 @@ def verifyCredentials(self, credentials):
% (self.service.realmName, self.shortName, e, credentials.fields), system="OpenDirectoryService")
return False

return opendirectory.authenticateUserDigest(
if self.digestcache[credentials.fields["uri"]] == response:
return True
except (AttributeError, KeyError):
pass

try:
if opendirectory.authenticateUserDigest(
self.service.directory,
self._nodename,
self.shortName,
challenge,
response,
credentials.method
)
):
try:
cache = self.digestcache
except AttributeError:
cache = self.digestcache = {}

cache[credentials.fields["uri"]] = response

return True
except opendirectory.ODError, e:
logging.err("Open Directory (node=%s) error while performing digest authentication for user %s: %s"
% (self.service.realmName, self.shortName, e), system="OpenDirectoryService")
@@ -297,39 +297,48 @@ def test_verifyCredentials_digest(self):

service = self.service()
for user in self.users:
userRecord = service.recordWithShortName(DirectoryService.recordType_users, user)

# I'm glad this is so simple...
response = calcResponse(
calcHA1(
for good in (True, True, False, False, True):
userRecord = service.recordWithShortName(DirectoryService.recordType_users, user)

# I'm glad this is so simple...
response = calcResponse(
calcHA1(
"md5",
user,
service.realmName,
self.users[user]["password"],
"booger",
"phlegm",
),
"md5",
user,
service.realmName,
self.users[user]["password"],
"booger",
None,
"phlegm",
),
"md5",
"booger",
None,
"phlegm",
"auth",
"GET",
"/",
None,
)
"auth",
"GET",
"/",
None,
)

credentials = DigestedCredentials(
user,
"GET",
service.realmName,
{
"response": response,
"uri": "/",
"nonce": "booger",
"cnonce": "phlegm",
"nc": None,
},
)
if good:
noise = ""
else:
noise = "blah"

credentials = DigestedCredentials(
user,
"GET",
service.realmName,
{
"response": response,
"uri": "/",
"nonce": "booger" + noise,
"cnonce": "phlegm",
"nc": None,
},
)

self.failUnless(userRecord.verifyCredentials(credentials))
if good:
self.failUnless(userRecord.verifyCredentials(credentials))
else:
self.failIf(userRecord.verifyCredentials(credentials))

0 comments on commit e96b849

Please sign in to comment.
You can’t perform that action at this time.