In [3]:
import os
import shutil
from glob import glob

# Define dataset paths
base_path = "IPAD_dataset"
output_train_dir = "memae-anomaly-detection/dataset/train"

# Create output directory
os.makedirs(output_train_dir, exist_ok=True)

# Extract training images
for dataset in ["R01", "R02", "R03", "R04", "S01", "S02", "S03", "S04", "S05", "S06", "S07", "S08", "S09", "S10", "S11", "S12"]:  # Modify as needed
    train_path = os.path.join(base_path, dataset, "training/frames/")
    for folder in glob(os.path.join(train_path, "*")):
        for img in glob(os.path.join(folder, "*.jpg")):
            shutil.copy(img, output_train_dir)  # Move images to MemAE's dataset directory

In [4]:
import os
import shutil
from glob import glob

def prepare_test_data(base_path, datasets=None, output_test_dir="memae-anomaly-detection/dataset/test"):
    """
    Extracts test images and labels from the IPAD dataset.
    
    Parameters:
    - base_path (str): The root directory of the IPAD dataset.
    - datasets (list): List of dataset folders to process. Use:
        - ["R01", "R02", "R03", "R04"] for real-world data only
        - ["S01", "S02", ..., "S12"] for synthetic data only
        - ["R01", "R02", "R03", "R04", "S01", ..., "S12"] for all data
    - output_test_dir (str): Directory where test images will be stored.
    """

    if datasets is None:
        datasets = ["R01", "R02", "R03", "R04"]  # Default: Only real-world data

    os.makedirs(output_test_dir, exist_ok=True)
    test_label_dest = "memae-anomaly-detection/dataset/test_label"

    for dataset in datasets:
        test_path = os.path.join(base_path, dataset, "testing/frames/")
        test_label_path = os.path.join(base_path, dataset, "test_label/")

        # Extract test images
        for folder in glob(os.path.join(test_path, "*")):
            for img in glob(os.path.join(folder, "*.jpg")):
                shutil.copy(img, output_test_dir)  # Move test images

        # Extract test labels (handling existing folder issue)
        if os.path.exists(test_label_dest):
            shutil.rmtree(test_label_dest)
        shutil.copytree(test_label_path, test_label_dest)

    print(f"✅ Test images saved to: {output_test_dir}")
    print(f"✅ Test labels saved to: {test_label_dest}")

# Example Usage:
# Option 1: Only real-world data
prepare_test_data(base_path="IPAD_dataset", datasets=["R01", "R02", "R03", "R04"])

# Option 2: Both real + synthetic data
# prepare_test_data(base_path="IPAD_dataset", datasets=["R01", "R02", "R03", "R04", "S01", "S02", "S03", "S04", "S05", "S06", "S07", "S08", "S09", "S10", "S11", "S12"])

✅ Test images saved to: memae-anomaly-detection/dataset/test
✅ Test labels saved to: memae-anomaly-detection/dataset/test_label


In [21]:
import os
import shutil

# Directorios base
input_base = "./IPAD_dataset"

# Directorios de salida para frames y labels
output_frames = "/Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test/"
output_labels = "/Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test_gt/"

# Crear directorios de salida si no existen
os.makedirs(output_frames, exist_ok=True)
os.makedirs(output_labels, exist_ok=True)

# Lista de carpetas R01 a R04
for folder in os.listdir(input_base):
    if folder.startswith("R0"):
        # Directorios "testing/frames" y "testing/test_label"
        testing_path = os.path.join(input_base, folder, "testing")
        frames_path = os.path.join(testing_path, "frames")
        labels_path = os.path.join(testing_path, "test_label")

        # 1. Copiar imágenes .jpg de frames, añadiendo prefijo con el nombre de la carpeta (por ej. "R01_") y el número de frame
        if os.path.isdir(frames_path):
            for subfolder in os.listdir(frames_path):
                subfolder_path = os.path.join(frames_path, subfolder)
                if os.path.isdir(subfolder_path):
                    frame_number = 1
                    for file in os.listdir(subfolder_path):
                        if file.lower().endswith('.jpg'):
                            source_file = os.path.join(subfolder_path, file)
                            # Se emplea el mismo prefijo que para los labels
                            if int(subfolder) < 10:
                                prefixed_name = f"{folder}_0{subfolder}_{frame_number:04d}.jpg"  # ejemplo: "R01_01_0001.jpg"
                            else:
                                prefixed_name = f"{folder}_{subfolder}_{frame_number:04d}.jpg"  # ejemplo: "R01_10_0001.jpg"
                            dest_file = os.path.join(output_frames, prefixed_name)
                            shutil.copy2(source_file, dest_file)
                            print(f"Copiado {source_file} a {dest_file}")
                            frame_number += 1
        else:
            print(f"No se encontró la carpeta de frames en: {frames_path}")


Copiado ./IPAD_dataset/R01/testing/frames/03/189.jpg a /Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test/R01_003_0001.jpg
Copiado ./IPAD_dataset/R01/testing/frames/03/162.jpg a /Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test/R01_003_0002.jpg
Copiado ./IPAD_dataset/R01/testing/frames/03/176.jpg a /Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test/R01_003_0003.jpg
Copiado ./IPAD_dataset/R01/testing/frames/03/348.jpg a /Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test/R01_003_0004.jpg
Copiado ./IPAD_dataset/R01/testing/frames/03/360.jpg a /Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test/R01_003_0005.jpg
Copiado ./IPAD_dataset/R01/testing/frames/03/406.jpg a /Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test/R01_003_0006.jpg
Copiado ./

In [5]:
import os
ruta = '/Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/surveillance video examples/output.avi'

# Comprobar si existe (archivo o carpeta)
if os.path.exists(ruta):
    print("La ruta existe.")
else:
    print("La ruta no existe.")

# Comprobar si es un archivo
if os.path.isfile(ruta):
    print("Es un archivo.")

# Comprobar si es una carpeta
if os.path.isdir(ruta):
    print("Es un directorio.")

La ruta existe.
Es un archivo.


In [11]:
import cv2
import os

def create_video_from_images(input_folder, output_path, fps=24):
    images = [f for f in os.listdir(input_folder) if f.lower().endswith('.jpg')]
    images.sort()
    if not images:
        print("No .jpg files found in", input_folder)
        return

    first_frame = cv2.imread(os.path.join(input_folder, images[0]))
    height, width, _ = first_frame.shape
    # Usar el códec mp4v para generar un archivo MP4
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    for img_name in images:
        frame = cv2.imread(os.path.join(input_folder, img_name))
        video_out.write(frame)

    video_out.release()
    print("Video saved to", output_path)

if __name__ == "__main__":
    create_video_from_images(
        input_folder="/Users/dilofac/Documents/Valtron CV/IPAD_dataset/R01/testing/frames/03/",
        output_path="./models/memae-anomaly-detection/surveillance video examples/video1.mp4",
        fps=24  # Ajusta el FPS a un valor más bajo
    )

Video saved to ./models/memae-anomaly-detection/surveillance video examples/video1.mp4


In [8]:
import os
import numpy as np
import scipy.io as sio

# Ruta a la carpeta donde están los archivos .npy
npy_folder = "./models/memae-anomaly-detection/dataset/testing/IPAD/Test_labels/"

# Crear la carpeta si no existe
os.makedirs(npy_folder, exist_ok=True)

# Obtener lista de archivos .npy en la carpeta
npy_files = [f for f in os.listdir(npy_folder) if f.endswith(".npy")]

if not npy_files:
    print("❌ No se encontraron archivos .npy en:", npy_folder)
else:
    print(f"🔄 Convirtiendo {len(npy_files)} archivos .npy a .mat...")

    for npy_file in npy_files:
        npy_path = os.path.join(npy_folder, npy_file)
        mat_path = os.path.join(npy_folder, npy_file.replace(".npy", ".mat"))

        # Cargar el archivo .npy
        data = np.load(npy_path)

        # Guardarlo como .mat
        sio.savemat(mat_path, {'l': data})

        print(f"✅ Convertido: {npy_file} → {mat_path}")

    print("🎉 Conversión completada.")

🔄 Convirtiendo 19 archivos .npy a .mat...
✅ Convertido: 009.npy → ./models/memae-anomaly-detection/dataset/testing/IPAD/Test_labels/009.mat
✅ Convertido: 008.npy → ./models/memae-anomaly-detection/dataset/testing/IPAD/Test_labels/008.mat
✅ Convertido: 018.npy → ./models/memae-anomaly-detection/dataset/testing/IPAD/Test_labels/018.mat
✅ Convertido: 019.npy → ./models/memae-anomaly-detection/dataset/testing/IPAD/Test_labels/019.mat
✅ Convertido: 017.npy → ./models/memae-anomaly-detection/dataset/testing/IPAD/Test_labels/017.mat
✅ Convertido: 003.npy → ./models/memae-anomaly-detection/dataset/testing/IPAD/Test_labels/003.mat
✅ Convertido: 002.npy → ./models/memae-anomaly-detection/dataset/testing/IPAD/Test_labels/002.mat
✅ Convertido: 016.npy → ./models/memae-anomaly-detection/dataset/testing/IPAD/Test_labels/016.mat
✅ Convertido: 014.npy → ./models/memae-anomaly-detection/dataset/testing/IPAD/Test_labels/014.mat
✅ Convertido: 015.npy → ./models/memae-anomaly-detection/dataset/testing/IPA

In [17]:
import os
import shutil

# El directorio base donde se encuentran R01, R02, etc.
input_base = "./IPAD_dataset"

# Directorios de salida para frames y labels
output_frames = "/Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test/"
output_labels = "/Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test_gt/"

# Crear directorios de salida si no existen
os.makedirs(output_frames, exist_ok=True)
os.makedirs(output_labels, exist_ok=True)

# Recorre las carpetas R01 a R04 dentro de input_base
for folder in os.listdir(input_base):
    if folder.startswith("R0"):
        # Directorios de "frames" y "test_label" dentro de la carpeta "testing"
        testing_path = os.path.join(input_base, folder, "testing")
        frames_path = os.path.join(testing_path, "frames")
        labels_path = os.path.join(input_base, folder, "test_label")

        # 1. Copiar imágenes .jpg, añadiendo prefijo con el nombre de la carpeta (R01, R02, etc.)
        if os.path.isdir(frames_path):
            for file in os.listdir(frames_path):
                if file.lower().endswith('.jpg'):
                    source_file = os.path.join(frames_path, file)
                    prefixed_name = f"{folder}_{file}"  # p.ej. "R01_imagen.jpg"
                    dest_file = os.path.join(output_frames, prefixed_name)
                    shutil.copy2(source_file, dest_file)
                    print(f"Copiado {source_file} a {dest_file}")
        else:
            print(f"No se encontró la carpeta de frames en: {frames_path}")

        # 2. Copiar archivos .npy, añadiendo prefijo
        if os.path.isdir(labels_path):
            for file in os.listdir(labels_path):
                if file.lower().endswith('.npy'):
                    source_file = os.path.join(labels_path, file)
                    prefixed_name = f"{folder}_{file}"  # p.ej. "R01_etiquetas.npy"
                    dest_file = os.path.join(output_labels, prefixed_name)
                    shutil.copy2(source_file, dest_file)
                    print(f"Copiado {source_file} a {dest_file}")
        else:
            print(f"No se encontró la carpeta de test_label en: {labels_path}")


Copiado ./IPAD_dataset/R01/test_label/009.npy a /Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test_gt/R01_009.npy
Copiado ./IPAD_dataset/R01/test_label/008.npy a /Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test_gt/R01_008.npy
Copiado ./IPAD_dataset/R01/test_label/003.npy a /Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test_gt/R01_003.npy
Copiado ./IPAD_dataset/R01/test_label/002.npy a /Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test_gt/R01_002.npy
Copiado ./IPAD_dataset/R01/test_label/014.npy a /Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test_gt/R01_014.npy
Copiado ./IPAD_dataset/R01/test_label/015.npy a /Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test_gt/R01_015.npy
Copiado ./IPAD_dataset/R01/test_label/001.npy a /Users/dilofac/D

In [22]:
import numpy as np

def read_npy_file(file_path):
    """
    Reads a .npy file and prints its contents.

    Parameters:
    - file_path (str): Path to the .npy file.
    """
    try:
        data = np.load(file_path)
        print(f"Contents of {file_path}:")
        print(data)
    except Exception as e:
        print(f"Error reading {file_path}: {e}")

# Example usage
file_path = "/Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test_gt/R03_012.npy"
read_npy_file(file_path)

Contents of /Users/dilofac/Documents/Valtron CV/models/memae-anomaly-detection/dataset/testing/IPAD/Test_gt/R03_012.npy:
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.
 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0.

In [None]:
import os
import shutil

# Directorios base
input_base = "./IPAD_dataset"

# Directorios de salida para frames y labels
output_frames = "./dataset/testing/frames/"
output_labels = "./dataset/test_label/"

# Crear directorios de salida si no existen
os.makedirs(output_frames, exist_ok=True)
os.makedirs(output_labels, exist_ok=True)

# Lista de carpetas R01 a R04
for folder in os.listdir(input_base):
    if folder.startswith("R0"):
        # Directorios "testing/frames" y "testing/test_label"
        testing_path = os.path.join(input_base, folder, "testing")
        frames_path = os.path.join(testing_path, "frames")
        labels_path = os.path.join(testing_path, "test_label")

        # 1. Copiar imágenes .jpg de frames, añadiendo prefijo con el nombre de la carpeta (por ej. "R01_")
        if os.path.isdir(frames_path):
            for file in os.listdir(frames_path):
                if file.lower().endswith('.jpg'):
                    source_file = os.path.join(frames_path, file)
                    # Se emplea el mismo prefijo que para los labels
                    prefixed_name = f"{folder}_{file}"  # ejemplo: "R01_imagen.jpg"
                    dest_file = os.path.join(output_frames, prefixed_name)
                    shutil.copy2(source_file, dest_file)
                    print(f"Copiado {source_file} a {dest_file}")
        else:
            print(f"No se encontró la carpeta de frames en: {frames_path}")

        # 2. Copiar archivos .npy de labels, añadiendo prefijo con el nombre de la carpeta (por ej. "R01_")
        if os.path.isdir(labels_path):
            for file in os.listdir(labels_path):
                if file.lower().endswith('.npy'):
                    source_file = os.path.join(labels_path, file)
                    prefixed_name = f"{folder}_{file}"  # ejemplo: "R01_etiquetas.npy"
                    dest_file = os.path.join(output_labels, prefixed_name)
                    shutil.copy2(source_file, dest_file)
                    print(f"Copiado {source_file} a {dest_file}")
        else:
            print(f"No se encontró la carpeta de test_label en: {labels_path}")

In [41]:
# report_generator_agent.py (VERSIÓN FINAL FUNCIONAL)
import os
import json
from datetime import datetime
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.enums import TA_JUSTIFY, TA_CENTER
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from langchain.agents import initialize_agent, Tool, AgentType
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain_community.llms import HuggingFaceHub

os.environ["HUGGINGFACEHUB_API_TOKEN"] = "hf_tu_api_key_aqui"

# Configuración de fuentes (asegúrate de tener Arial.ttf en el mismo directorio)
try:
    pdfmetrics.registerFont(TTFont('Arial', 'Arial.ttf'))
    pdfmetrics.registerFont(TTFont('Arial-Bold', 'Arial-Bold.ttf'))
except:
    print("⚠️ Usando fuentes por defecto. Para mejor formato, instale las fuentes Arial")

# Configuración de estilos PDF
styles = getSampleStyleSheet()
styles.add(ParagraphStyle(
    'Titulo',
    fontName='Arial-Bold',
    fontSize=16,
    alignment=TA_CENTER,
    spaceAfter=20
))
styles.add(ParagraphStyle(
    'Cuerpo',
    fontName='Arial',
    fontSize=10,
    leading=12,
    alignment=TA_JUSTIFY
))

class NetworkReportGenerator:
    def __init__(self):
        try:
            self.llm = HuggingFaceHub(
                repo_id="Qwen/Qwen2.5-7B-Instruct",
                model_kwargs={
                    "temperature": 0.1,
                    "max_new_tokens": 2000,  
                    "top_p": 0.95
                }
            )
            self.setup_tools()
        except Exception as e:
            raise RuntimeError(f"Error inicializando el modelo: {str(e)}")

    def setup_tools(self):
        tools = [
            Tool(
                name="AnalizadorRed",
                func=self.analyze_network_data,
                description="Analiza datos JSON de red y detecta anomalías"
            ),
            Tool(
                name="GeneradorPDF",
                func=self.create_pdf_report,
                description="Crea reportes PDF profesionales"
            )
        ]

        prompt = PromptTemplate(
            template="""Eres un analista senior de seguridad. Genera un reporte detallado en markdown usando:

            {input}

            Requisitos:
            - Encabezado con título y fecha
            - 3 secciones principales
            - Tablas para datos numéricos
            - Conclusión con recomendaciones
            """,
            input_variables=["input"]
        )

        self.agent = initialize_agent(
            tools=tools,
            llm=self.llm,
            agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
            prompt=prompt,
            verbose=True,
            handle_parsing_errors=True
        )

    def analyze_network_data(self, json_path: str) -> str:
        """Analiza datos de red desde un archivo JSON"""
        try:
            with open(json_path, 'r') as f:
                data = json.load(f)
            
            analysis_prompt = """Analiza estos datos técnicos de red:
            {data}

            Genera un reporte markdown con:
            1. Dispositivos no autorizados (IP, MAC)
            2. Eventos de seguridad (tipo, frecuencia)
            3. Uso de recursos (pico, promedio)
            """
            
            chain = LLMChain(
                llm=self.llm,
                prompt=PromptTemplate.from_template(analysis_prompt))
            
            return chain.run(data=str(data))
            
        except Exception as e:
            return f"🔴 Error analizando datos: {str(e)}"

    def create_pdf_report(self, report_text: str) -> str:
        """Genera PDF compatible con todos los lectores"""
        try:
            filename = f"Reporte_Seguridad_{datetime.now().strftime('%Y%m%d_%H%M')}.pdf"
            doc = SimpleDocTemplate(
                filename,
                pagesize=letter,
                leftMargin=40,
                rightMargin=40
            )
            
            elements = []
            
            # Cabecera
            elements.append(Paragraph("INFORME DE SEGURIDAD DE RED", styles["Titulo"]))
            elements.append(Paragraph(f"Generado: {datetime.now().strftime('%d/%m/%Y %H:%M')}", styles["Cuerpo"]))
            elements.append(Spacer(1, 30))
            
            # Contenido
            sections = report_text.split("## ")[1:]
            for section in sections:
                if section.strip():
                    title, *content = section.split("\n", 1)
                    elements.append(Paragraph(title.strip(), styles["Heading2"]))
                    elements.append(Spacer(1, 15))
                    
                    if content:
                        text = content[0].replace("- ", "• ").replace("* ", "• ")
                        paragraphs = text.split("\n\n")
                        for para in paragraphs:
                            if para.strip():
                                elements.append(Paragraph(para.strip(), styles["Cuerpo"]))
                                elements.append(Spacer(1, 8))
                    
                    elements.append(PageBreak())
            
            doc.build(elements)
            return os.path.abspath(filename)
        except Exception as e:
            raise RuntimeError(f"🔴 Error generando PDF: {str(e)}")

    def generar_reporte_completo(self, json_path: str):
        """Flujo completo de generación de reportes"""
        try:
            print("🔍 Analizando datos de red...")
            analisis = self.analyze_network_data(json_path)
            
            if "Error" in analisis:
                return analisis
            
            print("📄 Generando reporte PDF...")
            return self.create_pdf_report(analisis)
        except Exception as e:
            return f"❌ Error crítico: {str(e)}"

# Ejemplo de uso
if __name__ == "__main__":
    try:
        # Configurar datos de ejemplo
        datos_muestra = {
            "red": {
                "dispositivos": [
                    {"ip": "192.168.1.100", "mac": "00:1A:79:45:CC:FF", "autorizado": True},
                    {"ip": "192.168.1.105", "mac": "00:1B:44:11:3A:EE", "autorizado": False}
                ],
                "eventos": [
                    {"tipo": "Escaneo de puertos", "origen": "210.34.58.91", "frecuencia": 15},
                    {"tipo": "Intento DDoS", "origen": "58.229.5.130", "frecuencia": 3}
                ],
                "rendimiento": {
                    "ancho_banda_pico": "85%",
                    "latencia_promedio": "45ms"
                }
            }
        }
        
        with open("datos_red.json", "w") as f:
            json.dump(datos_muestra, f, indent=2)
        
        # Generar reporte
        generador = NetworkReportGenerator()
        resultado = generador.generar_reporte_completo("datos_red.json")
        
        print(f"\n✅ {resultado}\n")
        print("⚠️ Nota: Si el PDF no se abre automáticamente, búscalo en:")
        print(os.path.abspath(resultado))
        
    except Exception as e:
        print(f"❌ Error en la ejecución: {str(e)}")

Contenido generado:
 🔴 Error: InferenceClient.text_generation() got an unexpected keyword argument 'timeout'
❌ Error: El modelo no generó formato Markdown válido


In [12]:
from huggingface_hub import model_info
print(model_info("Qwen/Qwen2.5-7B-Instruct"))

  from .autonotebook import tqdm as notebook_tqdm


ModelInfo(id='Qwen/Qwen2.5-7B-Instruct', author='Qwen', sha='a09a35458c702b33eeacc393d103063234e8bc28', created_at=datetime.datetime(2024, 9, 16, 11, 55, 40, tzinfo=datetime.timezone.utc), last_modified=datetime.datetime(2025, 1, 12, 2, 10, 10, tzinfo=datetime.timezone.utc), private=False, gated=False, disabled=False, downloads=1337458, likes=470, library_name='transformers', tags=['transformers', 'safetensors', 'qwen2', 'text-generation', 'chat', 'conversational', 'en', 'arxiv:2309.00071', 'arxiv:2407.10671', 'base_model:Qwen/Qwen2.5-7B', 'base_model:finetune:Qwen/Qwen2.5-7B', 'license:apache-2.0', 'autotrain_compatible', 'text-generation-inference', 'endpoints_compatible', 'region:us'], pipeline_tag='text-generation', mask_token=None, card_data={'language': ['en'], 'license': 'apache-2.0', 'library_name': 'transformers', 'tags': ['chat'], 'base_model': 'Qwen/Qwen2.5-7B', 'datasets': None, 'metrics': None, 'eval_results': None, 'model_name': None, 'license_link': 'https://huggingface.

In [None]:
# report_generator_agent.py (VERSIÓN FINAL FUNCIONAL)
import os
import json
from datetime import datetime
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.enums import TA_JUSTIFY, TA_CENTER
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from langchain.agents import initialize_agent, Tool, AgentType
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain_community.llms import HuggingFaceHub
from dotenv import load_dotenv
load_dotenv() 

os.environ["HUGGINGFACEHUB_API_TOKEN"] = "hf_tu_api_key_aqui"

# Configuración de fuentes (asegúrate de tener Arial.ttf en el mismo directorio)
try:
    pdfmetrics.registerFont(TTFont('Arial', 'Arial.ttf'))
    pdfmetrics.registerFont(TTFont('Arial-Bold', 'Arial-Bold.ttf'))
except:
    print("⚠️ Usando fuentes por defecto. Para mejor formato, instale las fuentes Arial")

# Configuración de estilos PDF
styles = getSampleStyleSheet()
styles.add(ParagraphStyle(
    'Titulo',
    fontName='Arial-Bold',
    fontSize=16,
    alignment=TA_CENTER,
    spaceAfter=20
))
styles.add(ParagraphStyle(
    'Cuerpo',
    fontName='Arial',
    fontSize=10,
    leading=12,
    alignment=TA_JUSTIFY
))

class NetworkReportGenerator:
    def __init__(self):
        try:
            self.llm = HuggingFaceHub(
                repo_id="Qwen/Qwen2.5-7B-Instruct",
                model_kwargs={
                    "temperature": 0.1,
                    "max_new_tokens": 2000,  
                    "top_p": 0.95
                }
            )
            self.setup_tools()
        except Exception as e:
            raise RuntimeError(f"Error inicializando el modelo: {str(e)}")

    def setup_tools(self):
        tools = [
            Tool(
                name="AnalizadorRed",
                func=self.analyze_network_data,
                description="Analiza datos JSON de red y detecta anomalías"
            ),
            Tool(
                name="GeneradorPDF",
                func=self.create_pdf_report,
                description="Crea reportes PDF profesionales"
            )
        ]

        prompt = PromptTemplate(
            template="""Eres un analista senior de seguridad. Genera un reporte detallado en markdown usando:

            {input}

            Requisitos:
            - Encabezado con título y fecha
            - 3 secciones principales
            - Tablas para datos numéricos
            - Conclusión con recomendaciones
            """,
            input_variables=["input"]
        )

        self.agent = initialize_agent(
            tools=tools,
            llm=self.llm,
            agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
            prompt=prompt,
            verbose=True,
            handle_parsing_errors=True
        )

    def analyze_network_data(self, json_path: str) -> str:
        """Analiza datos de red desde un archivo JSON"""
        try:
            with open(json_path, 'r') as f:
                data = json.load(f)
            
            analysis_prompt = """Analiza estos datos técnicos de red:
            {data}

            Genera un reporte markdown con:
            1. Dispositivos no autorizados (IP, MAC)
            2. Eventos de seguridad (tipo, frecuencia)
            3. Uso de recursos (pico, promedio)
            """
            
            chain = LLMChain(
                llm=self.llm,
                prompt=PromptTemplate.from_template(analysis_prompt))
            
            return chain.run(data=str(data))
            
        except Exception as e:
            return f"🔴 Error analizando datos: {str(e)}"

    def create_pdf_report(self, report_text: str) -> str:
        """Genera PDF compatible con todos los lectores"""
        try:
            filename = f"Reporte_Seguridad_{datetime.now().strftime('%Y%m%d_%H%M')}.pdf"
            doc = SimpleDocTemplate(
                filename,
                pagesize=letter,
                leftMargin=40,
                rightMargin=40
            )
            
            elements = []
            
            # Cabecera
            elements.append(Paragraph("INFORME DE SEGURIDAD DE RED", styles["Titulo"]))
            elements.append(Paragraph(f"Generado: {datetime.now().strftime('%d/%m/%Y %H:%M')}", styles["Cuerpo"]))
            elements.append(Spacer(1, 30))
            
            # Contenido
            sections = report_text.split("## ")[1:]
            for section in sections:
                if section.strip():
                    title, *content = section.split("\n", 1)
                    elements.append(Paragraph(title.strip(), styles["Heading2"]))
                    elements.append(Spacer(1, 15))
                    
                    if content:
                        text = content[0].replace("- ", "• ").replace("* ", "• ")
                        paragraphs = text.split("\n\n")
                        for para in paragraphs:
                            if para.strip():
                                elements.append(Paragraph(para.strip(), styles["Cuerpo"]))
                                elements.append(Spacer(1, 8))
                    
                    elements.append(PageBreak())
            
            doc.build(elements)
            return os.path.abspath(filename)
        except Exception as e:
            raise RuntimeError(f"🔴 Error generando PDF: {str(e)}")

    def generar_reporte_completo(self, json_path: str):
        """Flujo completo de generación de reportes"""
        try:
            print("🔍 Analizando datos de red...")
            analisis = self.analyze_network_data(json_path)
            
            if "Error" in analisis:
                return analisis
            
            print("📄 Generando reporte PDF...")
            return self.create_pdf_report(analisis)
        except Exception as e:
            return f"❌ Error crítico: {str(e)}"

# En el main:
if __name__ == "__main__":
    try:
        # Verifica que la API key esté configurada
        if "HUGGINGFACEHUB_API_TOKEN" not in os.environ:
            raise ValueError("🔴 Falta la API Key de Hugging Face en las variables de entorno")
        # Configurar datos de ejemplo
        datos_muestra = {
            "red": {
                "dispositivos": [
                    {"ip": "192.168.1.100", "mac": "00:1A:79:45:CC:FF", "autorizado": True},
                    {"ip": "192.168.1.105", "mac": "00:1B:44:11:3A:EE", "autorizado": False}
                ],
                "eventos": [
                    {"tipo": "Escaneo de puertos", "origen": "210.34.58.91", "frecuencia": 15},
                    {"tipo": "Intento DDoS", "origen": "58.229.5.130", "frecuencia": 3}
                ],
                "rendimiento": {
                    "ancho_banda_pico": "85%",
                    "latencia_promedio": "45ms"
                }
            }
        }
        
        with open("datos_red.json", "w") as f:
            json.dump(datos_muestra, f, indent=2)
        
        # Generar reporte
        generador = NetworkReportGenerator()
        resultado = generador.generar_reporte_completo("datos_red.json")
        
        print(f"\n✅ {resultado}\n")
        print("⚠️ Nota: Si el PDF no se abre automáticamente, búscalo en:")
        print(os.path.abspath(resultado))
        
    except Exception as e:
        print(f"❌ Error en la ejecución: {str(e)}")

Contenido generado:
 🔴 Error: InferenceClient.text_generation() got an unexpected keyword argument 'timeout'
❌ Error: El modelo no generó formato Markdown válido
