Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion cl_sii/rcv/parse_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import csv
import logging
from datetime import date, datetime
from typing import Dict, Iterable, Optional, Sequence, Tuple
from typing import Callable, Dict, Iterable, Optional, Sequence, Tuple

import marshmallow
import marshmallow.fields
Expand All @@ -22,6 +22,7 @@
from cl_sii.libs import tz_utils
from cl_sii.rut import Rut

from .constants import RcEstadoContable, RcvKind
from .data_models import (
RcvDetalleEntry, RcNoIncluirDetalleEntry,
RcPendienteDetalleEntry, RcReclamadoDetalleEntry,
Expand All @@ -32,6 +33,60 @@
logger = logging.getLogger(__name__)


RcvCsvFileParserType = Callable[
[Rut, str, str, int, Optional[int]],
Iterable[
Tuple[
Optional[RcvDetalleEntry],
int,
Dict[str, object],
Dict[str, object],
],
],
]


def get_rcv_csv_file_parser(
rcv_kind: RcvKind,
estado_contable: Optional[RcEstadoContable],
) -> RcvCsvFileParserType:
"""
Return a function that parses a CSV file of the given :class:`RcvKind` and
:class:`RcEstadoContable`.

:raises ValueError:
:raises Exception: on unrecoverable errors
"""
parse_func: RcvCsvFileParserType

if rcv_kind == RcvKind.COMPRAS:
if estado_contable is None:
raise ValueError(
"'estado_contable' must not be None when 'rcv_kind' is 'COMPRAS'.",
)
elif estado_contable == RcEstadoContable.REGISTRO:
parse_func = parse_rcv_compra_registro_csv_file
elif estado_contable == RcEstadoContable.NO_INCLUIR:
parse_func = parse_rcv_compra_no_incluir_csv_file
elif estado_contable == RcEstadoContable.RECLAMADO:
parse_func = parse_rcv_compra_reclamado_csv_file
elif estado_contable == RcEstadoContable.PENDIENTE:
parse_func = parse_rcv_compra_pendiente_csv_file
else:
raise Exception(
"Programming error. No handler for given 'estado_contable'.",
estado_contable,
)
elif rcv_kind == RcvKind.VENTAS:
if estado_contable is not None:
raise ValueError("'estado_contable' must be None when 'rcv_kind' is 'VENTAS'.")
parse_func = parse_rcv_venta_csv_file
else:
raise Exception("Programming error. No handler for given 'rcv_kind'.", rcv_kind)

return parse_func


def parse_rcv_venta_csv_file(
rut: Rut,
razon_social: str,
Expand Down