diff --git a/cl_sii/libs/xml_utils.py b/cl_sii/libs/xml_utils.py index 6ed10205..606a62ac 100644 --- a/cl_sii/libs/xml_utils.py +++ b/cl_sii/libs/xml_utils.py @@ -19,13 +19,16 @@ """ +import io import logging import os -from typing import IO +from typing import IO, Tuple, Union import defusedxml import defusedxml.lxml import lxml.etree +import signxml +import signxml.exceptions import xml.parsers.expat import xml.parsers.expat.errors from lxml.etree import ElementBase as XmlElement # noqa: F401 @@ -33,6 +36,8 @@ from lxml.etree import _ElementTree as XmlElementTree # noqa: F401 from lxml.etree import XMLSchema as XmlSchema # noqa: F401 +from . import crypto_utils + logger = logging.getLogger(__name__) @@ -111,6 +116,29 @@ class XmlSchemaDocValidationError(Exception): """ +class XmlSignatureInvalid(Exception): + + """ + XML signature is invalid, for any reason. + """ + + +class XmlSignatureUnverified(XmlSignatureInvalid): + + """ + XML signature verification (i.e. digest validation) failed. + + This means the signature is not to be trusted. + """ + + +class XmlSignatureInvalidCertificate(XmlSignatureInvalid): + + """ + Certificate validation failed on XML signature processing. + """ + + ############################################################################### # functions ############################################################################### @@ -323,3 +351,111 @@ def write_xml_doc(xml_doc: XmlElement, output: IO[bytes]) -> None: # default: True. with_tail=True, ) + + +def verify_xml_signature( + xml_doc: XmlElement, + trusted_x509_cert: Union[crypto_utils.X509Cert, crypto_utils._X509CertOpenSsl] = None, +) -> Tuple[bytes, XmlElementTree, XmlElementTree]: + """ + Verify the XML signature in ``xml_doc``. + + .. note:: + XML document with more than one signature is not supported. + + If the inputs are ok but the XML signature does not verify, + raises :class:`XmlSignatureUnverified`. + + If ``trusted_x509_cert`` is None, it requires that the signature in + ``xml_doc`` includes a a valid X.509 **certificate chain** that + validates against the *known certificate authorities*. + + If ``trusted_x509_cert`` is given, it must be a **trusted** external + X.509 certificate, and the verification will be of whether the XML + signature in ``xml_doc`` was signed by ``trusted_x509_cert`` or not; + thus **it overrides** any X.509 certificate information included + in the signature. + + .. note:: + It is strongly recommended to validate ``xml_doc`` beforehand + (against the corresponding XML schema, using :func:`validate_xml_doc`). + + :param xml_doc: + :param trusted_x509_cert: a trusted external X.509 certificate, or None + :raises :class:`XmlSignatureInvalidCertificate`: + certificate validation failed + :raises :class:`XmlSignatureInvalid`: + signature is invalid + :raises :class:`XmlSchemaDocValidationError`: + XML doc is not valid + :raises :class:`ValueError`: + + """ + if not isinstance(xml_doc, XmlElement): + raise TypeError("'xml_doc' must be an XML document/element.") + + n_signatures = ( + len(xml_doc.findall('.//ds:Signature', namespaces=XML_DSIG_NS_MAP)) + + len(xml_doc.findall('.//dsig11:Signature', namespaces=XML_DSIG_NS_MAP)) + + len(xml_doc.findall('.//dsig2:Signature', namespaces=XML_DSIG_NS_MAP))) + + if n_signatures > 1: + raise NotImplementedError("XML document with more than one signature is not supported.") + + xml_verifier = signxml.XMLVerifier() + + if isinstance(trusted_x509_cert, crypto_utils._X509CertOpenSsl): + trusted_x509_cert_open_ssl = trusted_x509_cert + elif isinstance(trusted_x509_cert, crypto_utils.X509Cert): + trusted_x509_cert_open_ssl = crypto_utils._X509CertOpenSsl.from_cryptography( + trusted_x509_cert) + elif trusted_x509_cert is None: + trusted_x509_cert_open_ssl = None + else: + # A 'crypto_utils._X509CertOpenSsl' is ok but we prefer 'crypto_utils.X509Cert'. + raise TypeError("'trusted_x509_cert' must be a 'crypto_utils.X509Cert' instance, or None.") + + # warning: performance issue. + # note: 'signxml.XMLVerifier.verify()' calls 'signxml.util.XMLProcessor.get_root()', + # which converts the data to string, and then reparses it using the same function we use + # in 'parse_untrusted_xml()' ('defusedxml.lxml.fromstring'), but without all the precautions + # we have there. See: + # https://github.com/XML-Security/signxml/blob/v2.6.0/signxml/util/__init__.py#L141-L151 + # Considering that, we'd rather write to bytes ourselves and control the process. + f = io.BytesIO() + write_xml_doc(xml_doc, f) + tmp_bytes = f.getvalue() + + try: + # note: by passing 'x509_cert' we override any X.509 certificate information supplied + # by the signature itself. + result: signxml.VerifyResult = xml_verifier.verify( + data=tmp_bytes, require_x509=True, x509_cert=trusted_x509_cert_open_ssl) + + except signxml.exceptions.InvalidDigest as exc: + # warning: catch before 'InvalidSignature' (it is the parent of 'InvalidDigest'). + raise XmlSignatureUnverified(str(exc)) from exc + + except signxml.exceptions.InvalidCertificate as exc: + # warning: catch before 'InvalidSignature' (it is the parent of 'InvalidCertificate'). + raise XmlSignatureInvalidCertificate(str(exc)) from exc + + except signxml.exceptions.InvalidSignature as exc: + logger.exception( + "Unexpected exception (it should have been an instance of subclass of " + "'InvalidSignature'). Error: %s", + str(exc)) + raise XmlSignatureInvalid(str(exc)) from exc + + except signxml.exceptions.InvalidInput as exc: + raise ValueError("Invalid input.", str(exc)) from exc + + except lxml.etree.DocumentInvalid as exc: + # Simplest and safest way to get the error message (see 'validate_xml_doc()'). + # Error example: + # "Element '{http://www.w3.org/2000/09/xmldsig#}X509Certificate': '\nabc\n' is not a + # valid value of the atomic type 'xs:base64Binary'., line 30" + validation_error_msg = str(exc) + raise XmlSchemaDocValidationError(validation_error_msg) from exc + + return result.signed_data, result.signed_xml, result.signature_xml diff --git a/tests/test_data/xml/trivial-doc.xml b/tests/test_data/xml/trivial-doc.xml new file mode 100644 index 00000000..3de7f3fe --- /dev/null +++ b/tests/test_data/xml/trivial-doc.xml @@ -0,0 +1,24 @@ + + + + + 1 + 2008 + 141100 + + + + + 4 + 2011 + 59900 + + + + 68 + 2011 + 13600 + + + + diff --git a/tests/test_libs_xml_utils.py b/tests/test_libs_xml_utils.py index 33ec3037..d190f803 100644 --- a/tests/test_libs_xml_utils.py +++ b/tests/test_libs_xml_utils.py @@ -1,11 +1,15 @@ +import io import unittest import lxml.etree +from cl_sii.libs.crypto_utils import load_pem_x509_cert + from cl_sii.libs.xml_utils import XmlElement from cl_sii.libs.xml_utils import ( # noqa: F401 - XmlSyntaxError, XmlFeatureForbidden, - parse_untrusted_xml, read_xml_schema, validate_xml_doc, write_xml_doc, + XmlSyntaxError, XmlFeatureForbidden, XmlSchemaDocValidationError, + XmlSignatureInvalid, XmlSignatureInvalidCertificate, XmlSignatureUnverified, + parse_untrusted_xml, read_xml_schema, validate_xml_doc, verify_xml_signature, write_xml_doc, ) from .utils import read_test_file_bytes @@ -106,3 +110,168 @@ class FunctionWriteXmlDocTest(unittest.TestCase): # TODO: implement for function 'write_xml_doc'. Consider each of the "observations". pass + + +class FunctionVerifyXmlSignatureTest(unittest.TestCase): + + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + + cls.any_x509_cert_pem_file = read_test_file_bytes( + 'test_data/crypto/wildcard-google-com-cert.pem') + + cls.xml_doc_cert_pem_bytes = read_test_file_bytes( + 'test_data/sii-crypto/DTE--76354771-K--33--170-cert.pem') + + cls.with_valid_signature = read_test_file_bytes( + 'test_data/sii-dte/DTE--76354771-K--33--170--cleaned.xml') + cls.with_valid_signature_signed_data = read_test_file_bytes( + 'test_data/sii-dte/DTE--76354771-K--33--170--cleaned-signed_data.xml') + cls.with_valid_signature_signed_xml = read_test_file_bytes( + 'test_data/sii-dte/DTE--76354771-K--33--170--cleaned-signed_xml.xml') + cls.with_valid_signature_signature_xml = read_test_file_bytes( + 'test_data/sii-dte/DTE--76354771-K--33--170--cleaned-signature_xml.xml') + + cls.trivial_without_signature = read_test_file_bytes( + 'test_data/xml/trivial-doc.xml') + cls.with_too_many_signatures = read_test_file_bytes( + 'test_data/sii-rtc/AEC--76354771-K--33--170--SEQ-2.xml') + cls.without_signature = read_test_file_bytes( + 'test_data/sii-dte/DTE--76354771-K--33--170--cleaned-mod-removed-signature.xml') + cls.with_bad_cert = read_test_file_bytes( + 'test_data/sii-dte/DTE--76354771-K--33--170--cleaned-mod-bad-cert.xml') + cls.with_bad_cert_no_base64 = read_test_file_bytes( + 'test_data/sii-dte/DTE--76354771-K--33--170--cleaned-mod-bad-cert-no-base64.xml') + cls.with_signature_and_modified = read_test_file_bytes( + 'test_data/sii-dte/DTE--76354771-K--33--170--cleaned-mod-changed-monto.xml') + + def test_ok_external_trusted_cert(self) -> None: + xml_doc = parse_untrusted_xml(self.with_valid_signature) + cert = load_pem_x509_cert(self.xml_doc_cert_pem_bytes) + + signed_data, signed_xml, signature_xml = verify_xml_signature( + xml_doc, trusted_x509_cert=cert) + + self.assertEqual(signed_data, self.with_valid_signature_signed_data) + + f = io.BytesIO() + write_xml_doc(signed_xml, f) + signed_xml_bytes = f.getvalue() + self.assertEqual(signed_xml_bytes, self.with_valid_signature_signed_xml) + + f = io.BytesIO() + write_xml_doc(signature_xml, f) + signature_xml_bytes = f.getvalue() + self.assertEqual(signature_xml_bytes, self.with_valid_signature_signature_xml) + + def test_ok_cert_in_signature(self) -> None: + # TODO: implement! + + # xml_doc = parse_untrusted_xml(...) + # verify_xml_signature(xml_doc, trusted_x509_cert=None) + pass + + def test_fail_cert_type_error(self) -> None: + xml_doc = parse_untrusted_xml(self.with_valid_signature) + cert = self.any_x509_cert_pem_file + + with self.assertRaises(TypeError) as cm: + _ = verify_xml_signature(xml_doc, trusted_x509_cert=cert) + self.assertEqual( + cm.exception.args, + ("'trusted_x509_cert' must be a 'crypto_utils.X509Cert' instance, or None.", )) + + def test_fail_xml_doc_type_error(self) -> None: + cert = self.any_x509_cert_pem_file + + with self.assertRaises(TypeError) as cm: + _ = verify_xml_signature(xml_doc=object(), trusted_x509_cert=cert) + self.assertEqual( + cm.exception.args, + ("'xml_doc' must be an XML document/element.", )) + + def test_bad_cert_included(self) -> None: + # If the included certificate is bad, it does not matter, as long as it does not break XML. + xml_doc_with_bad_cert = parse_untrusted_xml(self.with_bad_cert) + xml_doc_with_bad_cert_no_base64 = parse_untrusted_xml(self.with_bad_cert_no_base64) + + cert = load_pem_x509_cert(self.xml_doc_cert_pem_bytes) + + verify_xml_signature(xml_doc_with_bad_cert, trusted_x509_cert=cert) + + with self.assertRaises(XmlSchemaDocValidationError) as cm: + verify_xml_signature(xml_doc_with_bad_cert_no_base64, trusted_x509_cert=cert) + self.assertEqual( + cm.exception.args, + ("Element '{http://www.w3.org/2000/09/xmldsig#}X509Certificate': '\nabc\n" + "' is not a valid value of the atomic type 'xs:base64Binary'., line 30", )) + + def test_fail_included_cert_not_from_a_known_ca(self) -> None: + xml_doc = parse_untrusted_xml(self.with_valid_signature) + + # Without cert: fails because the issuer of the cert in the signature is not a known CA. + with self.assertRaises(XmlSignatureInvalidCertificate) as cm: + verify_xml_signature(xml_doc, trusted_x509_cert=None) + self.assertEqual( + cm.exception.args, + ("[20, 0, 'unable to get local issuer certificate']", )) + + def test_fail_signed_data_modified(self) -> None: + xml_doc = parse_untrusted_xml(self.with_signature_and_modified) + cert = load_pem_x509_cert(self.xml_doc_cert_pem_bytes) + + with self.assertRaises(XmlSignatureUnverified) as cm: + verify_xml_signature(xml_doc, trusted_x509_cert=cert) + self.assertEqual(cm.exception.args, ("Digest mismatch for reference 0", )) + + def test_xml_doc_without_signature_1(self) -> None: + xml_doc = parse_untrusted_xml(self.without_signature) + + expected_exc_args = ( + 'Invalid input.', + 'Expected to find XML element Signature in {http://www.sii.cl/SiiDte}DTE') + + # Without cert: + with self.assertRaises(ValueError) as cm: + verify_xml_signature(xml_doc, trusted_x509_cert=None) + self.assertEqual(cm.exception.args, expected_exc_args) + + # With cert: + cert = load_pem_x509_cert(self.any_x509_cert_pem_file) + with self.assertRaises(ValueError) as cm: + verify_xml_signature(xml_doc, trusted_x509_cert=cert) + self.assertEqual(cm.exception.args, expected_exc_args) + + def test_fail_xml_doc_without_signature_2(self) -> None: + xml_doc = parse_untrusted_xml(self.trivial_without_signature) + + expected_exc_args = ( + 'Invalid input.', 'Expected to find XML element Signature in data') + + # Without cert: + with self.assertRaises(ValueError) as cm: + verify_xml_signature(xml_doc, trusted_x509_cert=None) + self.assertEqual(cm.exception.args, expected_exc_args) + + # With cert: + cert = load_pem_x509_cert(self.xml_doc_cert_pem_bytes) + with self.assertRaises(ValueError) as cm: + verify_xml_signature(xml_doc, trusted_x509_cert=cert) + self.assertEqual(cm.exception.args, expected_exc_args) + + def test_fail_xml_doc_with_too_many_signatures(self) -> None: + xml_doc = parse_untrusted_xml(self.with_too_many_signatures) + + expected_exc_args = ("XML document with more than one signature is not supported.", ) + + # Without cert: + with self.assertRaises(NotImplementedError) as cm: + verify_xml_signature(xml_doc, trusted_x509_cert=None) + self.assertEqual(cm.exception.args, expected_exc_args) + + # With cert: + cert = load_pem_x509_cert(self.xml_doc_cert_pem_bytes) + with self.assertRaises(NotImplementedError) as cm: + verify_xml_signature(xml_doc, trusted_x509_cert=cert) + self.assertEqual(cm.exception.args, expected_exc_args)