In [5]:
import pandas as pd
import json
import re
from datetime import datetime
import ast # Para ast.literal_eval, más seguro que eval()
import os

In [6]:
# %%
# --- Bloque para definir rutas dinámicamente y ejecutar la conversión ---

# El notebook está en: .../QLAB_CHATBOT_CORRUPCION/procesamiento/src_json_optimizado/
notebook_dir = os.getcwd()

# Subimos dos niveles para llegar a la raíz del proyecto: QLAB_CHATBOT_CORRUPCION/
project_root = os.path.abspath(os.path.join(notebook_dir, '..', '..'))

# Nombres de los archivos
# ¡ASEGÚRATE QUE ESTE ES EL NOMBRE CORRECTO DE TU ARCHIVO CSV DE ENTRADA!
csv_input_filename = "AC_total.xlsx - Sheet1.csv" 
jsonl_output_filename = "salida_informes_consolidados_desde_csv.jsonl"

# Construir rutas completas
# Archivo CSV de entrada desde la carpeta 'data' del proyecto
csv_input_path = os.path.join(project_root, 'data', csv_input_filename)

# Archivo JSONL de salida a la carpeta 'output' del proyecto
jsonl_output_path = os.path.join(project_root, 'output', jsonl_output_filename)

# (Opcional) Imprime las rutas para verificar:
print(f"Ruta del archivo CSV de entrada: {csv_input_path}")
print(f"Ruta del archivo JSONL de salida: {jsonl_output_path}")

# Llamada a la función principal
convert_csv_reports_to_jsonl(csv_input_path, jsonl_output_path)
# --- Fin del bloque de ejecución ---

Ruta del archivo CSV de entrada: c:\Users\LENOVO\Documents\GitHub\qlab_chatbot_corrupcion\data\AC_total.xlsx - Sheet1.csv
Ruta del archivo JSONL de salida: c:\Users\LENOVO\Documents\GitHub\qlab_chatbot_corrupcion\output\salida_informes_consolidados_desde_csv.jsonl
Archivo JSONL de informes consolidados 'c:\Users\LENOVO\Documents\GitHub\qlab_chatbot_corrupcion\output\salida_informes_consolidados_desde_csv.jsonl' generado exitosamente.


In [7]:

def format_date_from_csv(date_val_str):
    """
    Formatea fechas desde strings de CSV a YYYY-MM-DD.
    """
    if pd.isna(date_val_str) or date_val_str == '':
        return None
    
    # Intentar varios formatos comunes
    formats_to_try = [
        '%Y-%m-%d', '%d/%m/%Y', '%m/%d/%Y',
        '%Y/%m/%d', '%d-%m-%Y', '%m-%d-%Y',
        '%Y%m%d' # AAAA MM DD
    ]
    
    for fmt in formats_to_try:
        try:
            return datetime.strptime(str(date_val_str).strip(), fmt).strftime('%Y-%m-%d')
        except ValueError:
            continue
    
    # Si ninguno de los formatos funciona, intentar con pd.to_datetime como último recurso
    try:
        dt_obj = pd.to_datetime(str(date_val_str).strip(), errors='coerce')
        if pd.notna(dt_obj):
            return dt_obj.strftime('%Y-%m-%d')
    except Exception:
        pass
        
    # print(f"Advertencia: No se pudo parsear la fecha '{date_val_str}'. Se devuelve como string.")
    return str(date_val_str).strip() # Devuelve el string original si todo falla

def clean_numero_informe(num_informe_str):
    if pd.isna(num_informe_str) or num_informe_str == '':
        return None
    num_informe_str = str(num_informe_str).strip()
    num_informe_str = re.sub(r"^(N\*|N'|N°|INFORME DE AUDITORÍA N\*\s*)", "", num_informe_str, flags=re.IGNORECASE).strip()
    num_informe_str = re.sub(r'\s*-\s*', '-', num_informe_str)
    num_informe_str = num_informe_str.replace("%20", " ")
    num_informe_str = re.sub(r'\s+', ' ', num_informe_str)
    num_informe_str = re.sub(r'\s*-\s*', '-', num_informe_str)
    
    match = re.search(r'(\d+-\d{4}(?:-\d+-\d+)?(?:-CG)?\/[A-Z\s]+-AC|\d+-\d{4}-\d-\d+)', num_informe_str, flags=re.IGNORECASE)
    if match:
        return match.group(1).strip().upper()
    
    match_simple = re.search(r'(\d+-\d{4})', num_informe_str)
    if match_simple:
        cleaned_simple_fallback = re.sub(r'[^\w\d\s\/-]', '', num_informe_str).strip()
        if re.match(r'\d+-\d{4}.*', cleaned_simple_fallback):
            return cleaned_simple_fallback.upper()
        return match_simple.group(1).strip().upper()
        
    return re.sub(r'[^\w\d\s\/-]', '', num_informe_str).strip().upper()

def create_report_id(numero_informe_limpio):
    if not numero_informe_limpio:
        return "ID_REPORTE_DESCONOCIDO"
    report_id = numero_informe_limpio.replace("/", "_")
    report_id = report_id.replace("-CG_", "_") 
    report_id = re.sub(r'_CG(?=_|$)', '', report_id)
    report_id = report_id.replace("-", "_")
    return report_id.upper()

def parse_responsabilidades_dict(resp_str):
    parsed_resp = {'civil': False, 'penal': False, 'admin_ent': False, 'admin_pas': False}
    if pd.isna(resp_str) or not isinstance(resp_str, str) or resp_str == '':
        return parsed_resp
    try:
        resp_dict_raw = ast.literal_eval(resp_str)
        if not isinstance(resp_dict_raw, dict): # Asegurarse que sea un diccionario
            return parsed_resp
            
        parsed_resp['civil'] = bool(resp_dict_raw.get('Civil') and 'X' in resp_dict_raw.get('Civil',[]))
        parsed_resp['penal'] = bool(resp_dict_raw.get('Penal') and 'X' in resp_dict_raw.get('Penal',[]))
        parsed_resp['admin_ent'] = bool(resp_dict_raw.get('Adm. ENT') and 'X' in resp_dict_raw.get('Adm. ENT',[]))
        
        pas_marked = bool(resp_dict_raw.get('Adm. PAS') and 'X' in resp_dict_raw.get('Adm. PAS',[]))
        admin_general_marked = bool(resp_dict_raw.get('Admin.') and 'X' in resp_dict_raw.get('Admin.',[])) # Para la columna 'Admin.'
        parsed_resp['admin_pas'] = pas_marked or admin_general_marked
        return parsed_resp
    except (ValueError, SyntaxError) as e:
        # print(f"Error parseando responsabilidades con ast.literal_eval '{resp_str}': {e}")
        return parsed_resp
    
def clean_and_convert_monto(monto_str, default_if_empty='0'):
    """Limpia un string de monto y lo convierte a float."""
    if pd.isna(monto_str) or monto_str == '':
        monto_str = default_if_empty
    
    cleaned_str = str(monto_str)
    # Eliminar "S/", "S.", "$", etc. y espacios al principio/final
    cleaned_str = re.sub(r'^(S\/|S\.|\$)\s*', '', cleaned_str).strip()
    # Eliminar comas usadas como separadores de miles
    cleaned_str = cleaned_str.replace(',', '')
    # Eliminar espacios internos (ej. "1 234.56")
    cleaned_str = cleaned_str.replace(' ', '')
    
    try:
        return float(cleaned_str)
    except ValueError:
        # print(f"Advertencia: No se pudo convertir el monto '{monto_str}' a float. Se devuelve None.")
        return None # O podrías devolver 0.0 si prefieres


def convert_csv_reports_to_jsonl(csv_path, jsonl_path):
    try:
        # Leer el CSV, tratando todas las columnas como string para manejo inicial
        # y usando na_filter=False para que las celdas vacías sean '' en lugar de NaN
        df = pd.read_csv(csv_path, dtype=str, keep_default_na=False, na_filter=False)
    except FileNotFoundError:
        print(f"Error: El archivo CSV '{csv_path}' no fue encontrado.")
        return
    except Exception as e:
        print(f"Error al leer el archivo CSV: {e}")
        return

    # --- Mapeo de Nombres de Columnas (AJUSTAR SEGÚN TU CSV) ---
    col_map = {
        'numero_informe_col': 'numero_informe',
        'titulo_asunto_col': 'titulo_informe/asunto',
        'objetivo_col': 'objetivo',
        'observaciones_col': 'observaciones',
        'recomendaciones_col': 'recomendaciones',
        'entidad_auditada_col': 'entidad_auditada',
        'fecha_emision_col': 'fecha_emision_informe',
        'unidad_emite_col': 'unidad_emite_informe',
        'distrito_col': 'distrito',
        'provincia_col': 'provincia',
        'region_col': 'region',
        'modalidad_col': 'modalidad',
        'periodo_inicio_col': 'inicio',
        'periodo_fin_col': 'final',
        'monto_auditado_col': 'monto_auditado', 
        'monto_examinado_col': 'monto_examinado',
        'responsabilidades_dict_str_col': 'columnas', # String del dict de resp.
        'dni_col': 'dni',
        'nombres_persona_col': 'personas',
        'resp_civil_col_individual': 'civil',
        'resp_penal_col_individual': 'penal',
        'resp_admin_col_individual': 'admin',
        'resp_admin_ent_col_individual': 'adm_ent',
        'resp_admin_pas_col_individual': 'adm_pas'
    }

    # Asegurarse de que las columnas existan, si no, no se podrá agrupar
    if col_map['numero_informe_col'] not in df.columns:
        print(f"Error: La columna '{col_map['numero_informe_col']}' (para numero_informe) no se encuentra en el CSV.")
        return

    df['numero_informe_limpio'] = df[col_map['numero_informe_col']].apply(clean_numero_informe)
    df['report_id_temp'] = df['numero_informe_limpio'].apply(create_report_id)

    df_valid_reports = df[df['numero_informe_limpio'].notna() & (df['numero_informe_limpio'] != '')].copy()
    
    if df_valid_reports.empty:
        print("No se encontraron informes válidos para procesar.")
        return

    # Definir get_col_val aquí una vez, antes del bucle de agrupación
    def get_col_val(row, abstract_col_key, default=''):
        actual_col_name = col_map.get(abstract_col_key)
        if actual_col_name:
            return str(row.get(actual_col_name, default)).strip()
        # Fallback si abstract_col_key no está en col_map (podría ser un nombre de columna directo)
        return str(row.get(abstract_col_key, default)).strip()

    grouped = df_valid_reports.groupby('report_id_temp')

    with open(jsonl_path, 'w', encoding='utf-8') as outfile:
        for report_id, group_df in grouped:
            if not report_id or report_id == "ID_REPORTE_DESCONOCIDO":
                continue
            
            first_row = group_df.iloc[0]

            # --- INICIO DE LÓGICA MODIFICADA PARA EXTRACCIÓN DE 'year' ---
            year = None
            # Usar el valor de 'numero_informe_limpio' de la primera fila del grupo.
            # Si 'numero_informe_limpio' es None o no es string, get tratará '' como no-string y el if no entrará.
            numero_informe_para_year = first_row.get('numero_informe_limpio', '')

            if numero_informe_para_year and isinstance(numero_informe_para_year, str):
                # Encontrar todas las secuencias de 4 dígitos delimitadas por no-dígitos o inicio/fin de cadena.
                potential_year_strings = re.findall(r'\b(\d{4})\b', numero_informe_para_year)
                
                valid_years_in_range = []
                for y_str in potential_year_strings:
                    try:
                        candidate_y = int(y_str)
                        if 2016 <= candidate_y <= 2022:
                            valid_years_in_range.append(candidate_y)
                    except ValueError:
                        continue # Ignorar si no es un número
                
                if len(valid_years_in_range) == 1:
                    # Exactamente un año válido encontrado en el rango
                    year = valid_years_in_range[0]
                elif len(valid_years_in_range) > 1:
                    # Múltiples años válidos encontrados. Esto es ambiguo según tu requisito.
                    print(f"Advertencia [ID: {report_id}]: Múltiples años válidos (2016-2022) encontrados en "
                          f"numero_informe_limpio '{numero_informe_para_year}': {valid_years_in_range}. "
                          f"No se ha asignado un año debido a la ambigüedad.")
                    # 'year' permanece None
                # else: len(valid_years_in_range) == 0
                    # No se encontraron años en el rango válido. 'year' permanece None.
            
            # Solo imprimir advertencia si no se pudo asignar un año Y hubo un intento (numero_informe no vacío)
            if year is None and numero_informe_para_year:
                # La advertencia de múltiples años ya se imprimió arriba, así que nos enfocamos en 0 años válidos.
                # (Se asume 'potential_year_strings' está definida si entramos al bloque if anterior)
                num_valid_years = len(valid_years_in_range) if 'valid_years_in_range' in locals() else 0
                if num_valid_years == 0:
                    all_candidates = potential_year_strings if 'potential_year_strings' in locals() else "ninguno (o no se procesó)"
                    print(f"Advertencia [ID: {report_id}]: No se pudo determinar un año único y válido (2016-2022) "
                          f"desde numero_informe_limpio: '{numero_informe_para_year}'. "
                          f"Candidatos de 4 dígitos encontrados: {all_candidates}.")
            # --- FIN DE LÓGICA MODIFICADA PARA EXTRACCIÓN DE 'year' ---
            
            personas_implicadas_list = []
            seen_dnis = set()
            for _, p_row in group_df.iterrows():
                dni = get_col_val(p_row, 'dni_col', '') 
                nombre = get_col_val(p_row, 'nombres_persona_col', '')
                if dni and dni not in seen_dnis: 
                    personas_implicadas_list.append({"dni": dni, "nombre": nombre})
                    seen_dnis.add(dni)

            informe_resp = {'civil': False, 'penal': False, 'admin_ent': False, 'admin_pas': False}
            consolidated_from_dict = False
            for _, r_row in group_df.iterrows():
                resp_dict_str = get_col_val(r_row, 'responsabilidades_dict_str_col', '')
                if resp_dict_str:
                    resp_fila_dict = parse_responsabilidades_dict(resp_dict_str)
                    for resp_type_key in informe_resp.keys(): 
                        informe_resp[resp_type_key] = informe_resp[resp_type_key] or resp_fila_dict.get(resp_type_key, False)
                    if any(resp_fila_dict.values()): 
                        consolidated_from_dict = True
            
            if not consolidated_from_dict: 
                for _, r_row in group_df.iterrows():
                    informe_resp['civil'] = informe_resp['civil'] or (get_col_val(r_row,'resp_civil_col_individual','0') == '1')
                    informe_resp['penal'] = informe_resp['penal'] or (get_col_val(r_row,'resp_penal_col_individual','0') == '1')
                    informe_resp['admin_ent'] = informe_resp['admin_ent'] or (get_col_val(r_row, 'resp_admin_ent_col_individual','0') == '1')
                    admin_general = (get_col_val(r_row,'resp_admin_col_individual','0') == '1')
                    admin_pas_individual = (get_col_val(r_row, 'resp_admin_pas_col_individual','0') == '1')
                    informe_resp['admin_pas'] = informe_resp['admin_pas'] or admin_pas_individual or admin_general

            monto_auditado_str = get_col_val(first_row, 'monto_auditado_col')
            monto_examinado_str = get_col_val(first_row, 'monto_examinado_col')

            monto_auditado_float = clean_and_convert_monto(monto_auditado_str)
            monto_examinado_float = clean_and_convert_monto(monto_examinado_str)

           
                       # --- PROCESAMIENTO ESPECÍFICO PARA REGIÓN Y PROVINCIA ---
            
            # REGIÓN
            region_val_raw = get_col_val(first_row, 'region_col')
            region_final = None # Valor por defecto

            if region_val_raw and isinstance(region_val_raw, str):
                # Primero, limpieza general: reemplazar \n y normalizar espacios
                cleaned_region = region_val_raw.replace("\n", " ").strip()
                cleaned_region = ' '.join(cleaned_region.split()) # Normaliza múltiples espacios a uno

                # Ahora, la lógica específica
                # Usamos re.IGNORECASE por si hay variaciones de mayúsculas/minúsculas
                if re.search(r"P\s*\.\s*C\s*\.\s*DEL\s+CALLAO", cleaned_region, re.IGNORECASE) or \
                   re.search(r"Prov(incia)?\s*(Constitucional)?\s*del\s+Callao", cleaned_region, re.IGNORECASE): # Añadir más patrones si es necesario
                    region_final = "CALLAO"
                else:
                    region_final = cleaned_region # Si no coincide con el patrón especial, usar el valor limpiado
            elif region_val_raw: # Si no es string pero tiene valor (ej. numérico), convertir a string
                region_final = str(region_val_raw)


            # PROVINCIA
            provincia_val_raw = get_col_val(first_row, 'provincia_col')
            provincia_final = None # Valor por defecto

            if provincia_val_raw and isinstance(provincia_val_raw, str):
                cleaned_provincia = provincia_val_raw.replace("\n", " ").strip()
                cleaned_provincia = ' '.join(cleaned_provincia.split())

                # Lógica similar para provincia si es necesario, o simplemente limpieza general
                # Ejemplo de regla específica para provincia también:
                if re.search(r"PROV\s*\.\s*CONST\s*\.\s*DEL\s+CALLAO", cleaned_provincia, re.IGNORECASE) or \
                   re.search(r"CALLAO", cleaned_provincia, re.IGNORECASE): # Si ya dice "CALLAO" o similar
                    provincia_final = "CALLAO"
                # Podrías tener otras transformaciones específicas para provincia aquí
                # Por ejemplo, si ves "LIMA METROPOLITANA" y quieres solo "LIMA"
                # elif "LIMA METROPOLITANA" in cleaned_provincia.upper():
                #     provincia_final = "LIMA"
                else:
                    provincia_final = cleaned_provincia
            elif provincia_val_raw:
                provincia_final = str(provincia_val_raw)
                
            # --- FIN DE PROCESAMIENTO ESPECÍFICO ---




            
            json_report_object = {
                "report_id": report_id,
                "numero_informe": first_row.get('numero_informe_limpio', None),
                "titulo_informe": get_col_val(first_row, 'titulo_asunto_col').replace("\n", " "),
                "entidad_auditada": get_col_val(first_row, 'entidad_auditada_col'),
                "region": region_final,
                "provincia": provincia_final,
                "distrito": get_col_val(first_row, 'distrito_col') or None,
                "year": year, 
                "periodo_inicio": format_date_from_csv(get_col_val(first_row, 'periodo_inicio_col')),
                "periodo_fin": format_date_from_csv(get_col_val(first_row, 'periodo_fin_col')),
                "monto_auditado": monto_auditado_float,
                "monto_examinado": monto_examinado_float,
                "modalidad": get_col_val(first_row, 'modalidad_col') or None,
                "fecha_emision": format_date_from_csv(get_col_val(first_row, 'fecha_emision_col')),
                "unidad_emite": get_col_val(first_row, 'unidad_emite_col') or None,
                "personas_implicadas_consolidado": personas_implicadas_list,
                "responsabilidades_consolidadas": informe_resp,
                "texto_objetivos_completo": get_col_val(first_row, 'objetivo_col'),
                "texto_observaciones_completo": get_col_val(first_row, 'observaciones_col'),
                "texto_recomendaciones_completo": get_col_val(first_row, 'recomendaciones_col')
            }
    
            outfile.write(json.dumps(json_report_object, ensure_ascii=False) + '\n')

    print(f"Archivo JSONL de informes consolidados '{jsonl_path}' generado exitosamente.")

In [8]:
convert_csv_reports_to_jsonl(csv_input_path, jsonl_output_path)

Archivo JSONL de informes consolidados 'c:\Users\LENOVO\Documents\GitHub\qlab_chatbot_corrupcion\output\salida_informes_consolidados_desde_csv.jsonl' generado exitosamente.
