View
@@ -17,19 +17,14 @@
import threading
import time
from waitress.buffers import ReadOnlyFileBasedBuffer
from waitress.compat import (
tobytes,
Queue,
Empty,
reraise,
)
from waitress.utilities import (
from .buffers import ReadOnlyFileBasedBuffer
from .compat import Empty, Queue, reraise, tobytes
from .utilities import (
Forwarded,
build_http_date,
logger,
queue_logger,
undquote,
)
rename_headers = { # or keep them without the HTTP_ prefix added
@@ -48,6 +43,16 @@
'upgrade'
))
PROXY_HEADERS = frozenset({
'X_FORWARDED_FOR',
'X_FORWARDED_HOST',
'X_FORWARDED_PROTO',
'X_FORWARDED_PORT',
'X_FORWARDED_BY',
'FORWARDED',
})
class JustTesting(Exception):
pass
@@ -508,6 +513,193 @@ def start_response(status, headers, exc_info=None):
if hasattr(app_iter, 'close'):
app_iter.close()
def parse_proxy_headers(
self,
environ,
headers,
trusted_proxy_count=1,
trusted_proxy_forwarded=False
):
forwarded_for = None
client_addr = None
forwarded_for = None
if 'X_FORWARDED_FOR' in headers:
forwarded_for = []
for forward_hop in headers['X_FORWARDED_FOR'].split(','):
forward_hop = forward_hop.strip()
forward_hop = undquote(forward_hop)
# Make sure that all IPv6 addresses are surrounded by brackets
if ':' in forward_hop and forward_hop[-1] != ']':
forwarded_for.append('[{}]'.format(forward_hop))
else:
forwarded_for.append(forward_hop)
forwarded_for = forwarded_for[-trusted_proxy_count:]
client_addr = forwarded_for[0]
forwarded_host = undquote(headers.get('X_FORWARDED_HOST', ''))
forwarded_proto = undquote(headers.get('X_FORWARDED_PROTO', ''))
forwarded_port = undquote(headers.get('X_FORWARDED_PORT', ''))
forwarded = headers.get('FORWARDED', None)
# If the Forwarded header exists, it gets priority if the setting is
# turned on, and will warn if the other headers were not None and
# discard them.
if forwarded and trusted_proxy_forwarded:
if (
forwarded_for or
forwarded_host or
forwarded_proto or
forwarded_port
):
self.logger.warning(
'The Forwarded header was found to exist alongside '
'one or more of the older X-Forwarded-Host, '
'X-Forwarded-Proto, X-Forwarded-For, '
'X-Forwarded-Port, X-Forwarded-By headers. Waitress will '
'ignore the older style headers, but this could be a '
'security issue. Please make sure to remove the invalid '
'headers before passing the request to Waitress.')
# Remove all other proxy headers other than Forwarded
for header in PROXY_HEADERS - {'FORWARDED'}:
headers.pop(header, None)
proxies = []
for forwarded_element in forwarded.split(','):
# Remove whitespace that may have been introduced when
# appending a new entry
forwarded_element = forwarded_element.strip()
forwarded_for = forwarded_host = forwarded_proto = None
forwarded_port = forwarded_by = None
for pair in forwarded_element.split(';'):
pair = pair.lower()
if not pair:
continue
token, equals, value = pair.partition('=')
if equals != "=":
raise ValueError(
'Invalid forwarded-pair in Forwarded element')
if token.strip() != token:
raise ValueError(
'token may not be surrounded by whitespace')
if value.strip() != value:
raise ValueError(
'value may not be surrounded by whitespace')
if token == 'by':
forwarded_by = undquote(value)
elif token == 'for':
forwarded_for = undquote(value)
elif token == 'host':
forwarded_host = undquote(value)
elif token == 'proto':
forwarded_proto = undquote(value)
else:
self.logger.warning(
'Unknown Forwarded token: %s' % token)
proxies.append(
Forwarded(
forwarded_by, forwarded_for,
forwarded_host, forwarded_proto))
proxies = proxies[-trusted_proxy_count:]
# Iterate backwards and fill in some values, the oldest entry that
# contains the information we expect is the one we use. We expect
# that intermediate proxies may re-write the host header or proto,
# but the oldest entry is the one that contains the information the
# client expects when generating URL's
#
# Forwarded: for="[2001:db8::1]";host="example.com:8443";proto="https"
# Forwarded: for=192.0.2.1;host="example.internal:8080"
#
# (After HTTPS header folding) should mean that we use as values:
#
# Host: example.com
# Protocol: https
# Port: 8443
for proxy in proxies[::-1]:
client_addr = proxy.for_ or client_addr
forwarded_host = proxy.host or forwarded_host
forwarded_proto = proxy.proto or forwarded_proto
elif forwarded:
self.logger.warning(
'The Forwarded header was present, but is not considered '
'trusted. Waitress will remove the Forwarded header value.')
headers.pop('FORWARDED')
if forwarded_proto:
forwarded_proto = forwarded_proto.lower()
if forwarded_proto not in {'http', 'https'}:
raise ValueError(
'Invalid "Forwarded Proto=" or "X-Forwarded-Proto" value.')
# Set the URL scheme to the proxy provided proto
environ['wsgi.url_scheme'] = forwarded_proto
if not forwarded_port:
if forwarded_proto == 'http':
forwarded_port = '80'
if forwarded_proto == 'https':
forwarded_port = '443'
if forwarded_host:
forwarded_host = forwarded_host.strip()
if ':' in forwarded_host and forwarded_host[-1] != ']':
host, port = forwarded_host.rsplit(':', 1)
host, port = host.strip(), str(port)
# We trust the port in the Forwarded Host/X-Forwarded-Host over
# X-Forwarded-Port, or whatever we got from Forwarded
# Proto/X-Forwarded-Proto.
if forwarded_port != port:
forwarded_port = port
# We trust the proxy server's forwarded Host
environ['SERVER_NAME'] = host
environ['HTTP_HOST'] = forwarded_host
else:
# We trust the proxy server's forwarded Host
environ['SERVER_NAME'] = forwarded_host
environ['HTTP_HOST'] = forwarded_host
if forwarded_port and forwarded_port not in {'443', '80'}:
environ['HTTP_HOST'] = '{}:{}'.format(
forwarded_host, forwarded_port)
if forwarded_port:
environ['SERVER_PORT'] = str(forwarded_port)
if client_addr:
if ':' in client_addr and client_addr[-1] != ']':
addr, port = client_addr.rsplit(':', 1)
environ['REMOTE_ADDR'] = addr.strip()
environ['REMOTE_PORT'] = port.strip()
else:
environ['REMOTE_ADDR'] = client_addr.strip()
def get_environment(self):
"""Returns a WSGI environment."""
environ = self.environ
@@ -551,16 +743,28 @@ def get_environment(self):
environ['SCRIPT_NAME'] = url_prefix
environ['PATH_INFO'] = path
environ['QUERY_STRING'] = request.query
host = environ['REMOTE_ADDR'] = channel.addr[0]
remote_peer = environ['REMOTE_ADDR'] = channel.addr[0]
headers = dict(request.headers)
if host == server.adj.trusted_proxy:
wsgi_url_scheme = headers.pop('X_FORWARDED_PROTO',
request.url_scheme)
if remote_peer == server.adj.trusted_proxy or server.trusted_proxy:
self.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=server.adj.trusted_proxy_count,
trusted_proxy_forwarded=server.adj.trusted_proxy_forwarded
)
else:
wsgi_url_scheme = request.url_scheme
if wsgi_url_scheme not in ('http', 'https'):
raise ValueError('Invalid X_FORWARDED_PROTO value')
# If we are not relying on a proxy, we still want to try and set
# the REMOTE_PORT to something useful, maybe None though.
environ['REMOTE_PORT'] = str(channel.addr[1])
# Nah, we aren't actually going to look up the reverse DNS for
# REMOTE_ADDR, but we will happily set this environment variable for
# the WSGI application. Spec says we can just set this to REMOTE_ADDR,
# so we do.
environ['REMOTE_HOST'] = environ['REMOTE_ADDR']
for key, value in headers.items():
value = value.strip()
mykey = rename_headers.get(key, None)
@@ -571,14 +775,19 @@ def get_environment(self):
# the following environment variables are required by the WSGI spec
environ['wsgi.version'] = (1, 0)
environ['wsgi.url_scheme'] = wsgi_url_scheme
environ['wsgi.errors'] = sys.stderr # apps should use the logging module
# May have already been set by the proxy
if 'wsgi.url_scheme' not in environ:
environ['wsgi.url_scheme'] = request.url_scheme
# apps should use the logging module
environ['wsgi.errors'] = sys.stderr
environ['wsgi.multithread'] = True
environ['wsgi.multiprocess'] = False
environ['wsgi.run_once'] = False
environ['wsgi.input'] = request.get_body_stream()
environ['wsgi.file_wrapper'] = ReadOnlyFileBasedBuffer
environ['wsgi.input_terminated'] = True # wsgi.input is EOF terminated
environ['wsgi.input_terminated'] = True # wsgi.input is EOF terminated
self.environ = environ
return environ
View
@@ -653,6 +653,7 @@ class DummyAdjustments(object):
max_request_header_size = 10000
class DummyServer(object):
trusted_proxy = False
trigger_pulled = False
adj = DummyAdjustments()
View
@@ -752,11 +752,13 @@ def test_get_environment_values(self):
# nail the keys of environ
self.assertEqual(sorted(environ.keys()), [
'CONTENT_LENGTH', 'CONTENT_TYPE', 'HTTP_CONNECTION', 'HTTP_X_FOO',
'PATH_INFO', 'QUERY_STRING', 'REMOTE_ADDR', 'REQUEST_METHOD',
'SCRIPT_NAME', 'SERVER_NAME', 'SERVER_PORT', 'SERVER_PROTOCOL',
'SERVER_SOFTWARE', 'wsgi.errors', 'wsgi.file_wrapper', 'wsgi.input',
'wsgi.input_terminated', 'wsgi.multiprocess', 'wsgi.multithread',
'wsgi.run_once', 'wsgi.url_scheme', 'wsgi.version'])
'PATH_INFO', 'QUERY_STRING', 'REMOTE_ADDR', 'REMOTE_HOST',
'REMOTE_PORT', 'REQUEST_METHOD', 'SCRIPT_NAME', 'SERVER_NAME',
'SERVER_PORT', 'SERVER_PROTOCOL', 'SERVER_SOFTWARE', 'wsgi.errors',
'wsgi.file_wrapper', 'wsgi.input', 'wsgi.input_terminated',
'wsgi.multiprocess', 'wsgi.multithread', 'wsgi.run_once',
'wsgi.url_scheme', 'wsgi.version'
])
self.assertEqual(environ['REQUEST_METHOD'], 'GET')
self.assertEqual(environ['SERVER_PORT'], '80')
@@ -768,6 +770,8 @@ def test_get_environment_values(self):
self.assertEqual(environ['PATH_INFO'], '/')
self.assertEqual(environ['QUERY_STRING'], 'abc')
self.assertEqual(environ['REMOTE_ADDR'], '127.0.0.1')
self.assertEqual(environ['REMOTE_HOST'], '127.0.0.1')
self.assertEqual(environ['REMOTE_PORT'], '39830')
self.assertEqual(environ['CONTENT_TYPE'], 'abc')
self.assertEqual(environ['CONTENT_LENGTH'], '10')
self.assertEqual(environ['HTTP_X_FOO'], 'BAR')
@@ -799,7 +803,7 @@ def test_get_environment_values_w_scheme_override_untrusted(self):
def test_get_environment_values_w_scheme_override_trusted(self):
import sys
inst = self._makeOne()
inst.channel.addr = ['192.168.1.1']
inst.channel.addr = ['192.168.1.1', 8080]
inst.channel.server.adj.trusted_proxy = '192.168.1.1'
request = DummyParser()
request.headers = {
@@ -816,14 +820,16 @@ def test_get_environment_values_w_scheme_override_trusted(self):
# nail the keys of environ
self.assertEqual(sorted(environ.keys()), [
'CONTENT_LENGTH', 'CONTENT_TYPE', 'HTTP_CONNECTION', 'HTTP_X_FOO',
'PATH_INFO', 'QUERY_STRING', 'REMOTE_ADDR', 'REQUEST_METHOD',
'SCRIPT_NAME', 'SERVER_NAME', 'SERVER_PORT', 'SERVER_PROTOCOL',
'SERVER_SOFTWARE', 'wsgi.errors', 'wsgi.file_wrapper', 'wsgi.input',
'HTTP_X_FORWARDED_PROTO', 'PATH_INFO', 'QUERY_STRING',
'REMOTE_ADDR', 'REMOTE_HOST', 'REQUEST_METHOD', 'SCRIPT_NAME',
'SERVER_NAME', 'SERVER_PORT', 'SERVER_PROTOCOL', 'SERVER_SOFTWARE',
'wsgi.errors', 'wsgi.file_wrapper', 'wsgi.input',
'wsgi.input_terminated', 'wsgi.multiprocess', 'wsgi.multithread',
'wsgi.run_once', 'wsgi.url_scheme', 'wsgi.version'])
'wsgi.run_once', 'wsgi.url_scheme', 'wsgi.version'
])
self.assertEqual(environ['REQUEST_METHOD'], 'GET')
self.assertEqual(environ['SERVER_PORT'], '80')
self.assertEqual(environ['SERVER_PORT'], '443')
self.assertEqual(environ['SERVER_NAME'], 'localhost')
self.assertEqual(environ['SERVER_SOFTWARE'], 'waitress')
self.assertEqual(environ['SERVER_PROTOCOL'], 'HTTP/1.0')
@@ -861,6 +867,289 @@ def test_get_environment_values_w_bogus_scheme_override(self):
inst.request = request
self.assertRaises(ValueError, inst.get_environment)
def test_parse_proxy_headers_forwarded_for(self):
inst = self._makeOne()
headers = {
'X_FORWARDED_FOR': '192.0.2.1'
}
environ = {}
inst.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=1,
trusted_proxy_forwarded=False
)
self.assertEqual(environ['REMOTE_ADDR'], '192.0.2.1')
def test_parse_proxy_headers_forwarded_for_v6_missing_brackets(self):
inst = self._makeOne()
headers = {
'X_FORWARDED_FOR': '2001:db8::0'
}
environ = {}
inst.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=1,
trusted_proxy_forwarded=False
)
self.assertEqual(environ['REMOTE_ADDR'], '[2001:db8::0]')
def test_parse_proxy_headers_forwared_for_multiple(self):
inst = self._makeOne()
headers = {
'X_FORWARDED_FOR': '192.0.2.1, 198.51.100.2, 203.0.113.1'
}
environ = {}
inst.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=2
)
self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2')
def test_parse_forwarded_multiple_proxies(self):
inst = self._makeOne()
headers = {
'FORWARDED': 'For=192.0.2.1;host=fake.com, For=198.51.100.2;host=example.com:8080, For=203.0.113.1'
}
environ = {}
inst.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=2,
trusted_proxy_forwarded=True
)
self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2')
self.assertEqual(environ['SERVER_NAME'], 'example.com')
self.assertEqual(environ['HTTP_HOST'], 'example.com:8080')
self.assertEqual(environ['SERVER_PORT'], '8080')
def test_parse_proxy_headers_forwarded_host_with_port(self):
inst = self._makeOne()
headers = {
'X_FORWARDED_FOR': '192.0.2.1, 198.51.100.2, 203.0.113.1',
'X_FORWARDED_PROTO': 'http',
'X_FORWARDED_HOST': 'example.com:8080',
}
environ = {}
inst.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=2,
trusted_proxy_forwarded=False,
)
self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2')
self.assertEqual(environ['SERVER_NAME'], 'example.com')
self.assertEqual(environ['HTTP_HOST'], 'example.com:8080')
self.assertEqual(environ['SERVER_PORT'], '8080')
def test_parse_proxy_headers_forwarded_host_without_port(self):
inst = self._makeOne()
headers = {
'X_FORWARDED_FOR': '192.0.2.1, 198.51.100.2, 203.0.113.1',
'X_FORWARDED_PROTO': 'http',
'X_FORWARDED_HOST': 'example.com',
}
environ = {}
inst.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=2,
trusted_proxy_forwarded=False,
)
self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2')
self.assertEqual(environ['SERVER_NAME'], 'example.com')
self.assertEqual(environ['HTTP_HOST'], 'example.com')
self.assertEqual(environ['SERVER_PORT'], '80')
def test_parse_proxy_headers_forwarded_host_with_forwarded_port(self):
inst = self._makeOne()
headers = {
'X_FORWARDED_FOR': '192.0.2.1, 198.51.100.2, 203.0.113.1',
'X_FORWARDED_PROTO': 'http',
'X_FORWARDED_HOST': 'example.com',
'X_FORWARDED_PORT': '8080'
}
environ = {}
inst.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=2,
trusted_proxy_forwarded=False,
)
self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2')
self.assertEqual(environ['SERVER_NAME'], 'example.com')
self.assertEqual(environ['HTTP_HOST'], 'example.com:8080')
self.assertEqual(environ['SERVER_PORT'], '8080')
def test_parse_forwarded(self):
inst = self._makeOne()
headers = {
'FORWARDED': 'For=198.51.100.2:5858;host=example.com:8080;proto=https'
}
environ = {}
inst.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=1,
trusted_proxy_forwarded=True
)
self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2')
self.assertEqual(environ['REMOTE_PORT'], '5858')
self.assertEqual(environ['SERVER_NAME'], 'example.com')
self.assertEqual(environ['HTTP_HOST'], 'example.com:8080')
self.assertEqual(environ['SERVER_PORT'], '8080')
self.assertEqual(environ['wsgi.url_scheme'], 'https')
def test_parse_forwarded_warning_other_proxy_headers(self):
inst = self._makeOne()
inst.logger = DummyLogger()
headers = {
'X_FORWARDED_FOR': '[2001:db8::1]',
'FORWARDED': 'For=198.51.100.2;host=example.com:8080;proto=https'
}
environ = {}
inst.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=1,
trusted_proxy_forwarded=True
)
self.assertEqual(len(inst.logger.logged), 1)
self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2')
self.assertEqual(environ['SERVER_NAME'], 'example.com')
self.assertEqual(environ['HTTP_HOST'], 'example.com:8080')
self.assertEqual(environ['SERVER_PORT'], '8080')
self.assertEqual(environ['wsgi.url_scheme'], 'https')
def test_parse_forwarded_empty_pair(self):
inst = self._makeOne()
headers = {
'FORWARDED': 'For=198.51.100.2;;proto=https;by=_unused'
}
environ = {}
inst.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=1,
trusted_proxy_forwarded=True
)
self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2')
def test_parse_forwarded_pair_token_whitespace(self):
inst = self._makeOne()
headers = {
'FORWARDED': 'For=198.51.100.2; proto =https'
}
environ = {}
with self.assertRaises(ValueError):
inst.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=1,
trusted_proxy_forwarded=True
)
def test_parse_forwarded_pair_value_whitespace(self):
inst = self._makeOne()
headers = {
'FORWARDED': 'For= "198.51.100.2"; proto =https'
}
environ = {}
with self.assertRaises(ValueError):
inst.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=1,
trusted_proxy_forwarded=True
)
def test_parse_forwarded_pair_no_equals(self):
inst = self._makeOne()
headers = {
'FORWARDED': 'For'
}
environ = {}
with self.assertRaises(ValueError):
inst.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=1,
trusted_proxy_forwarded=True
)
def test_parse_forwarded_warning_unknown_token(self):
inst = self._makeOne()
inst.logger = DummyLogger()
headers = {
'FORWARDED': 'For=198.51.100.2;host=example.com:8080;proto=https;unknown="yolo"'
}
environ = {}
inst.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=1,
trusted_proxy_forwarded=True
)
self.assertEqual(len(inst.logger.logged), 1)
self.assertIn('Unknown Forwarded token', inst.logger.logged[0])
self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2')
self.assertEqual(environ['SERVER_NAME'], 'example.com')
self.assertEqual(environ['HTTP_HOST'], 'example.com:8080')
self.assertEqual(environ['SERVER_PORT'], '8080')
self.assertEqual(environ['wsgi.url_scheme'], 'https')
def test_parse_forwarded_warning_forwarded_exists(self):
inst = self._makeOne()
inst.logger = DummyLogger()
headers = {
'FORWARDED': 'For=198.51.100.2'
}
environ = {}
inst.parse_proxy_headers(
environ,
headers,
trusted_proxy_count=1,
trusted_proxy_forwarded=False
)
self.assertEqual(len(inst.logger.logged), 1)
self.assertIn('Forwarded header was present', inst.logger.logged[0])
self.assertNotIn('FORWARDED', headers)
class TestErrorTask(unittest.TestCase):
def _makeOne(self, channel=None, request=None):
@@ -969,8 +1258,11 @@ class DummyAdj(object):
port = 80
url_prefix = ''
trusted_proxy = None
trusted_proxy_count = 1
trusted_proxy_forwarded = False
class DummyServer(object):
trusted_proxy = False
server_name = 'localhost'
effective_port = 80
@@ -981,7 +1273,7 @@ class DummyChannel(object):
closed_when_done = False
adj = DummyAdj()
creation_time = 0
addr = ['127.0.0.1']
addr = ('127.0.0.1', 39830)
def __init__(self, server=None):
if server is None:
View
@@ -99,3 +99,36 @@ def test_it(self):
inst = self._makeOne()
self.assertEqual(inst.body, 1)
class Test_undquote(unittest.TestCase):
def _callFUT(self, value):
from waitress.utilities import undquote
return undquote(value)
def test_empty(self):
self.assertEqual(self._callFUT(''), '')
def test_quoted(self):
self.assertEqual(self._callFUT('"test"'), 'test')
def test_unquoted(self):
self.assertEqual(self._callFUT('test'), 'test')
def test_quoted_backslash_quote(self):
self.assertEqual(self._callFUT('"\\""'), '"')
def test_quoted_htab(self):
self.assertEqual(self._callFUT("\"\t\""), "\t")
def test_quoted_backslash_htab(self):
self.assertEqual(self._callFUT("\"\\\t\""), "\t")
def test_quoted_backslash_invalid(self):
self.assertRaises(ValueError, self._callFUT, '"\\"')
def test_invalid_quoting(self):
self.assertRaises(ValueError, self._callFUT, '"test')
def test_invalid_quoting_single_quote(self):
self.assertRaises(ValueError, self._callFUT, '"')
View
@@ -14,13 +14,14 @@
"""Utility functions
"""
import calendar
import errno
import logging
import os
import re
import stat
import time
import calendar
from collections import namedtuple
logger = logging.getLogger('waitress')
queue_logger = logging.getLogger('waitress.queue')
@@ -204,3 +205,49 @@ class RequestEntityTooLarge(BadRequest):
class InternalServerError(Error):
code = 500
reason = 'Internal Server Error'
# RFC 5234 Appendix B.1 "Core Rules":
# VCHAR = %x21-7E
# ; visible (printing) characters
vchar_re = '\x21-\x7e'
# RFC 7230 Section 3.2.6 "Field Value Components":
# quoted-string = DQUOTE *( qdtext / quoted-pair ) DQUOTE
# qdtext = HTAB / SP /%x21 / %x23-5B / %x5D-7E / obs-text
# obs-text = %x80-FF
# quoted-pair = "\" ( HTAB / SP / VCHAR / obs-text )
obs_text_re = '\x80-\xff'
# The '\\' between \x5b and \x5d is needed to escape \x5d (']')
qdtext_re = '[\t \x21\x23-\x5b\\\x5d-\x7e' + obs_text_re + ']'
quoted_pair_re = r'\\' + '([\t ' + vchar_re + obs_text_re + '])'
quoted_string_re = \
'"(?:(?:' + qdtext_re + ')|(?:' + quoted_pair_re + '))*"'
quoted_string = re.compile(quoted_string_re)
quoted_pair = re.compile(quoted_pair_re)
def undquote(value):
if value.startswith('"') and value.endswith('"'):
# So it claims to be DQUOTE'ed, let's validate that
matches = quoted_string.match(value)
if matches and matches.end() == len(value):
# Remove the DQUOTE's from the value
value = value[1:-1]
# Remove all backslashes that are followed by a valid vchar or
# obs-text
value = quoted_pair.sub(r'\1', value)
return value
elif not value.startswith('"') and not value.endswith('"'):
return value
raise ValueError('Invalid quoting in value')
Forwarded = namedtuple('Forwarded', ['by', 'for_', 'host', 'proto'])