Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 137 additions & 1 deletion cl_sii/libs/xml_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,25 @@


"""
import io
import logging
import os
from typing import IO
from typing import IO, Tuple, Union

import defusedxml
import defusedxml.lxml
import lxml.etree
import signxml
import signxml.exceptions
import xml.parsers.expat
import xml.parsers.expat.errors
from lxml.etree import ElementBase as XmlElement # noqa: F401
# note: 'lxml.etree.ElementTree' is a **function**, not a class.
from lxml.etree import _ElementTree as XmlElementTree # noqa: F401
from lxml.etree import XMLSchema as XmlSchema # noqa: F401

from . import crypto_utils


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -111,6 +116,29 @@ class XmlSchemaDocValidationError(Exception):
"""


class XmlSignatureInvalid(Exception):

"""
XML signature is invalid, for any reason.
"""


class XmlSignatureUnverified(XmlSignatureInvalid):

"""
XML signature verification (i.e. digest validation) failed.

This means the signature is not to be trusted.
"""


class XmlSignatureInvalidCertificate(XmlSignatureInvalid):

"""
Certificate validation failed on XML signature processing.
"""


###############################################################################
# functions
###############################################################################
Expand Down Expand Up @@ -323,3 +351,111 @@ def write_xml_doc(xml_doc: XmlElement, output: IO[bytes]) -> None:
# default: True.
with_tail=True,
)


def verify_xml_signature(
xml_doc: XmlElement,
trusted_x509_cert: Union[crypto_utils.X509Cert, crypto_utils._X509CertOpenSsl] = None,
) -> Tuple[bytes, XmlElementTree, XmlElementTree]:
"""
Verify the XML signature in ``xml_doc``.

.. note::
XML document with more than one signature is not supported.

If the inputs are ok but the XML signature does not verify,
raises :class:`XmlSignatureUnverified`.

If ``trusted_x509_cert`` is None, it requires that the signature in
``xml_doc`` includes a a valid X.509 **certificate chain** that
validates against the *known certificate authorities*.

If ``trusted_x509_cert`` is given, it must be a **trusted** external
X.509 certificate, and the verification will be of whether the XML
signature in ``xml_doc`` was signed by ``trusted_x509_cert`` or not;
thus **it overrides** any X.509 certificate information included
in the signature.

.. note::
It is strongly recommended to validate ``xml_doc`` beforehand
(against the corresponding XML schema, using :func:`validate_xml_doc`).

:param xml_doc:
:param trusted_x509_cert: a trusted external X.509 certificate, or None
:raises :class:`XmlSignatureInvalidCertificate`:
certificate validation failed
:raises :class:`XmlSignatureInvalid`:
signature is invalid
:raises :class:`XmlSchemaDocValidationError`:
XML doc is not valid
:raises :class:`ValueError`:

"""
if not isinstance(xml_doc, XmlElement):
raise TypeError("'xml_doc' must be an XML document/element.")

n_signatures = (
len(xml_doc.findall('.//ds:Signature', namespaces=XML_DSIG_NS_MAP))
+ len(xml_doc.findall('.//dsig11:Signature', namespaces=XML_DSIG_NS_MAP))
+ len(xml_doc.findall('.//dsig2:Signature', namespaces=XML_DSIG_NS_MAP)))

if n_signatures > 1:
raise NotImplementedError("XML document with more than one signature is not supported.")

xml_verifier = signxml.XMLVerifier()

if isinstance(trusted_x509_cert, crypto_utils._X509CertOpenSsl):
trusted_x509_cert_open_ssl = trusted_x509_cert
elif isinstance(trusted_x509_cert, crypto_utils.X509Cert):
trusted_x509_cert_open_ssl = crypto_utils._X509CertOpenSsl.from_cryptography(
trusted_x509_cert)
elif trusted_x509_cert is None:
trusted_x509_cert_open_ssl = None
else:
# A 'crypto_utils._X509CertOpenSsl' is ok but we prefer 'crypto_utils.X509Cert'.
raise TypeError("'trusted_x509_cert' must be a 'crypto_utils.X509Cert' instance, or None.")

# warning: performance issue.
# note: 'signxml.XMLVerifier.verify()' calls 'signxml.util.XMLProcessor.get_root()',
# which converts the data to string, and then reparses it using the same function we use
# in 'parse_untrusted_xml()' ('defusedxml.lxml.fromstring'), but without all the precautions
# we have there. See:
# https://github.com/XML-Security/signxml/blob/v2.6.0/signxml/util/__init__.py#L141-L151
# Considering that, we'd rather write to bytes ourselves and control the process.
f = io.BytesIO()
write_xml_doc(xml_doc, f)
tmp_bytes = f.getvalue()

try:
# note: by passing 'x509_cert' we override any X.509 certificate information supplied
# by the signature itself.
result: signxml.VerifyResult = xml_verifier.verify(
data=tmp_bytes, require_x509=True, x509_cert=trusted_x509_cert_open_ssl)

except signxml.exceptions.InvalidDigest as exc:
# warning: catch before 'InvalidSignature' (it is the parent of 'InvalidDigest').
raise XmlSignatureUnverified(str(exc)) from exc

except signxml.exceptions.InvalidCertificate as exc:
# warning: catch before 'InvalidSignature' (it is the parent of 'InvalidCertificate').
raise XmlSignatureInvalidCertificate(str(exc)) from exc

except signxml.exceptions.InvalidSignature as exc:
logger.exception(
"Unexpected exception (it should have been an instance of subclass of "
"'InvalidSignature'). Error: %s",
str(exc))
raise XmlSignatureInvalid(str(exc)) from exc

except signxml.exceptions.InvalidInput as exc:
raise ValueError("Invalid input.", str(exc)) from exc

except lxml.etree.DocumentInvalid as exc:
# Simplest and safest way to get the error message (see 'validate_xml_doc()').
# Error example:
# "Element '{http://www.w3.org/2000/09/xmldsig#}X509Certificate': '\nabc\n' is not a
# valid value of the atomic type 'xs:base64Binary'., line 30"
validation_error_msg = str(exc)
raise XmlSchemaDocValidationError(validation_error_msg) from exc

return result.signed_data, result.signed_xml, result.signature_xml
24 changes: 24 additions & 0 deletions tests/test_data/xml/trivial-doc.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?xml version="1.0"?>
<data>
<!-- This comment is very important! -->
<country name="Liechtenstein">
<rank>1</rank>
<year>2008</year>
<gdppc>141100</gdppc>
<neighbor name="Austria" direction="E"/>
<neighbor name="Switzerland" direction="W"/>
</country>
<country name="Singapore">
<rank>4</rank>
<year>2011</year>
<gdppc>59900</gdppc>
<neighbor name="Malaysia" direction="N"/>
</country>
<country name="Panama">
<rank>68</rank>
<year>2011</year>
<gdppc>13600</gdppc>
<neighbor name="Costa Rica" direction="W"/>
<neighbor name="Colombia" direction="E"/>
</country>
</data>
173 changes: 171 additions & 2 deletions tests/test_libs_xml_utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import io
import unittest

import lxml.etree

from cl_sii.libs.crypto_utils import load_pem_x509_cert

from cl_sii.libs.xml_utils import XmlElement
from cl_sii.libs.xml_utils import ( # noqa: F401
XmlSyntaxError, XmlFeatureForbidden,
parse_untrusted_xml, read_xml_schema, validate_xml_doc, write_xml_doc,
XmlSyntaxError, XmlFeatureForbidden, XmlSchemaDocValidationError,
XmlSignatureInvalid, XmlSignatureInvalidCertificate, XmlSignatureUnverified,
parse_untrusted_xml, read_xml_schema, validate_xml_doc, verify_xml_signature, write_xml_doc,
)

from .utils import read_test_file_bytes
Expand Down Expand Up @@ -106,3 +110,168 @@ class FunctionWriteXmlDocTest(unittest.TestCase):

# TODO: implement for function 'write_xml_doc'. Consider each of the "observations".
pass


class FunctionVerifyXmlSignatureTest(unittest.TestCase):

@classmethod
def setUpClass(cls) -> None:
super().setUpClass()

cls.any_x509_cert_pem_file = read_test_file_bytes(
'test_data/crypto/wildcard-google-com-cert.pem')

cls.xml_doc_cert_pem_bytes = read_test_file_bytes(
'test_data/sii-crypto/DTE--76354771-K--33--170-cert.pem')

cls.with_valid_signature = read_test_file_bytes(
'test_data/sii-dte/DTE--76354771-K--33--170--cleaned.xml')
cls.with_valid_signature_signed_data = read_test_file_bytes(
'test_data/sii-dte/DTE--76354771-K--33--170--cleaned-signed_data.xml')
cls.with_valid_signature_signed_xml = read_test_file_bytes(
'test_data/sii-dte/DTE--76354771-K--33--170--cleaned-signed_xml.xml')
cls.with_valid_signature_signature_xml = read_test_file_bytes(
'test_data/sii-dte/DTE--76354771-K--33--170--cleaned-signature_xml.xml')

cls.trivial_without_signature = read_test_file_bytes(
'test_data/xml/trivial-doc.xml')
cls.with_too_many_signatures = read_test_file_bytes(
'test_data/sii-rtc/AEC--76354771-K--33--170--SEQ-2.xml')
cls.without_signature = read_test_file_bytes(
'test_data/sii-dte/DTE--76354771-K--33--170--cleaned-mod-removed-signature.xml')
cls.with_bad_cert = read_test_file_bytes(
'test_data/sii-dte/DTE--76354771-K--33--170--cleaned-mod-bad-cert.xml')
cls.with_bad_cert_no_base64 = read_test_file_bytes(
'test_data/sii-dte/DTE--76354771-K--33--170--cleaned-mod-bad-cert-no-base64.xml')
cls.with_signature_and_modified = read_test_file_bytes(
'test_data/sii-dte/DTE--76354771-K--33--170--cleaned-mod-changed-monto.xml')

def test_ok_external_trusted_cert(self) -> None:
xml_doc = parse_untrusted_xml(self.with_valid_signature)
cert = load_pem_x509_cert(self.xml_doc_cert_pem_bytes)

signed_data, signed_xml, signature_xml = verify_xml_signature(
xml_doc, trusted_x509_cert=cert)

self.assertEqual(signed_data, self.with_valid_signature_signed_data)

f = io.BytesIO()
write_xml_doc(signed_xml, f)
signed_xml_bytes = f.getvalue()
self.assertEqual(signed_xml_bytes, self.with_valid_signature_signed_xml)

f = io.BytesIO()
write_xml_doc(signature_xml, f)
signature_xml_bytes = f.getvalue()
self.assertEqual(signature_xml_bytes, self.with_valid_signature_signature_xml)

def test_ok_cert_in_signature(self) -> None:
# TODO: implement!

# xml_doc = parse_untrusted_xml(...)
# verify_xml_signature(xml_doc, trusted_x509_cert=None)
pass

def test_fail_cert_type_error(self) -> None:
xml_doc = parse_untrusted_xml(self.with_valid_signature)
cert = self.any_x509_cert_pem_file

with self.assertRaises(TypeError) as cm:
_ = verify_xml_signature(xml_doc, trusted_x509_cert=cert)
self.assertEqual(
cm.exception.args,
("'trusted_x509_cert' must be a 'crypto_utils.X509Cert' instance, or None.", ))

def test_fail_xml_doc_type_error(self) -> None:
cert = self.any_x509_cert_pem_file

with self.assertRaises(TypeError) as cm:
_ = verify_xml_signature(xml_doc=object(), trusted_x509_cert=cert)
self.assertEqual(
cm.exception.args,
("'xml_doc' must be an XML document/element.", ))

def test_bad_cert_included(self) -> None:
# If the included certificate is bad, it does not matter, as long as it does not break XML.
xml_doc_with_bad_cert = parse_untrusted_xml(self.with_bad_cert)
xml_doc_with_bad_cert_no_base64 = parse_untrusted_xml(self.with_bad_cert_no_base64)

cert = load_pem_x509_cert(self.xml_doc_cert_pem_bytes)

verify_xml_signature(xml_doc_with_bad_cert, trusted_x509_cert=cert)

with self.assertRaises(XmlSchemaDocValidationError) as cm:
verify_xml_signature(xml_doc_with_bad_cert_no_base64, trusted_x509_cert=cert)
self.assertEqual(
cm.exception.args,
("Element '{http://www.w3.org/2000/09/xmldsig#}X509Certificate': '\nabc\n"
"' is not a valid value of the atomic type 'xs:base64Binary'., line 30", ))

def test_fail_included_cert_not_from_a_known_ca(self) -> None:
xml_doc = parse_untrusted_xml(self.with_valid_signature)

# Without cert: fails because the issuer of the cert in the signature is not a known CA.
with self.assertRaises(XmlSignatureInvalidCertificate) as cm:
verify_xml_signature(xml_doc, trusted_x509_cert=None)
self.assertEqual(
cm.exception.args,
("[20, 0, 'unable to get local issuer certificate']", ))

def test_fail_signed_data_modified(self) -> None:
xml_doc = parse_untrusted_xml(self.with_signature_and_modified)
cert = load_pem_x509_cert(self.xml_doc_cert_pem_bytes)

with self.assertRaises(XmlSignatureUnverified) as cm:
verify_xml_signature(xml_doc, trusted_x509_cert=cert)
self.assertEqual(cm.exception.args, ("Digest mismatch for reference 0", ))

def test_xml_doc_without_signature_1(self) -> None:
xml_doc = parse_untrusted_xml(self.without_signature)

expected_exc_args = (
'Invalid input.',
'Expected to find XML element Signature in {http://www.sii.cl/SiiDte}DTE')

# Without cert:
with self.assertRaises(ValueError) as cm:
verify_xml_signature(xml_doc, trusted_x509_cert=None)
self.assertEqual(cm.exception.args, expected_exc_args)

# With cert:
cert = load_pem_x509_cert(self.any_x509_cert_pem_file)
with self.assertRaises(ValueError) as cm:
verify_xml_signature(xml_doc, trusted_x509_cert=cert)
self.assertEqual(cm.exception.args, expected_exc_args)

def test_fail_xml_doc_without_signature_2(self) -> None:
xml_doc = parse_untrusted_xml(self.trivial_without_signature)

expected_exc_args = (
'Invalid input.', 'Expected to find XML element Signature in data')

# Without cert:
with self.assertRaises(ValueError) as cm:
verify_xml_signature(xml_doc, trusted_x509_cert=None)
self.assertEqual(cm.exception.args, expected_exc_args)

# With cert:
cert = load_pem_x509_cert(self.xml_doc_cert_pem_bytes)
with self.assertRaises(ValueError) as cm:
verify_xml_signature(xml_doc, trusted_x509_cert=cert)
self.assertEqual(cm.exception.args, expected_exc_args)

def test_fail_xml_doc_with_too_many_signatures(self) -> None:
xml_doc = parse_untrusted_xml(self.with_too_many_signatures)

expected_exc_args = ("XML document with more than one signature is not supported.", )

# Without cert:
with self.assertRaises(NotImplementedError) as cm:
verify_xml_signature(xml_doc, trusted_x509_cert=None)
self.assertEqual(cm.exception.args, expected_exc_args)

# With cert:
cert = load_pem_x509_cert(self.xml_doc_cert_pem_bytes)
with self.assertRaises(NotImplementedError) as cm:
verify_xml_signature(xml_doc, trusted_x509_cert=cert)
self.assertEqual(cm.exception.args, expected_exc_args)