Skip to content

Commit

Permalink
Merge pull request #222 from zaneb/regex-fixes
Browse files Browse the repository at this point in the history
Various regular expression fixes
  • Loading branch information
cdent committed Oct 11, 2017
2 parents 9cfbff2 + e2e95d3 commit 1834ee1
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 35 deletions.
5 changes: 3 additions & 2 deletions docs/source/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ tests:
status: 302

# When evaluating response headers it is possible to use a regular
# expression to not have to test the whole value.
# expression to not have to test the whole value. Regular expressions match
# anywhere in the output, not just at the beginning.

response_headers:
location: /https/
location: /^https/

# By default redirects will not be followed. This can be changed.

Expand Down
5 changes: 3 additions & 2 deletions docs/source/format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ Response Expectations
representing expected response header
names and values. If a header's value
is wrapped in ``/.../``, it will be
treated as a regular expression.
treated as a regular expression to
search for in the response header.

``response_forbidden_headers`` A list of headers which must `not`
be present.
Expand All @@ -161,7 +162,7 @@ Response Expectations

If the value is wrapped in ``/.../``
the result of the JSONPath query
will be compared against the
will be searched for the
value as a regular expression.

``poll`` A dictionary of two keys:
Expand Down
76 changes: 55 additions & 21 deletions gabbi/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,23 +147,26 @@ def get_content_handler(self, content_type):
return handler
return None

def replace_template(self, message):
def replace_template(self, message, escape_regex=False):
"""Replace magic strings in message."""
if isinstance(message, dict):
for k in message:
message[k] = self.replace_template(message[k])
message[k] = self.replace_template(message[k],
escape_regex=escape_regex)
return message

if isinstance(message, list):
return [self.replace_template(line) for line in message]
return [self.replace_template(line, escape_regex=escape_regex)
for line in message]

for replacer in REPLACERS:
template = '$%s' % replacer
method = '_%s_replace' % replacer.lower()
try:
if template in message:
try:
message = getattr(self, method)(message)
replace = getattr(self, method)
message = replace(message, escape_regex=escape_regex)
except (KeyError, AttributeError, ValueError) as exc:
raise AssertionError(
'unable to replace %s in %s, data unavailable: %s'
Expand Down Expand Up @@ -193,14 +196,28 @@ def _clean_query_value(self, value):
value = value.encode('UTF-8')
return value

def _environ_replace(self, message):
@staticmethod
def _regex_replacer(replacer, escape_regex):
"""Wrap a replacer function to escape return values in a regex."""
if escape_regex:
@functools.wraps(replacer)
def replace(match):
return re.escape(replacer(match))

return replace
else:
return replacer

def _environ_replace(self, message, escape_regex=False):
"""Replace an indicator in a message with the environment value.
If value can be a number, cast it as such. If value is a form of
"null", "true", or "false" cast it to None, True, False.
"""
value = re.sub(self._replacer_regex('ENVIRON'),
self._environ_replacer, message)
self._regex_replacer(self._environ_replacer,
escape_regex),
message)
try:
if '.' in value:
value = float(value)
Expand All @@ -226,13 +243,15 @@ def _environ_replacer(match):
environ_name = match.group('arg')
return os.environ[environ_name]

def _cookie_replace(self, message):
def _cookie_replace(self, message, escape_regex=False):
"""Replace $COOKIE in a message.
With cookie data from set-cookie in the prior request.
"""
return re.sub(self._simple_replacer_regex('COOKIE'),
self._cookie_replacer, message)
self._regex_replacer(self._cookie_replacer,
escape_regex),
message)

def _cookie_replacer(self, match):
"""Replace a regex match with the cookie of a previous response."""
Expand All @@ -247,12 +266,14 @@ def _cookie_replacer(self, match):
cookie_string = cookies.output(attrs=[], header='', sep=',').strip()
return cookie_string

def _headers_replace(self, message):
def _headers_replace(self, message, escape_regex=False):
"""Replace a header indicator in a message with that headers value from
the prior request.
"""
return re.sub(self._replacer_regex('HEADERS'),
self._header_replacer, message)
self._regex_replacer(self._header_replacer,
escape_regex),
message)

def _header_replacer(self, match):
"""Replace a regex match with the value of a prior header."""
Expand All @@ -264,20 +285,25 @@ def _header_replacer(self, match):
referred_case = self.prior
return referred_case.response[header_key.lower()]

def _last_url_replace(self, message):
def _last_url_replace(self, message, escape_regex=False):
"""Replace $LAST_URL in a message.
With the URL used in the prior request.
"""
return message.replace('$LAST_URL', self.prior.url)
last_url = self.prior.url
if escape_regex:
last_url = re.escape(last_url)
return message.replace('$LAST_URL', last_url)

def _url_replace(self, message):
def _url_replace(self, message, escape_regex=False):
"""Replace $URL in a message.
With the URL used in a previous request.
"""
return re.sub(self._simple_replacer_regex('URL'),
self._url_replacer, message)
self._regex_replacer(self._url_replacer,
escape_regex),
message)

def _url_replacer(self, match):
"""Replace a regex match with the value of a previous url."""
Expand All @@ -288,13 +314,15 @@ def _url_replacer(self, match):
referred_case = self.prior
return referred_case.url

def _location_replace(self, message):
def _location_replace(self, message, escape_regex=False):
"""Replace $LOCATION in a message.
With the location header from a previous request.
"""
return re.sub(self._simple_replacer_regex('LOCATION'),
self._location_replacer, message)
self._regex_replacer(self._location_replacer,
escape_regex),
message)

def _location_replacer(self, match):
"""Replace a regex match with the value of a previous location."""
Expand All @@ -317,11 +345,13 @@ def _load_data_file(self, filename):
with open(path, mode='rb') as data_file:
return data_file.read()

def _netloc_replace(self, message):
def _netloc_replace(self, message, escape_regex=False):
"""Replace $NETLOC with the current host and port."""
netloc = self.netloc
if self.prefix:
netloc = '%s%s' % (netloc, self.prefix)
if escape_regex:
netloc = re.escape(netloc)
return message.replace('$NETLOC', netloc)

def _parse_url(self, url):
Expand Down Expand Up @@ -371,7 +401,7 @@ def _simple_replacer_regex(key):
case = HTTPTestCase._history_regex
return r"%s\$%s" % (case, key)

def _response_replace(self, message):
def _response_replace(self, message, escape_regex=False):
"""Replace a content path with the value from a previous response.
If the match would replace the entire message, then don't cast it
Expand All @@ -381,7 +411,10 @@ def _response_replace(self, message):
match = re.match('^%s$' % regex, message)
if match:
return self._response_replacer(match, preserve=True)
return re.sub(regex, self._response_replacer, message)
return re.sub(regex,
self._regex_replacer(self._response_replacer,
escape_regex),
message)

def _response_replacer(self, match, preserve=False):
"""Replace a regex match with the value from a previous response."""
Expand Down Expand Up @@ -498,9 +531,10 @@ def _run_test(self):
redirect=test['redirects'])
self._assert_response()

def _scheme_replace(self, message):
def _scheme_replace(self, message, escape_regex=False):
"""Replace $SCHEME with the current protocol."""
return message.replace('$SCHEME', self.scheme)
scheme = re.escape(self.scheme) if escape_regex else self.scheme
return message.replace('$SCHEME', scheme)

def _test_data_to_string(self, data, content_type):
"""Turn the request data into a string.
Expand Down
13 changes: 9 additions & 4 deletions gabbi/handlers/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,14 @@ class HeadersResponseHandler(base.ResponseHandler):

def action(self, test, header, value=None):
header = header.lower() # case-insensitive comparison

response = test.response
header_value = test.replace_template(str(value))

header_value = str(value)
is_regex = (header_value.startswith('/') and
header_value.endswith('/') and
len(header_value) > 1)
header_value = test.replace_template(header_value,
escape_regex=is_regex)

try:
response_value = str(response[header])
Expand All @@ -64,8 +69,8 @@ def action(self, test, header, value=None):
"'%s' header not present in response: %s" % (
header, response.keys()))

if header_value.startswith('/') and header_value.endswith('/'):
header_value = header_value.strip('/').rstrip('/')
if is_regex:
header_value = header_value[1:-1]
test.assertRegex(
response_value, header_value,
'Expect header %s to match /%s/, got %s' %
Expand Down
13 changes: 8 additions & 5 deletions gabbi/handlers/jsonhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,16 @@ def action(self, test, path, value=None):
info = six.text_type(info, 'UTF-8')
value = self.loads(info)

expected = test.replace_template(value)
# If expected is a string, check to see if it is a regex.
if (hasattr(expected, 'startswith') and expected.startswith('/')
and expected.endswith('/')):
expected = expected.strip('/').rstrip('/')
is_regex = (isinstance(value, six.string_types) and
value.startswith('/') and
value.endswith('/') and
len(value) > 1)
expected = test.replace_template(value, escape_regex=is_regex)
if is_regex:
expected = expected[1:-1]
# match may be a number so stringify
match = str(match)
match = six.text_type(match)
test.assertRegexpMatches(
match, expected,
'Expect jsonpath %s to match /%s/, got %s' %
Expand Down

0 comments on commit 1834ee1

Please sign in to comment.