<h1> Reto 1: Extracción de datos de Facturas

In [4]:
!pip install python-dotenv

Collecting python-dotenv
  Using cached python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Using cached python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.1



[notice] A new release of pip is available: 24.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [9]:
!pip install pillow pandas openpyxl openai

Collecting pillow
  Using cached pillow-11.1.0-cp312-cp312-win_amd64.whl.metadata (9.3 kB)
Collecting pandas
  Using cached pandas-2.2.3-cp312-cp312-win_amd64.whl.metadata (19 kB)
Collecting openpyxl
  Using cached openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting openai
  Downloading openai-1.61.1-py3-none-any.whl.metadata (27 kB)
Collecting numpy>=1.26.0 (from pandas)
  Using cached numpy-2.2.2-cp312-cp312-win_amd64.whl.metadata (60 kB)
Collecting pytz>=2020.1 (from pandas)
  Downloading pytz-2025.1-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas)
  Using cached tzdata-2025.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting et-xmlfile (from openpyxl)
  Downloading et_xmlfile-2.0.0-py3-none-any.whl.metadata (2.7 kB)
Collecting anyio<5,>=3.5.0 (from openai)
  Using cached anyio-4.8.0-py3-none-any.whl.metadata (4.6 kB)
Collecting distro<2,>=1.7.0 (from openai)
  Using cached distro-1.9.0-py3-none-any.whl.metadata (6.8 kB)
Collecting httpx<1,>


[notice] A new release of pip is available: 24.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [26]:
import os
import json
import base64
import re
from PIL import Image
import pandas as pd
from openai import OpenAI
from dotenv import load_dotenv

load_dotenv()

client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))

INPUT_FOLDER = r"D:\PruebatecnicaFerreycorp\GenAI\Facturas"
OUTPUT_FOLDER = r"D:\PruebatecnicaFerreycorp\GenAI\Facturas_Procesadas"


In [27]:
def clean_json_response(response: str) -> str:
   """
   Limpia la respuesta del modelo para obtener solo el JSON válido.

   Args:
       response (str): Respuesta completa del modelo

   Returns:
       str: JSON limpio
   """
   # Buscar el contenido JSON entre llaves
   json_match = re.search(r'\{.*\}', response, re.DOTALL)
   if json_match:
       return json_match.group(0)
   return response

In [28]:
def convert_to_jpg(input_path: str, output_path: str) -> str | None:
   """
   Convierte una imagen a formato JPG.

   Args:
       input_path (str): Ruta de la imagen de entrada
       output_path (str): Ruta donde se guardará la imagen convertida

   Returns:
       str | None: Ruta de la imagen convertida o None si hay error
   """
   try:
       with Image.open(input_path) as img:
           if img.mode in ('RGBA', 'P'):
               img = img.convert('RGB')
           img.save(output_path, 'JPEG')
       return output_path
   except Exception as e:
       print(f"Error al convertir la imagen: {e}")
       return None

In [37]:
def analyze_image(image_path: str) -> str:
    """
    Analiza una imagen usando Gpt-4o-mini Vision para extraer información de la factura.
    Utiliza Chain of Thought (CoT) prompting para mejorar la extracción.

    Args:
        image_path (str): Ruta de la imagen a analizar

    Returns:
        str: Respuesta del modelo en formato JSON
    """
    with open(image_path, "rb") as image_file:
        base64_image = base64.b64encode(image_file.read()).decode('utf-8')

    prompt = """Analiza la factura paso a paso:

    1. Primero, identifica el logo o nombre de la empresa en la parte superior de la factura.
    2. Busca el número de factura (puede aparecer como Order No., Reference No., o Invoice No.).
    3. Encuentra la fecha de emisión (generalmente aparece como Invoice Date o Date), si encuentras en texto ponlo en formato dia/mes/año.
    4. Localiza el monto total (busca términos como Total, Total Order, Amount).
    5. Determina la moneda utilizada (USD, PEN, etc.).
    OBSERVACION: Si no encuentras o no se puede visualizar los items pedidos escribe "None"

    Después de analizar estos elementos, devuelve SOLO un objeto JSON con este formato exacto 
    (sin markdown, sin ```json, sin comentarios adicionales):
    {
        "proveedor": "nombre de la empresa que emite la factura",
        "factura": "número de factura",
        "fecha_de_factura": "fecha de emisión",
        "monto": "monto total",
        "moneda": "moneda"
    }
    """

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": prompt},
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/jpeg;base64,{base64_image}",
                        },
                    },
                ],
            }
        ],
        max_tokens=500,
    )

    content = response.choices[0].message.content
    print("\nJSON reconocido:")
    print(content)
    return content

In [38]:
def process_images() -> list:
   """
   Procesa todas las imágenes en el directorio de entrada.

   Returns:
       list: Lista de diccionarios con la información extraída de cada factura
   """
   results = []
   
   if not os.path.exists(OUTPUT_FOLDER):
       os.makedirs(OUTPUT_FOLDER)

   for filename in os.listdir(INPUT_FOLDER):
       if filename.lower().endswith(('.jpg', '.jpeg', '.jfif')):
           input_path = os.path.join(INPUT_FOLDER, filename)
           output_path = os.path.join(OUTPUT_FOLDER, f"{os.path.splitext(filename)[0]}.jpg")
           
           final_path = convert_to_jpg(input_path, output_path)
           if final_path:
               print(f"Procesando: {filename}")
               try:
                   # Obtener y limpiar la respuesta
                   raw_analysis = analyze_image(final_path)
                   cleaned_json = clean_json_response(raw_analysis)
                   
                   # Parsear el JSON limpio
                   json_data = json.loads(cleaned_json)
                   results.append(json_data)
                   print(f"Análisis completado para: {filename}")
               except json.JSONDecodeError as e:
                   print(f"Error al procesar JSON para {filename}: {e}")
                   print("Respuesta recibida:", raw_analysis)
                   print("JSON limpio intentado:", cleaned_json)
               print("-" * 50)

   return results

In [39]:
def main():
    """
    Función principal que ejecuta el proceso completo y muestra resultados detallados.
    """
    results = process_images()
    
    df = pd.DataFrame(results)
    
    print("\nDetalles del DataFrame:")
    print("----------------------")
    print("Dimensiones:", df.shape)
    print("\nColumnas:", list(df.columns))
    print("\nPrimeras filas:")
    print(df)
    print("\nResumen estadístico:")
    print(df.describe(include='all'))
    df.to_excel("resultados_facturas.xlsx", index=False)
    print("\nResultados guardados en 'resultados_facturas.xlsx'")

In [40]:
if __name__ == "__main__":
   main()

Procesando: factura1.jpg

JSON reconocido:
{
    "proveedor": "B&H",
    "factura": "Reference No.: 1286390",
    "fecha_de_factura": "06/09/11",
    "monto": "249.00",
    "moneda": "USD"
}
Análisis completado para: factura1.jpg
--------------------------------------------------
Procesando: factura2.jfif

JSON reconocido:
{
    "proveedor": "CONFECCIONES \"SAN JORGE\" S.A.C.",
    "factura": "0000765",
    "fecha_de_factura": "15/10/2013",
    "monto": "1,156.40",
    "moneda": "nuevos soles"
}
Análisis completado para: factura2.jfif
--------------------------------------------------

Detalles del DataFrame:
----------------------
Dimensiones: (2, 5)

Columnas: ['proveedor', 'factura', 'fecha_de_factura', 'monto', 'moneda']

Primeras filas:
                         proveedor                 factura fecha_de_factura  \
0                              B&H  Reference No.: 1286390         06/09/11   
1  CONFECCIONES "SAN JORGE" S.A.C.                 0000765       15/10/2013   

      mont

<h1>Reto 2: Evaluar procedimiento de cocina a partir de una receta y videos

In [1]:
!pip install openai opencv-python PyPDF2 pdf2image

Collecting pdf2image
  Using cached pdf2image-1.17.0-py3-none-any.whl.metadata (6.2 kB)
Using cached pdf2image-1.17.0-py3-none-any.whl (11 kB)
Installing collected packages: pdf2image
Successfully installed pdf2image-1.17.0



[notice] A new release of pip is available: 24.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [6]:
from pdf2image import convert_from_path
import os
import cv2
import numpy as np

def convertir_y_guardar_pdf(ruta_pdf: str, directorio_salida: str) -> list:
    """
    Convierte un PDF a imágenes JPG y las guarda en un directorio.
    
    Args:
        ruta_pdf: Ruta al archivo PDF
        directorio_salida: Directorio donde se guardarán las imágenes
        
    Returns:
        Lista con las rutas de las imágenes guardadas
    """
    # Crear directorio si no existe
    if not os.path.exists(directorio_salida):
        os.makedirs(directorio_salida)
    
    # Convertir PDF a imágenes
    imagenes = convert_from_path(ruta_pdf)
    rutas_guardadas = []
    
    # Guardar cada imagen
    for i, imagen in enumerate(imagenes):
        # Convertir imagen PIL a array numpy
        img_array = np.array(imagen)
        # Convertir de RGB a BGR (formato OpenCV)
        img_cv2 = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
        
        # Generar nombre de archivo
        nombre_archivo = f'receta_pagina_{i+1}.jpg'
        ruta_completa = os.path.join(directorio_salida, nombre_archivo)
        
        # Guardar imagen
        cv2.imwrite(ruta_completa, img_cv2)
        rutas_guardadas.append(ruta_completa)
        print(f"Imagen guardada: {ruta_completa}")
    
    return rutas_guardadas

if __name__ == "__main__":
    # Configurar rutas
    directorio_base = "D:\\PruebatecnicaFerreycorp\\GenAI\\Videos y Receta"
    ruta_pdf = os.path.join(directorio_base, "receta-panqueques.pdf")
    directorio_imagenes = os.path.join(directorio_base, "imagenes_receta")
    
    try:
        # Convertir PDF y guardar imágenes
        rutas_imagenes = convertir_y_guardar_pdf(ruta_pdf, directorio_imagenes)
        print(f"\nSe generaron {len(rutas_imagenes)} imágenes de la receta")
    except:
        print("Error")

Imagen guardada: D:\PruebatecnicaFerreycorp\GenAI\Videos y Receta\imagenes_receta\receta_pagina_1.jpg
Imagen guardada: D:\PruebatecnicaFerreycorp\GenAI\Videos y Receta\imagenes_receta\receta_pagina_2.jpg
Imagen guardada: D:\PruebatecnicaFerreycorp\GenAI\Videos y Receta\imagenes_receta\receta_pagina_3.jpg

Se generaron 3 imágenes de la receta


In [19]:
!pip install google-generativeai

Collecting google-generativeai
  Downloading google_generativeai-0.8.4-py3-none-any.whl.metadata (4.2 kB)
Collecting google-ai-generativelanguage==0.6.15 (from google-generativeai)
  Downloading google_ai_generativelanguage-0.6.15-py3-none-any.whl.metadata (5.7 kB)
Collecting google-api-core (from google-generativeai)
  Downloading google_api_core-2.24.1-py3-none-any.whl.metadata (3.0 kB)
Collecting google-api-python-client (from google-generativeai)
  Downloading google_api_python_client-2.160.0-py2.py3-none-any.whl.metadata (6.7 kB)
Collecting google-auth>=2.15.0 (from google-generativeai)
  Using cached google_auth-2.38.0-py2.py3-none-any.whl.metadata (4.8 kB)
Collecting protobuf (from google-generativeai)
  Using cached protobuf-5.29.3-cp310-abi3-win_amd64.whl.metadata (592 bytes)
Collecting proto-plus<2.0.0dev,>=1.22.3 (from google-ai-generativelanguage==0.6.15->google-generativeai)
  Downloading proto_plus-1.26.0-py3-none-any.whl.metadata (2.2 kB)
Collecting googleapis-common-pro


[notice] A new release of pip is available: 24.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [8]:
from openai import OpenAI
import cv2
import base64
import os
import json
from typing import Dict
from pathlib import Path

class RecetaAgent:
    def __init__(self):
        api_key = os.getenv('OPENAI_API_KEY')
        if not api_key:
            raise ValueError("No se encontró la API key de OpenAI")
        self.client = OpenAI(api_key=api_key)
        self.info_receta = {
            'ingredientes': [],
            'utensilios': [],
            'tiempo_preparacion': '',
            'porciones': '',
            'dificultad': '',
            'pasos': {}
        }
        self.resultados_json = {
            'analisis_general': {},
            'pasos': [],
            'metadatos': {
                'total_pasos': 0,
                'tiempo_total': '',
                'nivel_dificultad': ''
            }
        }

    def analizar_imagen(self, ruta_imagen: str, es_primera_pagina: bool = False) -> Dict:
        """Analiza una imagen de la receta usando GPT-4 Vision."""
        imagen = cv2.imread(ruta_imagen)
        _, buffer = cv2.imencode('.jpg', imagen)
        imagen_base64 = base64.b64encode(buffer).decode('utf-8')

        if es_primera_pagina:
            prompt = """
            Extrae la siguiente información de la receta y responde EXACTAMENTE en este formato:
            {
                "ingredientes": ["ingrediente1", "ingrediente2", ...],
                "utensilios": ["utensilio1", "utensilio2", ...],
                "tiempo_preparacion": "X minutos",
                "porciones": "X porciones",
                "dificultad": "nivel"
            }
            """
        else:
            prompt = """
            Extrae los pasos numerados de la receta en este formato:
            {
                "pasos": {
                    "1": "descripción completa del paso 1",
                    "2": "descripción completa del paso 2"
                }
            }
            """

        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": prompt},
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{imagen_base64}"
                            }
                        }
                    ]
                }
            ],
            max_tokens=1000
        )

        respuesta_texto = response.choices[0].message.content
        print(f"\nRespuesta de GPT para {os.path.basename(ruta_imagen)}:")
        print(respuesta_texto)
        
        try:
            inicio_json = respuesta_texto.find('{')
            fin_json = respuesta_texto.rfind('}') + 1
            if inicio_json != -1 and fin_json != 0:
                json_str = respuesta_texto[inicio_json:fin_json]
                return json.loads(json_str)
            return {}
        except Exception as e:
            print(f"Error al procesar JSON: {e}")
            return {}

    def procesar_receta(self, directorio_imagenes: str) -> Dict:
        """Procesa todas las imágenes de la receta y guarda los resultados."""
        archivos = sorted([f for f in os.listdir(directorio_imagenes) if f.endswith(('.jpg', '.jpeg', '.png'))])
        
        # Procesar primera página
        if archivos:
            print("\nAnalizando información general de la receta...")
            info_primera_pagina = self.analizar_imagen(
                os.path.join(directorio_imagenes, archivos[0]), 
                es_primera_pagina=True
            )
            
            # Actualizar información general
            self.info_receta.update({
                'ingredientes': info_primera_pagina.get('ingredientes', []),
                'utensilios': info_primera_pagina.get('utensilios', []),
                'tiempo_preparacion': info_primera_pagina.get('tiempo_preparacion', ''),
                'porciones': info_primera_pagina.get('porciones', ''),
                'dificultad': info_primera_pagina.get('dificultad', '')
            })
            
            # Actualizar JSON de resultados
            self.resultados_json['analisis_general'] = info_primera_pagina
            self.resultados_json['metadatos']['nivel_dificultad'] = info_primera_pagina.get('dificultad', '')
            self.resultados_json['metadatos']['tiempo_total'] = info_primera_pagina.get('tiempo_preparacion', '')

        # Procesar resto de páginas buscando pasos
        for archivo in archivos:
            print(f"\nBuscando pasos en {archivo}...")
            resultado = self.analizar_imagen(os.path.join(directorio_imagenes, archivo))
            if 'pasos' in resultado:
                self.info_receta['pasos'].update(resultado['pasos'])
                for num_paso, descripcion in resultado['pasos'].items():
                    self.resultados_json['pasos'].append({
                        'numero': num_paso,
                        'descripcion': descripcion
                    })

        self.resultados_json['metadatos']['total_pasos'] = len(self.resultados_json['pasos'])
        return self.resultados_json

    def guardar_resultados(self, ruta_salida: str = "analisis_receta.json"):
        """Guarda los resultados en un archivo JSON."""
        with open(ruta_salida, 'w', encoding='utf-8') as f:
            json.dump(self.resultados_json, f, ensure_ascii=False, indent=2)
        print(f"\nResultados guardados en: {ruta_salida}")

def main():
    # Configurar directorio de imágenes
    directorio_base = "D:\\PruebatecnicaFerreycorp\\GenAI\\Videos y Receta"
    directorio_imagenes = os.path.join(directorio_base, "imagenes_receta")
    
    if not os.path.exists(directorio_imagenes):
        raise ValueError(f"No se encontró el directorio de imágenes: {directorio_imagenes}")
    
    # Crear y ejecutar el agente
    agente = RecetaAgent()
    resultados = agente.procesar_receta(directorio_imagenes)
    
    # Guardar resultados
    agente.guardar_resultados("analisis_receta_detallado.json")
    
    # Imprimir resumen
    print("\nResumen del análisis:")
    print(f"Total de pasos: {resultados['metadatos']['total_pasos']}")
    print(f"Tiempo de preparación: {resultados['metadatos']['tiempo_total']}")
    print(f"Dificultad: {resultados['metadatos']['nivel_dificultad']}")

if __name__ == "__main__":
    main()


Analizando información general de la receta...

Respuesta de GPT para receta_pagina_1.jpg:
{
    "ingredientes": ["1 taza de harina leudante o harina de repostería", "1 huevo", "¾ taza de leche", "1 chorro de pequeño de aceite de oliva", "1 cucharadita de mantequilla", "1 chorro de miel"],
    "utensilios": ["Moldes para panqueques", "Espátula de silicona", "Sartén de panqueques (opcional)", "Bol"],
    "tiempo_preparacion": "15 minutos",
    "porciones": "2 porciones",
    "dificultad": "baja"
}

Buscando pasos en receta_pagina_1.jpg...

Respuesta de GPT para receta_pagina_1.jpg:
Lo siento, pero no puedo extraer los pasos numerados de la receta de la imagen que has proporcionado.

Buscando pasos en receta_pagina_2.jpg...

Respuesta de GPT para receta_pagina_2.jpg:
```json
{
    "pasos": {
        "1": "Hacer la receta de panqueques es súper fácil. Primero, reúne todos los ingredientes. Si no tienes harina leudante o preparada, que es harina de trigo que ya incluye levadura (también s

In [None]:
import google.generativeai as genai
import cv2
import json
import os
import time
from datetime import timedelta
from typing import Dict, List

class VideoAnalysisAgent:
    def __init__(self, api_key: str):
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel("gemini-2.0-flash-exp")
        self.receta_info = self._cargar_receta()
        
    def _cargar_receta(self, ruta_json: str = "D:\\PruebatecnicaFerreycorp\\analisis_receta_detallado.json") -> Dict:
        """Carga la información de la receta desde el JSON."""
        with open(ruta_json, 'r', encoding='utf-8') as f:
            return json.load(f)
    
    def _formatear_tiempo(self, segundos: float) -> str:
        """Convierte segundos a formato MM:SS."""
        tiempo = timedelta(seconds=int(segundos))
        return str(tiempo).split('.')[0][-5:]  # Obtiene solo MM:SS
    
    def analizar_frame(self, frame, timestamp: float, paso_actual: Dict) -> Dict:
        """Analiza un frame específico del video."""
        try:
            # Convertir frame a formato para Gemini
            _, buffer = cv2.imencode('.jpg', frame)
            frame_data = buffer.tobytes()
            
            prompt = f"""
            Analiza este frame del video de cocina y compáralo con el siguiente paso de la receta:
            
            Paso {paso_actual['numero']}: {paso_actual['descripcion']}
            
            Ingredientes necesarios: {self.receta_info['analisis_general']['ingredientes']}
            
            IMPORTANTE: Tu respuesta debe ser EXACTAMENTE en este formato JSON, sin texto adicional:
            {{
                "timestamp": "{self._formatear_tiempo(timestamp)}",
                "accion_detectada": "descripción detallada de la acción que se observa",
                "coincide_con_paso": true/false,
                "error": "descripción del error si existe, o 'None' si no hay error"
            }}
            """
            
            time.sleep(2)  # Pausa para evitar límites de API
            
            response = self.model.generate_content([
                {"mime_type": "image/jpeg", "data": frame_data},
                prompt
            ])
            
            # Intentar encontrar y extraer el JSON de la respuesta
            try:
                texto_respuesta = response.text
                inicio_json = texto_respuesta.find('{')
                fin_json = texto_respuesta.rfind('}') + 1
                
                if inicio_json != -1 and fin_json > inicio_json:
                    json_str = texto_respuesta[inicio_json:fin_json]
                    return json.loads(json_str)
                else:
                    print(f"No se encontró JSON válido en la respuesta: {texto_respuesta[:100]}...")
                    return None
                    
            except json.JSONDecodeError as e:
                print(f"Error decodificando JSON en timestamp {timestamp}: {str(e)}")
                print(f"Respuesta recibida: {texto_respuesta[:100]}...")
                return None
                
        except Exception as e:
            if "429" in str(e):
                print("Límite de API alcanzado, esperando 60 segundos...")
                time.sleep(60)
                return self.analizar_frame(frame, timestamp, paso_actual)
            print(f"Error analizando frame: {e}")
            return None

    def analizar_video(self, ruta_video: str, nombre_salida: str):
        """Analiza un video completo y genera el reporte."""
        print(f"\nAnalizando video: {os.path.basename(ruta_video)}")
        
        cap = cv2.VideoCapture(ruta_video)
        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        total_duration = total_frames / fps
        
        resultados = []
        paso_actual_idx = 0
        frame_interval = int(fps * 10)  # Aumentado a 10 segundos para reducir llamadas a la API
        inicio_paso_actual = 0
        
        print(f"Duración total del video: {self._formatear_tiempo(total_duration)}")
        print(f"Total de frames: {total_frames}")
        print(f"FPS: {fps}")
        
        for frame_idx in range(0, total_frames, frame_interval):
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
            ret, frame = cap.read()
            
            if not ret:
                break
                
            timestamp = frame_idx / fps
            
            # Si estamos al final de un paso o si detectamos un cambio significativo
            if paso_actual_idx < len(self.receta_info['pasos']):
                paso_actual = self.receta_info['pasos'][paso_actual_idx]
                
                print(f"\nAnalizando paso {paso_actual['numero']} en tiempo {self._formatear_tiempo(timestamp)}")
                analisis = self.analizar_frame(frame, timestamp, paso_actual)
                
                if analisis:
                    # Si detectamos una acción significativa
                    resultados.append({
                        "paso": f"**Paso {paso_actual['numero']}**",
                        "start": self._formatear_tiempo(inicio_paso_actual),
                        "end": self._formatear_tiempo(timestamp),
                        "action": analisis['accion_detectada'],
                        "label": "CORRECT" if analisis.get('coincide_con_paso', False) and (not analisis.get('error') or analisis['error'] == 'None') else "WRONG",
                        "error": analisis.get('error', 'None')
                    })
                    
                    if analisis.get('coincide_con_paso', False):
                        inicio_paso_actual = timestamp
                        paso_actual_idx += 1
            
            print(f"Progreso: {frame_idx}/{total_frames} frames ({(frame_idx/total_frames*100):.1f}%)")
        
        cap.release()
        
        # Guardar resultados
        with open(nombre_salida, 'w', encoding='utf-8') as f:
            for resultado in resultados:
                f.write(f"{resultado['paso']}:\n")
                f.write(f"* Start: {resultado['start']}\n")
                f.write(f"* End: {resultado['end']}\n")
                f.write(f"* Action: {resultado['action']}\n")
                f.write(f"* Label: {resultado['label']}\n")
                f.write(f"* Error: {resultado['error']}\n\n")
        
        print(f"\nAnálisis guardado en: {nombre_salida}")

def main():
    # Configuración
    api_key = "-------------------------"
    directorio_base = "D:\\PruebatecnicaFerreycorp\\GenAI\\Videos y Receta"
    
    # Crear agente
    agente = VideoAnalysisAgent(api_key)
    
    # Analizar videos
    videos = ["preparacion1.mp4", "preparacion2.mp4"]
    
    for i, video in enumerate(videos, 1):
        ruta_video = os.path.join(directorio_base, video)
        nombre_salida = f"analisis_video_{i}.txt"
        agente.analizar_video(ruta_video, nombre_salida)

if __name__ == "__main__":
    main()

  from .autonotebook import tqdm as notebook_tqdm



Analizando video: preparacion1.mp4
Duración total del video: 06:37
Total de frames: 11923
FPS: 29.98881571188469

Analizando paso 1 en tiempo 00:00
Progreso: 0/11923 frames (0.0%)

Analizando paso 2 en tiempo 00:09
Progreso: 299/11923 frames (2.5%)

Analizando paso 3 en tiempo 00:19
Progreso: 598/11923 frames (5.0%)

Analizando paso 4 en tiempo 00:29
Progreso: 897/11923 frames (7.5%)

Analizando paso 4 en tiempo 00:39
Progreso: 1196/11923 frames (10.0%)

Analizando paso 5 en tiempo 00:49
Progreso: 1495/11923 frames (12.5%)

Analizando paso 5 en tiempo 00:59
Progreso: 1794/11923 frames (15.0%)

Analizando paso 5 en tiempo 01:09
Progreso: 2093/11923 frames (17.6%)

Analizando paso 5 en tiempo 01:19
Progreso: 2392/11923 frames (20.1%)
Progreso: 2691/11923 frames (22.6%)
Progreso: 2990/11923 frames (25.1%)
Progreso: 3289/11923 frames (27.6%)
Progreso: 3588/11923 frames (30.1%)
Progreso: 3887/11923 frames (32.6%)
Progreso: 4186/11923 frames (35.1%)
Progreso: 4485/11923 frames (37.6%)
Progr

In [3]:
from openai import OpenAI
import os
import json
import re

class TitleAnalysisAgent:
    def __init__(self, api_key: str):
        self.client = OpenAI(api_key=api_key)
        
    def extraer_pasos(self, contenido: str) -> list:
        """Extrae los pasos del texto usando expresiones regulares."""
        patron = r'\*\*Paso \d+\*\*:\n\* Start: (.*?)\n\* End: (.*?)\n\* Action: (.*?)\n\* Label: (.*?)\n\* Error: (.*?)(?=\n\n|\Z)'
        pasos = re.findall(patron, contenido, re.DOTALL)
        return pasos
    
    def generar_titulo(self, accion: str) -> str:
        """Genera un título descriptivo corto usando GPT-4."""
        prompt = f"""
        Analiza la siguiente acción de una receta y genera un título descriptivo corto (máximo 4 palabras):

        Acción: {accion}

        Responde SOLO con el título, sin puntos, comillas ni explicaciones adicionales.
        Ejemplo: "Mezcla de ingredientes" o "Preparación de masa"
        """
        
        try:
            response = self.client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": prompt}],
                max_tokens=20,
                temperature=0.3
            )
            return response.choices[0].message.content.strip().strip('"')
        except Exception as e:
            print(f"Error generando título: {e}")
            return "Paso sin título"

    def procesar_archivo(self, ruta_entrada: str, ruta_salida: str):
        """Procesa el archivo de análisis y genera una versión con títulos."""
        try:
            # Leer archivo original
            with open(ruta_entrada, 'r', encoding='utf-8') as f:
                contenido = f.read()
            
            # Extraer pasos
            pasos = self.extraer_pasos(contenido)
            
            # Generar nuevo contenido con títulos
            nuevo_contenido = ""
            for paso in pasos:
                start, end, action, label, error = paso
                titulo = self.generar_titulo(action)
                
                nuevo_contenido += f"""# {titulo}
**Paso**:
* Start: {start}
* End: {end}
* Action: {action}
* Label: {label}
* Error: {error}\n\n"""
            
            # Guardar nuevo archivo
            with open(ruta_salida, 'w', encoding='utf-8') as f:
                f.write(nuevo_contenido)
                
            print(f"Archivo procesado y guardado en: {ruta_salida}")
            
        except Exception as e:
            print(f"Error procesando archivo: {e}")

def main():
    # Configuración
    api_key = os.getenv('OPENAI_API_KEY')
    if not api_key:
        raise ValueError("No se encontró la API key de OpenAI")
    
    # Crear agente
    agente = TitleAnalysisAgent(api_key)
    
    # Procesar archivos de análisis
    for i in range(1, 3):
        archivo_entrada = f"analisis_video_{i}.txt"
        archivo_salida = f"analisis_video_{i}_FINAL.txt"
        
        if os.path.exists(archivo_entrada):
            print(f"\nProcesando archivo: {archivo_entrada}")
            agente.procesar_archivo(archivo_entrada, archivo_salida)
        else:
            print(f"No se encontró el archivo: {archivo_entrada}")

if __name__ == "__main__":
    main()


Procesando archivo: analisis_video_1.txt
Archivo procesado y guardado en: analisis_video_1_FINAL.txt

Procesando archivo: analisis_video_2.txt
Archivo procesado y guardado en: analisis_video_2_FINAL.txt


In [6]:
!pip install git+https://github.com/openai/swarm.git

Collecting git+https://github.com/openai/swarm.git
  Cloning https://github.com/openai/swarm.git to c:\users\diego\appdata\local\temp\pip-req-build-eoyzj657
  Resolved https://github.com/openai/swarm.git to commit 9db581cecaacea0d46a933d6453c312b034dbf47
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Collecting pytest (from swarm==0.1.0)
  Using cached pytest-8.3.4-py3-none-any.whl.metadata (7.5 kB)
Collecting pre-commit (from swarm==0.1.0)
  Downloading pre_commit-4.1.0-py2.py3-none-any.whl.metadata (1.3 kB)
Collecting instructor (from swarm==0.1.0)
  Downloading instructor-1.7.2-py3-none-any.whl.metadata (18 kB)
Collecting docstring-parser<1.0,>=0.16 (from instructor->swarm==0.1.0)
  Downloading docstring_

  Running command git clone --filter=blob:none --quiet https://github.com/openai/swarm.git 'C:\Users\diego\AppData\Local\Temp\pip-req-build-eoyzj657'

[notice] A new release of pip is available: 24.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


<h2> Enrutador de Orquestacion de Agentes

In [None]:
from swarm import Swarm, Agent
import os
from pathlib import Path

# Function definitions for agents
def process_recipe_images():
    """Function for RecetaAgent to process recipe images"""
    agent = RecetaAgent()
    directorio_base = "D:\\PruebatecnicaFerreycorp\\GenAI\\Videos y Receta"
    directorio_imagenes = os.path.join(directorio_base, "imagenes_receta")
    
    if not os.path.exists(directorio_imagenes):
        return {"status": "error", "message": f"Directory not found: {directorio_imagenes}"}
    
    resultados = agent.procesar_receta(directorio_imagenes)
    agent.guardar_resultados("analisis_receta_detallado.json")
    return {"status": "success", "data": resultados}

def analyze_videos():
    """Function for VideoAnalysisAgent to analyze videos"""
    api_key = "AIzaSyB_wQJWAwL9VzrYtF-P3Ipnz4rhp5KbalY"
    directorio_base = "D:\\PruebatecnicaFerreycorp\\GenAI\\Videos y Receta"
    
    # Crear carpeta para análisis sin títulos
    carpeta_sin_titulos = os.path.join(directorio_base, "Analisis de videos sin titulos")
    os.makedirs(carpeta_sin_titulos, exist_ok=True)
    
    agent = VideoAnalysisAgent(api_key)
    videos = ["preparacion1.mp4", "preparacion2.mp4"]
    results = []
    
    for i, video in enumerate(videos, 1):
        ruta_video = os.path.join(directorio_base, video)
        nombre_salida = os.path.join(carpeta_sin_titulos, f"analisis_video_{i}.txt")
        agent.analizar_video(ruta_video, nombre_salida)
        results.append({"video": video, "output": nombre_salida})
    
    return {"status": "success", "data": results}

def generate_titles():
    """Function for TitleAnalysisAgent to generate titles"""
    api_key = os.getenv('OPENAI_API_KEY')
    if not api_key:
        return {"status": "error", "message": "OpenAI API key not found"}
    
    directorio_base = "D:\\PruebatecnicaFerreycorp\\GenAI\\Videos y Receta"
    carpeta_sin_titulos = os.path.join(directorio_base, "Analisis de videos sin titulos")
    carpeta_con_titulos = os.path.join(directorio_base, "Analisis de videos con especificaciones - FINAL")
    
    # Crear carpeta para análisis con títulos
    os.makedirs(carpeta_con_titulos, exist_ok=True)
    
    agent = TitleAnalysisAgent(api_key)
    results = []
    
    for i in range(1, 3):
        archivo_entrada = os.path.join(carpeta_sin_titulos, f"analisis_video_{i}.txt")
        archivo_salida = os.path.join(carpeta_con_titulos, f"analisis_video_{i}_FINAL.txt")
        
        if os.path.exists(archivo_entrada):
            agent.procesar_archivo(archivo_entrada, archivo_salida)
            results.append({"input": archivo_entrada, "output": archivo_salida})
    
    return {"status": "success", "data": results}

def transfer_to_video_agent():
    return video_agent

def transfer_to_title_agent():
    return title_agent

# Create Swarm client
client = Swarm()

# Define agents
recipe_agent = Agent(
    name="Recipe Agent",
    instructions="""
    You are responsible for processing recipe images and extracting structured information.
    Your main tasks are:
    1. Process recipe images
    2. Extract ingredients, tools, and steps
    3. Generate a structured JSON output
    """,
    functions=[process_recipe_images, transfer_to_video_agent]
)

video_agent = Agent(
    name="Video Agent",
    instructions="""
    You are responsible for analyzing cooking videos and comparing them with recipe steps.
    Your main tasks are:
    1. Process video frames
    2. Compare actions with recipe steps
    3. Generate analysis reports
    """,
    functions=[analyze_videos, transfer_to_title_agent]
)

title_agent = Agent(
    name="Title Agent",
    instructions="""
    You are responsible for generating descriptive titles for recipe steps.
    Your main tasks are:
    1. Process analysis reports
    2. Generate concise titles for each step
    3. Create final formatted output
    """,
    functions=[generate_titles]
)

# Example usage
def run_recipe_analysis():
    # Start with recipe agent
    response = client.run(
        agent=recipe_agent,
        messages=[{"role": "user", "content": "Process the recipe images and extract information"}]
    )
    
    # If recipe processing is successful, move to video analysis
    if "success" in response.messages[-1]["content"]:
        response = client.run(
            agent=video_agent,
            messages=[{"role": "user", "content": "Analyze the cooking videos and compare with recipe steps"}]
        )
        
        # If video analysis is successful, move to title generation
        if "success" in response.messages[-1]["content"]:
            response = client.run(
                agent=title_agent,
                messages=[{"role": "user", "content": "Generate titles for the analyzed steps"}]
            )
    
    return response.messages[-1]["content"]

if __name__ == "__main__":
    result = run_recipe_analysis()
    print("Analysis complete:", result)


Analizando información general de la receta...

Respuesta de GPT para receta_pagina_1.jpg:
{
    "ingredientes": ["1 taza de harina leudante o harina de repostería", "1 huevo", "¾ taza de leche (180 mililitros)", "1 chorro de pequeño de aceite de oliva", "1 cucharadita de mantequilla", "1 chorro de miel"],
    "utensilios": ["Moldes para panqueques", "Espátula de silicona", "Sartén de panqueques (opcional)", "Bol"],
    "tiempo_preparacion": "15 minutos",
    "porciones": "2 porciones",
    "dificultad": "baja"
}

Buscando pasos en receta_pagina_1.jpg...

Respuesta de GPT para receta_pagina_1.jpg:
Lo siento, pero no puedo extraer texto de imágenes. Sin embargo, si tienes la receta en formato de texto, estaré encantado de ayudarte.

Buscando pasos en receta_pagina_2.jpg...

Respuesta de GPT para receta_pagina_2.jpg:
```json
{
    "pasos": {
        "1": "Hacer la receta de panqueques es súper fácil. Primero, reúne todos los ingredientes. Si no tienes harina leudante o preparada, que es

: 

<h1>Reto 3: Elaborar un Chatbot conectado a un dataset

In [None]:
import os
from dotenv import load_dotenv
from langchain_openai import OpenAI
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferWindowMemory
from langchain.prompts import PromptTemplate
import pandas as pd
from openai import OpenAI as DirectOpenAI

# Load environment variables
load_dotenv()

# Initialize OpenAI client
client = DirectOpenAI(api_key=os.getenv('OPENAI_API_KEY'))

# Define el template para el chatbot
CUSTOM_PROMPT_TEMPLATE = """Eres un asistente especializado en analizar datos de compras y comportamiento de clientes.
Utiliza la siguiente información recuperada para responder la pregunta del usuario.

Contexto recuperado: {context}

Historial de la conversación:
{chat_history}

Pregunta del usuario: {question}

Instrucciones adicionales:
- Responde en español
- Sé conciso pero informativo
- Si no tienes suficiente información para responder, indícalo
- Si la pregunta no está relacionada con los datos proporcionados, puedes responder de manera general pero apropiada

Tu respuesta:"""

class RAGChatbot:
    def __init__(self):
        # Initialize OpenAI with updated imports
        self.llm = OpenAI(
            temperature=0.7,
            openai_api_key=os.getenv('OPENAI_API_KEY')
        )
        self.embeddings = OpenAIEmbeddings(
            openai_api_key=os.getenv('OPENAI_API_KEY')
        )
        
        # Initialize memory with correct configuration
        self.memory = ConversationBufferWindowMemory(
            memory_key="chat_history",
            return_messages=True,
            output_key='answer',
            k=5
        )
        
        # Load and process documents
        self.vectorstore = self.create_vectorstore()
        
        # Create custom prompt
        prompt = PromptTemplate(
            input_variables=["context", "chat_history", "question"],
            template=CUSTOM_PROMPT_TEMPLATE
        )
        
        # Create conversation chain with memory and custom prompt
        self.chain = ConversationalRetrievalChain.from_llm(
            llm=self.llm,
            retriever=self.vectorstore.as_retriever(search_kwargs={"k": 3}),
            memory=self.memory,
            combine_docs_chain_kwargs={"prompt": prompt},
            return_source_documents=True,
            verbose=True
        )

    def create_vectorstore(self):
        try:
            # Load CSV data
            df = pd.read_csv(r'D:\PruebatecnicaFerreycorp\GenAI\Dataset\compras_data.csv')
            
            # Convert important columns to string with descriptions
            csv_text = ""
            for column in df.columns:
                csv_text += f"Columna {column}:\n"
                if pd.api.types.is_numeric_dtype(df[column]):
                    csv_text += f"Estadísticas: Min={df[column].min()}, Max={df[column].max()}, Media={df[column].mean():.2f}\n"
                else:
                    csv_text += f"Valores únicos: {', '.join(map(str, df[column].unique()[:5]))}\n"
                csv_text += "\n"

            # Load dictionary text
            with open(r'D:\PruebatecnicaFerreycorp\GenAI\Dataset\Diccionario de Datos.txt', 'r', encoding='utf-8') as file:
                dictionary_text = file.read()

            # Combine texts with better structure
            combined_text = f"""
            INFORMACIÓN DEL DATASET DE COMPRAS
            
            DICCIONARIO DE DATOS:
            {dictionary_text}
            
            ANÁLISIS DE COLUMNAS:
            {csv_text}
            """

            # Split text with smaller chunks
            text_splitter = CharacterTextSplitter(
                separator="\n",
                chunk_size=500,
                chunk_overlap=100,
                length_function=len
            )
            texts = text_splitter.split_text(combined_text)

            # Create vectorstore
            return FAISS.from_texts(texts, self.embeddings)
        except Exception as e:
            print(f"Error al crear el vectorstore: {str(e)}")
            raise

    def process_question(self, question: str) -> str:
        try:
            # Use the new invoke method instead of calling directly
            result = self.chain.invoke({
                "question": question
            })
            
            # Extract and return just the answer
            return result.get("answer", "Lo siento, no pude generar una respuesta.")
            
        except Exception as e:
            return f"Lo siento, ocurrió un error al procesar tu pregunta: {str(e)}"

def main():
    print("Inicializando el chatbot RAG...")
    try:
        chatbot = RAGChatbot()
        print("""¡Chatbot listo! 
        
Puedo ayudarte a analizar datos de compras y clientes. Algunas preguntas que puedes hacer:
- ¿Cuál es la edad promedio de los clientes?
- ¿Cuántos ítems de la marca X se compraron el día Y?
- ¿Qué significan las diferentes columnas del dataset?
- ¿Quiénes compran más, los hombres o las mujeres?

Escribe 'salir' para terminar.
        """)
        
        while True:
            user_input = input("\nTu pregunta: ").strip()
            
            if not user_input:
                print("Por favor, ingresa una pregunta.")
                continue
                
            if user_input.lower() in ['salir', 'exit', 'quit']:
                print("¡Hasta luego!")
                break
                
            response = chatbot.process_question(user_input)
            print(f"\nRespuesta: {response}")
            
    except Exception as e:
        print(f"Error al inicializar el chatbot: {str(e)}")

if __name__ == "__main__":
    main()

Inicializando el chatbot RAG...


  self.memory = ConversationBufferWindowMemory(


¡Chatbot listo! 
        
Puedo ayudarte a analizar datos de compras y clientes. Algunas preguntas que puedes hacer:
- ¿Cuál es la edad promedio de los clientes?
- ¿Cuántos ítems de la marca X se compraron el día Y?
- ¿Qué significan las diferentes columnas del dataset?
- ¿Quiénes compran más, los hombres o las mujeres?

Escribe 'salir' para terminar.
        
Por favor, ingresa una pregunta.


[1m> Entering new StuffDocumentsChain chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mEres un asistente especializado en analizar datos de compras y comportamiento de clientes.
Utiliza la siguiente información recuperada para responder la pregunta del usuario.

Contexto recuperado: ANÁLISIS DE COLUMNAS:
            Columna id:
Estadísticas: Min=200000001, Max=200000500, Media=200000252.90
Columna dia_visita:
Estadísticas: Min=1, Max=730, Media=349.43
Columna incidencia_compra:
Estadísticas: Min=0, Max=1, Media=0.25
Columna id_marca:
Estadísticas: Mi

In [4]:
!pip install pandasql

Collecting pandasql
  Downloading pandasql-0.7.3.tar.gz (26 kB)
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Building wheels for collected packages: pandasql
  Building wheel for pandasql (pyproject.toml): started
  Building wheel for pandasql (pyproject.toml): finished with status 'done'
  Created wheel for pandasql: filename=pandasql-0.7.3-py3-none-any.whl size=26845 sha256=caa39a65273509ee78cd62df41a832aed931ab1dcd0cd8d5d6c5084f31d41b0f
  Stored in directory: c:\users\diego\appdata\local\pip\cache\wheels\15\a1\e7\6f92f295b5272ae5c02365e6b8fa19cb93f16a537090a1cf27
Successfully built pandasql
Installing collected packages: pandasql
Successfully installed pandasql-0.7.3



[notice] A new release of pip is available: 24.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [6]:
!pip install ipywidgets

Collecting ipywidgets
  Using cached ipywidgets-8.1.5-py3-none-any.whl.metadata (2.3 kB)
Collecting widgetsnbextension~=4.0.12 (from ipywidgets)
  Using cached widgetsnbextension-4.0.13-py3-none-any.whl.metadata (1.6 kB)
Collecting jupyterlab-widgets~=3.0.12 (from ipywidgets)
  Using cached jupyterlab_widgets-3.0.13-py3-none-any.whl.metadata (4.1 kB)
Using cached ipywidgets-8.1.5-py3-none-any.whl (139 kB)
Using cached jupyterlab_widgets-3.0.13-py3-none-any.whl (214 kB)
Using cached widgetsnbextension-4.0.13-py3-none-any.whl (2.3 MB)
Installing collected packages: widgetsnbextension, jupyterlab-widgets, ipywidgets
Successfully installed ipywidgets-8.1.5 jupyterlab-widgets-3.0.13 widgetsnbextension-4.0.13



[notice] A new release of pip is available: 24.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


<h3>Chatbot Mediante Queries

In [None]:
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferWindowMemory
from langchain.prompts import PromptTemplate
import pandas as pd
import numpy as np
from pandasql import sqldf

load_dotenv()

class AnalyticsRAG:
    def __init__(self, csv_path, dict_path):
        # Inicialización del modelo LLM
        self.llm = ChatOpenAI(
            temperature=0.3,
            model="gpt-4",
            openai_api_key=os.getenv('OPENAI_API_KEY')
        )
        
        # Inicialización de embeddings
        self.embeddings = OpenAIEmbeddings(
            openai_api_key=os.getenv('OPENAI_API_KEY')
        )
        
        # Cargar datos
        self.df = pd.read_csv(csv_path)
        with open(dict_path, 'r', encoding='utf-8') as file:
            self.dictionary_text = file.read()
        
        # Configurar memoria del chat
        self.memory = ConversationBufferWindowMemory(
            memory_key="chat_history",
            return_messages=True,
            k=5
        )
        
        # Template para queries SQL
        self.query_template = """
        Basándote en el siguiente contexto y pregunta, genera una query SQL precisa.

        CONTEXTO DEL DICCIONARIO:
        {context}

        PREGUNTA: {question}

        La tabla se llama 'df' y debes:
        1. Usar CASE WHEN para transformar variables categóricas a sus etiquetas descriptivas
        2. Usar agregaciones apropiadas (COUNT, SUM, AVG, etc.)
        3. Incluir filtros WHERE relevantes
        4. Usar GROUP BY cuando sea necesario
        5. Para porcentajes, multiplicar por 100 y redondear a 2 decimales

        Genera SOLO la query SQL, sin explicaciones adicionales.
        """
        
        # Template para análisis de resultados
        self.analysis_template = """
        Analiza los siguientes resultados basándote en el contexto del diccionario de datos y la pregunta original.

        CONTEXTO DEL DICCIONARIO:
        {context}

        PREGUNTA ORIGINAL: {question}

        RESULTADOS DE LA CONSULTA:
        {results}

        INSTRUCCIONES:
        1. Explica los resultados de manera clara y profesional
        2. Relaciona los hallazgos con las definiciones del diccionario de datos
        3. Identifica patrones o tendencias importantes
        4. Proporciona insights relevantes para el negocio
        5. Si hay anomalías o puntos interesantes, destácalos

        Proporciona un análisis detallado y fundamentado.
        """
        
        # Inicializar vectorstore y chain
        self.vectorstore = self._create_vectorstore()
        
    def _create_vectorstore(self):
        """Crea y retorna el vectorstore con la información del dataset y diccionario"""
        dataset_info = self._generate_dataset_info()
        combined_text = f"""
        DICCIONARIO DE DATOS:
        {self.dictionary_text}

        INFORMACIÓN DEL DATASET:
        {dataset_info}

        MAPEOS DE VARIABLES:
        - Género: 0=Mujer, 1=Hombre
        - Estado Civil: 0=Soltero, 1=Casado, 2=Divorciado
        - Nivel Educación: 0=Básica, 1=Media, 2=Superior, 3=Postgrado
        - Ocupación: 0=Desempleado, 1=Empleado, 2=Independiente, 3=Jubilado
        """
        
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=500,
            chunk_overlap=100,
            separators=["\n\n", "\n", ". ", ", "]
        )
        texts = text_splitter.split_text(combined_text)
        
        return FAISS.from_texts(texts, self.embeddings)
    
    def _generate_dataset_info(self):
        """Genera información estadística básica del dataset"""
        info = []
        info.append(f"Total registros: {len(self.df)}")
        info.append(f"Período analizado: Días {self.df['dia_visita'].min()} a {self.df['dia_visita'].max()}")
        info.append(f"Total clientes únicos: {self.df['id'].nunique()}")
        compras = self.df[self.df['incidencia_compra'] == 1]
        info.append(f"Total compras realizadas: {len(compras)}")
        info.append(f"Tasa de conversión global: {(len(compras)/len(self.df))*100:.2f}%")
        
        # Estadísticas por marca
        info.append("\nEstadísticas por marca:")
        for marca in range(1, 6):
            compras_marca = compras[compras['id_marca'] == marca]
            info.append(f"Marca {marca}: {len(compras_marca)} compras")
        
        return "\n".join(info)
    
    def _get_context_for_question(self, question):
        """Obtiene el contexto relevante del vectorstore para una pregunta"""
        docs = self.vectorstore.similarity_search(question, k=3)
        return "\n".join(doc.page_content for doc in docs)
    
    def _generate_sql_query(self, question, context):
        """Genera una query SQL basada en la pregunta y el contexto"""
        try:
            query_response = self.llm.invoke(
                self.query_template.format(
                    context=context,
                    question=question
                )
            )
            query = query_response.content.strip()
            
            # Limpiar la query de marcadores markdown
            query = query.replace('```sql', '').replace('```', '').strip()
            
            if "SELECT" not in query.upper():
                return None
                
            # Verificar y corregir nombres de columnas
            query = query.replace('precio_marca_x', 'precio_marca_1')
            query = query.replace('promo_marca_x', 'promo_marca_1')
            
            print(f"\nQuery generada: {query}")  # Para debugging
            return query
        except Exception as e:
            print(f"Error en generación de query SQL: {str(e)}")
            return None
    
    def _execute_sql_query(self, query):
        """Ejecuta una query SQL y retorna los resultados"""
        try:
            if query:
                local_env = {'df': self.df}
                result = sqldf(query, local_env)
                if len(result) > 0:
                    return result
            return None
        except Exception as e:
            print(f"Error en ejecución de query SQL: {str(e)}")
            return None
    
    def _analyze_results(self, question, results, context):
        """Analiza los resultados usando el contexto del diccionario"""
        try:
            analysis_response = self.llm.invoke(
                self.analysis_template.format(
                    context=context,
                    question=question,
                    results=results.to_string()
                )
            )
            return analysis_response.content.strip()
        except Exception as e:
            print(f"Error en análisis de resultados: {str(e)}")
            return None
    
    def process_question(self, question):
        """Procesa una pregunta y retorna una respuesta analítica"""
        try:
            # Primero, determinar si la pregunta requiere análisis de datos
            sql_keywords = [
                'cuántos', 'promedio', 'total', 'porcentaje', 'comparar', 
                'máximo', 'mínimo', 'distribución', 'tendencia', 'cantidad',
                'ventas', 'compras', 'precio', 'mayor', 'menor', 'más', 'menos',
                'análisis', 'estadísticas', 'comportamiento', 'impacto'
            ]
            
            needs_analysis = any(keyword in question.lower() for keyword in sql_keywords)
            
            if not needs_analysis:
                # Si es una pregunta conversacional, usar el LLM directamente
                context = self._get_context_for_question(question)
                response = self.llm.invoke(
                    f"""Actúa como un asistente experto en análisis de datos de ventas.
                    Contexto del sistema: {context}
                    
                    Pregunta del usuario: {question}
                    
                    Proporciona una respuesta profesional y ayuda al usuario a formular preguntas 
                    más específicas si es necesario."""
                )
                return response.content
            
            # Si requiere análisis, proceder con la generación de SQL
            print("\nObteniendo contexto relevante...")
            context = self._get_context_for_question(question)
            
            print("\nGenerando consulta SQL...")
            query = self._generate_sql_query(question, context)
            if not query:
                return "No se pudo generar una consulta SQL válida para tu pregunta."
            
            print("\nEjecutando consulta...")
            sql_result = self._execute_sql_query(query)
            if sql_result is None or sql_result.empty:
                return "No se encontraron resultados para tu consulta."
            
            # 3. Analizar resultados
            print("\nAnalizando resultados...")
            analysis = self._analyze_results(question, sql_result, context)
            if not analysis:
                return "Error al analizar los resultados."
            
            # 4. Formatear respuesta final
            response = f"Resultados numéricos:\n"
            response += f"{sql_result.to_string()}\n\n"
            response += f"Análisis:\n{analysis}"
            
            return response
            
        except Exception as e:
            return f"Error en el procesamiento: {str(e)}"
    
    def start(self):
        """Inicia la interfaz de conversación"""
        print("Sistema de análisis de datos de compras iniciado.")
        print("Escriba 'salir' para terminar la sesión.")
        print("\nEjemplos de consultas:")
        print("- ¿Cuál es la distribución de compras por nivel educativo?")
        print("- ¿Qué impacto tienen las promociones en las ventas por marca?")
        print("- ¿Hay alguna relación entre el ingreso anual y la frecuencia de compra?")
        print("- ¿Cuál es el comportamiento de compra según género y estado civil?")
        print("-" * 50)
        
        while True:
            question = input("\nConsulta: ").strip()
            
            if question.lower() in ['salir', 'exit', 'quit']:
                print("\nSesión finalizada.")
                break
                
            if not question:
                continue
                
            response = self.process_question(question)
            print("\nRespuesta:", response)

def initialize_analytics():
    """Función principal para inicializar el sistema"""
    csv_path = r"D:\PruebatecnicaFerreycorp\GenAI\Dataset\compras_data.csv"
    dict_path = r"D:\PruebatecnicaFerreycorp\GenAI\Dataset\Diccionario de Datos.txt"
    
    print("Inicializando sistema de análisis...")
    analytics = AnalyticsRAG(csv_path, dict_path)
    print("Sistema inicializado.")
    
    analytics.start()

if __name__ == "__main__":
    initialize_analytics()

Inicializando sistema de análisis...


  self.memory = ConversationBufferWindowMemory(


Sistema inicializado.
Sistema de análisis de datos de compras iniciado.
Escriba 'salir' para terminar la sesión.

Ejemplos de consultas:
- ¿Cuál es la distribución de compras por nivel educativo?
- ¿Qué impacto tienen las promociones en las ventas por marca?
- ¿Hay alguna relación entre el ingreso anual y la frecuencia de compra?
- ¿Cuál es el comportamiento de compra según género y estado civil?
--------------------------------------------------

Respuesta: ¡Hola! ¿Cómo puedo asistirte hoy con el análisis de datos de ventas? Tengo información detallada sobre las compras realizadas por marca, así como datos demográficos de los clientes como género, estado civil, nivel de educación, ingreso anual y ocupación. ¿Hay algún aspecto específico en el que estés interesado, como el rendimiento de ventas de una marca en particular, o tal vez cómo las ventas se distribuyen entre diferentes grupos demográficos?

Respuesta: ¡Hola! ¿Cómo puedo ayudarte hoy con tu análisis de datos de ventas? Tienes 