Skip to content

Commit

Permalink
Merge pull request #247 from orsinium-forks/fix-ssrf
Browse files Browse the repository at this point in the history
[CVE-2020-27197] Avoid SSRF on parsing XML
  • Loading branch information
emmanvg committed Oct 19, 2020
2 parents c8e9c9b + 587d180 commit 23c6f7b
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 17 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ build/
dist/
libtaxii.egg-info/
venv/
.pytest_cache
.tox
output.txt
libtaxii/test/output/
Expand Down
31 changes: 23 additions & 8 deletions libtaxii/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import dateutil.parser
from lxml import etree
import six
from six.moves.urllib.parse import urlparse

try:
import simplejson as json
Expand All @@ -21,20 +22,34 @@
_XML_PARSER = None


def parse(s):
def parse(s, allow_file=True, allow_url=False):
"""
Uses the default parser to parse a string or file-like object

:param s: The XML String or File-like object to parse
:param s: The XML String or File-like object to parse.
:param allow_file: Allow `s` to be a file path.
:param allow_url: Allow `s` to be a URL.
:return: an etree._Element
"""
# Do a simple validation that the given string (or URL)
# has no protocol specified. Anything without parseable protocol
# will be interpreted by lxml as string instead or path of external URL.
if not allow_url and isinstance(s, six.string_types):
parsed = urlparse(s)
if parsed.scheme:
raise ValueError('external URLs are not allowed')

parser = get_xml_parser()

# parse from string if no external paths allowed
if not allow_file and not allow_url:
return etree.fromstring(s, parser)

# try to parse from file or string if files are allowed
try:
e = etree.parse(s, get_xml_parser()).getroot()
return etree.parse(s, parser).getroot()
except IOError:
e = etree.XML(s, get_xml_parser())

return e
return etree.XML(s, parser)


def parse_xml_string(xmlstr):
Expand All @@ -56,7 +71,7 @@ def parse_xml_string(xmlstr):
else:
xmlstr = six.StringIO(xmlstr)

return parse(xmlstr)
return parse(xmlstr, allow_file=True)


def get_xml_parser():
Expand Down Expand Up @@ -422,7 +437,7 @@ def stringify_content(content):

if hasattr(content, 'read'): # The content is file-like
try: # Try to parse as XML
xml = parse(content)
xml = parse(content, allow_file=True)
return xml, True
except etree.XMLSyntaxError: # Content is not well-formed XML; just treat as a string
return content.read(), False
Expand Down
6 changes: 3 additions & 3 deletions libtaxii/messages_10.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def validate_xml(xml_string):
etree_xml = parse_xml_string(xml_string)
package_dir, package_filename = os.path.split(__file__)
schema_file = os.path.join(package_dir, "xsd", "TAXII_XMLMessageBinding_Schema.xsd")
taxii_schema_doc = parse(schema_file)
taxii_schema_doc = parse(schema_file, allow_file=True)
xml_schema = etree.XMLSchema(taxii_schema_doc)
valid = xml_schema.validate(etree_xml)
if not valid:
Expand Down Expand Up @@ -479,7 +479,7 @@ def from_dict(cls, d, **kwargs):
extended_headers = {}
for k, v in six.iteritems(d['extended_headers']):
try:
v = parse(v)
v = parse(v, allow_file=False)
except etree.XMLSyntaxError:
pass
extended_headers[k] = v
Expand Down Expand Up @@ -646,7 +646,7 @@ def from_dict(d):
is_xml = d.get('content_is_xml', False)
if is_xml:
#FIXME: to parse or not to parse the content - this should be configurable
kwargs['content'] = parse(d['content'])
kwargs['content'] = parse(d['content'], allow_file=False)
else:
kwargs['content'] = d['content']

Expand Down
6 changes: 3 additions & 3 deletions libtaxii/messages_11.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def validate_xml(xml_string):
etree_xml = parse_xml_string(xml_string)
package_dir, package_filename = os.path.split(__file__)
schema_file = os.path.join(package_dir, "xsd", "TAXII_XMLMessageBinding_Schema_11.xsd")
taxii_schema_doc = parse(schema_file)
taxii_schema_doc = parse(schema_file, allow_file=True)
xml_schema = etree.XMLSchema(taxii_schema_doc)
valid = xml_schema.validate(etree_xml)
# TODO: Additionally, validate the Query stuff
Expand Down Expand Up @@ -818,7 +818,7 @@ def from_dict(d):
kwargs['message'] = d.get('message')
is_xml = d.get('content_is_xml', False)
if is_xml:
kwargs['content'] = parse(d['content'])
kwargs['content'] = parse(d['content'], allow_file=False)
else:
kwargs['content'] = d['content']

Expand Down Expand Up @@ -1109,7 +1109,7 @@ def from_dict(cls, d, **kwargs):
extended_headers = {}
for k, v in six.iteritems(d['extended_headers']):
try:
v = parse(v)
v = parse(v, allow_file=False)
except etree.XMLSyntaxError:
pass
extended_headers[k] = v
Expand Down
36 changes: 36 additions & 0 deletions libtaxii/test/messages_11_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,42 @@ def test_xee_local(self):
except etree.XMLSyntaxError:
raise ValueError("An XML Syntax Error was raised, meaning a real attack would have succeeded!")

def test_ssrf(self):
"""
Tests that external URLs are forbidden by default.
https://github.com/TAXIIProject/libtaxii/issues/246
"""
try:
parse("http://localhost/")
except ValueError:
pass
else:
raise AssertionError("oh no!")

try:
parse("ftp://localhost/")
except ValueError:
pass
else:
raise AssertionError("oh no!")

def test_local_filesystem_access(self):
"""No access to files allowed with allow_file=False
"""
try:
parse("file:///etc/hosts/", allow_file=False, allow_url=True)
except etree.XMLSyntaxError:
pass
else:
raise AssertionError("oh no!")

try:
parse("/etc/hosts/", allow_file=False)
except etree.XMLSyntaxError:
pass
else:
raise AssertionError("oh no!")

def test_dtd_retrieval(self):
pass

Expand Down
6 changes: 3 additions & 3 deletions libtaxii/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def __init__(self, schema_file):
will be used when validate_file/string/etree
is used.
"""
schema_doc = parse(schema_file)
schema_doc = parse(schema_file, allow_file=True)
self.xml_schema = etree.XMLSchema(schema_doc)

def validate_file(self, file_location):
Expand All @@ -155,7 +155,7 @@ def validate_file(self, file_location):
"""

with open(file_location, 'r') as f:
etree_xml = parse(f)
etree_xml = parse(f, allow_file=True)

return self.validate_etree(etree_xml)

Expand All @@ -164,7 +164,7 @@ def validate_string(self, xml_string):
A wrapper for validate_etree. Parses xml_string,
turns it into an etree, then calls validate_etree( ... )
"""
etree_xml = parse(xml_string)
etree_xml = parse(xml_string, allow_file=False)
return self.validate_etree(etree_xml)

def validate_etree(self, etree_xml):
Expand Down

0 comments on commit 23c6f7b

Please sign in to comment.