diff --git a/cl_sii/dte/parse.py b/cl_sii/dte/parse.py index 00b6077d..39b42f1e 100644 --- a/cl_sii/dte/parse.py +++ b/cl_sii/dte/parse.py @@ -131,7 +131,7 @@ def parse_dte_xml(xml_doc: XmlElement) -> data_models.DteDataL2: # TODO: change response type to a dataclass like 'DteXmlData'. # TODO: separate the XML parsing stage from the deserialization stage, which could be # performed by XML-agnostic code (perhaps using Marshmallow or data clacases?). - # See :class:`cl_sii.rcv.parse.RcvCsvRowSchema`. + # See :class:`cl_sii.rcv.parse_csv.RcvVentaCsvRowSchema`. if not isinstance(xml_doc, (XmlElement, XmlElementTree)): raise TypeError("'xml_doc' must be an 'XmlElement'.") diff --git a/cl_sii/rcv/__init__.py b/cl_sii/rcv/__init__.py index 03ca583b..9661404c 100644 --- a/cl_sii/rcv/__init__.py +++ b/cl_sii/rcv/__init__.py @@ -9,64 +9,3 @@ http://www.sii.cl/preguntas_frecuentes/catastro/001_012_6971.htm """ -import csv -import io -from typing import Callable - -from . import parse - - -def process_rcv_csv_file( - text_stream: io.TextIOBase, - rcv_owner_rut: str, - row_data_handler: Callable, - max_data_rows: int = None, -) -> int: - """ - Process a RCV CSV file. - - Processing steps: - - Create a CSV reader, with auto-detection of header names (first row). - - Instantiate an schema to parse and deserialize each row. - - For each data row: - - Using an appropriate schema, deserialize the raw data. - - Apply ``row_data_handler`` to the deserialization output. - - :param text_stream: a file-like object, not necessarily a real file - :param rcv_owner_rut: RCV file owner's RUT - :param row_data_handler: function be called with parsed row data - :param max_data_rows: max number of data rows to process (raise exception if exceeded); - ``None`` means no limit - :return: number of data rows processed - - """ - # TODO: convert to iterator. That way we do not need the 'row_data_handler' and we can also use - # the same function to retrieve the collection of deserialized rows. - - csv_reader = parse.create_rcv_csv_reader(text_stream, expected_fields_strict=True) - schema = parse.RcvCsvRowSchema(context=dict(receptor_rut=rcv_owner_rut)) - - try: - for row_ix, row_data in enumerate(csv_reader, start=1): - if max_data_rows is not None and row_ix > max_data_rows: - # TODO: custom exception - raise Exception("Exceeded 'max_data_rows' value: {}.".format(max_data_rows)) - - try: - deserialized_row_data = schema.deserialize_csv_row(row_data) - except Exception as exc: - exc_msg = "Error deserializing row {} of CSV file: {}".format(row_ix, exc) - raise Exception(exc_msg) from exc - try: - row_data_handler(row_ix, deserialized_row_data) - except Exception as exc: - exc_msg = "Error in row_data_handler for row {} of CSV file: {}".format(row_ix, exc) - raise Exception(exc_msg) from exc - - # The first row in the CSV file is not a data row; it is the headers row. - rows_processed = csv_reader.line_num - 1 - except csv.Error as exc: - exc_msg = "CSV error for line {} of CSV file: {}".format(csv_reader.line_num, exc) - raise Exception(exc_msg) from exc - - return rows_processed diff --git a/cl_sii/rcv/parse.py b/cl_sii/rcv/parse.py deleted file mode 100644 index 8f22786a..00000000 --- a/cl_sii/rcv/parse.py +++ /dev/null @@ -1,195 +0,0 @@ -import csv -import io -from collections import OrderedDict - -import marshmallow -import marshmallow.fields -import marshmallow.validate - -from cl_sii.extras import mm_fields -from cl_sii.libs import mm_utils -from cl_sii.libs import tz_utils - - -_CSV_ROW_DICT_EXTRA_FIELDS_KEY = None -"""CSV row dict key under which the extra data in the row will be saved.""" - -_RCV_CSV_EXPECTED_FIELD_NAMES = ( - 'Nro', - 'Tipo Doc', - 'Tipo Compra', - 'RUT Proveedor', - 'Razon Social', - 'Folio', - 'Fecha Docto', - 'Fecha Recepcion', - 'Fecha Acuse', - 'Monto Exento', - 'Monto Neto', - 'Monto IVA Recuperable', - 'Monto Iva No Recuperable', - 'Codigo IVA No Rec.', - 'Monto Total', - 'Monto Neto Activo Fijo', - 'IVA Activo Fijo', - 'IVA uso Comun', - 'Impto. Sin Derecho a Credito', - 'IVA No Retenido', - 'Tabacos Puros', - 'Tabacos Cigarrillos', - 'Tabacos Elaborados', - 'NCE o NDE sobre Fact. de Compra', - 'Codigo Otro Impuesto', - 'Valor Otro Impuesto', - 'Tasa Otro Impuesto', -) -_RCV_CSV_DIALECT_KEY = 'sii_rcv' - - -class _RcvCsvDialect(csv.Dialect): - - """ - CSV dialect of RCV CSV files. - - The properties of this dialect were determined with the help of - :class:`csv.Sniffer`. - - >>> import gzip - >>> filename = 'SII-download-RCV-file-http-body-response.csv.gz' - >>> with gzip.open(filename, 'rt', encoding='utf-8') as f: - ... dialect = csv.Sniffer().sniff(f.read(50 * 1024)) - - """ - - delimiter = ';' - quotechar = '"' - escapechar = None - doublequote = False - skipinitialspace = False - lineterminator = '\r\n' - quoting = csv.QUOTE_MINIMAL - - -csv.register_dialect(_RCV_CSV_DIALECT_KEY, _RcvCsvDialect) - - -class RcvCsvRowSchema(marshmallow.Schema): - - EXPECTED_INPUT_FIELDS = tuple(_RCV_CSV_EXPECTED_FIELD_NAMES) + (_CSV_ROW_DICT_EXTRA_FIELDS_KEY, ) # type: ignore # noqa: E501 - FIELD_FECHA_RECEPCION_DATETIME_TZ = tz_utils.TZ_CL_SANTIAGO - - class Meta: - strict = True - - emisor_rut = mm_fields.RutField( - required=True, - load_from='RUT Proveedor', - ) - tipo_dte = marshmallow.fields.Integer( - required=True, - load_from='Tipo Doc', - ) - folio = marshmallow.fields.Integer( - required=True, - load_from='Folio', - ) - fecha_emision_date = mm_utils.CustomMarshmallowDateField( - format='%d/%m/%Y', # e.g. '22/10/2018' - required=True, - load_from='Fecha Docto', - ) - fecha_recepcion_datetime = marshmallow.fields.DateTime( - format='%d/%m/%Y %H:%M:%S', # e.g. '23/10/2018 01:54:13' - required=True, - load_from='Fecha Recepcion', - ) - # note: this field value is set using data passed in the schema context. - receptor_rut = mm_fields.RutField( - required=True, - ) - monto_total = marshmallow.fields.Integer( - required=True, - load_from='Monto Total', - ) - - @marshmallow.pre_load - def preprocess(self, in_data: dict) -> dict: - # note: required fields checks are run later on automatically thus we may not assume that - # values of required fields (`required=True`) exist. - - # Set field value only if it was not in the input data. - in_data.setdefault('receptor_rut', self.context['receptor_rut']) - - return in_data - - @marshmallow.post_load - def postprocess(self, data: dict) -> dict: - # >>> data['fecha_recepcion_datetime'].isoformat() - # '2018-10-23T01:54:13' - data['fecha_recepcion_datetime'] = tz_utils.convert_naive_dt_to_tz_aware( - dt=data['fecha_recepcion_datetime'], tz=self.FIELD_FECHA_RECEPCION_DATETIME_TZ) - # >>> data['fecha_recepcion_datetime'].isoformat() - # '2018-10-23T01:54:13-03:00' - # >>> data['fecha_recepcion_datetime'].astimezone(pytz.UTC).isoformat() - # '2018-10-23T04:54:13+00:00' - - # note: to express this value in another timezone (but the value does not change), do - # `datetime_obj.astimezone(pytz.timezone('some timezone'))` - - return data - - @marshmallow.validates_schema(pass_original=True) - def validate_schema(self, data: dict, original_data: dict) -> None: - # Fail validation if there was an unexpected input field. - unexpected_input_fields = ( - set(original_data) - - set(self.fields) - - set(self.EXPECTED_INPUT_FIELDS) - ) - if unexpected_input_fields: - raise marshmallow.ValidationError( - 'Unexpected input field', field_names=list(unexpected_input_fields)) - - # @marshmallow.validates('field_x') - # def validate_field_x(self, value): - # pass - - ########################################################################### - # non-marshmallow-related methods - ########################################################################### - - def deserialize_csv_row(self, row: OrderedDict) -> dict: - try: - result = self.load(row) # type: marshmallow.UnmarshalResult - except marshmallow.ValidationError as exc: - exc_msg = "Validation errors during deserialization." - validation_error_msgs = dict(exc.normalized_messages()) - raise ValueError(exc_msg, validation_error_msgs) from exc - - result_data = result.data # type: dict - result_errors = result.errors # type: dict - if result_errors: - raise Exception("Deserialization errors: %s", result_errors) - return result_data - - -def create_rcv_csv_reader( - text_stream: io.TextIOBase, - expected_fields_strict: bool = True, -) -> csv.DictReader: - # note: mypy wrongly complains: it does not accept 'fieldnames' to be None but that value - # is completely acceptable, and it even is the default! - # > error: Argument "fieldnames" to "DictReader" has incompatible type "None"; expected - # > "Sequence[str]" - csv_reader = csv.DictReader( # type: ignore - text_stream, - fieldnames=None, # the values of the first row will be used as the fieldnames - restkey=_CSV_ROW_DICT_EXTRA_FIELDS_KEY, - dialect=_RCV_CSV_DIALECT_KEY, - ) - if expected_fields_strict and tuple(csv_reader.fieldnames) != _RCV_CSV_EXPECTED_FIELD_NAMES: - raise Exception( - "CSV file field names do not match those expected, or their order.", - csv_reader.fieldnames) - - return csv_reader diff --git a/cl_sii/rcv/parse_csv.py b/cl_sii/rcv/parse_csv.py index 392c4e7a..12abf4c7 100644 --- a/cl_sii/rcv/parse_csv.py +++ b/cl_sii/rcv/parse_csv.py @@ -139,6 +139,303 @@ def parse_rcv_venta_csv_file( ) +def parse_rcv_compra_registro_csv_file( + receptor_rut: Rut, + receptor_razon_social: str, + input_file_path: str, + n_rows_offset: int = 0, + max_n_rows: int = None, +) -> Iterable[Tuple[Optional[DteDataL2], int, Dict[str, object], Dict[str, object]]]: + """ + Parse DTE data objects from a RCV "Compra/Registro" file (CSV). + + """ + schema_context = dict( + receptor_rut=receptor_rut, + receptor_razon_social=receptor_razon_social, + ) + input_csv_row_schema = RcvCompraRegistroCsvRowSchema(context=schema_context) + + expected_input_field_names = ( + 'Nro', + 'Tipo Doc', # 'tipo_dte' + 'Tipo Compra', + 'RUT Proveedor', # 'emisor_rut' + 'Razon Social', # 'emisor_razon_social' + 'Folio', # 'folio' + 'Fecha Docto', # 'fecha_emision_date' + 'Fecha Recepcion', # 'fecha_recepcion_dt' + 'Fecha Acuse', # 'fecha_acuse_dt' + 'Monto Exento', + 'Monto Neto', + 'Monto IVA Recuperable', + 'Monto Iva No Recuperable', + 'Codigo IVA No Rec.', + 'Monto Total', # 'monto_total' + 'Monto Neto Activo Fijo', + 'IVA Activo Fijo', + 'IVA uso Comun', + 'Impto. Sin Derecho a Credito', + 'IVA No Retenido', + 'Tabacos Puros', + 'Tabacos Cigarrillos', + 'Tabacos Elaborados', + 'NCE o NDE sobre Fact. de Compra', + 'Codigo Otro Impuesto', + 'Valor Otro Impuesto', + 'Tasa Otro Impuesto', + ) + + fields_to_remove_names = ( + 'Nro', + 'Tipo Compra', + 'Monto Exento', + 'Monto Neto', + 'Monto IVA Recuperable', + 'Monto Iva No Recuperable', + 'Codigo IVA No Rec.', + 'Monto Neto Activo Fijo', + 'IVA Activo Fijo', + 'IVA uso Comun', + 'Impto. Sin Derecho a Credito', + 'IVA No Retenido', + 'Tabacos Puros', + 'Tabacos Cigarrillos', + 'Tabacos Elaborados', + 'NCE o NDE sobre Fact. de Compra', + 'Codigo Otro Impuesto', + 'Valor Otro Impuesto', + 'Tasa Otro Impuesto', + ) + + yield from _parse_rcv_csv_file( + input_csv_row_schema, + expected_input_field_names, + fields_to_remove_names, + input_file_path, + n_rows_offset, + max_n_rows, + ) + + +def parse_rcv_compra_no_incluir_csv_file( + receptor_rut: Rut, + receptor_razon_social: str, + input_file_path: str, + n_rows_offset: int = 0, + max_n_rows: int = None, +) -> Iterable[Tuple[Optional[DteDataL2], int, Dict[str, object], Dict[str, object]]]: + """ + Parse DTE data objects from a RCV "Compra/no incluir" file (CSV). + + """ + schema_context = dict( + receptor_rut=receptor_rut, + receptor_razon_social=receptor_razon_social, + ) + input_csv_row_schema = RcvCompraNoIncluirCsvRowSchema(context=schema_context) + + expected_input_field_names = ( + 'Nro', + 'Tipo Doc', # 'tipo_dte' + 'Tipo Compra', + 'RUT Proveedor', # 'emisor_rut' + 'Razon Social', # 'emisor_razon_social' + 'Folio', # 'folio' + 'Fecha Docto', # 'fecha_emision_date' + 'Fecha Recepcion', # 'fecha_recepcion_dt' + 'Fecha Acuse', # 'fecha_acuse_dt' + 'Monto Exento', + 'Monto Neto', + 'Monto IVA Recuperable', + 'Monto Iva No Recuperable', + 'Codigo IVA No Rec.', + 'Monto Total', # 'monto_total' + 'Monto Neto Activo Fijo', + 'IVA Activo Fijo', + 'IVA uso Comun', + 'Impto. Sin Derecho a Credito', + 'IVA No Retenido', + 'NCE o NDE sobre Fact. de Compra', + 'Codigo Otro Impuesto', + 'Valor Otro Impuesto', + 'Tasa Otro Impuesto', + ) + + fields_to_remove_names = ( + 'Nro', + 'Tipo Compra', + 'Monto Exento', + 'Monto Neto', + 'Monto IVA Recuperable', + 'Monto Iva No Recuperable', + 'Codigo IVA No Rec.', + 'Monto Neto Activo Fijo', + 'IVA Activo Fijo', + 'IVA uso Comun', + 'Impto. Sin Derecho a Credito', + 'IVA No Retenido', + 'NCE o NDE sobre Fact. de Compra', + 'Codigo Otro Impuesto', + 'Valor Otro Impuesto', + 'Tasa Otro Impuesto', + ) + + yield from _parse_rcv_csv_file( + input_csv_row_schema, + expected_input_field_names, + fields_to_remove_names, + input_file_path, + n_rows_offset, + max_n_rows, + ) + + +def parse_rcv_compra_reclamado_csv_file( + receptor_rut: Rut, + receptor_razon_social: str, + input_file_path: str, + n_rows_offset: int = 0, + max_n_rows: int = None, +) -> Iterable[Tuple[Optional[DteDataL2], int, Dict[str, object], Dict[str, object]]]: + """ + Parse DTE data objects from a RCV "Compra/reclamado" file (CSV). + + """ + schema_context = dict( + receptor_rut=receptor_rut, + receptor_razon_social=receptor_razon_social, + ) + input_csv_row_schema = RcvCompraReclamadoCsvRowSchema(context=schema_context) + + expected_input_field_names = ( + 'Nro', + 'Tipo Doc', # 'tipo_dte' + 'Tipo Compra', + 'RUT Proveedor', # 'emisor_rut' + 'Razon Social', # 'emisor_razon_social' + 'Folio', # 'folio' + 'Fecha Docto', # 'fecha_emision_date' + 'Fecha Recepcion', # 'fecha_recepcion_dt' + 'Fecha Reclamo', # 'fecha_reclamo_dt' + 'Monto Exento', + 'Monto Neto', + 'Monto IVA Recuperable', + 'Monto Iva No Recuperable', + 'Codigo IVA No Rec.', + 'Monto Total', # 'monto_total' + 'Monto Neto Activo Fijo', + 'IVA Activo Fijo', + 'IVA uso Comun', + 'Impto. Sin Derecho a Credito', + 'IVA No Retenido', + 'NCE o NDE sobre Fact. de Compra', + 'Codigo Otro Impuesto', + 'Valor Otro Impuesto', + 'Tasa Otro Impuesto', + ) + + fields_to_remove_names = ( + 'Nro', + 'Tipo Compra', + 'Monto Exento', + 'Monto Neto', + 'Monto IVA Recuperable', + 'Monto Iva No Recuperable', + 'Codigo IVA No Rec.', + 'Monto Neto Activo Fijo', + 'IVA Activo Fijo', + 'IVA uso Comun', + 'Impto. Sin Derecho a Credito', + 'IVA No Retenido', + 'NCE o NDE sobre Fact. de Compra', + 'Codigo Otro Impuesto', + 'Valor Otro Impuesto', + 'Tasa Otro Impuesto', + ) + + yield from _parse_rcv_csv_file( + input_csv_row_schema, + expected_input_field_names, + fields_to_remove_names, + input_file_path, + n_rows_offset, + max_n_rows, + ) + + +def parse_rcv_compra_pendiente_csv_file( + receptor_rut: Rut, + receptor_razon_social: str, + input_file_path: str, + n_rows_offset: int = 0, + max_n_rows: int = None, +) -> Iterable[Tuple[Optional[DteDataL2], int, Dict[str, object], Dict[str, object]]]: + """ + Parse DTE data objects from a RCV "Compra/pendiente" file (CSV). + + """ + schema_context = dict( + receptor_rut=receptor_rut, + receptor_razon_social=receptor_razon_social, + ) + input_csv_row_schema = RcvCompraPendienteCsvRowSchema(context=schema_context) + + expected_input_field_names = ( + 'Nro', + 'Tipo Doc', # 'tipo_dte' + 'Tipo Compra', + 'RUT Proveedor', # 'emisor_rut' + 'Razon Social', # 'emisor_razon_social' + 'Folio', # 'folio' + 'Fecha Docto', # 'fecha_emision_date' + 'Fecha Recepcion', # 'fecha_recepcion_dt' + 'Monto Exento', + 'Monto Neto', + 'Monto IVA Recuperable', + 'Monto Iva No Recuperable', + 'Codigo IVA No Rec.', + 'Monto Total', # 'monto_total' + 'Monto Neto Activo Fijo', + 'IVA Activo Fijo', + 'IVA uso Comun', + 'Impto. Sin Derecho a Credito', + 'IVA No Retenido', + 'NCE o NDE sobre Fact. de Compra', + 'Codigo Otro Impuesto', + 'Valor Otro Impuesto', + 'Tasa Otro Impuesto', + ) + + fields_to_remove_names = ( + 'Nro', + 'Tipo Compra', + 'Monto Exento', + 'Monto Neto', + 'Monto IVA Recuperable', + 'Monto Iva No Recuperable', + 'Codigo IVA No Rec.', + 'Monto Neto Activo Fijo', + 'IVA Activo Fijo', + 'IVA uso Comun', + 'Impto. Sin Derecho a Credito', + 'IVA No Retenido', + 'NCE o NDE sobre Fact. de Compra', + 'Codigo Otro Impuesto', + 'Valor Otro Impuesto', + 'Tasa Otro Impuesto', + ) + + yield from _parse_rcv_csv_file( + input_csv_row_schema, + expected_input_field_names, + fields_to_remove_names, + input_file_path, + n_rows_offset, + max_n_rows, + ) + + ############################################################################### # schemas ############################################################################### @@ -307,6 +604,309 @@ def postprocess(self, data: dict) -> dict: return data +class RcvCompraRegistroCsvRowSchema(_RcvCsvRowSchemaBase): + + FIELD_FECHA_RECEPCION_DT_TZ = DteDataL2.DATETIME_FIELDS_TZ + FIELD_FECHA_ACUSE_DT_TZ = DteDataL2.DATETIME_FIELDS_TZ + + class Meta: + strict = True + + ########################################################################### + # basic fields + ########################################################################### + + emisor_rut = mm_fields.RutField( + required=True, + load_from='RUT Proveedor', + ) + tipo_dte = mm_fields.TipoDteField( + required=True, + load_from='Tipo Doc', + ) + folio = marshmallow.fields.Integer( + required=True, + load_from='Folio', + ) + fecha_emision_date = mm_utils.CustomMarshmallowDateField( + format='%d/%m/%Y', # e.g. '22/10/2018' + required=True, + load_from='Fecha Docto', + ) + monto_total = marshmallow.fields.Integer( + required=True, + load_from='Monto Total', + ) + emisor_razon_social = marshmallow.fields.String( + required=True, + load_from='Razon Social', + ) + + ########################################################################### + # fields whose value is set using data passed in the schema context + ########################################################################### + + receptor_rut = mm_fields.RutField( + required=True, + ) + receptor_razon_social = marshmallow.fields.String( + required=True, + ) + + ########################################################################### + # extra fields: not included in the returned struct + ########################################################################### + + fecha_recepcion_dt = marshmallow.fields.DateTime( + format='%d/%m/%Y %H:%M:%S', # e.g. '23/10/2018 01:54:13' + required=True, + load_from='Fecha Recepcion', + ) + fecha_acuse_dt = marshmallow.fields.DateTime( + format='%d/%m/%Y %H:%M:%S', # e.g. '23/10/2018 01:54:13' + required=True, + allow_none=True, + load_from='Fecha Acuse', + ) + + @marshmallow.pre_load + def preprocess(self, in_data: dict) -> dict: + # note: required fields checks are run later on automatically thus we may not assume that + # values of required fields (`required=True`) exist. + + # Set field value only if it was not in the input data. + in_data.setdefault('receptor_rut', self.context['receptor_rut']) + in_data.setdefault('receptor_razon_social', self.context['receptor_razon_social']) + + # Fix missing/default values. + if 'Fecha Acuse' in in_data: + if in_data['Fecha Acuse'] == '': + in_data['Fecha Acuse'] = None + + return in_data + + @marshmallow.post_load + def postprocess(self, data: dict) -> dict: + # >>> data['fecha_recepcion_dt'].isoformat() + # '2018-10-23T01:54:13' + data['fecha_recepcion_dt'] = tz_utils.convert_naive_dt_to_tz_aware( + dt=data['fecha_recepcion_dt'], tz=self.FIELD_FECHA_RECEPCION_DT_TZ) + # >>> data['fecha_recepcion_dt'].isoformat() + # '2018-10-23T01:54:13-03:00' + # >>> data['fecha_recepcion_dt'].astimezone(pytz.UTC).isoformat() + # '2018-10-23T04:54:13+00:00' + + if data['fecha_acuse_dt']: + data['fecha_acuse_dt'] = tz_utils.convert_naive_dt_to_tz_aware( + dt=data['fecha_acuse_dt'], tz=self.FIELD_FECHA_ACUSE_DT_TZ) + + # note: to express this value in another timezone (but the value does not change), do + # `dt_obj.astimezone(pytz.timezone('some timezone'))` + + return data + + +RcvCompraNoIncluirCsvRowSchema = RcvCompraRegistroCsvRowSchema + + +class RcvCompraReclamadoCsvRowSchema(_RcvCsvRowSchemaBase): + + FIELD_FECHA_RECEPCION_DT_TZ = DteDataL2.DATETIME_FIELDS_TZ + FIELD_FECHA_RECLAMO_DT_TZ = DteDataL2.DATETIME_FIELDS_TZ + + class Meta: + strict = True + + ########################################################################### + # basic fields + ########################################################################### + + emisor_rut = mm_fields.RutField( + required=True, + load_from='RUT Proveedor', + ) + tipo_dte = mm_fields.TipoDteField( + required=True, + load_from='Tipo Doc', + ) + folio = marshmallow.fields.Integer( + required=True, + load_from='Folio', + ) + fecha_emision_date = mm_utils.CustomMarshmallowDateField( + format='%d/%m/%Y', # e.g. '22/10/2018' + required=True, + load_from='Fecha Docto', + ) + monto_total = marshmallow.fields.Integer( + required=True, + load_from='Monto Total', + ) + emisor_razon_social = marshmallow.fields.String( + required=True, + load_from='Razon Social', + ) + + ########################################################################### + # fields whose value is set using data passed in the schema context + ########################################################################### + + receptor_rut = mm_fields.RutField( + required=True, + ) + receptor_razon_social = marshmallow.fields.String( + required=True, + ) + + ########################################################################### + # extra fields: not included in the returned struct + ########################################################################### + + fecha_recepcion_dt = marshmallow.fields.DateTime( + format='%d/%m/%Y %H:%M:%S', # e.g. '23/10/2018 01:54:13' + required=True, + load_from='Fecha Recepcion', + ) + fecha_reclamo_dt = marshmallow.fields.DateTime( + # note: for some reason the DTEs with `tipo_dte=` + # (and maybe others as well) do not have this field set (always? we do not know). + format='%d/%m/%Y %H:%M:%S', # e.g. '23/10/2018 01:54:13' + required=False, + allow_none=True, + load_from='Fecha Reclamo', + ) + + @marshmallow.pre_load + def preprocess(self, in_data: dict) -> dict: + # note: required fields checks are run later on automatically thus we may not assume that + # values of required fields (`required=True`) exist. + + # Set field value only if it was not in the input data. + in_data.setdefault('receptor_rut', self.context['receptor_rut']) + in_data.setdefault('receptor_razon_social', self.context['receptor_razon_social']) + + # Fix missing/default values. + # note: for some reason the DTEs with `tipo_dte=` + # (and maybe others as well) do not have this field set (always? we do not know). + if 'Fecha Reclamo' in in_data: + if in_data['Fecha Reclamo'] == '' or 'null' in in_data['Fecha Reclamo']: + in_data['Fecha Reclamo'] = None + + return in_data + + @marshmallow.post_load + def postprocess(self, data: dict) -> dict: + # >>> data['fecha_recepcion_dt'].isoformat() + # '2018-10-23T01:54:13' + data['fecha_recepcion_dt'] = tz_utils.convert_naive_dt_to_tz_aware( + dt=data['fecha_recepcion_dt'], tz=self.FIELD_FECHA_RECEPCION_DT_TZ) + # >>> data['fecha_recepcion_dt'].isoformat() + # '2018-10-23T01:54:13-03:00' + # >>> data['fecha_recepcion_dt'].astimezone(pytz.UTC).isoformat() + # '2018-10-23T04:54:13+00:00' + + if data['fecha_reclamo_dt']: + data['fecha_reclamo_dt'] = tz_utils.convert_naive_dt_to_tz_aware( + dt=data['fecha_reclamo_dt'], tz=self.FIELD_FECHA_RECLAMO_DT_TZ) + + # note: to express this value in another timezone (but the value does not change), do + # `dt_obj.astimezone(pytz.timezone('some timezone'))` + + return data + + +class RcvCompraPendienteCsvRowSchema(_RcvCsvRowSchemaBase): + + FIELD_FECHA_RECEPCION_DT_TZ = DteDataL2.DATETIME_FIELDS_TZ + FIELD_FECHA_ACUSE_DT_TZ = DteDataL2.DATETIME_FIELDS_TZ + + class Meta: + strict = True + + ########################################################################### + # basic fields + ########################################################################### + + emisor_rut = mm_fields.RutField( + required=True, + load_from='RUT Proveedor', + ) + tipo_dte = mm_fields.TipoDteField( + required=True, + load_from='Tipo Doc', + ) + folio = marshmallow.fields.Integer( + required=True, + load_from='Folio', + ) + fecha_emision_date = mm_utils.CustomMarshmallowDateField( + format='%d/%m/%Y', # e.g. '22/10/2018' + required=True, + load_from='Fecha Docto', + ) + monto_total = marshmallow.fields.Integer( + required=True, + load_from='Monto Total', + ) + emisor_razon_social = marshmallow.fields.String( + required=True, + load_from='Razon Social', + ) + + ########################################################################### + # fields whose value is set using data passed in the schema context + ########################################################################### + + receptor_rut = mm_fields.RutField( + required=True, + ) + receptor_razon_social = marshmallow.fields.String( + required=True, + ) + + ########################################################################### + # extra fields: not included in the returned struct + ########################################################################### + + fecha_recepcion_dt = marshmallow.fields.DateTime( + format='%d/%m/%Y %H:%M:%S', # e.g. '23/10/2018 01:54:13' + required=True, + load_from='Fecha Recepcion', + ) + + @marshmallow.pre_load + def preprocess(self, in_data: dict) -> dict: + # note: required fields checks are run later on automatically thus we may not assume that + # values of required fields (`required=True`) exist. + + # Set field value only if it was not in the input data. + in_data.setdefault('receptor_rut', self.context['receptor_rut']) + in_data.setdefault('receptor_razon_social', self.context['receptor_razon_social']) + + # Fix missing/default values. + if 'Fecha Acuse' in in_data: + if in_data['Fecha Acuse'] == '': + in_data['Fecha Acuse'] = None + + return in_data + + @marshmallow.post_load + def postprocess(self, data: dict) -> dict: + # >>> data['fecha_recepcion_dt'].isoformat() + # '2018-10-23T01:54:13' + data['fecha_recepcion_dt'] = tz_utils.convert_naive_dt_to_tz_aware( + dt=data['fecha_recepcion_dt'], tz=self.FIELD_FECHA_RECEPCION_DT_TZ) + # >>> data['fecha_recepcion_dt'].isoformat() + # '2018-10-23T01:54:13-03:00' + # >>> data['fecha_recepcion_dt'].astimezone(pytz.UTC).isoformat() + # '2018-10-23T04:54:13+00:00' + + # note: to express this value in another timezone (but the value does not change), do + # `dt_obj.astimezone(pytz.timezone('some timezone'))` + + return data + + ############################################################################### # helpers ############################################################################### diff --git a/tests/test_rcv.py b/tests/test_rcv.py deleted file mode 100644 index 81f0d3f0..00000000 --- a/tests/test_rcv.py +++ /dev/null @@ -1,10 +0,0 @@ -import unittest - -from cl_sii.rcv import process_rcv_csv_file # noqa: F401 - - -class FunctionsTest(unittest.TestCase): - - def test_process_rcv_csv_file(self) -> None: - # TODO: implement! - pass diff --git a/tests/test_rcv_parse.py b/tests/test_rcv_parse.py deleted file mode 100644 index 8cf1812b..00000000 --- a/tests/test_rcv_parse.py +++ /dev/null @@ -1,16 +0,0 @@ -import unittest - -from cl_sii.rcv.parse import RcvCsvRowSchema, create_rcv_csv_reader # noqa: F401 - - -class RcvCsvRowSchemaTest(unittest.TestCase): - - # TODO: implement! - pass - - -class FunctionsTest(unittest.TestCase): - - def test_create_rcv_csv_reader(self) -> None: - # TODO: implement! - pass diff --git a/tests/test_rcv_parse_csv.py b/tests/test_rcv_parse_csv.py index 6dc7741c..a03110df 100644 --- a/tests/test_rcv_parse_csv.py +++ b/tests/test_rcv_parse_csv.py @@ -1,7 +1,13 @@ import unittest from cl_sii.rcv.parse_csv import ( # noqa: F401 - RcvVentaCsvRowSchema, parse_rcv_venta_csv_file, _parse_rcv_csv_file, + RcvCompraNoIncluirCsvRowSchema, RcvCompraPendienteCsvRowSchema, + RcvCompraReclamadoCsvRowSchema, RcvCompraRegistroCsvRowSchema, + RcvVentaCsvRowSchema, + parse_rcv_compra_no_incluir_csv_file, parse_rcv_compra_pendiente_csv_file, + parse_rcv_compra_reclamado_csv_file, parse_rcv_compra_registro_csv_file, + parse_rcv_venta_csv_file, + _parse_rcv_csv_file, ) @@ -11,12 +17,52 @@ class RcvVentaCsvRowSchemaTest(unittest.TestCase): pass +class RcvCompraRegistroCsvRowSchemaTest(unittest.TestCase): + + # TODO: implement for 'RcvCompraRegistroCsvRowSchema'. + pass + + +class RcvCompraNoIncluirCsvRowSchemaTest(unittest.TestCase): + + # TODO: implement for 'RcvCompraNoIncluirCsvRowSchema'. + pass + + +class RcvCompraReclamadoCsvRowSchemaTest(unittest.TestCase): + + # TODO: implement for 'RcvCompraReclamadoCsvRowSchema'. + pass + + +class RcvCompraPendienteCsvRowSchemaTest(unittest.TestCase): + + # TODO: implement for 'RcvCompraPendienteCsvRowSchema'. + pass + + class FunctionsTest(unittest.TestCase): def test_parse_rcv_venta_csv_file(self) -> None: # TODO: implement for 'parse_rcv_venta_csv_file'. pass + def test_parse_rcv_compra_registro_csv_file(self) -> None: + # TODO: implement for 'parse_rcv_compra_registro_csv_file'. + pass + + def test_parse_rcv_compra_no_incluir_csv_file(self) -> None: + # TODO: implement for 'parse_rcv_compra_no_incluir_csv_file'. + pass + + def test_parse_rcv_compra_reclamado_csv_file(self) -> None: + # TODO: implement for 'parse_rcv_compra_reclamado_csv_file'. + pass + + def test_parse_rcv_compra_pendiente_csv_file(self) -> None: + # TODO: implement for 'parse_rcv_compra_pendiente_csv_file'. + pass + def test__parse_rcv_csv_file(self) -> None: # TODO: implement for '_parse_rcv_csv_file'. pass