diff --git a/cl_sii/dte/parse.py b/cl_sii/dte/parse.py index dee45c42..aac22134 100644 --- a/cl_sii/dte/parse.py +++ b/cl_sii/dte/parse.py @@ -7,7 +7,8 @@ >>> from cl_sii.dte import parse >>> from cl_sii.libs import xml_utils ->>> with open('/dir/my_file.xml', mode='rb') as f: +>>> xml_file_path = '/dir/my_file.xml' +>>> with open(xml_file_path, mode='rb') as f: ... xml_doc = xml_utils.parse_untrusted_xml(f.read()) >>> parse.clean_dte_xml(xml_doc) @@ -16,11 +17,12 @@ >>> dte_struct = parse.parse_dte_xml(xml_doc) """ +import io import logging import os from dataclasses import MISSING, _MISSING_TYPE from datetime import date -from typing import Optional, Union +from typing import Optional, Tuple, Union import lxml.etree @@ -33,8 +35,18 @@ logger = logging.getLogger(__name__) +DTE_XMLNS = 'http://www.sii.cl/SiiDte' +""" +XML namespace for DTE element in DTE XML schema. + +Ref: target namespace in 'DTE_v10.xsd' and 'EnvioDTE_v10.xsd'. + +* cl_sii/data/ref/factura_electronica/schemas-xml/DTE_v10.xsd#L19 (f57a326) +* cl_sii/data/ref/factura_electronica/schemas-xml/EnvioDTE_v10.xsd#L14 (f57a326) +""" + DTE_XMLNS_MAP = { - 'sii-dte': 'http://www.sii.cl/SiiDte', + 'sii-dte': DTE_XMLNS, } """ Mapping from XML namespace prefix to full name, for DTE processing. @@ -59,29 +71,36 @@ # main functions ############################################################################### -def clean_dte_xml(xml_doc: lxml.etree.ElementBase) -> bool: +def clean_dte_xml( + xml_doc: lxml.etree.ElementBase, + set_missing_xmlns: bool = False, + remove_doc_personalizado: bool = True, +) -> Tuple[lxml.etree.ElementBase, bool]: """ - Remove some non-compliant (DTE XML schema) data from ``xml_doc``. + Apply changes to ``xml_doc`` towards compliance to DTE XML schema. + + .. seealso:: :data:`DTE_XML_SCHEMA_OBJ` - Not all non-compliant data is removed; only some corresponding to popular - modifications but non-compliant nonetheless. + There is a kwarg to enable/disable each kind of change. - The object is modified in-place. + .. warning:: + Do not assume the ``xml_doc``object is modified in-place because in + some cases it will be replaced (i.e. a entirely different object). - :returns: whether ``xml_doc`` was modified or not + :returns: new ``xml_doc`` and whether it was modified or not """ modified = False - xml_etree = xml_doc.getroottree() + if set_missing_xmlns: + xml_doc, _modified = _set_dte_xml_missing_xmlns(xml_doc) + modified = modified or _modified - # Remove non-standard but popular element 'DocPersonalizado'. - xml_em = xml_etree.find('sii-dte:DocPersonalizado', namespaces=DTE_XMLNS_MAP) - if xml_em is not None: - modified = True - xml_doc.remove(xml_em) + if remove_doc_personalizado: + xml_doc, _modified = _remove_dte_xml_doc_personalizado(xml_doc) + modified = modified or _modified - return modified + return xml_doc, modified def validate_dte_xml(xml_doc: lxml.etree.ElementBase) -> None: @@ -125,6 +144,58 @@ def parse_dte_xml(xml_doc: lxml.etree.ElementBase) -> data_models.DteDataL2: # helpers ############################################################################### +def _set_dte_xml_missing_xmlns( + xml_doc: lxml.etree.ElementBase, +) -> Tuple[lxml.etree.ElementBase, bool]: + + # source: name of the XML element without namespace. + # cl_sii/data/ref/factura_electronica/schemas-xml/DTE_v10.xsd#L22 (f57a326) + # cl_sii/data/ref/factura_electronica/schemas-xml/EnvioDTE_v10.xsd#L92 (f57a326) + em_tag_simple = 'DTE' + + em_namespace = DTE_XMLNS + em_tag_namespaced = '{%s}%s' % (em_namespace, em_tag_simple) + + # Tag of 'DTE' should be ... + assert em_tag_namespaced == '{http://www.sii.cl/SiiDte}DTE' + + modified = False + + root_em = xml_doc.getroottree().getroot() + root_em_tag = root_em.tag + + if root_em_tag == em_tag_namespaced: + pass + elif root_em_tag == em_tag_simple: + modified = True + root_em.set('xmlns', em_namespace) + f = io.BytesIO() + xml_utils.write_xml_doc(xml_doc, f) + new_xml_doc_bytes = f.getvalue() + xml_doc = xml_utils.parse_untrusted_xml(new_xml_doc_bytes) + else: + exc_msg = "XML root element tag does not match the expected simple or namespaced name." + raise Exception(exc_msg, em_tag_simple, em_tag_namespaced, root_em_tag) + + return xml_doc, modified + + +def _remove_dte_xml_doc_personalizado( + xml_doc: lxml.etree.ElementBase, +) -> Tuple[lxml.etree.ElementBase, bool]: + # Remove non-standard but popular element 'DocPersonalizado', it if exists. + + modified = False + em_path = 'sii-dte:DocPersonalizado' + + xml_em = xml_doc.getroottree().find(em_path, namespaces=DTE_XMLNS_MAP) + if xml_em is not None: + modified = True + xml_doc.remove(xml_em) + + return xml_doc, modified + + def _get_tipo_dte(xml_etree: lxml.etree.ElementTree) -> constants.TipoDteEnum: em_path = 'sii-dte:Documento/sii-dte:Encabezado/sii-dte:IdDoc/sii-dte:TipoDTE' diff --git a/cl_sii/libs/xml_utils.py b/cl_sii/libs/xml_utils.py index 1a9fd0a3..83a9d69e 100644 --- a/cl_sii/libs/xml_utils.py +++ b/cl_sii/libs/xml_utils.py @@ -1,5 +1,6 @@ import logging import os +from typing import IO import defusedxml import defusedxml.lxml @@ -237,3 +238,45 @@ def validate_xml_doc(xml_schema: lxml.etree.XMLSchema, xml_doc: lxml.etree.Eleme validation_error_msg = str(exc) raise XmlSchemaDocValidationError(validation_error_msg) from exc + + +def write_xml_doc(xml_doc: lxml.etree.ElementBase, output: IO[bytes]) -> None: + """ + Write ``xml_doc`` to bytes stream ``output``. + + In this context, "write" means "serialize", so there are a number of + observations on that regard: + + * Encoding will be preserved. + * XML declaration (````) will be included. + * Quoting of each XML declaration attribute's value may change + i.e. from ``"`` to ``'`` or viceversa. + * In self-closing tags, the whitespace between the last attribute + and the closing (``/>``) may be removed e.g. + ```` to + ```` + * No pretty-print. + + For a temporary bytes stream in memory you may create a + :class:`io.BytesIO` object. + + """ + # note: use `IO[X]` for arguments and `TextIO`/`BinaryIO` for return types (says GVR). + # https://github.com/python/typing/issues/518#issuecomment-350903120 + + xml_etree: lxml.etree.ElementTree = xml_doc.getroottree() + + # See: + # https://lxml.de/api/lxml.etree._ElementTree-class.html#write + xml_etree.write( + file=output, + encoding=xml_etree.docinfo.encoding, + # alternatives: 'xml', 'html', 'text' or 'c14n' + method='xml', + # note: include XML declaration (``). + xml_declaration=True, + pretty_print=False, + # note: we are not sure what this does. + # default: True. + with_tail=True, + ) diff --git a/scripts/clean_dte_xml_file.py b/scripts/clean_dte_xml_file.py new file mode 100755 index 00000000..c6880d45 --- /dev/null +++ b/scripts/clean_dte_xml_file.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python +""" +Clean DTE XML files. + + +Example for a single file:: + + ./scripts/clean_dte_xml_file.py file \ + 'tests/test_data/sii-dte/DTE--76354771-K--33--170.xml' \ + 'tests/test_data/sii-dte/DTE--76354771-K--33--170-clean.xml' + + +Example for all files in a directory:: + + ./scripts/clean_dte_xml_file.py dir 'tests/test_data/sii-dte/' + + +""" +import difflib +import os +import pathlib +import sys +from typing import Iterable + +try: + import cl_sii # noqa: F401 +except ImportError: + # If package 'cl-sii' is not installed, try appending the project repo directory to the + # Python path, assuming thath we are in the project repo. If not, it will fail nonetheless. + sys.path.append(os.path.dirname(os.path.abspath(__name__))) + import cl_sii # noqa: F401 + +import cl_sii.dte.parse +from cl_sii.libs import xml_utils + + +# TODO: log messages instead of print. + + +def clean_dte_xml_file(input_file_path: str, output_file_path: str) -> Iterable[bytes]: + with open(input_file_path, mode='rb') as f: + file_bytes = f.read() + + xml_doc = xml_utils.parse_untrusted_xml(file_bytes) + + xml_doc_cleaned, modified = cl_sii.dte.parse.clean_dte_xml( + xml_doc, + set_missing_xmlns=True, + remove_doc_personalizado=True, + ) + + cl_sii.dte.parse.validate_dte_xml(xml_doc_cleaned) + + with open(output_file_path, 'w+b') as f: + xml_utils.write_xml_doc(xml_doc_cleaned, f) + + with open(output_file_path, mode='rb') as f: + file_bytes_rewritten = f.read() + + # note: another way to compute the difference in a similar format is + # `diff -Naur $input_file_path $output_file_path` + file_bytes_diff_gen = difflib.diff_bytes( + dfunc=difflib.unified_diff, + a=file_bytes.splitlines(), + b=file_bytes_rewritten.splitlines()) + + return file_bytes_diff_gen + + +def main_single_file(input_file_path: str, output_file_path: str) -> None: + file_bytes_diff_gen = clean_dte_xml_file( + input_file_path=input_file_path, + output_file_path=output_file_path) + + for diff_line in file_bytes_diff_gen: + print(diff_line) + + +def main_dir_files(input_files_dir_path: str) -> None: + for p in pathlib.Path(input_files_dir_path).iterdir(): + if not p.is_file(): + continue + + # e.g. 'an example.xml' -> 'an example.clean.xml' + input_file_path = str(p) + output_file_path = str(p.with_suffix(f'.clean{p.suffix}')) + + print(f"\n\nWill clean file '{input_file_path}' and save it to '{output_file_path}'.") + file_bytes_diff_gen = clean_dte_xml_file( + input_file_path=input_file_path, + output_file_path=output_file_path) + + print(f"Difference between input and output files:") + diff_line = None + for diff_line in file_bytes_diff_gen: + print(diff_line) + if diff_line is None: + print(f"No difference.") + + +if __name__ == '__main__': + if sys.argv[1] == 'file': + main_single_file( + input_file_path=sys.argv[2], + output_file_path=sys.argv[3]) + elif sys.argv[1] == 'dir': + main_dir_files( + input_files_dir_path=sys.argv[2]) + else: + raise ValueError(f"Invalid option: '{sys.argv[1]}'") diff --git a/tests/test_dte_parse.py b/tests/test_dte_parse.py index 1e416018..fedb651b 100644 --- a/tests/test_dte_parse.py +++ b/tests/test_dte_parse.py @@ -1,12 +1,22 @@ +import difflib +import io import unittest +from datetime import date + +import cl_sii.dte.constants +from cl_sii.libs import xml_utils +from cl_sii.rut import Rut from cl_sii.dte.parse import ( # noqa: F401 clean_dte_xml, parse_dte_xml, validate_dte_xml, - DTE_XML_SCHEMA_OBJ, DTE_XMLNS_MAP, + _remove_dte_xml_doc_personalizado, _set_dte_xml_missing_xmlns, + DTE_XML_SCHEMA_OBJ, DTE_XMLNS, DTE_XMLNS_MAP ) +from .utils import read_test_file_bytes + -# TODO: add a real DTE XML file in 'tests/test_data/dte/'. +_TEST_DTE_NEEDS_CLEAN_FILE_PATH = 'test_data/sii-dte/DTE--76354771-K--33--170.xml' class OthersTest(unittest.TestCase): @@ -15,11 +25,131 @@ def test_DTE_XML_SCHEMA_OBJ(self) -> None: # TODO: implement pass + def test_integration_ok(self) -> None: + # TODO: split in separate tests, with more coverage. + + dte_bad_xml_file_path = _TEST_DTE_NEEDS_CLEAN_FILE_PATH + + file_bytes = read_test_file_bytes(dte_bad_xml_file_path) + xml_doc = xml_utils.parse_untrusted_xml(file_bytes) + + self.assertEqual( + xml_doc.getroottree().getroot().tag, + 'DTE') + + with self.assertRaises(xml_utils.XmlSchemaDocValidationError) as cm: + validate_dte_xml(xml_doc) + self.assertSequenceEqual( + cm.exception.args, + ("Element 'DTE': No matching global declaration available for the validation root., " + "line 2", ) + ) + # This would raise: + # parse_dte_xml(xml_doc) + + xml_doc_cleaned, modified = clean_dte_xml( + xml_doc, + set_missing_xmlns=True, + remove_doc_personalizado=True, + ) + self.assertTrue(modified) + + # This will not raise. + validate_dte_xml(xml_doc_cleaned) + + self.assertEqual( + xml_doc_cleaned.getroottree().getroot().tag, + '{%s}DTE' % DTE_XMLNS) + + f = io.BytesIO() + xml_utils.write_xml_doc(xml_doc_cleaned, f) + file_bytes_rewritten = f.getvalue() + del f + + xml_doc_rewritten = xml_utils.parse_untrusted_xml(file_bytes_rewritten) + validate_dte_xml(xml_doc_rewritten) + parsed_dte_rewritten = parse_dte_xml(xml_doc_cleaned) + + self.assertDictEqual( + dict(parsed_dte_rewritten.as_dict()), + dict( + emisor_rut=Rut('76354771-K'), + tipo_dte=cl_sii.dte.constants.TipoDteEnum.FACTURA_ELECTRONICA, + folio=170, + fecha_emision_date=date(2019, 4, 1), + receptor_rut=Rut('96790240-3'), + monto_total=2996301, + emisor_razon_social='INGENIERIA ENACON SPA', + receptor_razon_social='MINERA LOS PELAMBRES', + fecha_vencimiento_date=None, + )) + + expected_file_bytes_diff = ( + b'--- \n', + b'+++ \n', + b'@@ -1,5 +1,5 @@\n', + b'-', + b'-', + b"+", + b'+', + b' ', + b' ', + b' ', + b'@@ -59,13 +59,13 @@\n', + b' ', + b' ', + b' ', + b'-', # noqa: E501 + b'-', + b'+', # noqa: E501 + b'+', + b' ', + b' ', + b'-', + b'+', + b' ', + b'-', + b'+', + b' ij2Qn6xOc2eRx3hwyO/GrzptoBk=', + b' ', + b' ', + ) + + file_bytes_diff_gen = difflib.diff_bytes( + dfunc=difflib.unified_diff, + a=file_bytes.splitlines(), + b=file_bytes_rewritten.splitlines()) + self.assertSequenceEqual( + [diff_line for diff_line in file_bytes_diff_gen], + expected_file_bytes_diff + ) + class FunctionCleanDteXmlTest(unittest.TestCase): - # TODO: implement - pass + def test_clean_dte_xml_ok(self) -> None: + # TODO: implement + pass + + def test_clean_dte_xml_fail(self) -> None: + # TODO: implement + pass + + def test__set_dte_xml_missing_xmlns_ok(self) -> None: + # TODO: implement + pass + + def test__set_dte_xml_missing_xmlns_fail(self) -> None: + # TODO: implement + pass + + def test__remove_dte_xml_doc_personalizado_ok(self) -> None: + # TODO: implement + pass + + def test__remove_dte_xml_doc_personalizado_fail(self) -> None: + # TODO: implement + pass class FunctionParseDteXmlTest(unittest.TestCase): diff --git a/tests/test_libs_xml_utils.py b/tests/test_libs_xml_utils.py index bb04316e..a8a15723 100644 --- a/tests/test_libs_xml_utils.py +++ b/tests/test_libs_xml_utils.py @@ -4,7 +4,7 @@ from cl_sii.libs.xml_utils import ( # noqa: F401 XmlSyntaxError, XmlFeatureForbidden, - parse_untrusted_xml, read_xml_schema, validate_xml_doc, + parse_untrusted_xml, read_xml_schema, validate_xml_doc, write_xml_doc, ) from .utils import read_test_file_bytes @@ -99,3 +99,9 @@ class FunctionValidateXmlDocTest(unittest.TestCase): # TODO: implement pass + + +class FunctionWriteXmlDocTest(unittest.TestCase): + + # TODO: implement for function 'write_xml_doc'. Consider each of the "observations". + pass diff --git a/tests/test_scripts_clean_dte_xml_file.py b/tests/test_scripts_clean_dte_xml_file.py new file mode 100644 index 00000000..05e89749 --- /dev/null +++ b/tests/test_scripts_clean_dte_xml_file.py @@ -0,0 +1,2 @@ + +# TODO: implement tests for script 'clean_dte_xml_file.py'