# Importaciones

In [1]:
# Librer√≠as est√°ndar de Python
import os
import re
import time
import json
import datetime
from io import BytesIO
from functools import lru_cache
from collections import defaultdict, OrderedDict, Counter
from typing import List, Union

# Librer√≠as de terceros
import requests
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.colors import Normalize
import seaborn as sns
import numpy as np
import boto3
from matplotlib.colors import LinearSegmentedColormap, SymLogNorm
import textwrap

# Librer√≠as para manejo de documentos Word
from docx import Document
from docx.shared import Inches, Pt, RGBColor
from docx.text.paragraph import Paragraph
from docx.oxml import parse_xml, OxmlElement
from docx.oxml.ns import qn
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT, WD_ALIGN_PARAGRAPH
from docx.enum.table import WD_TABLE_ALIGNMENT, WD_ALIGN_VERTICAL
from docx.enum.style import WD_STYLE_TYPE

# Librerias propias
import openIA_analisis_conclusiones as OA

In [2]:
%matplotlib inline
plt.rcParams['font.family'] = 'Segoe UI Emoji'

# Conexion y descarga de Query desde Athena

In [3]:
def lista_para_analizar(proyecto=None, instituciones=None, grade=None, career=None, educacion=None, grade_section=None, genero=None, etario=None):
    '''	Genera una lista que contiene los elementos del dataframe que se van a analizar y la cuales deberan ser ingresadas por el usuario '''
    lista = []
    if proyecto is True:
        lista.append('project_id')
    if instituciones is True:
        lista.append('educative_institution')
    if grade is True:
        lista.append('grade')
    if career is True:
        lista.append('career')
    if educacion is True:
        lista.append('educational_level')
    if grade_section is True:
        lista.append('grade_section')
    if genero is True:
        lista.append('genero')
    if etario is True:
        lista.append('rango_etario')
    return lista

# Diccionario de mapeo
mapeo_variables = {
    'project_id': 'Proyecto ID',
    'educative_institution': 'Instituci√≥n Educativa',
    'grade': 'Grado',
    'career': 'Carrera',
    'educational_level': 'Nivel Educativo',
    'grade_section': 'Secci√≥n',
    'genero': 'G√©nero',
    'rango_etario': 'Rango Etario',
}

# Variables a cambiar

In [4]:
# Filtros hacia la query
project_id = "74" # 72,73,74 (en el front se deberia mostrar un lista de los proyectos)
tipo_test = 'evs' # (En el front se deberia mostrar una lista de los tipos de test)
IA = True # si esto se pone en true el informe demora unos 20min

lista_graficos=lista_para_analizar(
    proyecto=None,
    instituciones=True,
    grade=True,
    career=None,
    educacion=None,
    grade_section=None,
    genero=True,
    etario=None)

# Codigo

In [5]:


# Definir el filtro de tipo_test seg√∫n el valor de la variable
test = tipo_test.lower()  # Aseguramos consistencia en min√∫sculas
if test == "evs":
    filtro_tipo_test = "'cuestionario de entrada', 'cuestionario de salida'"
elif test == "evm":
    filtro_tipo_test = "'cuestionario de entrada', 'cuestionario medio'"
elif test == "mvs":
    filtro_tipo_test = "'cuestionario medio', 'cuestionario de salida'"
else:
    filtro_tipo_test = f"'{tipo_test}'"

In [6]:
# Capturar Tiempo
start_time_consulta= time.time()

In [7]:
# Crear los clientes de Athena y S3
athena = boto3.client('athena', region_name='us-east-1')
s3 = boto3.client('s3', region_name='us-east-1')
bucket_output = 'aws-athena-query-results-us-east-1-158862062418'

# Generamos un path √∫nico para la salida en Parquet, usando el timestamp
timestamp = int(time.time())
parquet_output_path = f's3://{bucket_output}/python_ale/{timestamp}/'

query = f''' 
WITH 
activos_por_proyecto AS (
  select
    ee.b2b_project_id,
    count(distinct ee.student_id) as inscriptos_activos
  from
    enrollment_enrolment ee
    left join projects p on (p.id = ee.b2b_project_id)
  where p.id in ({project_id}) and ee.state <> 'cancel' and ee.state <> 'inactive'
  group by 1
),

activos_por_institucion AS (
  select
    ee.b2b_project_id,
    ee.institution,
    count(distinct ee.student_id) as inscriptos_activos
  from
    enrollment_enrolment ee
    
  where ee.b2b_project_id in ({project_id}) and ee.state <> 'cancel' and ee.state <> 'inactive'
  group by 1,2
),

activos_por_grado AS (
  select
    ee.b2b_project_id,
    ee.grade,
    count(distinct ee.student_id) as inscriptos_activos
  from
    enrollment_enrolment ee
    
  where ee.b2b_project_id in ({project_id}) and ee.state <> 'cancel' and ee.state <> 'inactive'
  group by 1,2
),

BASE AS (
   SELECT DISTINCT
     me.moodle_id moodle_user_id
   --, 'No aplica' tipo_estandarizacion
   , 'Moodle' origen
   --, CONCAT(CAST(ss.id AS varchar), '-', CAST(p.id AS varchar)) identificador_unico
   --, me.role moodle_user_role
   , ss.id student_id
   --, CONCAT(ss.first_name, ' ', ss.last_name) student_name
   , ee.institution educative_institution
   , ee.grade grade
   , concat(ee.grade,'+', ee.group_section) grade_section
   , ee.career career
   , ee.educational_level educational_level
   , DATE_DIFF('year', ss.birthdate, p.operative_start_date) age 
   , ss.gender genero
   , ipp.inscriptos_activos activos_por_proyecto
   , ipi.inscriptos_activos activos_por_educative_institution
   , ipg.inscriptos_activos activos_por_grade
   , p.id project_id
   --, p.type project_type
   , p.name project_name
   --, 'Grupo CTC' tipo_grupo
   , ce.course_id moodle_course_id
   , rr.id room_id
   --, rr.name room_name
   , ce.unique_id evaluation_unique_id
   , ce.name evaluation_name
   --, cer.attempt_time_finish response_time_finished
   --, cer.attempt_state attempt_state
   --, cer.attempt_id
   , ceq.name question_name
   , ceq.tag tag_question
   , ceq.question_id question_id
   , ceq.question_name question
   , cer.answer answer
   , cer.right_answer right_answer
   , ce.tag AS tipo_test

   FROM
   moodle_enrollment me
   LEFT JOIN moodle_course_evaluations ce ON (me.course_id = ce.course_id)
   LEFT JOIN moodle_course_evaluation_questions ceq ON (ce.unique_id = ceq.unique_id) AND ((ceq.question_name <> 'label') OR (ceq.question_name IS NULL))
   LEFT JOIN moodle_course_evaluation_responses cer ON ((cer.unique_id = ceq.unique_id) AND (ceq.question_id = cer.question_id) AND (me.moodle_id = cer.moodle_id) AND (ce.type <> 'assign')  AND (cer.attempt_time_finish IS NOT NULL))
   INNER JOIN room_room rr ON (rr.course_mdl_id = me.course_id)
   LEFT JOIN student_student ss ON (ss.user_mdl_id = me.moodle_id)
   LEFT JOIN room_room_students rrs ON ((rrs.student_id = ss.id) AND (rrs.room_id = rr.id))
   LEFT JOIN enrollment_enrolment ee ON (((ee.group_id = rr.group_id) OR (ee.room_id = rr.id)) AND (ee.student_id = ss.id) AND (ee.state <> 'cancel') AND (ee.state <> 'inactive'))
   LEFT JOIN projects p ON (p.id = ee.b2b_project_id)
   left JOIN activos_por_proyecto ipp ON (ipp.b2b_project_id = p.id)
   left join activos_por_institucion ipi ON (ipi.institution = ee.institution and ipi.b2b_project_id=ee.b2b_project_id)
   left join activos_por_grado ipg ON (ipg.grade= ee.grade and ipg.b2b_project_id=ee.b2b_project_id)

   WHERE (p.id in ({project_id}) and (me.role = 'student'))

   ) 
SELECT
  *
FROM
  BASE b
WHERE 
(
  (b.answer IS NOT NULL) AND (trim(BOTH FROM b.answer) <> '') AND 
  b.project_id in ({project_id}) AND b.tipo_test IN ({filtro_tipo_test})
  
)

'''

# Query para guardar el resultado en un archivo Parquet
ctas_query = f"""
CREATE TABLE python_table_{timestamp}
WITH (
  format = 'PARQUET',
  external_location = '{parquet_output_path}',
  write_compression = 'SNAPPY'
) AS
{query}
"""

# Ejecutar la consulta CTAS
response = athena.start_query_execution(
    QueryString=ctas_query,
    QueryExecutionContext={'Database': 'datalake'},
    ResultConfiguration={'OutputLocation': f's3://{bucket_output}/'}
)

# Obtener el ID de ejecuci√≥n y esperar a que termine
query_execution_id = response['QueryExecutionId']
while True:
    result = athena.get_query_execution(QueryExecutionId=query_execution_id)
    state = result['QueryExecution']['Status']['State']
    if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
        break
    time.sleep(2)

if state == 'FAILED':
    print(result['QueryExecution']['Status'].get('StateChangeReason'))

if state == 'SUCCEEDED':
    print("‚úÖ Consulta CTAS completada.")
    print(f"üîó Resultados guardados en: {parquet_output_path}")
    df = pd.read_parquet(parquet_output_path, engine='pyarrow')
    print("‚úÖ Datos cargados en el DataFrame.")
    
    # Eliminar los archivos Parquet de S3 despu√©s de cargarlos en el DataFrame
    try:
        # Extraer el prefijo del path S3
        prefix = f'python_ale/{timestamp}er/'
        
        # Listar todos los objetos en el prefijo
        objects = s3.list_objects_v2(Bucket=bucket_output, Prefix=prefix)
        
        if 'Contents' in objects:
            # Crear lista de objetos a borrar
            delete_keys = [{'Key': obj['Key']} for obj in objects['Contents']]
            
            # Borrar los objetos
            s3.delete_objects(
                Bucket=bucket_output,
                Delete={'Objects': delete_keys}
            )
            print(f"üóëÔ∏è Se eliminaron {len(delete_keys)} archivos Parquet de S3")
    except Exception as e:
        print(f"‚ö†Ô∏è Error al eliminar archivos de S3: {str(e)}")
    

    # 3Ô∏è‚É£ Eliminar la tabla en Athena
    try:
        drop_query = f"DROP TABLE IF EXISTS datalake.python_table_{timestamp};"
        drop_resp = athena.start_query_execution(
            QueryString=drop_query,
            QueryExecutionContext={'Database': 'datalake'},
            ResultConfiguration={'OutputLocation': f's3://{bucket_output}/'}  # Athena exige un OutputLocation aunque no genere archivos
        )
        drop_qid = drop_resp['QueryExecutionId']
        # Esperar a que se complete el DROP
        while True:
            drop_status = athena.get_query_execution(QueryExecutionId=drop_qid)['QueryExecution']['Status']['State']
            if drop_status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
                break
            time.sleep(1)
        if drop_status == 'SUCCEEDED':
            print(f"üóëÔ∏è Tabla python_table_{timestamp} eliminada de Athena.")
        else:
            print(f"‚ö†Ô∏è Fall√≥ el DROP TABLE con estado: {drop_status}")
    except Exception as e:
        print(f"‚ö†Ô∏è Error al eliminar la tabla en Athena: {e}")

else:
    raise Exception(result['QueryExecution']['Status'].get('StateChangeReason'))

‚úÖ Consulta CTAS completada.
üîó Resultados guardados en: s3://aws-athena-query-results-us-east-1-158862062418/python_ale/1764280244/
‚úÖ Datos cargados en el DataFrame.
üóëÔ∏è Tabla python_table_1764280244 eliminada de Athena.


In [8]:
end_time_consulta= time.time()
time_consulta= end_time_consulta - start_time_consulta

# Funciones

## Para normalizar el df

In [9]:
def limpiar_texto(texto):

    if pd.isna(texto):
        return ''
    # Reemplaza tabuladores, retornos de carro y saltos de l√≠nea por un espacio
    texto = re.sub(r'[\t\r\n]', ' ', str(texto))
    # Reemplaza m√∫ltiples espacios por uno solo
    texto = re.sub(r'\s+', ' ', texto)
    return texto.strip()

def ajustar_titulo(titulo, largo_primera_linea=44, largo_otras_lineas=40, max_lineas=2):

    # Si solo se permite una l√≠nea, truncar y rellenar
    if max_lineas == 1:
        if len(titulo) > largo_primera_linea:
            return titulo[:largo_primera_linea - 3] + "..."
        return titulo.center(largo_primera_linea)

    # Separar la primera l√≠nea con el largo espec√≠fico
    if len(titulo) <= largo_primera_linea:
        linea1 = titulo.center(largo_primera_linea)
        return linea1
    else:
        linea1 = titulo[:largo_primera_linea]
        resto = titulo[largo_primera_linea:]

    # Envolver el resto en l√≠neas m√°s cortas
    lineas_extra = textwrap.wrap(resto.strip(), width=largo_otras_lineas)

    # Limitar la cantidad de l√≠neas totales
    lineas_extra = lineas_extra[:max_lineas - 1]
    
    # Si hay m√°s texto que lo permitido, truncar la √∫ltima l√≠nea
    if len(lineas_extra) == (max_lineas - 1) and len(titulo) > len(linea1) + sum(len(l) for l in lineas_extra):
        if len(lineas_extra[-1]) > largo_otras_lineas - 3:
            lineas_extra[-1] = lineas_extra[-1][:largo_otras_lineas - 3] + "..."

    # Rellenar las l√≠neas
    linea1 = linea1.center(largo_primera_linea)
    lineas_extra = [l.center(largo_otras_lineas) for l in lineas_extra]

    return "\n".join([linea1] + lineas_extra)

def ajustar_etiquetas(texto, max_length=30):
    """Ajusta etiquetas largas dividi√©ndolas en l√≠neas y truncando si es necesario."""
    if len(texto) <= max_length:
        return texto
    # Dividir en l√≠neas de m√°ximo 30 caracteres
    lineas = textwrap.fill(texto, width=max_length, max_lines=2, placeholder="...").split('\n')
    return '\n'.join(lineas)

def extract_number_text(answer: str):
    '''Esta funcion separa la parte numerica y la string'''
    _pattern = re.compile(r'^\s*(\d+)(?:[\.\)\-\:\s]+)(.*)$')
    answer = str(answer).strip()
    m = _pattern.match(answer)
    if m:
        numero = m.group(1)
        texto  = m.group(2).strip()
        return numero, texto
    # si no coincide, devolvemos None y el texto completo
    return None, answer

def normalize_answer(row):
    if pd.notnull(row['Number']):
        question = row['question']
        number = row['Number']
        standard_text = mapping.get((question, number), row['Text'])
        return f"{number}. {standard_text}"
    return row['answer']

def normalize_question(row):
    question = row['question']
    tag = row['tag_question']
    standard_text = mapping.get(tag, question)
    return standard_text

def clasificar_tipo_pregunta(respuesta):
    if pd.isna(respuesta) or not isinstance(respuesta, str):
        return "Abierta"
    
    # 1) Limpiar espacios
    respuesta_limpia = respuesta.strip()
    baja = respuesta_limpia.lower()
    
    # 2) Verdadero/Falso como categ√≥ricas
    if baja in {"verdadero", "falso"}:
        return "Categorica"
    
    # 3) Si no hay ';', es abierta o categ√≥rica simple seg√∫n prefijo
    if ';' not in respuesta_limpia:
        if re.match(r'^(\d+\.\s*|[A-Z][\-\)])', respuesta_limpia):
            return "Categorica"
        else:
            return "Abierta"
    
    # 4) Con ';', buscamos MULTISELECCION ESTRUCTURADA:
    patrones_multiseleccion = [
        r'\b\d+\.\s+[^\;]+(?:;\s*\d+\.\s+[^\;]+)+',     # 1. ...; 2. ...
        r'\b[A-Z]\)\s+[^\;]+(?:;\s*[A-Z]\)\s+[^\;]+)+', # A) ...; B) ...
        r'\b[A-Z]-\s+[^\;]+(?:;\s*[A-Z]-\s+[^\;]+)+'    # A- ...; B- ...
    ]
    for patron in patrones_multiseleccion:
        if re.search(patron, respuesta_limpia):
            return "Categorica Multiseleccion"
    
    # 5) Si arranca como categ√≥rica pero no multiselecci√≥n:
    if re.match(r'^(\d+\.\s*|[A-Z][\-\)])', respuesta_limpia):
        return "Categorica"
    
    # 6) Lo que quede: abierta
    return "Abierta"

def consolidar_tipo_pregunta(series_tipos):
    """
    Serie de valores: ["Abierta", "Categorica", "Categorica Multiseleccion"].
    Devuelve:
      - "Categorica Multiseleccion" si al menos una respuesta es multiselecci√≥n.
      - "Categorica" si todas las respuestas son categ√≥ricas.
      - "Abierta" en cualquier otro caso (incluye mixtas o que contengan abiertas).
    """
    tipos = set(series_tipos)
    if "Categorica Multiseleccion" in tipos:
        return "Categorica Multiseleccion"
    elif tipos == {"Categorica"}:
        return "Categorica"
    else:
        return "Abierta"

def extraer_numero(texto):
    """
    Intenta extraer un n√∫mero entero al inicio del texto, justo antes de un punto.
    Retorna el n√∫mero si se encuentra o None si no hay coincidencia.
    """
    # Se utiliza una expresi√≥n regular que busca d√≠gitos seguidos de un punto al inicio del string
    m = re.match(r'\s*(\d+)\.', str(texto))
    if m:
        return int(m.group(1))
    return None

def un_tag_una_pregunta(dataframe: pd.DataFrame):
    '''Crear un diccionario con la primera pregunta por cada tag_question '''

    primera_pregunta_por_tag = (
        dataframe.groupby('tag_question')['question']
        .first()
        .to_dict()
    )

    # Reemplazar todas las preguntas con la seleccionada para ese tag_question
    dataframe['question'] = dataframe['tag_question'].map(primera_pregunta_por_tag)

    return dataframe

def renombrar_tipo_test(tipo_test):
    renombrar_test = {
        'cuestionario de entrada': 'Cuestionario de entrada',
        'cuestionario medio': 'Cuestionario medio',
        'cuestionario de salida': 'Cuestionario de salida',
        'examen de casos inicial': 'Ex. casos inicial',
        'examen de casos final': 'Ex. casos final',
        'examen final': 'Ex. final',
        'cuestionario de satisfacci√≥n modular': 'Satisfacci√≥n Modular',
        'cuestionario de satisfacci√≥n final': 'Satisfacci√≥n Final',
    }
    return renombrar_test.get(tipo_test, 'Otro')  # Devuelve Otro si no est√° en el diccionario

def ordenar_tipo_test(tipo_test):
    orden_test = {
        'cuestionario de entrada': 1,
        'cuestionario medio': 2,
        'cuestionario de salida': 3,
        'examen de casos inicial': 4,
        'examen de casos final': 5,
        'examen final': 6,
        'cuestionario de satisfacci√≥n modular': 7,
        'cuestionario de satisfacci√≥n final': 8,
    }
    return orden_test.get(tipo_test, 6)  # Devuelve 6 si no est√° en el diccionario

def ordenar_tag(tag_question):
    orden_tag = {
        "nombre": 1,
        "genero": 2,
        "correo_personal": 3,
        "celular": 4,
        "celular_de": 5,
        "tipo_documento": 6,
        "documento": 7,
        "nacimiento": 8,
        "etnia": 9,
        "nacionalidad": 10,
        "estrato_socioeconomico": 11,
        "nivel_educativo_familia": 12,
        "trabajar_ayuda_casa": 13,
        "cuidar_ayuda_casa": 14,
        "interes_tecnologia": 15,
        "dispositivos": 16,
        "forma_conectividad": 17,
        "uso_tecnologia_dia_a_dia": 18,
        "uso_tecnologia_a_futuro": 19,
        "planes_futuro": 20,
        "abandonar_estudios": 21,
        "motivo_abandonar_estudios": 22,
        "programas_educativos": 23,
        "prioridad_programas_educativos": 24,
        "trabajo": 25,
        "intereses_futuro": 26,
        "apoyo_financiero": 27,
        "financiamiento": 28,
        "cv": 29,
        "cv_experiencia_laboral": 30,
        "empresas": 31,
        "sitios_busqueda_laboral": 32,
        "retos_mercado_laboral": 33,
        "actividades_ultimo_anio": 34,
        "fuentes_informacion": 35,
        "fuente_informacion_otros": 36,
        "motivacion_familia": 37,
        "motivacion_familia_quienes": 38,
        "apoyo_econ_familia_estudio": 39,
        "actividades_profesores": 40,
        "apoyo_metas_prof_familia": 41,
        "ayuda_familiar_trabajos": 42,
        "motivacion_familiar_trabajo": 43,
        "motivacion_familiar_emprender": 44,
        "motivacion_profesores_metas": 45,
        # Competencias
        'innovacion':46, 
        'analisis':47,
        'critico':48,
        'comunicacion':49,
        'autogestion':50,
        'equipo':51,
    }
    return orden_tag.get(tag_question, 52)  # Devuelve 52 si no est√° en el diccionario


In [10]:
def resumen_por_cluster(df, cluster): 
    '''grade | educative_institution'''
    df_cluster = (
        df.groupby(['project_id', 'project_name', cluster, 'tipo_test'])
        .agg(
            Activos=(f'activos_por_{cluster}', 'max'),  # M√°ximo de inscritos por proyecto
            Respuestas=('student_id', 'nunique')
        )
        .reset_index()
    )

    # Calcular el porcentaje de respuestas por tipo de test
    df_cluster['% Respuestas del'] = round((df_cluster['Respuestas'] / df_cluster['Activos']) * 100, 0)

    for p in df.project_id.unique(): 
        
        tabla_cluster_project=df_cluster[df_cluster['project_id']==p]
        pname=tabla_cluster_project['project_name'].unique()[0]
        # Crear una tabla pivotada para mostrar los datos por tipo de test
        df_cluster_pivot = tabla_cluster_project.pivot_table(
            index=[cluster, 'Activos'],
            columns='tipo_test',
            values=['% Respuestas del'],
            aggfunc='first'
        ).sort_values('Activos', ascending=False).reset_index()

        # Aplanar los nombres de las columnas
        df_cluster_pivot.columns = [' '.join(col).strip() if isinstance(col, tuple) else col.lower() for col in df_cluster_pivot.columns]

        # Reemplazar NaN con 0 antes de convertir a entero
        
        df_cluster_pivot = df_cluster_pivot.fillna(0)

        for c in df_cluster_pivot.columns:
            if df_cluster_pivot[c].dtype == 'float64':  # Verificar si la columna es de tipo float
                # Primero redondear (para porcentajes) y luego convertir a entero
                df_cluster_pivot[c] = df_cluster_pivot[c].round().astype(int)

            if '%' in c: 
                df_cluster_pivot[c] = df_cluster_pivot[c].astype(str)
                df_cluster_pivot[c] = df_cluster_pivot[c] + '%'
                
    return df_cluster_pivot

In [11]:
def no_aplico2():
    from difflib import get_close_matches

    # Desde aca en adelante es el c√≥digo para hacer hacer el match de tag y pregunta
    mapping_df = pd.read_excel(r'C:\Users\boatt\Downloads\Lista de tag.xlsx')

    # 1) Cargar el DataFrame y la lista de preguntas
    df_question=df[['question']].drop_duplicates()
    df_question.reset_index(drop=True, inplace=True)
    mapping_df = pd.read_excel(r'C:\Users\boatt\Downloads\Lista de tag.xlsx')

    # 2) Funci√≥n de fuzzy matching
    def match_question(q, choices, cutoff=0.6):
        """
        Devuelve la mejor coincidencia de 'q' dentro de la lista 'choices',
        si la similitud es >= cutoff; si no, devuelve None.
        """
        matches = get_close_matches(q, choices, n=1, cutoff=cutoff)
        return matches[0] if matches else None

    # 3) Aplicar el matching y hacer merge
    df_mapping = mapping_df[['# final', 'seccion', 'tag', 'dimension', 'Pregunta final']].rename(columns={'Pregunta final':'question_match'})

    # Para cada pregunta buscamos la ‚ÄúPregunta final‚Äù m√°s parecida
    df_question['question_match'] = df_question['question'].apply(lambda x: match_question(x, mapping_df['Pregunta final']))

    # Hacemos el merge por la columna auxiliar question_match
    df_question = df_question.merge(df_mapping, on='question_match', how='left')
    df_question.rename(columns={'# final':'orden'}, inplace=True)

    df=df.merge(df_question, on='question', how='left')

## Para trabajar el Word

In [12]:
# Crear un nuevo documento
doc = Document()
# Configurar p√°gina en tama√±o A4 (21 x 29.7 cm = 8.27 x 11.69 inches)
section = doc.sections[0]
section.page_height = Inches(11.69)
section.page_width = Inches(8.27)

# M√°rgenes (por ejemplo, 1 pulgada a cada lado)
section.top_margin = Inches(1)
section.bottom_margin = Inches(1)
section.left_margin = Inches(1)
section.right_margin = Inches(1)


def agregar_titulo(doc, texto, nivel):
    # Paleta de colores corporativos sobrios
    COLOR_TITULO = RGBColor(0x2E, 0x3F, 0x5F)  # Azul marino oscuro
    COLOR_SUBTITULO = RGBColor(0x4F, 0x4F, 0x4F)  # Gris oscuro

    if nivel == 1:
        # T√≠tulo principal - Nivel 1
        titulo = doc.add_heading(level=1)
        run = titulo.add_run(texto.upper())
        run.font.name = 'Lora'
        run.font.size = Pt(14)
        run.font.bold = True
        run.font.color.rgb = COLOR_TITULO
        titulo.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
        titulo.paragraph_format.space_before = Pt(18)
        titulo.paragraph_format.space_after = Pt(12)
        
        # Agregar l√≠nea decorativa inferior
        p = titulo._element
        pPr = p.get_or_add_pPr()
        pBdr = OxmlElement('w:pBdr')
        pPr.append(pBdr)
        bottom = OxmlElement('w:bottom')
        bottom.set(qn('w:val'), 'single')
        bottom.set(qn('w:sz'), '8')
        bottom.set(qn('w:space'), '1')
        bottom.set(qn('w:color'), '2E3F5F')
        pBdr.append(bottom)

    elif nivel == 2:
        # Subt√≠tulo importante - Nivel 2
        titulo = doc.add_heading(level=2)
        run = titulo.add_run(texto)
        run.font.name = 'Lora'
        run.font.size = Pt(12)
        run.font.bold = True
        run.font.color.rgb = COLOR_SUBTITULO
        titulo.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
        titulo.paragraph_format.space_before = Pt(14)
        titulo.paragraph_format.space_after = Pt(8)
        
        # Subrayado decorativo
        p = titulo._element
        pPr = p.get_or_add_pPr()
        pBdr = OxmlElement('w:pBdr')
        pPr.append(pBdr)
        bottom = OxmlElement('w:bottom')
        bottom.set(qn('w:val'), 'single')
        bottom.set(qn('w:sz'), '6')
        bottom.set(qn('w:space'), '1')
        bottom.set(qn('w:color'), 'D3D3D3')
        pBdr.append(bottom)

    elif nivel == 3:
        # Subt√≠tulo secundario - Nivel 3
        titulo = doc.add_heading(level=3)
        run = titulo.add_run(texto)
        run.font.name = 'Lora'
        run.font.size = Pt(11)
        run.font.color.rgb = COLOR_SUBTITULO
        run.font.italic = True
        titulo.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
        titulo.paragraph_format.space_before = Pt(10)
        titulo.paragraph_format.space_after = Pt(4)

    else:
        # Para niveles inferiores
        parrafo = doc.add_paragraph(texto)
        parrafo.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
        run = parrafo.runs[0]
        run.font.name = 'Segoe UI Light'
        run.font.size = Pt(8)
        run.font.underline = True
        run.font.bold = True
        '''
        titulo = doc.add_heading(level=nivel)
        run = titulo.add_run(texto)
        run.font.name = 'Lora'
        run.font.size = Pt(9)
        run.font.color.rgb = COLOR_SUBTITULO
        titulo.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
        titulo.paragraph_format.space_before = Pt(6)
        titulo.paragraph_format.space_after = Pt(2)
        '''

def agregar_parrafo(doc, texto):
    parrafo = doc.add_paragraph(texto)
    parrafo.alignment = WD_PARAGRAPH_ALIGNMENT.JUSTIFY
    run = parrafo.runs[0]
    run.font.name = 'Segoe UI Light'
    run.font.size = Pt(8)

def insertar_figura(doc, figura, titulo=None, pie=None):
    if titulo:  # Solo agrega t√≠tulo si se proporciona
        agregar_titulo(doc, titulo, 3)
    imagen_stream = BytesIO()
    figura.savefig(imagen_stream, format='png', bbox_inches='tight')
    imagen_stream.seek(0)
    # Insertar imagen centrada
    p = doc.add_paragraph()
    run = p.add_run()
    run.add_picture(imagen_stream, width=Inches(5.5))
    p.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
    imagen_stream.close()
    # Insertar pie de gr√°fico si se proporciona
    if pie:
        pie_p = doc.add_paragraph(pie)
        pie_p.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
        run = pie_p.runs[0]
        run.font.name = 'Segoe UI Light'
        run.font.size = Pt(6)
        run.font.bold = True
        run.font.italic = True

def set_cell_width(cell, width_inches):
    """
    Establece el ancho de una celda en pulgadas.
    """
    width_twips = int(width_inches * 1440)
    cell.width = Inches(width_inches)
    tc = cell._tc
    tcPr = tc.get_or_add_tcPr()
    
    # Eliminar cualquier w:tcW anterior
    for child in tcPr.findall(qn('w:tcW')):
        tcPr.remove(child)

    # Crear nuevo elemento de ancho
    tcW = OxmlElement('w:tcW')
    tcW.set(qn('w:w'), str(width_twips))
    tcW.set(qn('w:type'), 'dxa')
    tcPr.append(tcW)

def insertar_tabla(doc, df, titulo=None):
    if titulo:
        agregar_titulo(doc, titulo, 3)

    tabla = doc.add_table(rows=1, cols=len(df.columns))
    tabla.style = 'Table Grid'
    tabla.alignment = WD_TABLE_ALIGNMENT.CENTER

    ancho_total = 6.0
    ancho_columna = ancho_total / len(df.columns)

    # Encabezados
    hdr_cells = tabla.rows[0].cells
    for i, col_name in enumerate(df.columns):
        cell = hdr_cells[i]
        cell.text = str(col_name)
        run = cell.paragraphs[0].runs[0]
        run.font.bold = True
        run.font.size = Pt(6.5)
        run.font.name = 'Segoe UI Light'
        set_cell_width(cell, ancho_columna)
        # centrar
        cell.paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER
        cell.vertical_alignment      = WD_ALIGN_VERTICAL.CENTER

    # Filas de datos
    for _, row in df.iterrows():
        row_cells = tabla.add_row().cells
        for i, value in enumerate(row):
            cell = row_cells[i]
            cell.text = str(value)
            run = cell.paragraphs[0].runs[0]
            run.font.size = Pt(7)
            set_cell_width(cell, ancho_columna)
            # centrar
            cell.paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER
            cell.vertical_alignment      = WD_ALIGN_VERTICAL.CENTER

def insertar_tabla_con_merge(doc, df, titulo=None, group_cols=None):
    if titulo:
        agregar_titulo(doc, titulo, 3)

    tabla = doc.add_table(rows=1, cols=len(df.columns))
    tabla.style = 'Table Grid'
    tabla.alignment = WD_TABLE_ALIGNMENT.CENTER

    ancho_total = 6.0
    ancho_columna = ancho_total / len(df.columns)

    # Encabezados
    hdr_cells = tabla.rows[0].cells
    for i, col in enumerate(df.columns):
        cell = hdr_cells[i]
        cell.text = str(col)
        run = cell.paragraphs[0].runs[0]
        run.font.bold = True
        run.font.size = Pt(6.5)
        run.font.name = 'Segoe UI Light'
        # Anchos
        set_cell_width(cell, ancho_columna)
        # Centrado horizontal y vertical
        cell.paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER
        cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER

    # Filas de datos
    for _, row in df.iterrows():
        row_cells = tabla.add_row().cells
        for i, val in enumerate(row):
            cell = row_cells[i]
            cell.text = str(val)
            run = cell.paragraphs[0].runs[0]
            run.font.size = Pt(7)
            set_cell_width(cell, ancho_columna)
            # Centrado horizontal y vertical
            cell.paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER
            cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER

    # Merge de grupos (igual que antes)‚Ä¶
    if group_cols:
        col2idx = {col: idx for idx, col in enumerate(df.columns)}
        sizes = OrderedDict()
        prev_key = None
        for key_vals in df[group_cols].itertuples(index=False, name=None):
            if key_vals == prev_key:
                sizes[key_vals] += 1
            else:
                sizes[key_vals] = 1
                prev_key = key_vals

        current_row = 1
        for key_vals, size in sizes.items():
            if size > 1:
                for col in group_cols:
                    c_idx = col2idx[col]
                    start = tabla.cell(current_row, c_idx)
                    end   = tabla.cell(current_row + size - 1, c_idx)
                    # Vaciar intermedias y merge
                    for r in range(current_row + 1, current_row + size):
                        tabla.cell(r, c_idx).text = ''
                    start.merge(end)
                    # Aseguramos que la celda fusionada mantenga el centrado
                    start.paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER
                    start.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
            current_row += size

    return tabla

def insertar_salto_pagina(doc):
    doc.add_page_break()

def agregar_vi√±etas(doc, items, nivel=1, espacio_antes=Pt(4), espacio_despues=Pt(4)):
    """
    Inserta una lista usando guiones '-' como vi√±etas.

    Par√°metros
    ----------
    doc : Document
        Objeto python-docx Document.
    items : list de str
        Cada cadena ser√° un √≠tem de la lista.
    nivel : int, opcional (por defecto=1)
        Nivel de sangr√≠a (1 = vi√±etas principales, 2 = sub-vi√±etas, etc.).
    espacio_antes : Pt, opcional
        Espacio antes de cada √≠tem.
    espacio_despues : Pt, opcional
        Espacio despu√©s de cada √≠tem.
    """
    indent_por_nivel = Pt(12)  # 12pt de sangr√≠a por nivel

    for texto in items:
        # Preparo el p√°rrafo con indentaci√≥n
        p = doc.add_paragraph()
        p.paragraph_format.space_before = espacio_antes
        p.paragraph_format.space_after = espacio_despues
        # Sangrar seg√∫n nivel, a la izquierda
        p.paragraph_format.left_indent = indent_por_nivel * (nivel - 1)

        # Agregar el run con gui√≥n + texto
        run = p.add_run(f"- {texto}")
        run.font.name = 'Segoe UI Light'
        run.font.size = Pt(8)
        # Alineaci√≥n por defecto (izquierda)
        p.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT

def insertar_en_posicion(doc, funcion_contenido, *args, posicion='final', **kwargs):
    """
    Inserta contenido generado por una funci√≥n en una posici√≥n espec√≠fica del documento.

    Par√°metros
    ----------
    doc : Document
        Documento principal.
    funcion_contenido : function
        Funci√≥n que recibe un doc y otros par√°metros, y agrega contenido (p√°rrafo, t√≠tulo, etc.).
    *args, **kwargs :
        Argumentos para pasar a la funci√≥n.
    posicion : str
        'inicio', 'final' o 'index:<n>' para insertar en una posici√≥n concreta.
    """
    # Crear documento temporal con el contenido a insertar
    doc_temp = Document()
    funcion_contenido(doc_temp, *args, **kwargs)

    # Extraer elementos del cuerpo
    elementos_temp = list(doc_temp.element.body)

    # Insertar al inicio, final o √≠ndice
    body = doc.element.body

    if posicion == 'inicio':
        for elem in reversed(elementos_temp):
            body.insert(0, elem)
    elif posicion == 'final':
        for elem in elementos_temp:
            body.append(elem)
    elif posicion.startswith('index:'):
        idx = int(posicion.split(':')[1])
        for i, elem in enumerate(elementos_temp):
            body.insert(idx + i, elem)
    else:
        raise ValueError("La posici√≥n debe ser 'inicio', 'final' o 'index:<n>'")

def insertar_indice(doc, titulo="√çndice"):
    # T√≠tulo del √≠ndice
    agregar_titulo(doc, titulo, 1)

    # P√°rrafo donde ir√° la tabla de contenido
    p = doc.add_paragraph()
    run = p.add_run()

    # Agregar campo TOC
    fldChar1 = OxmlElement('w:fldChar')
    fldChar1.set(qn('w:fldCharType'), 'begin')

    instrText = OxmlElement('w:instrText')
    instrText.set(qn('xml:space'), 'preserve')
    instrText.text = r'TOC \o "1-3" \h \z \u'

    fldChar2 = OxmlElement('w:fldChar')
    fldChar2.set(qn('w:fldCharType'), 'separate')

    fldChar3 = OxmlElement('w:fldChar')
    fldChar3.set(qn('w:fldCharType'), 'end')

    run._r.append(fldChar1)
    run._r.append(instrText)
    run._r.append(fldChar2)
    run._r.append(fldChar3)

    # Estilo
    p.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
    p.paragraph_format.space_after = Pt(6)

def agregar_advertencia_actualizacion(doc):
    p = doc.add_paragraph()
    run = p.add_run("‚ö†Ô∏è Al abrir este documento, recuerde actualizar los campos (√≠ndice, referencias cruzadas, etc.).")
    run.font.italic = True
    run.font.color.rgb = RGBColor(0x80, 0x00, 0x00)
    p.paragraph_format.space_before = Pt(12)

def mostrar_contenido(doc):
    print("√çndice | Tipo   | Contenido resumido")
    print("--------------------------------------")

    idx_parrafo = 0
    idx_tabla = 0

    for i, elem in enumerate(doc.element.body):
        tag = elem.tag.split('}')[-1]

        if tag == 'p':
            parrafo = doc.paragraphs[idx_parrafo]
            texto = parrafo.text.strip().replace('\n', ' ')
            print(f"{i:<6} | P√°rrafo | '{texto[:60]}'")
            idx_parrafo += 1

        elif tag == 'tbl':
            print(f"{i:<6} | Tabla   | [Tabla con {len(doc.tables[idx_tabla].rows)} filas]")
            idx_tabla += 1

        else:
            print(f"{i:<6} | Otro    | Etiqueta: {tag}")
    
def mostrar_contenido_posicional(doc, buscar=None):
    """
    Si se pasa un texto en `buscar`, tambi√©n devuelve las posiciones donde aparece.
    """
    idx_parrafo = 0
    posiciones_encontradas = []

    for i, elem in enumerate(doc.element.body):
        tag = elem.tag.split('}')[-1]

        if tag == 'p':
            parrafo = doc.paragraphs[idx_parrafo]
            texto = parrafo.text.strip().replace('\n', ' ')
            # print(f"{i:<6} | P√°rrafo | '{texto[:60]}'")

            if buscar and buscar.lower() in texto.lower():
                posiciones_encontradas.append(i)

            idx_parrafo += 1

    return posiciones_encontradas

def reemplazar_parrafo(original: Paragraph, nuevo: Paragraph):
    # Reemplaza el elemento XML del p√°rrafo original por el del nuevo
    original._element.getparent().replace(original._element, nuevo._element)

def numerar_titulos_existentes(doc):
    contador = {1: 0, 2: 0, 3: 0}
    # Guardar los p√°rrafos a reemplazar (para evitar modificar la lista mientras iteras)
    reemplazos = []

    for i, parrafo in enumerate(doc.paragraphs):
        estilo = parrafo.style.name.strip()
        if estilo.startswith("Heading"):
            try:
                nivel = int(estilo.split()[-1])
            except (ValueError, IndexError):
                continue

            if nivel in contador:
                contador[nivel] += 1
                for deeper in range(nivel + 1, 4):
                    contador[deeper] = 0

                if nivel == 1:
                    numeracion = f"{contador[1]}."
                elif nivel == 2:
                    numeracion = f"{contador[1]}.{contador[2]}"
                elif nivel == 3:
                    numeracion = f"{contador[1]}.{contador[2]}.{contador[3]}"

                texto = parrafo.text.strip()
                if not texto.startswith(numeracion):
                    texto_sin_num = texto
                    # Crear un doc temporal para el nuevo t√≠tulo
                    doc_temp = Document()
                    agregar_titulo(doc_temp, f"{numeracion} {texto_sin_num}", nivel)
                    nuevo_parrafo = doc_temp.paragraphs[0]
                    reemplazos.append((parrafo, nuevo_parrafo))

    # Hacer los reemplazos al final para evitar problemas de √≠ndice
    for original, nuevo in reemplazos:
        reemplazar_parrafo(original, nuevo)



In [13]:
def procesar_resumen_en_doc(doc, resumen: dict):
    # 1. Contexto General del Diagn√≥stico
    contexto = resumen.get("Contexto General del Diagn√≥stico")
    if contexto:
        agregar_titulo(doc, "Contexto General del Diagn√≥stico", nivel=2)
        agregar_vi√±etas(doc, contexto, nivel=1)

    # 2. Hallazgos Clave y Correlaciones Relevantes
    hallazgos = resumen.get("Hallazgos Clave y Correlaciones Relevantes")
    if hallazgos:
        agregar_titulo(doc, "Hallazgos Clave y Correlaciones Relevantes", nivel=2)
        for categoria, insights in hallazgos.items():
            agregar_titulo(doc, categoria, nivel=3)
            agregar_vi√±etas(doc, insights, nivel=1)

    # 3. Retos Priorizados Identificados
    retos = resumen.get("Retos Priorizados Identificados")
    if retos:
        # convertir lista de dicts a DataFrame
        df_retos = pd.DataFrame(retos)
        insertar_tabla(doc, df_retos, titulo="Retos Priorizados Identificados")

    # 4. Otras Secciones Relevantes (opcional)
    otras = resumen.get("Otras Secciones Relevantes")
    if otras:
        agregar_titulo(doc, "Otras Secciones Relevantes", nivel=2)
        for seccion, items in otras.items():
            agregar_titulo(doc, seccion, nivel=3)
            agregar_vi√±etas(doc, items, nivel=1)

    # 5. Relevancia del Programa
    relevancia = resumen.get("Relevancia del Programa") or resumen.get("Relevancia del Programa +Educaci√≥n +Innovaci√≥n")
    if relevancia:
        agregar_titulo(doc, "Relevancia del Programa", nivel=2)
        agregar_vi√±etas(doc, relevancia, nivel=1)
    insertar_salto_pagina(doc)

# Normalizacion

In [14]:
start_time_norm= time.time()

In [15]:
# Intentar convertir los grados a n√∫meros enteros, si falla mantener como string
try:
    df['grade'] = df['grade'].astype(int)
except:
    pass

In [16]:
# ordernar el DataFrame por tipo de test
df['tipo_test_orden'] = df['tipo_test'].apply(ordenar_tipo_test)

df['tag_question_orden'] = df['tag_question'].apply(ordenar_tag)

df.sort_values(by=['project_id', 'tipo_test_orden','tag_question_orden'], inplace=True)

df.drop(columns=['tipo_test_orden', 'tag_question_orden'], inplace=True)

df['tipo_test'] = df['tipo_test'].apply(renombrar_tipo_test)

In [17]:
for col in ['answer', 'right_answer']: # Limpiar saltos de l√≠nea, retornos de carro y dobles comas en 'answer' y 'right_answer'
    df[col] = df[col].str.replace('\n', '', regex=False)
    df[col] = df[col].str.replace('\r', '', regex=False)
    df[col] = df[col].str.replace(',,', ',', regex=False)
    df[col] = df[col].str.strip()

#df['answer'] = df['answer'].str.normalize('NFKD').str.encode('ascii', errors='ignore').str.decode('utf-8') # Elimino los acentos y caracteres especiales de las respuestas

df['age'] = df['age'].fillna(0).astype(int)  # Convierto edad a numero entero

df['question']=df['question'].str.replace('&nbsp;',' ', regex=False) # Elimino los &nbsp;

In [18]:
df['genero'] = df['genero'].map({ 'male': 'Masculino', 'female': 'Femenino', 'unspecified': 'Indefinido' }) # mapeo de genero a espa√±ol

In [19]:
# Pasos para clasificar el tipo de pregunta

    ## Paso 1: Aplicar la funci√≥n a la columna "Answer" para crear una nueva columna "Tipo de Pregunta"
df['Tipo de Pregunta (por respuesta)'] = df['answer'].apply(clasificar_tipo_pregunta)

    ## Paso 2: Crear nuevo dataframe con el tipo por pregunta
tipo_por_pregunta = df.groupby('question')['Tipo de Pregunta (por respuesta)'].apply(consolidar_tipo_pregunta).reset_index()
tipo_por_pregunta.rename(columns={'Tipo de Pregunta (por respuesta)': 'Tipo de Pregunta'}, inplace=True)

    ## Paso 3: unir de vuelta al df original
df = df.merge(tipo_por_pregunta, on='question', how='left')

In [20]:
## Agrupar por tag y question, y elegir el texto m√°s frecuente
standard_question = df.groupby(['tag_question'])['question'].agg(
    lambda x: Counter(x).most_common(1)[0][0]
).reset_index()

    ## Crear un mapeo autom√°tico
mapping = {
    row['tag_question']: row['question']
    for _, row in standard_question.iterrows()
}

    ## Aplicar normalizaci√≥n
df['question'] = df.apply(normalize_question, axis=1)


In [21]:
# Pasos para normalizar las respuestas categ√≥ricas, por si la misma pregunta tiene diferentes respuestas que significan lo mismo

df_categorico = df[df['Tipo de Pregunta']=='Categorica'].copy()
df_no_categoria = df[df['Tipo de Pregunta']!='Categorica'].copy()

print(f"DataFrame categ√≥rico tiene {len(df_categorico)} filas")
print(f"DataFrame no categ√≥rico tiene {len(df_no_categoria)} filas")

if len(df_categorico) > 0:
    print("Procesando respuestas categ√≥ricas...")
    
    ## Separar n√∫mero y texto
    df_categorico['Number'], df_categorico['Text'] = zip(*df_categorico['answer'].apply(extract_number_text))

    ## Agrupar por pregunta y n√∫mero, y elegir el texto m√°s frecuente
    standard_texts = df_categorico.groupby(['question', 'Number'])['Text'].agg(
        lambda x: Counter(x).most_common(1)[0][0]
    ).reset_index()

    ## Crear un mapeo autom√°tico
    mapping = {
        (row['question'], row['Number']): row['Text']
        for _, row in standard_texts.iterrows()
    }

    ## Aplicar normalizaci√≥n
    df_categorico['answer'] = df_categorico.apply(normalize_answer, axis=1)

    ## borro las columnas number y text
    df_categorico.drop(columns=['Number', 'Text'], inplace=True)

    ## uno los dataframes
    df = pd.concat([df_no_categoria, df_categorico])
    print("Normalizaci√≥n de respuestas categ√≥ricas completada.")
    
else:
    print("No hay datos categ√≥ricos para procesar. Usando solo datos no categ√≥ricos.")
    df = df_no_categoria

DataFrame categ√≥rico tiene 96 filas
DataFrame no categ√≥rico tiene 216 filas
Procesando respuestas categ√≥ricas...
Normalizaci√≥n de respuestas categ√≥ricas completada.


In [22]:
df['answer_numeric'] = df['answer'].apply(lambda x: extract_number_text(x)[0])

In [23]:
# rango etario
df['rango_etario'] = pd.cut(
	df['age'],
	bins=[0, 5, 11, 17, 24, 34, 54, np.inf],
	labels=['0-5', '6-11', '12-17', '18-24', '25-34', '35-54', '55+'],
	right=False
)

In [24]:
df_completo=df.copy() # Guardo el dataframe original, lo hago asi porque era un bajon modificar todo el codigo buscando a df


df_filtrado = pd.DataFrame() # Crear un nuevo DataFrame a filtrar que luego usare con el nombre df

# Recorremos proyecto por proyecto
for (pid, pname), grupo in df.groupby(['project_id', 'project_name']):
    grupo['respond'] = True
    
    pivot = grupo.pivot_table(index='student_id',
                               columns='tipo_test',
                               values='respond',
                               fill_value=0)

    # Filtrar solo los alumnos que respondieron todos los tests de su proyecto
    columnas_test = pivot.columns
    alumnos_completos = pivot[pivot[columnas_test].eq(1).all(axis=1)].index

    # Filtrar el grupo original
    grupo_filtrado = grupo[grupo['student_id'].isin(alumnos_completos)]
    
    # Agregar al nuevo DataFrame
    df_filtrado = pd.concat([df_filtrado, grupo_filtrado], ignore_index=True)

df=df_filtrado

del df_filtrado # 	borrar df_filtrado para liberar memoria

In [25]:
end_time_norm= time.time()
time_norm= end_time_norm - start_time_norm

# Introduccion

In [26]:
# Agrupar por proyecto, tipo de test y calcular los inscritos y estudiantes con respuesta
df_proyecto = (
    df_completo.groupby(['project_id', 'project_name', 'tipo_test'])
    .agg(
        Activos=('activos_por_proyecto', 'max'),  # M√°ximo de inscritos por proyecto
        Evaluados=('student_id', 'nunique'),
        Instituciones=('educative_institution', 'nunique'),
        age_min=('age', 'min'),
        age_max=('age', 'max'),
        Salones=('room_id', 'nunique'),
        Mujeres=('student_id', lambda x: df_completo.loc[x.index][df_completo.loc[x.index, 'genero'] == 'Femenino']['student_id'].nunique()),
        Hombres=('student_id', lambda x: df_completo.loc[x.index][df_completo.loc[x.index, 'genero'] == 'Masculino']['student_id'].nunique()),
    )
    .reset_index()
)

# Calcular el porcentaje de respuestas por tipo de test
df_proyecto['% Respuestas'] = round((df_proyecto['Evaluados'] / df_proyecto['Activos']) * 100, 1)
df_proyecto['% Mujeres']=round(df_proyecto['Mujeres'] / df_proyecto['Evaluados'] * 100, 1)
df_proyecto['% Hombres']=round(df_proyecto['Hombres'] / df_proyecto['Evaluados'] * 100, 1)

In [27]:
agregar_advertencia_actualizacion(doc)
insertar_indice(doc)
insertar_salto_pagina(doc)


# Agregar t√≠tulo al reporte
agregar_titulo(doc, "Reporte de Respuestas", 1)
## insertar_salto_pagina(doc)


# --- Secci√≥n de Introducci√≥n ---
agregar_titulo(doc, "Introducci√≥n", 2)

In [28]:
# 1) Datos generales
n_proyectos = df_proyecto['project_id'].nunique()
tipos_test = sorted(df_proyecto['tipo_test'].unique())
n_tipos = len(tipos_test)
lista_tests = ', '.join(tipos_test)

# P√°rrafo introductorio general
intro = (
    f"Este documento presenta un an√°lisis de las respuestas obtenidas en "
    f"{n_proyectos} proyecto{'s' if n_proyectos > 1 else ''}, considerando "
    f"{n_tipos} tipo{'s' if n_tipos > 1 else ''} de actividad: {lista_tests}. "
    f"Se incluyen indicadores de participaci√≥n por proyecto, como el n√∫mero de personas activas, "
    f"instituciones participantes, g√©nero, rango etario y tasas de respuesta por tipo de prueba."
)


agregar_parrafo(doc, intro)


# 2) Detalle por proyecto, consolidando tipos de test
for (pid, pname), grupo in df_proyecto.groupby(['project_id', 'project_name']):
    activos = grupo['Activos'].max()
    instituciones = grupo['Instituciones'].max()
    edad_min = grupo['age_min'].min()
    edad_max = grupo['age_max'].max()
    alumnos_cruzados=df[df['project_id']==pid]['student_id'].nunique()
    porcentaje_cruzados=round(alumnos_cruzados*100/activos, 0).astype(int)
    salones = grupo['Salones'].max()
    
    # Resumen por tipo de test en una sola frase
    resumen_tests = []
    for _, row in grupo.iterrows():
        ttest = row['tipo_test']
        evaluados = row['Evaluados']
        pct_res = row['% Respuestas']
        resumen_tests.append(f"{ttest}: {evaluados} personas ({pct_res:.0f}%)")
        h=row['% Hombres']
        m=row['% Mujeres']

    resumen_tests_str = ' y en el '.join(resumen_tests)

    p√°rrafo= (
        f"Proyecto ¬´{pname}¬ª: cont√≥ con {activos} personas activas en "
        f"{instituciones} instituci{'ones' if instituciones > 1 else '√≥n'} y {salones} salones. De las personas activas el {h}% son hombres y el {m}% son mujeres. Y ambos generos poseen edades entre "
        f"{edad_min} y {edad_max} a√±os. Logrando as√≠ alcanzar en el {resumen_tests_str}"
    )

    if n_tipos>1:
        agregar_parrafo(
            doc,
            f"Las respuestas consideradas en el an√°lisis comparativo ser√°n las de aquellas personas que respondieron ambas actividades, siendo un total de {alumnos_cruzados} que representan el {porcentaje_cruzados}% del total de personas activas" )



In [29]:
texto_introduccion = intro + '\n\n' + p√°rrafo

In [30]:
df_proyecto['Proyecto'] = df_proyecto['project_name'].astype(str) + ' (' + df_proyecto['project_id'].astype(str) + ')'
tabla_proyecto=df_proyecto[['Proyecto', 'tipo_test', 'Activos', 'Evaluados',  '% Hombres' , '% Mujeres', 'Instituciones','Salones', '% Respuestas']].copy()

for c in tabla_proyecto.columns:
    if '%' in c:
        tabla_proyecto[c]=tabla_proyecto[c].astype(int).astype(str) + '%'

# Insertar la tabla en el documento
insertar_tabla_con_merge(
    doc,
    tabla_proyecto.rename(
        columns={
            'tipo_test':'Actividad',
            'Activos':'Colaboradores activos',
            '% Hombres':'% Evaluados Hombres',
            '% Mujeres':'% Evaluadas Mujeres'}),
    'Resumen por proyecto y actividad',
    group_cols=['Proyecto'])

<docx.table.Table at 0x1657d086e40>

In [31]:
if df_completo['educative_institution'].nunique()>0:
    resumen_instituciones = resumen_por_cluster(df_completo,'educative_institution')
    if not resumen_instituciones.empty:
        # Insertar la tabla en el documento
        insertar_tabla(
            doc,
            resumen_instituciones.rename(
                columns={
                    'educative_institution': 'Instituci√≥n',
                    'Activos': 'Colaboradores activos'}),
            f"Instituciones del proyecto <{pname}>")

if df_completo['grade'].nunique()>0:
    resumen_grados = resumen_por_cluster(df_completo,'grade')
    
    # Ordenar de forma ascendente
    resumen_grados  = resumen_grados.sort_values('grade', ascending=True)

    if not resumen_grados.empty:
        # Insertar la tabla en el documento
        insertar_tabla(
            doc,
            resumen_grados.rename(
                columns={
                    'grade': 'Grados',
                    'Activos': 'Colaboradores activos'}),
            f"Grados del proyecto <{pname}>")

# Graficos por Preguntas

In [32]:
start_time_graficos = time.time()

## Funciones para...

In [33]:
def tabla_answer(df_funcion):
    '''Esta funcion me sirve para devolver la tabla de respuestas y porcentaje por ansnwer: 
    Espera un df con la pregunta ya filtrada. Agrupa por tipo de test, answer y realiza los conteos'''

    df_base = (
                df_funcion
                .groupby(['tipo_test', 'answer'])
                .size()
                .reset_index(name='Conteo')
            )
            
    ### Calcular el total por tipo_test
    total_por_test = df_base.groupby('tipo_test', observed=True)['Conteo'].transform('sum')

    ### Calcular porcentaje por tipo_test
    df_base['porcentaje'] = (df_base['Conteo'] / total_por_test)*100

    # Crear tabla resumen
    pivot = df_base.pivot_table(
        index='answer',
        columns='tipo_test',
        values='porcentaje'
    ).reset_index().fillna(0)

    pivot.columns = ['Respuesta'] + [f"% {col}" for col in pivot.columns[1:]]

    tipo_tests = df_base['tipo_test'].unique()

    if len(tipo_tests) == 1:
        # Solo un tipo de test: mostrar Conteo y porcentaje
        pivot = df_base.groupby(['answer'], observed=True).agg(
            Conteo=('Conteo', 'sum'),
            Porcentaje=('porcentaje', 'sum')
        ).reset_index()
        
        df_return = pivot.rename(
            columns={
                'answer':'Opci√≥n',
                'Porcentaje':'% Porcentaje sobre el total',
                'Conteo':'Cantidad'}).fillna(0)

        for c in df_return.columns:
            if df_return[c].dtype == 'float64':  # Verificar si la columna es de tipo float
                # Primero redondear (para porcentajes) y luego convertir a entero
                df_return[c] = df_return[c].round(1)
            if '%' in c:
                df_return[c] = df_return[c].astype(str) + '%'

    else:
        # Dos tipos de test: mostrar porcentaje y diferencia
        columna_test_1 = f"% {tipo_tests[0]}"
        columna_test_2 = f"% {tipo_tests[1]}"
        df_return=pivot.fillna(0)

        for c in df_return.columns:
            if df_return[c].dtype == 'float64':  # Verificar si la columna es de tipo float
                # Primero redondear (para porcentajes) y luego convertir a entero
                df_return[c] = df_return[c].apply(lambda x: float(f"{x:.1f}"))


        df_return['Diferencia (pp)'] = df_return[columna_test_2]- df_return[columna_test_1]

        df_return['Diferencia (pp)'] = df_return['Diferencia (pp)'].round(1).astype(str) + ' pp'    
        df_return[columna_test_1] = df_return[columna_test_1].astype(str) + ' %'    
        df_return[columna_test_2] = df_return[columna_test_2].astype(str) + ' %'    


    return df_return

In [34]:
def tabla_agrupada(df_funcion, indice):
    ''' Esta funcion me sirve para devolver la tabla pivotea de instituciones y answer ''' 


    # Agrupar y contar respuestas por instituci√≥n, tipo de test y respuesta
    df_educative = (
        df_funcion
        .groupby([indice, 'tipo_test', 'answer'], observed=True)
        .size()
        .reset_index(name='conteo')
    )

    # Calcular total por tipo_test dentro de cada instituci√≥n
    total_por_test = df_educative.groupby([indice, 'tipo_test'], observed=True)['conteo'].transform('sum')
    df_educative['porcentaje'] = (df_educative['conteo'] / total_por_test)

    # Detectar cu√°ntos tipos de test hay
    tipos = df_educative['tipo_test'].unique()

    if len(tipos) == 1:
        # Solo un tipo de test: devolver porcentajes por respuesta
        pivot = df_educative.pivot_table(
            index=indice,
            columns='answer',
            values='porcentaje',
            fill_value=0,
            observed=True
        ).reset_index()
    elif len(tipos) == 2:
        # Dos tipos de test: calcular diferencia (variaci√≥n)
        t1, t2 = tipos
        pivot_pct = df_educative.pivot_table(
            index=indice,
            columns=['tipo_test', 'answer'],
            values='porcentaje',
            fill_value=0,
            observed=True
        )

        # Calcular la diferencia entre los dos tests para cada respuesta
        variacion = {
            answer: (pivot_pct[(t2, answer)] - pivot_pct[(t1, answer)])
            for answer in df_educative['answer'].unique()
            if (t2, answer) in pivot_pct.columns and (t1, answer) in pivot_pct.columns # Solo calcula la diferencia si ambos tienen datos
        }

        # Armar DataFrame final con las variaciones
        pivot = pd.DataFrame(variacion, index=pivot_pct.index).reset_index()
        pivot = pivot.rename(columns={col: f"{col}" for col in variacion})

    else:
        raise ValueError("La funci√≥n solo soporta 1 o 2 tipos de test.")
    

    return pivot

In [35]:
def mapa_calor(data, ind, title=None, use_negative_scale=False):
    import matplotlib.ticker as mtick
    from matplotlib.colors import LinearSegmentedColormap

    # Colormaps
    neg_colors = ["#1e88e5", "#fdfefe","#f39c12"]  # rojo ‚Üí blanco ‚Üí verde
    pos_colors = ["#ffffff", "#809bce"]            # blanco ‚Üí celeste

    
    if use_negative_scale:
        cmap = LinearSegmentedColormap.from_list("custom_cmap_neg", neg_colors)
        vmin, vmax = -1, 1
        norm = SymLogNorm(linthresh=0.01, vmin=vmin, vmax=vmax)
        suffix = "pp"
    else:
        cmap = LinearSegmentedColormap.from_list("custom_cmap_pos", pos_colors)
        vmin, vmax = 0, 1
        norm = Normalize(vmin=vmin, vmax=vmax)
        suffix = "%"
    
    # Pivot original
    data_pivot = tabla_agrupada(data, indice=ind).set_index(ind)
    
    # Crear DF de anotaciones: multiplicar por 100, redondear 1 dec, y a√±adir sufijo
    annot_df = (data_pivot * 100).round(1).astype(str) + suffix

    # Reemplazar los ceros por '-'
    annot_df = annot_df.where(data_pivot != 0, "-")
    
    altura=len(data_pivot)
    
    # Plot
    fig, ax = plt.subplots(figsize=(10, altura*0.40))
    sns.heatmap(
        data_pivot,
        annot=annot_df,
        fmt="",
        cmap=cmap,
        norm=norm,
        cbar_kws={'label': ''},
        ax=ax
    )
    fig.subplots_adjust(right=0.85)
    
    
    # Obtener respuesta correcta
    right_answer_actual = df_pregunta['right_answer'].dropna().unique()
    right_answer_actual = right_answer_actual[0] if len(right_answer_actual) > 0 else None

    # Obtener etiquetas originales
    x_labels = [label.get_text() for label in ax.get_xticklabels()]
    new_labels = [ajustar_etiquetas(label) for label in x_labels]

    # Asignar nuevas etiquetas
    ax.set_xticklabels(new_labels)

    # Aplicar formato y color
    for tick_label, original_label in zip(ax.get_xticklabels(), x_labels):
        tick_label.set_rotation(45)
        tick_label.set_horizontalalignment('right')
        tick_label.set_fontsize(9)

        if original_label == right_answer_actual:
            tick_label.set_color('#ff8562')  # Naranja
            tick_label.set_weight('bold')  # Set text to bold
        else:
            tick_label.set_color('black')    # Default

    
    plt.title(title or "", fontsize=12, pad=20)
    plt.xlabel("")
    plt.ylabel("")

    # Colorbar en %
    cbar = ax.collections[0].colorbar
    cbar.ax.yaxis.set_major_formatter(mtick.PercentFormatter(xmax=1.0))
    cbar.set_ticks([vmin, 0, vmax])

    # Insertar en el documento
    insertar_figura(doc, plt)
    plt.close()

    return None


In [36]:
def generar_analisis_categorico(df_grouped):
    grupos = df_grouped['tipo_test'].unique()
    n_grupos = len(grupos)
    analisis = []
    
    # Introducci√≥n adaptable
    if n_grupos == 1:
        analisis.append("El gr√°fico muestra la distribuci√≥n porcentual y nominal de respuestas. ")
    else:
        analisis.append("El gr√°fico compara la distribuci√≥n porcentual y nominal de respuestas entre grupos. ")
    
    # An√°lisis por grupo (si hay m√°s de 1)
    if n_grupos > 0:
        for grupo in grupos:
            df_grupo = df_grouped[df_grouped['tipo_test'] == grupo]
            if not df_grupo.empty:
                max_row = df_grupo.loc[df_grupo['%'].idxmax()]
                analisis.append(
                    f"En la actividad {grupo}, la opci√≥n m√°s frecuente fue '{max_row['Respuestas']}' "
                    f"({max_row['%']:.1f}%). "
                )
    
    # Comparativa solo si hay 2 grupos
    if n_grupos == 2:
        diferencias = []
        for Respuestas in df_grouped['Respuestas'].unique():
            vals = df_grouped[df_grouped['Respuestas'] == Respuestas]['%'].values
            if len(vals) == 2:
                diferencia = abs(vals[0] - vals[1])
                diferencias.append((diferencia, Respuestas))
        
        if diferencias:
            max_diff = max(diferencias, key=lambda x: x[0])
            analisis.append(
                f"La mayor diferencia entre grupos ocurre en '{max_diff[1]}' "
                f"({max_diff[0]:.1f} pp). "
            )
    
    # Menci√≥n de categor√≠a menos seleccionada (solo si aplica)
    if not df_grouped.empty:
        min_global = df_grouped.loc[df_grouped['%'].idxmin()]
        analisis.append(
            f"La opci√≥n menos seleccionada fue '{min_global['Respuestas']}' "
            f"({min_global['%']:.1f}%). "
        )
    
    # Tama√±o muestral siempre
    # total_respuestas = df_grouped['Conteo'].sum()
    # analisis.append(f"Base: {total_respuestas} respuestas.")
    
    return " ".join(analisis).replace("  ", " ")  # Limpiar dobles espacios

## Graficos

In [37]:
conclusion = []

In [38]:
##
agregar_titulo(doc, "An√°lisis realizado por pregunta", 2)

agregar_parrafo(doc, "Para poder entender c√≥mo se distribuyeron las respuestas primero ver√°s gr√°ficos de barras, que son columnas que muestran cu√°ntas personas respondieron cada opci√≥n. Luego debajo de cada gr√°fico encontrar√°s una tabla con n√∫meros que muestran los mismos datos, pero con cifras exactas. Es como un resumen r√°pido de lo que ves en el gr√°fico de arriba.")

if df.tipo_test.nunique()>1 and len(lista_graficos)>0:

    agregar_parrafo(doc, f"Siguiendo por los mapas de calor, que parecen cuadros de colores. Imag√≠nate un sem√°foro pero con m√°s tonos: muestra c√≥mo vari√≥ el porcentaje de respuestas entre la primera y la segunda actividad. Los colores indican el tipo de cambio. Por ejemplo, si separamos las respuestas por edad, puedes ver al instante si los j√≥venes respondieron diferente que los adultos mayores.")
    agregar_vi√±etas(doc, ["Los tonos calientes muestran un aumento en la proporci√≥n de respuestas.",
    "Los tonos frios indican una disminuci√≥n.",
    "Los colores m√°s claros representan cambios peque√±os, y los m√°s oscuros, cambios m√°s grandes."])

else:

    agregar_parrafo(doc, f"Siguiendo por los mapas de calor, que parecen cuadros de colores. Los colores claros significan pocas respuestas y los colores oscuros significan muchas respuestas. Estos mapas te ayudan a ver patrones r√°pidamente.")
    agregar_parrafo(doc, f"Por ejemplo, si separamos las respuestas por edad, puedes ver al instante si los j√≥venes respondieron diferente que los adultos mayores.")


df['tipo_test_orden'] = df['tipo_test'].apply(ordenar_tipo_test)

df['tag_question_orden'] = df['tag_question'].apply(ordenar_tag)

lista_preguntas=df[['tipo_test_orden','tag_question_orden', 'question', 'Tipo de Pregunta']].drop_duplicates().reset_index(drop=True)

lista_preguntas.sort_values(by=['tipo_test_orden','tag_question_orden'], inplace=True)

lista_preguntas.drop(columns=['tipo_test_orden', 'tag_question_orden'], inplace=True)


# 2) Explode del array de respuestas solo para preguntas de tipo "Categorica Multiseleccion"
mask_multi = df['Tipo de Pregunta'] == 'Categorica Multiseleccion'
df_multi = df[mask_multi].copy()
df_otros = df[~mask_multi].copy()

# Explode y split solo para multiselecci√≥n
df_multi = df_multi.explode('answer')
df_multi = (
    df_multi
    .assign(
        answer=df_multi['answer'].str.split(r'[;](?=\s*\d+\.)|[;](?!\s*\d+\.)')
    )
    .explode('answer')
)
df_multi['answer'] = df_multi['answer'].str.strip()

# Unir de nuevo
df = pd.concat([df_multi, df_otros], ignore_index=True)

df = df.drop_duplicates(subset=['student_id', 'question', 'tipo_test', 'answer'])
# recorrer question de la lista de preguntas, si el tipo de pregunta es categorica hacer un grafico si no hacer otro
h=0
for i in range(len(lista_preguntas)):
    conclusion_pregunta=[]

    h=h+1
    pregunta = lista_preguntas['question'].iloc[i]
    tipo_pregunta = lista_preguntas['Tipo de Pregunta'].iloc[i]

    ## Filtrar el DataFrame por la pregunta actual
    df_pregunta = df[df['question'] == pregunta].copy()

    ninstituciones=df_pregunta['educative_institution'].nunique()
    proyectos = df_pregunta[['project_name', 'project_id']].drop_duplicates()
    proyectos_str = ', '.join([f"{row['project_name']} ({row['project_id']})" for _, row in proyectos.iterrows()])
    pie_texto = f"El grafico incluye respuestas de: {proyectos_str}"

    if tipo_pregunta != 'Abierta':
        ### Aqu√≠ puedes agregar la l√≥gica para crear gr√°ficos de barras

        ### Agrupar por respuesta y tipo_test
        df_base = (
            df_pregunta
            .groupby(['tipo_test', 'answer'])
            .size()
            .reset_index(name='Conteo')
        )

        ### Calcular el total por tipo_test
        total_por_test = df_base.groupby('tipo_test')['Conteo'].transform('sum')

        ### Calcular porcentaje por tipo_test
        df_base['%'] = df_base['Conteo'] *100 / total_por_test 
        
        ### Orden l√≥gico de las categor√≠as
        categorias = df_base['answer'].dropna().unique()
        try:
            categorias_ordenadas = sorted(
                categorias,
                key=lambda x: (extraer_numero(x) if extraer_numero(x) is not None else float('inf'),
                            str(x).lower())
            )
        except Exception:
            categorias_ordenadas = sorted(categorias)
        
        # Paleta de colores
        if df_base['tipo_test'].nunique() == 1:
            paleta= ['#9FDEF1']  # Celeste
        else:
            paleta = ['#9FDEF1', '#FFB500']  # Celeste y Naranja

        ## Plot
        plt.figure(figsize=(8, 5))
        ax = sns.barplot(
            data=df_base,
            x='answer',
            y='%',
            hue='tipo_test',
            order=categorias_ordenadas,
            palette=paleta # Celeste and Naranja colors
        )

        ### Etiquetas X
        etiquetas_ajustadas = [ajustar_etiquetas(str(cat)) for cat in categorias_ordenadas]

        # Asegurar que el n√∫mero de categor√≠as coincide con los ticks
        ax.set_xticks(range(len(categorias_ordenadas)))
        ax.set_xticklabels(etiquetas_ajustadas)

        # Aplicar formato y color a cada etiqueta
        right_answer_actual = df_pregunta['right_answer'].dropna().unique()
        right_answer_actual = right_answer_actual[0] if len(right_answer_actual) > 0 else None

        for tick_label, categoria in zip(ax.get_xticklabels(), categorias_ordenadas):
            tick_label.set_rotation(45)
            tick_label.set_horizontalalignment('right')
            tick_label.set_fontsize(10)
            
            if categoria == right_answer_actual:
                tick_label.set_color('#ff8562')  # Naranja
                tick_label.set_weight('bold')  # Set text to bold
            else:
                tick_label.set_color('black')    # Default

        
        ax.set_ylabel("")
        ax.set_xlabel("")
        ### leyenda como test
        handles, labels = ax.get_legend_handles_labels()

        ### 1) Leyenda centrada abajo, fuera del √°rea de datos
        ax.legend(
            loc='best',
            ncol=len(df_base['tipo_test'].unique()),
            frameon=False,
            fontsize=8
        )

        ### Agregar Conteo y porcentaje sobre cada barra
        for bar, (_, fila) in zip(ax.patches, df_base.iterrows()):
            altura = bar.get_height()
            if altura > 0:
                ax.text(
                    bar.get_x() + bar.get_width() / 2,
                    altura + 1,
                    f"{int(fila['Conteo'])}\n({altura:.1f}%)",
                    ha='center',
                    va='bottom',
                    fontsize=8,
                    rotation=0
                )

        ### T√≠tulo
        titulo = f"Distribuci√≥n de respuestas para la pregunta: {pregunta}"
        plt.title(ajustar_titulo(titulo, 44, 120, 3), fontsize=10, pad=30)
        plt.yticks([])
        plt.tight_layout()
        plt.subplots_adjust(top=1)
        ax.spines['top'].set_visible(False)
        ax.spines['right'].set_visible(False)
        ax.spines['left'].set_visible(False)
        
        #Word
        df_base=df_base.rename(columns={'answer':'Respuestas'})

        agregar_titulo(doc, f"{pregunta}", 3)

        # Generar an√°lisis autom√°tico
        if IA is True:
            texto_analisis = OA.analyze_dataframe(df_base,pregunta)
            conclusion_pregunta.append(texto_analisis)
        else:
            texto_analisis = generar_analisis_categorico(df_base)

        agregar_parrafo(doc, texto_analisis)

        # Insertar grafico
        insertar_figura(doc, plt, pie=pie_texto)
        

        agregar_parrafo(doc, "En la siguiente tabla dispone del resumen del grafico en formato tabular")
        insertar_tabla(doc, tabla_answer(df_pregunta))
        
        
        if df.tipo_test.nunique()>1 and len(lista_graficos)>0:
            for c in lista_graficos:
                if c in df_pregunta.columns and df_pregunta[c].notna().any():
                    variable=mapeo_variables.get(c, c)

                    texto_base=f"Como vario la entrada y la salida en puntos porcentuales (pp) las respuestas por {variable.lower()} en la pregunta:"
                    texto_mas_pregunta=f"{texto_base} {pregunta}"

                    agregar_titulo(doc, f"Observamos por {variable}:", 4)
                    
                    if IA is True:
                        df_analisis_mapa=tabla_agrupada(df_pregunta, c)
                        texto_analisis_mapa=OA.analyze_dataframe(df_analisis_mapa, texto_mas_pregunta, matriz=True)
                        conclusion_pregunta.append(texto_analisis_mapa)
                        agregar_parrafo(doc, texto_analisis_mapa)
                    
                    mapa_calor(df_pregunta, c, ajustar_titulo(texto_mas_pregunta, len(texto_base), 120, 3), True)

        else:

            for c in lista_graficos:
                if c in df_pregunta.columns and df_pregunta[c].notna().any():
                    variable=mapeo_variables.get(c, c)

                    texto_base=f"¬øC√≥mo se concentraron las respuestas por {variable.lower()}? en la pregunta:"
                    texto_mas_pregunta=f"{texto_base} {pregunta}"
                    
                    agregar_titulo(doc, f"Observamos por {variable}:", 4)

                    if IA is True:
                        df_analisis_mapa=tabla_agrupada(df_pregunta, c)
                        texto_analisis_mapa=OA.analyze_dataframe(df_analisis_mapa, texto_mas_pregunta, matriz=True)
                        conclusion_pregunta.append(texto_analisis_mapa)
                        agregar_parrafo(doc, texto_analisis_mapa)
                        
                    mapa_calor(df_pregunta, c,  ajustar_titulo(texto_mas_pregunta, len(texto_base), 120, 3))
                    
        if (IA is True) and (len(conclusion_pregunta)>0): 
            texto_ai=OA.insight_parcial(conclusion_pregunta, texto_mas_pregunta)
            conclusion.append(texto_ai)    
        
        plt.close()

    else:
        # Si no es categ√≥rica, puedes agregar otra l√≥gica o simplemente pasar
        h=h-1
        # print(f"Pregunta Abierta: {pregunta}")


In [39]:
end_time_graficos = time.time()
tiempo_graficos = end_time_graficos - start_time_graficos

# Final

In [40]:
if IA is True:
    # Paso 1: Insertar t√≠tulo "Resumen ejecutivo" antes de la Introducci√≥n
    pos_intro = mostrar_contenido_posicional(doc, 'Introducci√≥n')[0]
    insertar_en_posicion(doc, agregar_titulo, "Resumen ejecutivo", 2, posicion=f'index:{pos_intro}')

    # Paso 2: Generar texto resumen con modelo OA y agregarlo antes de la Introducci√≥n
    texto_resumen = OA.analyze_list(conclusion, tabla_proyecto, texto_introduccion)
    pos_intro = mostrar_contenido_posicional(doc, 'Introducci√≥n')[0]
    insertar_en_posicion(doc, agregar_parrafo, texto_resumen, posicion=f'index:{pos_intro}')

    # Paso 3: Insertar salto de p√°gina antes de la Introducci√≥n
    pos_intro = mostrar_contenido_posicional(doc, 'Introducci√≥n')[0]
    insertar_en_posicion(doc, insertar_salto_pagina, posicion=f'index:{pos_intro}')

    # Paso 4: Generar JSON estructurado con insights del modelo OA
    OA_insight = OA.insight_list(conclusion, tabla_proyecto, texto_introduccion)
    match = re.search(r"\{.*\}", OA_insight, re.DOTALL)

    if match:
        json_str = match.group(0)
        resumen = json.loads(json_str)

        # Paso 5: Insertar contenido del resumen en la secci√≥n "Resumen ejecutivo"
        idx_resumen = mostrar_contenido_posicional(doc, buscar="REPORTE DE RESPUESTAS")
        if idx_resumen:
            insertar_en_posicion(doc, procesar_resumen_en_doc, resumen, posicion=f'index:{idx_resumen[0] + 1}')
    
            
    uso_modelo=pd.DataFrame(OA.registro_tokens)
    # Convertir la columna de fecha/hora a tipo datetime
    uso_modelo['fecha_hora'] = pd.to_datetime(uso_modelo['fecha_hora'])

    # Agrupar el DataFrame actual
    resumen_nuevo = uso_modelo.groupby('modelo').agg({
        'fecha_hora': 'min',
        'input_tokens': 'sum',
        'output_tokens': 'sum',
        'costo_usd': 'sum'
    }).reset_index()

    archivo = "uso_modelo.csv"

    if os.path.exists(archivo):
        # Leer el archivo existente
        resumen_existente = pd.read_csv(archivo, parse_dates=['fecha_hora'])
        # Unir ambos DataFrames
        resumen_total = pd.concat([resumen_existente, resumen_nuevo])
        # Volver a agrupar para sumar correctamente y conservar la primera fecha
        
    else:
        resumen_total = resumen_nuevo

    # Guardar el archivo actualizado
    resumen_total.to_csv(archivo, index=False)

In [41]:
numerar_titulos_existentes(doc)
# guardar el documento
doc.save(f'{tipo_test} ({project_id}).docx')

In [42]:
# Crear tabla de tiempos de ejecuci√≥n
tiempos = [
    {"Etapa": "Consulta Athena", "Tiempo (segundos)": round(time_consulta, 2)},
    {"Etapa": "Normalizaci√≥n", "Tiempo (segundos)": round(time_norm, 2)},
    {"Etapa": "Gr√°ficos y Word", "Tiempo (segundos)": round(tiempo_graficos, 2)},
    {"Etapa": "Total", "Tiempo (segundos)": round(time_consulta + time_norm + tiempo_graficos, 2)},
]
df_tiempos = pd.DataFrame(tiempos)
display(df_tiempos)

Unnamed: 0,Etapa,Tiempo (segundos)
0,Consulta Athena,15.03
1,Normalizaci√≥n,0.28
2,Gr√°ficos y Word,33.83
3,Total,49.14
