# Asignaciones financieras


## Datos de proyecto en análisis


In [1]:
proyecto = "sibayo"
mes = 5
anio = 2025

## Librerias necesarias


In [2]:
import pandas as pd
import os
import re
import pprint
import pickle
import json
import xlsxwriter
from xlsxwriter.utility import xl_range, xl_rowcol_to_cell
import re
from typing import Dict, List, Any
import itertools
from pathlib import Path
from dotenv import load_dotenv, find_dotenv
from google.oauth2 import service_account
from google.cloud import firestore
import excel2img
from dotenv import load_dotenv, find_dotenv
from openpyxl import load_workbook
from openpyxl.utils import get_column_letter
import win32com.client as win32

## Funciones utiles


### Calculadora de costo total


In [3]:
from typing import TypedDict


class CostoIngenieriaResult(TypedDict):
    costo_directo: float
    gastos_generales: float
    utilidad: float
    subtotal: float
    igv: float
    total: float


def calculadora_costo_total(costo_directo: float) -> CostoIngenieriaResult:
    """
    Calcula el costo total de un proyecto de ingeniería civil en soles peruanos.

    Args:
        costo_directo (float): Costo directo del proyecto

    Returns:
        CostoIngenieriaResult: Diccionario con todos los valores calculados
    """
    # Calcular gastos generales (10% del costo directo)
    gastos_generales = round(costo_directo * 0.10, 2)

    # Calcular utilidad (5% del costo directo)
    utilidad = round(costo_directo * 0.05, 2)

    # Calcular subtotal
    subtotal = round(costo_directo + gastos_generales + utilidad, 2)

    # Calcular IGV (18% del subtotal)
    igv = round(subtotal * 0.18, 2)

    # Calcular total
    total = round(subtotal + igv, 2)

    return {
        "costo_directo": round(costo_directo, 2),
        "gastos_generales": gastos_generales,
        "utilidad": utilidad,
        "subtotal": subtotal,
        "igv": igv,
        "total": total,
    }

### Multiplicar el precio unitario actualizado por la carga trabajo


In [4]:
def calcular_costos_unitarios(
    dict_precios_unitarios_actualizados, cargas_trabajo_contratista
):
    """
    Multiplica cada carga de trabajo del contratista por su precio unitario
    correspondiente.

    Args:
        dict_precios_unitarios_actualizados (dict): mapea código → precio unitario.
        cargas_trabajo_contratista (dict): mapea código → carga de trabajo.

    Returns:
        dict: mapea código → precio total (precio unitario * carga).

    Raises:
        KeyError: si alguna clave de cargas_trabajo_contratista no existe en
                  dict_precios_unitarios_actualizados.
    """
    # Comprobar que no falte ninguna clave
    faltantes = set(cargas_trabajo_contratista) - set(
        dict_precios_unitarios_actualizados
    )
    if faltantes:
        raise KeyError(f"Faltan precios unitarios para las claves: {faltantes}")

    # Generar el diccionario resultado
    resultado = {
        clave: dict_precios_unitarios_actualizados[clave]
        * cargas_trabajo_contratista[clave]
        for clave in cargas_trabajo_contratista
    }
    return resultado

### Fusionar diccionarios

In [5]:
def fusionar_diccionarios(diccionario_de_diccionarios):
    """
    Fusiona diccionarios separados en un diccionario unificado.
    Solo incluye claves donde ambos valores sean diferentes de cero.
    Usa dinámicamente las claves del diccionario de entrada.
    
    Args:
        diccionario_de_diccionarios (dict): Diccionario con estructura:
            {
                "nombre_campo1": {clave: valor, ...},
                "nombre_campo2": {clave: valor, ...}
            }
    
    Returns:
        dict: Diccionario fusionado con estructura {clave: {nombre_campo1: valor, nombre_campo2: valor}}
    """
    # Obtener las claves (nombres de los campos) del diccionario principal
    nombres_campos = list(diccionario_de_diccionarios.keys())
    
    if len(nombres_campos) != 2:
        raise ValueError("El diccionario debe contener exactamente 2 campos")
    
    campo1_nombre = nombres_campos[0]
    campo2_nombre = nombres_campos[1]
    
    campo1_datos = diccionario_de_diccionarios[campo1_nombre]
    campo2_datos = diccionario_de_diccionarios[campo2_nombre]
    
    fusionado = {}
    
    # Obtener todas las claves únicas de ambos diccionarios
    todas_las_claves = set(campo1_datos.keys()) | set(campo2_datos.keys())
    
    for clave in todas_las_claves:
        valor1 = campo1_datos.get(clave, 0)
        valor2 = campo2_datos.get(clave, 0)
        
        # Solo agregar si ambos valores son diferentes de cero
        if valor1 != 0 and valor2 != 0:
            fusionado[clave] = {
                campo1_nombre: valor1,
                campo2_nombre: valor2
            }
    
    return fusionado

### Formatear progresiva

In [6]:
def formatear_progresiva(distancia, decimales=0):
    """
    Convierte una distancia en metros a notación de progresiva.
    
    Parámetros:
    - distancia: int o float, la distancia en metros.
    - decimales: int, número de decimales a mostrar en la parte de los metros.
    
    Retorna:
    - str: progresiva en formato 'K+XXX' con los decimales indicados.
    """
    if not isinstance(distancia, (int, float)):
        raise ValueError("La distancia debe ser un número (int o float).")
    if not isinstance(decimales, int) or decimales < 0:
        raise ValueError("Los decimales deben ser un entero no negativo.")
    
    km = int(distancia) // 1000
    metros = distancia - (km * 1000)
    
    formato_metros = f"{metros:0.{decimales}f}".zfill(3 + (1 if decimales > 0 else 0) + decimales)
    return f"{km}+{formato_metros}"

### Calcular avance

In [7]:
def calcular_avance(programado, ejecutado):
    """
    Calcula el porcentaje de avance basado en lo programado y lo ejecutado.

    Si lo programado es 0 y lo ejecutado es mayor a 0, devuelve 'ejecución adelantada'.
    Si ambos son 0, devuelve 0.0.
    En cualquier otro caso, devuelve el porcentaje (0-1) como float redondeado a 2 decimales.
    """
    if programado == 0:
        if ejecutado > 0:
            return "Ejec. adelantada"
        else:
            return 0.0
    else:
        porcentaje = (ejecutado / programado) 
        return porcentaje

### Ordenar por centena

In [8]:
# def ordenar_por_centena(data: Any) -> List[Dict[str, Any]]:
#     """
#     Toma un dict (o un JSON en formato str) cuyas claves acaban en número
#     y devuelve una lista de dicts {'key':…, 'value':…} ordenada por ese número.
#     Compatible con Firestore (to_dict()) y JSON.
#     """
#     # Si viene como cadena JSON, lo convertimos
#     if isinstance(data, str):
#         data = json.loads(data)
    
#     # Aseguramos que sea dict
#     if not isinstance(data, dict):
#         raise ValueError("Se esperaba un diccionario o un string JSON que represente un diccionario.")

#     pattern = re.compile(r'(\d+)$')

#     def obtener_clave_numerica(item):
#         clave = str(item[0])  # Convertimos clave a string en caso no lo sea
#         match = pattern.search(clave)
#         if match:
#             return int(match.group(1))
#         else:
#             return float('inf')  # Opcional: claves sin número al final se van al final

#     sorted_items = sorted(data.items(), key=obtener_clave_numerica)

#     return [{"key": k, "value": v} for k, v in sorted_items]

In [9]:
def ordenar_por_centena(data: Any) -> List[Dict[str, Any]]:
    """
    Toma un dict (o un JSON en formato str) cuyas claves tienen formato letras+número
    y devuelve una lista de dicts {'key':…, 'value':…} ordenada primero por letras y luego por número.
    Compatible con Firestore (to_dict()) y JSON.
    """
    # Si viene como cadena JSON, lo convertimos
    if isinstance(data, str):
        data = json.loads(data)
    
    # Aseguramos que sea dict
    if not isinstance(data, dict):
        raise ValueError("Se esperaba un diccionario o un string JSON que represente un diccionario.")

    pattern = re.compile(r'^([A-Z]+)(\d+)$')

    def obtener_clave_ordenada(item):
        clave = str(item[0])  # Convertimos clave a string en caso no lo sea
        match = pattern.search(clave)
        if match:
            letras = match.group(1)  # MR, VP, etc.
            numero = int(match.group(2))  # 101, 102, etc.
            return (letras, numero)  # Tupla para ordenar primero por letras, luego por número
        else:
            # Para claves que no siguen el patrón, las ponemos al final
            return (clave, float('inf'))

    sorted_items = sorted(data.items(), key=obtener_clave_ordenada)

    return [{"key": k, "value": v} for k, v in sorted_items]

### Clave grupo

In [10]:
def clave_grupo(item):
    """
    Extrae el grupo de una clave de actividad.
    
    Para claves con formato letras+número (MR101, VP101, etc.): 
    - Extrae las letras para identificar el tipo de actividad
    - Extrae el número y divide por 100 para agrupar por centenas
    
    Args:
        item: Diccionario con clave 'key' que contiene el código de actividad
        
    Returns:
        tuple: (prefijo_letras, grupo_numero) para ordenamiento jerárquico
    """
    key = item['key']
    
    # Patrón para extraer letras y números
    pattern = re.compile(r'^([A-Z]+)(\d+)$')
    match = pattern.search(key)
    
    if match:
        letras = match.group(1)  # MR, VP, etc.
        numero = int(match.group(2))  # 101, 102, etc.
        grupo = numero // 100  # Agrupar por centenas
        
        # Retornar tupla para ordenamiento jerárquico
        # Las letras se ordenan alfabéticamente, luego por grupo numérico
        return (letras, grupo)
    else:
        # Para claves que no siguen el patrón (PS, PP, etc.)
        # Asignar un grupo especial que aparezca al final
        return (key, 999)

## Carga de datos


### Firebase


In [11]:
# 1. Busca el .env en el directorio actual o en cualquiera de los padres
dotenv_path = find_dotenv()
if not dotenv_path:
    raise FileNotFoundError(
        "No se encontró ningún archivo .env en este directorio ni en sus padres."
    )
load_dotenv(dotenv_path)

# 2. Define el root del proyecto como la carpeta que contiene el .env
project_root = Path(dotenv_path).parent

# 3. Obtén la ruta relativa de las credenciales desde la variable de entorno
rel_cred_path = os.getenv("FIRESTORE_CREDENTIALS")
if not rel_cred_path:
    raise RuntimeError("No existe la variable FIRESTORE_CREDENTIALS en el .env")

# 4. Construye la ruta absoluta al JSON
cred_path = Path(rel_cred_path)
if not cred_path.is_absolute():
    cred_path = (project_root / cred_path).resolve()

if not cred_path.exists():
    raise FileNotFoundError(f"No existe el archivo de credenciales en: {cred_path}")

# 5. Carga las credenciales y crea el cliente de Firestore
credentials = service_account.Credentials.from_service_account_file(str(cred_path))
client = firestore.Client(credentials=credentials, project=credentials.project_id)

# 6. Prueba que funcione
print("Colecciones disponibles:", [c.id for c in client.collections()])

Colecciones disponibles: ['rutinarios']


In [12]:
db = firestore.Client(credentials=credentials, project=credentials.project_id)

#### Documento del proyecto firebase

In [13]:
doc_proyecto_firebase = db.collection("rutinarios").document(proyecto).get().to_dict()

pprint.pprint(doc_proyecto_firebase)

{'contrato': {'contratista': {'razon_social': 'PLATERS MANAGEMENT S.A.C.',
                              'ruc': ''},
              'denominacion_tramo_convenio': 'EMP. AR-111 (NUEVO SIBAYO) - '
                                             'TUTI EMP. AR-681 (DV. CHIVAY) '
                                             '(KM 32+252)',
              'fecha_inicio': {'anio': 2025, 'dia': 15, 'mes': 4},
              'id_contrato': '004-2025',
              'jefe_mantenimiento': {'apellido': 'Tinta Cáceres',
                                     'dni': 0,
                                     'nombre': 'Genaro',
                                     'titulo': 'Ingeniero'},
              'monto_contrato': 165130.4,
              'numero_cuadrillas': 1,
              'numero_trabajadores': 3,
              'tiempo_ejecucion_dias': 240,
              'tipo_servicio': 'mantenimiento rutinario'},
 'datos_generales': {'distritos': ['Chivay', 'Tuti', 'Sibayo'],
                     'provincia': 'Cayllom

#### Contrato


In [14]:
contrato = doc_proyecto_firebase["contrato"]
pprint.pprint(contrato)

{'contratista': {'razon_social': 'PLATERS MANAGEMENT S.A.C.', 'ruc': ''},
 'denominacion_tramo_convenio': 'EMP. AR-111 (NUEVO SIBAYO) - TUTI EMP. AR-681 '
                                '(DV. CHIVAY) (KM 32+252)',
 'fecha_inicio': {'anio': 2025, 'dia': 15, 'mes': 4},
 'id_contrato': '004-2025',
 'jefe_mantenimiento': {'apellido': 'Tinta Cáceres',
                        'dni': 0,
                        'nombre': 'Genaro',
                        'titulo': 'Ingeniero'},
 'monto_contrato': 165130.4,
 'numero_cuadrillas': 1,
 'numero_trabajadores': 3,
 'tiempo_ejecucion_dias': 240,
 'tipo_servicio': 'mantenimiento rutinario'}


In [15]:
monto_contrato = contrato["monto_contrato"]
print(monto_contrato)

165130.4


#### Expediente técnico firebase

In [16]:
expediente_tecnico_firebase = doc_proyecto_firebase["expediente"]

pprint.pprint(expediente_tecnico_firebase)

{'cargas_trabajo': {'MR101': 10.28,
                    'MR103': 53.33,
                    'MR104': 85.33,
                    'MR201': 32350,
                    'MR202': 34.67,
                    'MR301': 15999,
                    'MR401': 64.67,
                    'MR601': 258.02,
                    'VP101': 1512.58,
                    'VP102': 700},
 'codigo_ruta': 'AR-683',
 'coordenadas': {'fin': {'altitud': 3629,
                         'datum': 'WGS84',
                         'hemisferio': 'S',
                         'progresiva': 32252.0,
                         'x': 220383.97,
                         'y': 8270075.15,
                         'zona': None,
                         'zona_letra': None},
                 'inicio': {'altitud': 3820,
                            'datum': 'WGS84',
                            'hemisferio': 'S',
                            'progresiva': 0,
                            'x': 226316.6,
                            'y': 8281146.

#### Progresiva de inicio y fin del expediente

In [17]:
progresiva_inicio=expediente_tecnico_firebase["coordenadas"]["inicio"]["progresiva"]
progresiva_fin=expediente_tecnico_firebase["coordenadas"]["fin"]["progresiva"]

print(progresiva_inicio)
print(progresiva_fin)

0
32252.0


#### Valorización programada mensual


In [18]:
desembolsos =  db.collection("rutinarios").document(proyecto).collection('presupuestos').document('desembolsos').get().to_dict()
cronogramas_desembolsos = desembolsos['cronograma_desembolsos']
pprint.pprint(cronogramas_desembolsos)

{'10': {'gastos_operativos': 704.82,
        'igv': 1075.15,
        'mantenimiento_con_go': 7753.01,
        'mantenimiento_con_igv': 7048.19,
        'mantenimiento_sin_igv': 5973.04},
 '11': {'gastos_operativos': 534.7,
        'igv': 815.65,
        'mantenimiento_con_go': 5881.72,
        'mantenimiento_con_igv': 5347.01,
        'mantenimiento_sin_igv': 4531.37},
 '12': {'gastos_operativos': 262.18,
        'igv': 399.93,
        'mantenimiento_con_go': 2883.94,
        'mantenimiento_con_igv': 2621.76,
        'mantenimiento_sin_igv': 2221.83},
 '4': {'gastos_operativos': 222.55,
       'igv': 339.48,
       'mantenimiento_con_go': 2448.06,
       'mantenimiento_con_igv': 2225.51,
       'mantenimiento_sin_igv': 1886.02},
 '5': {'gastos_operativos': 2553.4,
       'igv': 3895.02,
       'mantenimiento_con_go': 28087.43,
       'mantenimiento_con_igv': 25534.03,
       'mantenimiento_sin_igv': 21639.01},
 '6': {'gastos_operativos': 2988.84,
       'igv': 4559.25,
       'mantenim

In [19]:
desembolso_current_month = cronogramas_desembolsos[str(mes)]
print(desembolso_current_month["mantenimiento_con_igv"])

25534.03


#### Cargas de trabajo mensual presentadas por el contratista


In [20]:
valorizaciones = db.collection("rutinarios").document(proyecto).collection('valorizaciones').document(str(mes)).get().to_dict()
cargas_trabajo_contratista = valorizaciones['cargas_trabajo_contratista_corregido']
print(cargas_trabajo_contratista)

{'MR301': 1999.88, 'MR103': 13.33, 'MR201': 5146.67, 'MR202': 5.1, 'VP101': 242.474, 'MR101': 0.86, 'VP102': 112}


#### Cargas de trabajo del expediente técnico

In [21]:
cargas_trabajo_expediente_tecnico=doc_proyecto_firebase['expediente']['cargas_trabajo']

pprint.pprint(cargas_trabajo_expediente_tecnico)

{'MR101': 10.28,
 'MR103': 53.33,
 'MR104': 85.33,
 'MR201': 32350,
 'MR202': 34.67,
 'MR301': 15999,
 'MR401': 64.67,
 'MR601': 258.02,
 'VP101': 1512.58,
 'VP102': 700}


#### Precios unitarios del expediente técnico

In [22]:
precios_unitarios_expediente_tecnico = doc_proyecto_firebase['expediente']['precios_unitarios']

pprint.pprint(precios_unitarios_expediente_tecnico)

{'MR101': 317.8,
 'MR103': 19.07,
 'MR104': 21.18,
 'MR201': 0.53,
 'MR202': 95.34,
 'MR301': 0.17,
 'MR401': 13.28,
 'MR601': 2.44,
 'VP101': 47.01,
 'VP102': 71.79}


### Pickle

#### Cargas de trabajo programadas

In [23]:
# Concatenar la ruta completa al archivo .pkl
ruta_archivo = os.path.join("data",proyecto, f"{proyecto}_cargas_trabajo.pkl")

# Leer el archivo pickle
with open(ruta_archivo, "rb") as f:
    cargas_trabajo_programadas_anualmente = pickle.load(f)

cargas_trabajo_programadas_anualmente.tail(15)

Unnamed: 0_level_0,2025-04,2025-05,2025-06,2025-07,2025-08,2025-09,2025-10,2025-11,2025-12,TOTAL
codigo,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
MR201,862.344,5146.590909,1470.454545,2940.909091,2940.909091,1470.454545,8822.727273,5019.4728,3676.136364,32350.0
MR202,4.078824,5.098529,0.0,2.039412,3.059118,4.078824,6.118235,8.157647,2.039412,34.67
MR203,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
MR204,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
MR205,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
MR206,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
MR301,1999.875,1999.875,1999.875,1999.875,1999.875,1999.875,1999.875,1999.875,0.0,15999.0
MR401,16.1675,0.0,0.0,0.0,0.0,16.1675,16.1675,16.1675,0.0,64.67
MR501,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
MR601,43.003333,0.0,43.003333,0.0,43.003333,43.003333,43.003333,43.003333,0.0,258.02


In [24]:
#eliminando el total
cargas_trabajo_programadas_anualmente = cargas_trabajo_programadas_anualmente.iloc[:-1]

In [25]:
cargas_trabajo_programadas_current_month = cargas_trabajo_programadas_anualmente[f"2025-{mes:02d}"].to_dict()
pprint.pprint(cargas_trabajo_programadas_current_month)

{'MR101': 0.856666667,
 'MR102': 0.0,
 'MR103': 13.3325,
 'MR104': 0.0,
 'MR111': 0.0,
 'MR112': 0.0,
 'MR201': 5146.590909,
 'MR202': 5.098529412,
 'MR203': 0.0,
 'MR204': 0.0,
 'MR205': 0.0,
 'MR206': 0.0,
 'MR301': 1999.875,
 'MR401': 0.0,
 'MR501': 0.0,
 'MR601': 0.0,
 'MR701': 0.0,
 'MR702': 0.0,
 'VP101': 242.5,
 'VP102': 112.0}


In [26]:
cargas_trabajo_programadas_anualmente.index.to_list()

['MR101',
 'MR102',
 'MR103',
 'MR104',
 'MR111',
 'MR112',
 'MR201',
 'MR202',
 'MR203',
 'MR204',
 'MR205',
 'MR206',
 'MR301',
 'MR401',
 'MR501',
 'MR601',
 'MR701',
 'MR702',
 'VP101',
 'VP102']

#### Cronograma anual

In [27]:
# Concatenar la ruta completa al archivo .pkl
ruta_archivo = os.path.join("data",proyecto, f"{proyecto}_cronograma_anual.pkl")

# Leer el archivo pickle
with open(ruta_archivo, "rb") as f:
    df_cronograma_anual = pickle.load(f)

df_cronograma_anual.head(15)

Unnamed: 0,2025-04,2025-05,2025-06,2025-07,2025-08,2025-09,2025-10,2025-11,2025-12,TOTAL
MR101,1,2,1,2,2,1,1,1,1,12
MR102,0,0,0,0,0,0,0,0,0,0
MR103,0,1,1,0,0,1,0,1,0,4
MR104,0,0,1,1,1,1,1,1,0,6
MR111,5,11,10,11,11,10,11,10,3,82
MR112,3,5,5,5,5,5,5,5,2,40
MR201,3,6,5,6,6,5,6,5,2,44
MR202,3,5,4,4,4,4,4,4,2,34
MR203,0,0,0,0,0,0,0,0,0,0
MR204,0,0,0,0,0,0,0,0,0,0


### JSON

#### Actividades

In [28]:
ruta_actividades= os.path.join("data", "general_data", "actividades.json")
with open(ruta_actividades, 'r', encoding='utf-8') as archivo:
    actividades = json.load(archivo)
# Ahora 'datos' es un diccionario de Python
pprint.pprint(actividades)

[{'key': 'MR100',
  'value': {'label': 'Conservación de calzada',
            'value': [{'key': 'MR101',
                       'value': {'carga_trabajo': 0,
                                 'label': 'Limpieza de calzada',
                                 'unidad': 'Km'}},
                      {'key': 'MR102',
                       'value': {'carga_trabajo': 0,
                                 'label': 'Bacheo',
                                 'unidad': 'm2'}},
                      {'key': 'MR103',
                       'value': {'carga_trabajo': 0,
                                 'label': 'Desquinche',
                                 'unidad': 'm3'}},
                      {'key': 'MR104',
                       'value': {'carga_trabajo': 0,
                                 'label': 'Remoción de derrumbes',
                                 'unidad': 'm3'}}]}},
 {'key': 'MR200',
  'value': {'label': 'Limpieza de obras de arte',
            'value': [{'key': 'MR201',
            

## Cálculos


### Cargas de trabajo y pu de expediente tecnico

In [29]:
datos_cargas_trabajo_expediente_tecnico=fusionar_diccionarios(
    {
        'precio_unitario':precios_unitarios_expediente_tecnico,
        'carga_trabajo':cargas_trabajo_expediente_tecnico
    }
)

pprint.pprint(datos_cargas_trabajo_expediente_tecnico)

{'MR101': {'carga_trabajo': 10.28, 'precio_unitario': 317.8},
 'MR103': {'carga_trabajo': 53.33, 'precio_unitario': 19.07},
 'MR104': {'carga_trabajo': 85.33, 'precio_unitario': 21.18},
 'MR201': {'carga_trabajo': 32350, 'precio_unitario': 0.53},
 'MR202': {'carga_trabajo': 34.67, 'precio_unitario': 95.34},
 'MR301': {'carga_trabajo': 15999, 'precio_unitario': 0.17},
 'MR401': {'carga_trabajo': 64.67, 'precio_unitario': 13.28},
 'MR601': {'carga_trabajo': 258.02, 'precio_unitario': 2.44},
 'VP101': {'carga_trabajo': 1512.58, 'precio_unitario': 47.01},
 'VP102': {'carga_trabajo': 700, 'precio_unitario': 71.79}}


In [30]:
# Crear DataFrame usando pd.DataFrame.from_dict() con orient='index'
df_cargas_trabajo_expediente = pd.DataFrame.from_dict(
    datos_cargas_trabajo_expediente_tecnico, orient="index"
)

# Resetear el índice para convertir las claves MR en una columna
df_cargas_trabajo_expediente = df_cargas_trabajo_expediente.reset_index().rename(
    columns={"index": "codigo_MR"}
)

In [31]:
# Agregar columna parcial
df_cargas_trabajo_expediente["parcial"] = (
    df_cargas_trabajo_expediente["precio_unitario"]
    * df_cargas_trabajo_expediente["carga_trabajo"]
)

In [32]:
df_cargas_trabajo_expediente.head()

Unnamed: 0,codigo_MR,precio_unitario,carga_trabajo,parcial
0,MR101,317.8,10.28,3266.984
1,MR601,2.44,258.02,629.5688
2,MR401,13.28,64.67,858.8176
3,MR103,19.07,53.33,1017.0031
4,VP102,71.79,700.0,50253.0


In [33]:
# Calcular el total
costo_directo = df_cargas_trabajo_expediente["parcial"].sum()
print("costo_directo", costo_directo)

costo_directo 152109.8165


In [34]:
costo_total_expediente = calculadora_costo_total(costo_directo)
print(costo_total_expediente["total"])

206413.02


In [35]:
# Agregar columna parcial
df_cargas_trabajo_expediente["precio_unitario_actualizado"] = (
    df_cargas_trabajo_expediente["precio_unitario"]
    * (monto_contrato / costo_total_expediente["total"])
)

df_cargas_trabajo_expediente["parcial_actualizado"] = (
    df_cargas_trabajo_expediente["precio_unitario_actualizado"]
    * df_cargas_trabajo_expediente["carga_trabajo"]
)

# Calcular el total
costo_directo_actualizado = df_cargas_trabajo_expediente["parcial_actualizado"].sum()

print(f"El costo directo es: {costo_directo_actualizado}")

El costo directo es: 121687.84140928513


In [36]:
df_cargas_trabajo_expediente.head(10)

Unnamed: 0,codigo_MR,precio_unitario,carga_trabajo,parcial,precio_unitario_actualizado,parcial_actualizado
0,MR101,317.8,10.28,3266.984,254.239975,2613.586947
1,MR601,2.44,258.02,629.5688,1.952,503.654991
2,MR401,13.28,64.67,858.8176,10.623999,687.054013
3,MR103,19.07,53.33,1017.0031,15.255999,813.602401
4,VP102,71.79,700.0,50253.0,57.431994,40202.396105
5,MR202,95.34,34.67,3305.4378,76.271993,2644.349984
6,MR201,0.53,32350.0,17145.5,0.424,13716.398671
7,MR104,21.18,85.33,1807.2894,16.943998,1445.83138
8,VP101,47.01,1512.58,71106.3858,37.607996,56885.103128
9,MR301,0.17,15999.0,2719.83,0.136,2175.863789


In [37]:
pago_costo_total_contratista = calculadora_costo_total(costo_directo_actualizado)
print(pago_costo_total_contratista["total"])

165130.39


In [38]:
dict_precios_unitarios_actualizados = dict(
    zip(
        df_cargas_trabajo_expediente["codigo_MR"],
        df_cargas_trabajo_expediente["precio_unitario_actualizado"],
    )
)

pprint.pprint(dict_precios_unitarios_actualizados)

{'MR101': 254.23997536589505,
 'MR103': 15.255998521798674,
 'MR104': 16.9439983582431,
 'MR201': 0.4239999589173203,
 'MR202': 76.27199260976852,
 'MR301': 0.1359999868225367,
 'MR401': 10.623998970607571,
 'MR601': 1.9519998108646441,
 'VP101': 37.60799635604382,
 'VP102': 57.431994435234756}


### Cálculo de pago de acuerdo a cargas de trabajo


In [39]:
pago_costo_directo_parciales_contratista = calcular_costos_unitarios(
    dict_precios_unitarios_actualizados, cargas_trabajo_contratista
)
pprint.pprint(pago_costo_directo_parciales_contratista)

{'MR101': 218.64637881466973,
 'MR103': 203.3624602955763,
 'MR201': 2182.187868561005,
 'MR202': 388.98716230981944,
 'MR301': 271.9836536466547,
 'VP101': 9118.96130843537,
 'VP102': 6432.383376746293}


##### Visualizacion en dataframe


In [40]:
df_pago_costo_directo_parciales_contratista = pd.DataFrame.from_dict(
    pago_costo_directo_parciales_contratista, orient="index"
)

df_pago_costo_directo_parciales_contratista = (
    df_pago_costo_directo_parciales_contratista.reset_index().rename(
        columns={"index": "codigo_MR"}
    )
)

df_pago_costo_directo_parciales_contratista = (
    df_pago_costo_directo_parciales_contratista.rename(columns={0: "monto_pago"})
)

df_pago_costo_directo_parciales_contratista

Unnamed: 0,codigo_MR,monto_pago
0,MR301,271.983654
1,MR103,203.36246
2,MR201,2182.187869
3,MR202,388.987162
4,VP101,9118.961308
5,MR101,218.646379
6,VP102,6432.383377


In [41]:
# Ordenar el DataFrame por la columna 'monto_pago'
# Por defecto, el orden es ascendente (de menor a mayor)
df_ordenado = df_pago_costo_directo_parciales_contratista.sort_values(
    by="monto_pago", ascending=False
)

print("\nDataFrame Ordenado por 'monto_pago' (ascendente):")
print(df_ordenado)


DataFrame Ordenado por 'monto_pago' (ascendente):
  codigo_MR   monto_pago
4     VP101  9118.961308
6     VP102  6432.383377
2     MR201  2182.187869
3     MR202   388.987162
0     MR301   271.983654
5     MR101   218.646379
1     MR103   203.362460


In [42]:
# Lambda que suma todos los valores de un diccionario
sumar_valores = lambda d: sum(d.values())

In [43]:
pago_costo_directo_contratista = sumar_valores(pago_costo_directo_parciales_contratista)
print(pago_costo_directo_contratista)

18816.51220880939


In [44]:
pago_costo_total_contratista = calculadora_costo_total(pago_costo_directo_contratista)
print(pago_costo_total_contratista["total"])

25534.01


##### Validación


In [45]:
diferencia_costos = abs(
    pago_costo_total_contratista["total"]
    - desembolso_current_month["mantenimiento_con_igv"]
)

pprint.pprint(
    {
        "diferencia_costos": diferencia_costos,
        "ejecutado": pago_costo_total_contratista["total"],
        "programado": desembolso_current_month["mantenimiento_con_igv"],
    }
)

if diferencia_costos > 1:
    raise ValueError(
        "La valorizacion de las actividades presentadas por el contratista no es coherente con lo programado"
    )

{'diferencia_costos': 0.020000000000436557,
 'ejecutado': 25534.01,
 'programado': 25534.03}


### Ordenando cargas de trabajo

In [46]:
cargas_trabajo_ordenadas=ordenar_por_centena(cargas_trabajo_programadas_current_month)
pprint.pprint(cargas_trabajo_ordenadas)

[{'key': 'MR101', 'value': 0.856666667},
 {'key': 'MR102', 'value': 0.0},
 {'key': 'MR103', 'value': 13.3325},
 {'key': 'MR104', 'value': 0.0},
 {'key': 'MR111', 'value': 0.0},
 {'key': 'MR112', 'value': 0.0},
 {'key': 'MR201', 'value': 5146.590909},
 {'key': 'MR202', 'value': 5.098529412},
 {'key': 'MR203', 'value': 0.0},
 {'key': 'MR204', 'value': 0.0},
 {'key': 'MR205', 'value': 0.0},
 {'key': 'MR206', 'value': 0.0},
 {'key': 'MR301', 'value': 1999.875},
 {'key': 'MR401', 'value': 0.0},
 {'key': 'MR501', 'value': 0.0},
 {'key': 'MR601', 'value': 0.0},
 {'key': 'MR701', 'value': 0.0},
 {'key': 'MR702', 'value': 0.0},
 {'key': 'VP101', 'value': 242.5},
 {'key': 'VP102', 'value': 112.0}]


## Escribiendo el excel

### Path

In [47]:
# Ruta dinámica
ruta_directorio = os.path.join("output", proyecto, str(mes))
ruta_archivo = os.path.join(ruta_directorio, "cargas_trabajo.xlsx")
# Crear carpetas si no existen
os.makedirs(ruta_directorio, exist_ok=True)

wb = xlsxwriter.Workbook(ruta_archivo)
ws=wb.add_worksheet("cargas_trabajo")

### Formatos

In [48]:
# 1. Propiedades base
BASE_CELL = {
    "valign": "vcenter",
    "border": 1,
}

BASE_HEADER = {
    **BASE_CELL,
    "bold": True,
}

BG_HEADER = "#D9E1F2"
BG_TABLE = "#C5D9F1"


# 2. Función de helper para crear formatos
def fmt(wb, **props):
    cfg = {}
    # parte genérica
    cfg.update(props)
    return wb.add_format(cfg)


# 3. Diccionario de formatos
formats = {
    # encabezados
    "header": {"align": "center", "bg_color": BG_HEADER, **BASE_HEADER},
    "header2": {"align": "left", **BASE_HEADER},
    "header_label": {"align": "center", **BASE_HEADER},
    # formato vertical header
    "header_vertical": {
        **BASE_HEADER,
        "bold": True,
        "align": "center",
        "valign": "vcenter",
        "bg_color": BG_HEADER,
        "rotation": 90,  # <— ¡aquí está el giro!
    },
    # encabezados de tabla
    "table_header": {
        "align": "center",
        "text_wrap": True,
        "shrink": True,
        "bg_color": BG_TABLE,
        **BASE_CELL,
    },
    "table_header_int": {"align": "center", "bg_color": BG_TABLE, **BASE_CELL},
    # celdas numéricas y de texto
    "cell": {"font_size": 8, "num_format": "#,##0.00", **BASE_CELL},
    # "cell": {"font_size": 8, "num_format": "#,##0.00", **BASE_CELL},
    "cell_int": {"num_format": "#,##0", **BASE_CELL},
    "cell_number": {"num_format": "#,##0.00", **BASE_CELL},
    "cell_number_percent": {
        "num_format": "#,##0.00%",
        "align": "right",
        **BASE_CELL,
    },
    "cell_text": {"align": "left", "text_wrap": True, "shrink": True, **BASE_CELL},
    "cell_right": {"align": "right", **BASE_CELL},
    # sumas
    "suma": {"num_format": "#,##0.00", **BASE_CELL},
    # soles
    "soles_color": {
        "bold": True,
        "bg_color": BG_HEADER,
        "num_format": '"S/." #,##0.00',
        **BASE_CELL,
    },
    "soles": {"num_format": '"S/." #,##0.00', **BASE_CELL},
    # formato de ajuste general
    "ajustar": {"align": "center", "text_wrap": True, "shrink": True, **BASE_CELL},
    "only_border": {
        "border": 1,
    },
}

# 4. Creación dinámica de los objetos Format
wb_formats = {name: fmt(wb, **props) for name, props in formats.items()}

# Ahora sólo usa wb_formats["header"], wb_formats["cell_text"], etc.

In [49]:
# Punto de inserción (cero-indexed)
table_start_row = 1   # si quieres que empiece en la fila 7
table_start_col = 1   # si quieres que empiece en la columna B

### Escritores de excel

In [50]:
def merge(ws, r1, c1, r2, c2, valor, fmt):
    """Merge relative a table_start_* + offsets."""
    ws.merge_range(
        xl_range(table_start_row + r1,
                 table_start_col + c1,
                 table_start_row + r2,
                 table_start_col + c2),
        valor, fmt
    )

In [51]:
def write_rel(ws, row_offset, col_offset, value, fmt=None):
    """
    Escribe en la celda (table_start_row + row_offset, table_start_col + col_offset)
    """
    abs_row = table_start_row + row_offset
    abs_col = table_start_col + col_offset
    ws.write(abs_row, abs_col, value, fmt)

In [52]:
def merge_xy(ws, start: tuple[int,int], end: tuple[int,int], value, fmt):
    """
    Merge de celdas desde start=(x1, y1) hasta end=(x2, y2),
    relativas a table_start_col/row.
    """
    (x1, y1), (x2, y2) = start, end
    ws.merge_range(
        xl_range(
            table_start_row + y1,  # row1 absoluto
            table_start_col + x1,  # col1 absoluto
            table_start_row + y2,  # row2 absoluto
            table_start_col + x2,  # col2 absoluto
        ),
        value, fmt
    )

def write_xy(ws, pos: tuple[int,int], value, fmt=None):
    """
    Escribe un valor en pos=(x, y) relativa a table_start_col/row.
    """
    x, y = pos
    ws.write(
        table_start_row + y,
        table_start_col + x,
        value, fmt
    )

In [53]:
def set_columns_rel(ws, rel_col_widths):
    """
    Aplica ws.set_column a rangos de columnas definidos
    en coordenadas relativas (x) a table_start_col.
    rel_col_widths: lista de tuplas (first_rel, last_rel, width)
    """
    for first_rel, last_rel, width in rel_col_widths:
        abs_first = table_start_col + first_rel
        abs_last  = table_start_col + last_rel
        ws.set_column(abs_first, abs_last, width)

### Anchos de columnas

In [54]:
col_widths_rel = [
    (0, 0, 8),  # columna B en adelante:  B–B ancho=8
    (1, 1, 12),  # C–C ancho=30
    (2, 2, 30),  # D–H ancho=8
    (3, 3, 12),  # I–T ancho=8
    (4, 4, 10),  # U–U ancho=10
    (5, 5, 10),  # U–U ancho=10
    (6, 6, 14),  # U–U ancho=10
    (7, 7, 14),
]

set_columns_rel(ws, col_widths_rel)

### Comenzando a escribir el excel

#### Encabezados de la tabla

In [55]:
merge_xy(ws, (0, 0), (0, 1), "N.º", wb_formats["header"])
merge_xy(ws, (1, 0), (1, 1), "Código", wb_formats["header"])
merge_xy(ws, (2, 0), (2, 1), "Actividades", wb_formats["header"])

merge_xy(ws, (3, 0), (3, 1), "Unidad", wb_formats["header"])

merge_xy(ws, (4, 0), (5, 0), "Cargas de trabajo", wb_formats["header"])

write_xy(ws, (4, 1), "Programación", wb_formats["header_vertical"])
write_xy(ws, (5, 1), "Ejecución", wb_formats["header_vertical"])

merge_xy(ws, (6, 0), (6, 1), "Avance", wb_formats["header"])
merge_xy(ws, (7, 0), (7, 1), "Progresivas", wb_formats["header"])

#### Contenido de la tabla:

In [56]:
current_col = 0
current_row = 2

In [57]:
columnas_insertadas = 0
impresos = set()
contador_items = 1

for grupo, elementos in itertools.groupby(cargas_trabajo_ordenadas, clave_grupo):
    current_col = 0
    
    # Desempaquetar la tupla del grupo
    letras, numero_grupo = grupo
    
    # Para grupos especiales (PS, PP, etc.)
    if letras not in ['MR', 'VP'] and numero_grupo == 999:
        # Procesar elementos especiales sin buscar en actividades MR
        for it in elementos:
            current_col = 0
            
            write_xy(
                ws, (current_col, current_row), contador_items, wb_formats["cell_int"]
            )  # numero
            current_col += 1
            write_xy(
                ws, (current_col, current_row), it["key"], wb_formats["cell_text"]
            )  # codigo
            current_col += 1
            write_xy(
                ws,
                (current_col, current_row),
                f"Actividad {it['key']}",  # Etiqueta genérica
                wb_formats["cell_text"],
            )  # etiqueta
            current_col += 1
            write_xy(
                ws,
                (current_col, current_row),
                "unidad",  # Unidad genérica
                wb_formats["cell_text"],
            )  # unidad
            current_col += 1
            write_xy(
                ws,
                (current_col, current_row),
                cargas_trabajo_programadas_anualmente.loc[it["key"], f"{anio}-{mes:02d}"],
                wb_formats["cell_number"],
            )  # programado
            current_col += 1
            write_xy(
                ws,
                (current_col, current_row),
                cargas_trabajo_contratista.get(it["key"], 0),
                wb_formats["cell_number"],
            )  # ejecutado
            current_col += 1

            avance = calcular_avance(
                programado=cargas_trabajo_programadas_anualmente.loc[it["key"], f"{anio}-{mes:02d}"],
                ejecutado=cargas_trabajo_contratista.get(it["key"], 0),
            )
            write_xy(
                ws,
                (current_col, current_row),
                avance,
                wb_formats["cell_number_percent"],
            )  # avance
            current_col += 1

            progresivas = (
                formatear_progresiva(progresiva_inicio)
                + " - "
                + formatear_progresiva(progresiva_fin)
            )
            write_xy(
                ws,
                (current_col, current_row),
                progresivas,
                wb_formats["cell_text"],
            )  # progresivas

            contador_items += 1
            current_row += 1
    else:
        # Procesar elementos normales (MR, VP, etc.)
        inicio = numero_grupo * 100
        fin = inicio + 99

        print(f"Valores entre {inicio} al {fin} para {letras}!")
        
        # Buscar la actividad correspondiente según el prefijo
        resultado = next(
            (item for item in actividades if item["key"] == f"{letras}{inicio}"), None
        )
        
        if resultado:
            write_xy(ws, (current_col, current_row), "", wb_formats["cell_text"])  # numero
            current_col += 1
            write_xy(
                ws, (current_col, current_row), resultado["key"], wb_formats["cell_text"]
            )  # codigo
            current_col += 1
            write_xy(
                ws,
                (current_col, current_row),
                resultado["value"]["label"],
                wb_formats["cell_text"],
            )  # etiqueta

            current_row += 1


            for it in elementos:
                print(it)
                print(resultado["value"]["value"])
                current_col = 0
                resultado2 = next(
                    (item for item in resultado["value"]["value"] if item["key"] == it["key"]),
                    None,
                )

                if resultado2:
                    write_xy(
                        ws, (current_col, current_row), contador_items, wb_formats["cell_int"]
                    )  # numero
                    current_col += 1
                    write_xy(
                        ws, (current_col, current_row), resultado2["key"], wb_formats["cell_text"]
                    )  # codigo
                    current_col += 1
                    write_xy(
                        ws,
                        (current_col, current_row),
                        resultado2["value"]["label"],
                        wb_formats["cell_text"],
                    )  # etiqueta
                    current_col += 1
                    write_xy(
                        ws,
                        (current_col, current_row),
                        resultado2["value"]["unidad"],
                        wb_formats["cell_text"],
                    )  # unidad
                    current_col += 1
                    write_xy(
                        ws,
                        (current_col, current_row),
                        cargas_trabajo_programadas_anualmente.loc[it["key"], f"{anio}-{mes:02d}"],
                        wb_formats["cell_number"],
                    )  # programado
                    current_col += 1
                    write_xy(
                        ws,
                        (current_col, current_row),
                        cargas_trabajo_contratista.get(it["key"], 0),
                        wb_formats["cell_number"],
                    )  # ejecutado
                    current_col += 1

                    avance = calcular_avance(
                        programado=cargas_trabajo_programadas_anualmente.loc[it["key"], f"{anio}-{mes:02d}"],
                        ejecutado=cargas_trabajo_contratista.get(it["key"], 0),
                    )
                    write_xy(
                        ws,
                        (current_col, current_row),
                        avance,
                        wb_formats["cell_number_percent"],
                    )  # avance
                    current_col += 1

                    progresivas = (
                        formatear_progresiva(progresiva_inicio)
                        + " - "
                        + formatear_progresiva(progresiva_fin)
                    )
                    write_xy(
                        ws,
                        (current_col, current_row),
                        progresivas,
                        wb_formats["cell_text"],
                    )  # progresivas

                    contador_items += 1
                    current_row += 1

Valores entre 100 al 199 para MR!
{'key': 'MR101', 'value': 0.856666667}
[{'key': 'MR101', 'value': {'label': 'Limpieza de calzada', 'carga_trabajo': 0, 'unidad': 'Km'}}, {'key': 'MR102', 'value': {'label': 'Bacheo', 'carga_trabajo': 0, 'unidad': 'm2'}}, {'key': 'MR103', 'value': {'label': 'Desquinche', 'carga_trabajo': 0, 'unidad': 'm3'}}, {'key': 'MR104', 'value': {'label': 'Remoción de derrumbes', 'carga_trabajo': 0, 'unidad': 'm3'}}]
{'key': 'MR102', 'value': 0.0}
[{'key': 'MR101', 'value': {'label': 'Limpieza de calzada', 'carga_trabajo': 0, 'unidad': 'Km'}}, {'key': 'MR102', 'value': {'label': 'Bacheo', 'carga_trabajo': 0, 'unidad': 'm2'}}, {'key': 'MR103', 'value': {'label': 'Desquinche', 'carga_trabajo': 0, 'unidad': 'm3'}}, {'key': 'MR104', 'value': {'label': 'Remoción de derrumbes', 'carga_trabajo': 0, 'unidad': 'm3'}}]
{'key': 'MR103', 'value': 13.3325}
[{'key': 'MR101', 'value': {'label': 'Limpieza de calzada', 'carga_trabajo': 0, 'unidad': 'Km'}}, {'key': 'MR102', 'value':

In [58]:
print(current_row, current_col)

28 7


In [59]:
print(
    xl_range(
        table_start_row,
        table_start_col,
        table_start_row + current_row-1,
        table_start_col+current_col-1,
    ),
)

B2:H29


In [60]:
# Guardar archivo
wb.close()

In [61]:
ruta_archivo = Path(ruta_archivo).resolve()

In [62]:
# Aplicar bordes con Excel
excel = win32.Dispatch("Excel.Application")
excel.Visible = False

workbook = excel.Workbooks.Open(ruta_archivo)
worksheet = workbook.ActiveSheet

# Seleccionar tu rango
range_obj = worksheet.Range(
    worksheet.Cells(table_start_row + 1, table_start_col + 1),
    worksheet.Cells(table_start_row + current_row, table_start_col + current_col)
)

# Aplicar todos los bordes
range_obj.Borders.LineStyle = 1
range_obj.Borders.Weight = 2

workbook.Save()
workbook.Close()
excel.Quit()


### Imprimiendo una imagen a partir del excel

In [63]:
# 1. Carga el libro y la hoja

wb = load_workbook(ruta_archivo, data_only=True)
ws = wb.active  # o wb["NombreHoja"]

# 2. Determina el rango usado
min_row, min_col = ws.min_row, ws.min_column
max_row, max_col = ws.max_row, ws.max_column
rango = f"{get_column_letter(min_col)}{min_row}:{get_column_letter(max_col)}{max_row}"

# 3. Exporta a PNG
salida_png = f"cargas_trabajo.png"
ruta_salida_image= os.path.join(ruta_directorio, salida_png)
excel2img.export_img(ruta_archivo, ruta_salida_image, ws.title, rango)

print(f"Imagen generada: {salida_png}")

Imagen generada: cargas_trabajo.png
