In [1]:
from decimal import Decimal
from sqlalchemy import text
from app.database.database import get_session_origen
from app.database.querys import query_bien_consumo
import pandas as pd

async with get_session_origen() as session:
    result = await session.execute(text(query_bien_consumo))
    data = result.all()
    df = pd.DataFrame(data)
    df = df.astype({
        'fecha': 'datetime64[ns]',
    })
    
    cols_decimal = [
        'entrada_cant',
        'entrada_costo_uni',
        'entrada_costo_tot',
        'salida_cant',
        'salida_costo_uni',
        'salida_costo_tot'
    ]

    for col in cols_decimal:
        df[col] = df[col].apply(lambda x: Decimal(str(x)) if pd.notnull(x) else None) # type: ignore
    
df

Unnamed: 0,movimiento_uuid,movimiento_ref_uuid,movimiento_tipo,fecha,documento_fuente_cod_serie,documento_fuente_cod_numero,concepto,entrada_cant,entrada_costo_uni,entrada_costo_tot,salida_cant,salida_costo_uni,salida_costo_tot
0,25082f63-2e5f-40fa-b63d-28959050c5eb,,EntradaBienConsumoValorNuevo,2025-05-18 18:32:37,MOV2025,1,inventario inicial,3.0,45.5,136.5,,,
1,16366708-522d-413e-b715-aa60c2ed87ee,,EntradaBienConsumoValorNuevo,2025-05-18 18:32:37,MOV2025,1,inventario inicial,3.0,45.5,136.5,,,
2,df9b451f-68b3-40ad-b136-ac7f36029403,,SalidaBienConsumoValorNuevo,2025-05-18 19:25:58,MOV2025,2,venta,,,,1.0,0.0,0.0
3,9ec6697e-fc83-4a08-9a02-dc21dbaf2a20,,SalidaBienConsumoValorNuevo,2025-05-18 19:25:58,MOV2025,2,venta,,,,2.0,0.0,0.0


In [2]:
df.dtypes

movimiento_uuid                        object
movimiento_ref_uuid                    object
movimiento_tipo                        object
fecha                          datetime64[ns]
documento_fuente_cod_serie             object
documento_fuente_cod_numero             int64
concepto                               object
entrada_cant                           object
entrada_costo_uni                      object
entrada_costo_tot                      object
salida_cant                            object
salida_costo_uni                       object
salida_costo_tot                       object
dtype: object

In [None]:
from typing import Any, Hashable
from pandas import DataFrame, Series
from app.models.MovimientoTipoBienConsumo import MovimientoTipoBienConsumo


class ProcesadorMovimientos:
    
    def __init__(self, df: DataFrame):
        self.df = df
        self.saldo_cant = Decimal('0.0')
        self.saldo_valor_uni = Decimal('0.0')
        self.saldo_valor_tot = Decimal('0.0')
        
    def set_df(self, df: DataFrame):
        self.df = df
        
    def procesar(self):
        for index, row in self.df.iterrows():
            row: Series[Any]
            
            match row["movimiento_tipo"]:
        
                case MovimientoTipoBienConsumo.ENTRADA_VALOR_NUEVO.value:
                    self.procesar_entrada_valor_nuevo(index)
                    
                case MovimientoTipoBienConsumo.ENTRADA_VALOR_SALIDA.value:
                    self.procesar_entrada_valor_salida(index)
                        
                case MovimientoTipoBienConsumo.SALIDA_VALOR_NUEVO.value:
                    self.procesar_salida_valor_nuevo(index)
                    
                case MovimientoTipoBienConsumo.SALIDA_VALOR_ENTRADA.value:
                    self.procesar_salida_valor_entrada(index)
                    
                case MovimientoTipoBienConsumo.SALIDA_NOTA_VENTA.value:
                    self.procesar_salida_nota_venta(index)
                    
                case MovimientoTipoBienConsumo.SALIDA_NOTA_VENTA_SERVICIO_REPARACION_RECURSO.value:
                    self.procesar_salida_nota_venta_servicio_reparacion_recurso(index)
                    
                case _:
                    continue


    def procesar_entrada_valor_nuevo(self, index: Hashable):
        self.saldo_cant += self.df.at[index, 'entrada_cant']
        self.saldo_valor_tot += self.df.at[index, 'entrada_costo_tot']
        self.establecer_saldo_valor_uni(index)


    def procesar_entrada_valor_salida(self, index: Hashable):
        pass


    def procesar_salida_valor_nuevo(self, index: Hashable):
        self.df.at[index, 'salida_costo_uni'] = self.saldo_valor_uni
        self.df.at[index, 'salida_costo_tot'] = self.saldo_valor_uni * self.df.at[index, 'salida_cant']
        
        self.saldo_cant -= self.df.at[index,'salida_cant']
        self.saldo_valor_tot -= self.df.at[index, 'salida_costo_tot']
        self.saldo_valor_tot -= self.df.at[index, 'salida_costo_tot']
        self.establecer_saldo_valor_uni(index)


    def procesar_salida_valor_entrada(self, index: Hashable):
        pass


    def procesar_salida_nota_venta(self, index: Hashable):
        pass


    def procesar_salida_nota_venta_servicio_reparacion_recurso(self, index: Hashable):
        pass

    
    def establecer_saldo_valor_uni(self, index: Hashable):
        try:
            self.saldo_valor_uni = self.saldo_valor_tot / self.saldo_cant
        except:
            self.saldo_valor_uni = Decimal('0.0')
            
        self.df.at[index, 'saldo_cant'] = self.saldo_cant
        self.df.at[index, 'saldo_valor_uni'] = self.saldo_valor_uni
        self.df.at[index, 'saldo_valor_tot'] = self.saldo_valor_tot
    
    

In [6]:
procesador = ProcesadorMovimientos(df)
procesador.procesar()
procesador.df

Unnamed: 0,movimiento_uuid,movimiento_ref_uuid,movimiento_tipo,fecha,documento_fuente_cod_serie,documento_fuente_cod_numero,concepto,entrada_cant,entrada_costo_uni,entrada_costo_tot,salida_cant,salida_costo_uni,salida_costo_tot,saldo_cant,saldo_valor_uni,saldo_valor_tot
0,25082f63-2e5f-40fa-b63d-28959050c5eb,,EntradaBienConsumoValorNuevo,2025-05-18 18:32:37,MOV2025,1,inventario inicial,3.0,45.5,136.5,,,,3.0,45.5,136.5
1,16366708-522d-413e-b715-aa60c2ed87ee,,EntradaBienConsumoValorNuevo,2025-05-18 18:32:37,MOV2025,1,inventario inicial,3.0,45.5,136.5,,,,6.0,45.5,273.0
2,df9b451f-68b3-40ad-b136-ac7f36029403,,SalidaBienConsumoValorNuevo,2025-05-18 19:25:58,MOV2025,2,venta,,,,1.0,45.5,45.5,5.0,45.5,227.5
3,9ec6697e-fc83-4a08-9a02-dc21dbaf2a20,,SalidaBienConsumoValorNuevo,2025-05-18 19:25:58,MOV2025,2,venta,,,,2.0,45.5,91.0,3.0,45.5,136.5


In [None]:
# transaccion en las tablas utilizadas debe ser única y bloqueante de escritura ( no de lectura ), en tablas que no se utilizan no se bloquean
# transaccion de lectura siempre habilitado

# REPOSITORIOS KARDEX DETALLE
from typing import Any
from sqlmodel import col, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.entities.KardexMovimientoBienConsumoEntity import KardexMovimientoBienConsumoEntity
from app.services.procesador_movimientos import ProcesadorMovimientos


async def crear_movimientos(session: AsyncSession, kardex_id: int, movimientos: list[dict[str,Any]]):
    
    # 1. Insertar movimientos
    movimientos_ordenados = sorted(movimientos, key=lambda mov: mov["fecha"])
    session.add_all(KardexMovimientoBienConsumoEntity(
        kardex_bien_consumo_id=kardex_id,
        movimiento_uuid=mov["movimientosUuid"],
        movimiento_ref_uuid=mov["movimientoRefUuid"],
        movimiento_tipo=mov["movimientoTipo"],
        fecha=mov["fecha"],
        documento_fuente_cod_serie=mov["documentoFuenteCodigoSerie"],
        documento_fuente_cod_numero=mov["documentoFuenteCodigoNumero"],
        concepto=mov["concepto"],
        entrada_cant=mov["entradaCantidad"],
        entrada_costo_uni=mov["entradaCostoUnitario"],
        entrada_costo_tot=mov["entradaCostoTotal"],
        salida_cant=mov["salidaCantidad"],
        salida_costo_uni=mov["salidaCostoUnitario"],
        salida_costo_tot=mov["salidaCostoTotal"],
    ) for mov in movimientos_ordenados )
    
    
    # 2. Obtener fecha minima
    fechas = [mov["fecha"] for mov in movimientos if mov.get("fecha")]
    if not fechas:
        return 
    fecha_minima = min(fechas)
    
    
    # 3. Obtener saldos anteriores a `fecha_minima`
    result = await session.execute(
        select(KardexMovimientoBienConsumoEntity)
        .where(KardexMovimientoBienConsumoEntity.kardex_bien_consumo_id == kardex_id)
        .where(KardexMovimientoBienConsumoEntity.fecha < fecha_minima)
        .order_by(col(KardexMovimientoBienConsumoEntity.fecha).desc(), col(KardexMovimientoBienConsumoEntity.id).desc())
        .limit(1)
    )
    ultimo_movimiento_anterior = result.scalar_one_or_none()

    saldo_cant = ultimo_movimiento_anterior.saldo_cant if ultimo_movimiento_anterior is not None else Decimal('0.0')
    saldo_valor_uni = ultimo_movimiento_anterior.saldo_valor_uni if ultimo_movimiento_anterior is not None else Decimal('0.0')
    saldo_valor_tot = ultimo_movimiento_anterior.saldo_valor_tot if ultimo_movimiento_anterior is not None else Decimal('0.0')
    

    # 4. Procesar movimientos desde la fecha minima y saldos anteriores en lotes de 100
    offset = 0
    limit = 100
    
    while True:
        
        # obtener lote de movimientos
        result = await session.execute(
            select(KardexMovimientoBienConsumoEntity)
            .where(KardexMovimientoBienConsumoEntity.kardex_bien_consumo_id == kardex_id)
            .where(KardexMovimientoBienConsumoEntity.fecha >= fecha_minima)
            .order_by(col(KardexMovimientoBienConsumoEntity.fecha).asc(), col(KardexMovimientoBienConsumoEntity.id).asc())
            .offset(offset)
            .limit(limit)
        )
        data = result.scalars().all()
        if not data:
            break
        df_movimientos = pd.DataFrame([x.model_dump() for x in data])

        if not df_movimientos.empty:
            # procesar movimientos
            procesador = ProcesadorMovimientos(session, df_movimientos, saldo_cant, saldo_valor_uni, saldo_valor_tot)
            procesador.procesar()
            procesador.actualizar_tabla_movimientos()
            
            # actualizar saldos
            saldo_cant = procesador.saldo_cant
            saldo_valor_uni = procesador.saldo_valor_uni
            saldo_valor_tot = procesador.saldo_valor_tot
            
            offset += limit
        else:
            break


def actualizar_movimientos(movimientos_eliminar: list[dict[str,Any]], movimientos_crear: list[dict[str,Any]]):
    # eliminar_inventario()
    # iniciar_inventario()
    pass

def eliminar_movimientos(movimientos: list[dict[str,Any]]):
    pass