Skip to content
Permalink
Browse files

Tweaked header tests to allow grabbing header values for later tests.

  • Loading branch information...
cyrusdaboo committed Nov 4, 2008
1 parent a697941 commit 640cb976a50013784d9861ccba63549b07d0042b
Showing with 59 additions and 40 deletions.
  1. +2 −2 scripts/tests/caldavtest.dtd
  2. +15 −16 src/caldavtest.py
  3. +25 −7 src/request.py
  4. +1 −1 src/xmlDefs.py
  5. +16 −14 verifiers/header.py
@@ -22,7 +22,7 @@
<!ELEMENT start (request*)>
<!ELEMENT end (request*)>

<!ELEMENT request (method, header*, ruri?, data?, verify*, grablocation?, grabproperty?)>
<!ELEMENT request (method, header*, ruri*, data?, verify*, grabheader?, grabproperty?)>
<!ATTLIST request auth (yes|no) "yes"
user CDATA ""
pswd CDATA ""
@@ -44,7 +44,7 @@
<!ELEMENT callback (#PCDATA)>
<!ELEMENT arg (name, value+)>

<!ELEMENT grablocation EMPTY>
<!ELEMENT grabheader (name, variable)>

<!ELEMENT grabproperty (property, variable)>
<!ELEMENT property (#PCDATA)>
@@ -165,6 +165,7 @@ def dofindall( self, collection):
hrefs = []
req = request(self.manager)
req.method = "PROPFIND"
req.ruris.append(collection[0])
req.ruri = collection[0]
req.headers["Depth"] = "1"
if len(collection[1]):
@@ -211,6 +212,7 @@ def dodeleteall( self, deletes ):
for deleter in deletes:
req = request(self.manager)
req.method = "DELETE"
req.ruris.append(deleter[0])
req.ruri = deleter[0]
if len(deleter[1]):
req.user = deleter[1]
@@ -222,6 +224,7 @@ def dofindnew( self, collection):
hresult = ""
req = request(self.manager)
req.method = "PROPFIND"
req.ruris.append(collection[0])
req.ruri = collection[0]
req.headers["Depth"] = "1"
if len(collection[1]):
@@ -301,6 +304,7 @@ def doenddelete( self, description ):
for deleter in self.end_deletes:
req = request(self.manager)
req.method = "DELETE"
req.ruris.append(deleter[0])
req.ruri = deleter[0]
if len(deleter[1]):
req.user = deleter[1]
@@ -369,13 +373,14 @@ def dorequest( self, req, details=False, doverify = True, forceverify = False, s

# Special check for DELETEALL
if req.method == "DELETEALL":
collection = (req.ruri, req.user, req.pswd)
hrefs = self.dofindall(collection)
self.dodeleteall(hrefs)
for ruri in req.ruris:
collection = (ruri, req.user, req.pswd)
hrefs = self.dofindall(collection)
self.dodeleteall(hrefs)
return True, "", None, None

# Special check for ACCESS-DISABLE
if req.method == "ACCESS-DISABLE":
elif req.method == "ACCESS-DISABLE":
if self.doaccess(req.ruri, False):
return True, "", None, None
else:
@@ -387,7 +392,7 @@ def dorequest( self, req, details=False, doverify = True, forceverify = False, s
return False, "Could not remove caldav-access-disabled xattr on file", None, None

# Special check for QUOTA
if req.method == "QUOTA-DISABLE":
elif req.method == "QUOTA-DISABLE":
if self.doquota(req.ruri, None):
return True, "", None, None
else:
@@ -399,22 +404,16 @@ def dorequest( self, req, details=False, doverify = True, forceverify = False, s
return False, "Could not set quota-root xattr on file", None, None

# Special for delay
if req.method == "DELAY":
elif req.method == "DELAY":
# self.ruri contains a numeric delay in seconds
delay = int(req.ruri)
starttime = time.time()
while (time.time() < starttime + delay):
pass
return True, "", None, None

# Special for LISTNEW
if req.method == "LISTNEW":
collection = (req.ruri, req.user, req.pswd)
self.grabbedlocation = self.dofindnew(collection)
return True, "", None, None

# Special for GETNEW
if req.method == "GETNEW":
elif req.method == "GETNEW":
collection = (req.ruri, req.user, req.pswd)
self.grabbedlocation = self.dofindnew(collection)
req.method = "GET"
@@ -480,10 +479,10 @@ def dorequest( self, req, details=False, doverify = True, forceverify = False, s
resulttxt += str(response.msg) + "\n" + respdata
resulttxt += "\n--------END:RESPONSE--------\n"

if req.grablocation:
hdrs = response.msg.getheaders("Location")
if req.grabheader:
hdrs = response.msg.getheaders(req.grabheader[0])
if hdrs:
self.grabbedlocation = hdrs[0]
self.manager.server_info.addextrasubs({req.grabheader[1]: hdrs[0].encode("utf-8")})

if req.grabproperty:
if response.status == 207:
@@ -127,8 +127,8 @@ class request( object ):
be used to determine a satisfactory output or not.
"""
__slots__ = ['manager', 'auth', 'user', 'pswd', 'end_delete', 'print_response',
'method', 'headers', 'ruri', 'data', 'datasubs', 'verifiers',
'grablocation', 'grabproperty']
'method', 'headers', 'ruris', 'ruri', 'data', 'datasubs', 'verifiers',
'grabheader', 'grabproperty']

def __init__( self, manager ):
self.manager = manager
@@ -139,15 +139,16 @@ def __init__( self, manager ):
self.print_response = False
self.method = ""
self.headers = {}
self.ruris = []
self.ruri = ""
self.data = None
self.datasubs = True
self.verifiers = []
self.grablocation = False
self.grabheader = None
self.grabproperty = None

def __str__(self):
return "Method: %s; uri: %s" % (self.method, self.ruri)
return "Method: %s; uris: %s" % (self.method, self.ruris if len(self.ruris) > 1 else self.ruri,)

def getURI( self, si ):
if self.ruri == "$":
@@ -161,6 +162,8 @@ def getURI( self, si ):

def getHeaders( self, si ):
hdrs = self.headers
for key, value in hdrs.items():
hdrs[key] = si.extrasubs(value)

# Content type
if self.data != None:
@@ -279,15 +282,17 @@ def parseXML( self, node ):
elif child._get_localName() == src.xmlDefs.ELEMENT_HEADER:
self.parseHeader(child)
elif child._get_localName() == src.xmlDefs.ELEMENT_RURI:
self.ruri = self.manager.server_info.subs(child.firstChild.data.encode("utf-8"))
self.ruris.append(self.manager.server_info.subs(child.firstChild.data.encode("utf-8")))
if len(self.ruris) == 1:
self.ruri = self.ruris[0]
elif child._get_localName() == src.xmlDefs.ELEMENT_DATA:
self.data = data()
self.datasubs = self.data.parseXML( child )
elif child._get_localName() == src.xmlDefs.ELEMENT_VERIFY:
self.verifiers.append(verify(self.manager))
self.verifiers[-1].parseXML( child )
elif child._get_localName() == src.xmlDefs.ELEMENT_GRABLOCATION:
self.grablocation = True
elif child._get_localName() == src.xmlDefs.ELEMENT_GRABHEADER:
self.parseGrabHeader(child)
elif child._get_localName() == src.xmlDefs.ELEMENT_GRABPROPERTY:
self.parseGrabProperty(child)

@@ -315,6 +320,19 @@ def parseList( manager, node ):

parseList = staticmethod( parseList )

def parseGrabHeader(self, node):

header = None
variable = None
for child in node._get_childNodes():
if child._get_localName() == src.xmlDefs.ELEMENT_NAME:
header = child.firstChild.data.encode("utf-8")
elif child._get_localName() == src.xmlDefs.ELEMENT_VARIABLE:
variable = self.manager.server_info.subs(child.firstChild.data.encode("utf-8"))

if (header is not None) and (variable is not None):
self.grabheader = (header, variable)

def parseGrabProperty(self, node):

property = None
@@ -30,7 +30,7 @@
ELEMENT_DESCRIPTION = "description"
ELEMENT_END = "end"
ELEMENT_FILEPATH = "filepath"
ELEMENT_GRABLOCATION = "grablocation"
ELEMENT_GRABHEADER = "grabheader"
ELEMENT_GRABPROPERTY = "grabproperty"
ELEMENT_HEADER = "header"
ELEMENT_HOST = "host"
@@ -35,50 +35,52 @@ def verify(self, manager, uri, response, respdata, args): #@UnusedVariable
p = p[1:]
present = "multiple"
if p.find("$") != -1:
testheader[i] = (p.split("$", 1)[0], p.split("$", 1)[1], present,)
testheader[i] = (p.split("$", 1)[0], p.split("$", 1)[1], present, True,)
elif p.find("!") != -1:
testheader[i] = (p.split("!", 1)[0], p.split("!", 1)[1], present, False,)
else:
testheader[i] = (p, None, present,)
testheader[i] = (p, None, present, True,)

result = True
resulttxt = ""
for test in testheader:
hdrs = response.msg.getheaders(test[0])
for hdrname, hdrvalue, presence, matchvalue in testheader:
hdrs = response.msg.getheaders(hdrname)
if (hdrs is None or (len(hdrs) == 0)):
if test[2] != "none":
if presence != "none":
result = False
if len(resulttxt):
resulttxt += "\n"
resulttxt += " Missing Response Header: %s" % (test[0],)
resulttxt += " Missing Response Header: %s" % (hdrname,)
continue
else:
continue

if (hdrs is not None) and (len(hdrs) != 0) and (test[2] == "none"):
if (hdrs is not None) and (len(hdrs) != 0) and (presence == "none"):
result = False
if len(resulttxt):
resulttxt += "\n"
resulttxt += " Response Header was present one or more times: %s" % (test[0],)
resulttxt += " Response Header was present one or more times: %s" % (hdrname,)
continue

if (len(hdrs) != 1) and (test[2] == "single"):
if (len(hdrs) != 1) and (presence == "single"):
result = False
if len(resulttxt):
resulttxt += "\n"
resulttxt += " Multiple Response Headers: %s" % (test[0],)
resulttxt += " Multiple Response Headers: %s" % (hdrname,)
continue

if (test[1] is not None):
if (hdrvalue is not None):
matched = False
for hdr in hdrs:
if (re.match(test[1], hdr) is not None):
if (re.match(hdrvalue, hdr) is not None):
matched = True
break

if not matched:
if matchvalue and not matched or not matchvalue and matched:
result = False
if len(resulttxt):
resulttxt += "\n"
resulttxt += " Wrong Response Header Value: %s: %s" % (test[0], str(hdrs))
resulttxt += " Wrong Response Header Value: %s: %s" % (hdrname, str(hdrs))

return result, resulttxt

0 comments on commit 640cb97

Please sign in to comment.
You can’t perform that action at this time.