<a href="https://colab.research.google.com/github/horelvis/proteus_life_simulation/blob/main/deeplearning-physics-word.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ============================================================================
# CLASES FALTANTES PARA EL SISTEMA H√çBRIDO DE F√çSICA
# ============================================================================

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np
import time
import os
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass
from tqdm import tqdm
import matplotlib.pyplot as plt

# ============================================================================
# 1. MODELO DE RED NEURONAL H√çBRIDA
# ============================================================================

class HybridPhysicsModel(nn.Module):
    """
    Red neuronal para aprender residuales de f√≠sica
    Arquitectura optimizada para correcciones f√≠sicas
    """

    def __init__(self, input_dim: int, hidden_dims: List[int], output_dim: int,
                 dropout_rate: float = 0.1, activation: str = 'relu'):
        super(HybridPhysicsModel, self).__init__()

        self.input_dim = input_dim
        self.output_dim = output_dim
        self.dropout_rate = dropout_rate

        # Seleccionar funci√≥n de activaci√≥n
        if activation.lower() == 'relu':
            self.activation = nn.ReLU()
        elif activation.lower() == 'leakyrelu':
            self.activation = nn.LeakyReLU(0.1)
        elif activation.lower() == 'gelu':
            self.activation = nn.GELU()
        elif activation.lower() == 'swish':
            self.activation = nn.SiLU()  # SiLU es equivalente a Swish
        else:
            self.activation = nn.ReLU()

        # Construir capas
        layers = []
        prev_dim = input_dim

        for hidden_dim in hidden_dims:
            layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.BatchNorm1d(hidden_dim),
                self.activation,
                nn.Dropout(dropout_rate)
            ])
            prev_dim = hidden_dim

        # Capa de salida sin activaci√≥n (regresi√≥n)
        layers.append(nn.Linear(prev_dim, output_dim))

        self.network = nn.Sequential(*layers)

        # Inicializaci√≥n de pesos
        self._initialize_weights()

    def _initialize_weights(self):
        """Inicializaci√≥n Xavier/Glorot para mejor convergencia"""
        for module in self.modules():
            if isinstance(module, nn.Linear):
                nn.init.xavier_uniform_(module.weight)
                if module.bias is not None:
                    nn.init.constant_(module.bias, 0)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass de la red

        Args:
            x: Tensor de entrada [batch_size, input_dim]

        Returns:
            Tensor de salida [batch_size, output_dim]
        """
        return self.network(x)

    def get_model_info(self) -> Dict[str, Any]:
        """Informaci√≥n del modelo"""
        total_params = sum(p.numel() for p in self.parameters())
        trainable_params = sum(p.numel() for p in self.parameters() if p.requires_grad)

        return {
            'total_parameters': total_params,
            'trainable_parameters': trainable_params,
            'input_dim': self.input_dim,
            'output_dim': self.output_dim,
            'dropout_rate': self.dropout_rate,
            'architecture': [layer for layer in self.network if isinstance(layer, nn.Linear)]
        }

# ============================================================================
# 2. ENTRENADOR OPTIMIZADO
# ============================================================================

class PhysicsTrainer:
    """
    Entrenador especializado para modelos de f√≠sica h√≠brida
    Incluye optimizaciones para GPU, mixed precision y logging
    """

    def __init__(self, model: nn.Module, train_loader: DataLoader, val_loader: DataLoader,
                 criterion: nn.Module, optimizer: optim.Optimizer,
                 scheduler: Optional[optim.lr_scheduler._LRScheduler] = None,
                 device: torch.device = torch.device('cpu'),
                 repo_id: str = "physics-model",
                 use_mixed_precision: bool = True,
                 use_cyclic_lr: bool = False,
                 use_warmup: bool = True,
                 warmup_epochs: int = 5):

        self.model = model
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.criterion = criterion
        self.optimizer = optimizer
        self.scheduler = scheduler
        self.device = device
        self.repo_id = repo_id
        self.use_mixed_precision = use_mixed_precision
        self.use_cyclic_lr = use_cyclic_lr
        self.use_warmup = use_warmup
        self.warmup_epochs = warmup_epochs

        # Configurar mixed precision
        self.scaler = None
        if use_mixed_precision and torch.cuda.is_available():
            try:
                # Intentar usar la nueva API de torch.amp
                from torch.amp import GradScaler, autocast
                self.scaler = GradScaler('cuda')
                self.autocast = autocast('cuda')
                print("‚úÖ Using new API of mixed precision (torch.amp)")
            except ImportError:
                try:
                    # Fallback a la API antigua
                    from torch.cuda.amp import GradScaler, autocast
                    self.scaler = GradScaler()
                    self.autocast = autocast()
                    print("‚úÖ Using legacy API of mixed precision (torch.cuda.amp)")
                except ImportError:
                    print("‚ö†Ô∏è Mixed precision not available, using standard precision")
                    self.use_mixed_precision = False
        else:
            self.use_mixed_precision = False

        # Configurar learning rate scheduler c√≠clico
        if use_cyclic_lr:
            self.cyclic_scheduler = optim.lr_scheduler.CyclicLR(
                optimizer, base_lr=1e-5, max_lr=1e-2,
                step_size_up=len(train_loader) * 2,
                mode='triangular2'
            )

        # Variables de tracking
        self.train_losses = []
        self.val_losses = []
        self.learning_rates = []
        self.best_val_loss = float('inf')
        self.epochs_without_improvement = 0

        # Crear repositorio si es posible
        self._setup_huggingface_repo()

    def _setup_huggingface_repo(self):
        """Configurar repositorio de Hugging Face"""
        try:
            from huggingface_hub import HfApi, create_repo
            self.hf_api = HfApi()

            # Intentar crear el repositorio
            try:
                create_repo(self.repo_id, exist_ok=True, private=True)
                print(f"‚úÖ Hugging Face repository ready: {self.repo_id}")
                self.use_hf_upload = True
            except Exception as e:
                print(f"‚ö†Ô∏è Failed to create or check Hugging Face Hub repository: {e}")
                self.use_hf_upload = False

        except ImportError:
            print("‚ö†Ô∏è Hugging Face Hub not available")
            self.use_hf_upload = False

    def _warmup_lr(self, epoch: int, initial_lr: float) -> float:
        """Calcula learning rate con warmup"""
        if epoch < self.warmup_epochs:
            return initial_lr * (epoch + 1) / self.warmup_epochs
        return initial_lr

    def train_epoch(self) -> float:
        """Entrena una √©poca"""
        self.model.train()
        total_loss = 0.0
        num_batches = 0

        for batch_idx, (inputs, targets) in enumerate(self.train_loader):
            inputs, targets = inputs.to(self.device), targets.to(self.device)

            self.optimizer.zero_grad()

            if self.use_mixed_precision and self.scaler is not None:
                with self.autocast:
                    outputs = self.model(inputs)
                    loss = self.criterion(outputs, targets)

                self.scaler.scale(loss).backward()
                self.scaler.step(self.optimizer)
                self.scaler.update()
            else:
                outputs = self.model(inputs)
                loss = self.criterion(outputs, targets)
                loss.backward()
                self.optimizer.step()

            # Actualizar scheduler c√≠clico si est√° activado
            if self.use_cyclic_lr:
                self.cyclic_scheduler.step()

            total_loss += loss.item()
            num_batches += 1

        return total_loss / num_batches

    def validate(self) -> float:
        """Valida el modelo"""
        self.model.eval()
        total_loss = 0.0
        num_batches = 0

        with torch.no_grad():
            for inputs, targets in self.val_loader:
                inputs, targets = inputs.to(self.device), targets.to(self.device)

                if self.use_mixed_precision and self.scaler is not None:
                    with self.autocast:
                        outputs = self.model(inputs)
                        loss = self.criterion(outputs, targets)
                else:
                    outputs = self.model(inputs)
                    loss = self.criterion(outputs, targets)

                total_loss += loss.item()
                num_batches += 1

        return total_loss / num_batches

    def save_model(self, filename: str = "best_physics_model.pth"):
        """Guarda el modelo localmente"""
        torch.save({
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'val_loss': self.best_val_loss,
            'train_losses': self.train_losses,
            'val_losses': self.val_losses,
            'learning_rates': self.learning_rates
        }, filename)
        print(f"  ‚úÖ Model saved locally ({filename}) (val_loss: {self.best_val_loss:.6f})")

    def upload_to_hub(self, filename: str = "pytorch_model.bin"):
        """Sube el modelo a Hugging Face Hub"""
        if not self.use_hf_upload:
            return

        try:
            from huggingface_hub import upload_file

            # Guardar estado del modelo
            temp_file = "temp_model.bin"
            torch.save(self.model.state_dict(), temp_file)

            # Subir a Hub
            upload_file(
                path_or_fileobj=temp_file,
                path_in_repo=filename,
                repo_id=self.repo_id,
                repo_type="model"
            )

            # Limpiar archivo temporal
            if os.path.exists(temp_file):
                os.remove(temp_file)

            print(f"  üöÄ Model uploaded to Hub: {self.repo_id}")

        except Exception as e:
            print(f"  ‚ö†Ô∏è Failed to upload to Hub: {e}")

    def train(self, num_epochs: int, early_stopping_patience: int = 10):
        """
        Entrenamiento principal con todas las optimizaciones
        """
        print(f"üî• Starting training for {num_epochs} epochs...")
        start_time = time.time()

        # Learning rate inicial para warmup
        if self.use_warmup:
            initial_lr = self.optimizer.param_groups[0]['lr']

        for epoch in range(num_epochs):
            epoch_start = time.time()

            # Aplicar warmup si est√° habilitado
            if self.use_warmup and epoch < self.warmup_epochs:
                warmup_lr = self._warmup_lr(epoch, initial_lr)
                for param_group in self.optimizer.param_groups:
                    param_group['lr'] = warmup_lr

            # Entrenar √©poca
            train_loss = self.train_epoch()
            val_loss = self.validate()

            # Actualizar scheduler (no c√≠clico)
            if self.scheduler and not self.use_cyclic_lr:
                if isinstance(self.scheduler, optim.lr_scheduler.ReduceLROnPlateau):
                    self.scheduler.step(val_loss)
                else:
                    self.scheduler.step()

            # Guardar m√©tricas
            self.train_losses.append(train_loss)
            self.val_losses.append(val_loss)
            current_lr = self.optimizer.param_groups[0]['lr']
            self.learning_rates.append(current_lr)

            # Early stopping y guardado del mejor modelo
            if val_loss < self.best_val_loss:
                self.best_val_loss = val_loss
                self.epochs_without_improvement = 0
                self.save_model("best_physics_model_default.pth")
                # self.upload_to_hub()  # Comentado para evitar errores de autenticaci√≥n
            else:
                self.epochs_without_improvement += 1

            # Informaci√≥n de √©poca
            epoch_time = time.time() - epoch_start
            gpu_memory = torch.cuda.max_memory_allocated() / 1e9 if torch.cuda.is_available() else 0

            print(f"\nüìà Epoch {epoch+1}/{num_epochs}")
            if self.use_warmup and epoch < self.warmup_epochs:
                print(f"  Warmup LR: {current_lr:.6f}")
            print(f"  Training Loss: {train_loss:.6f}")
            print(f"  Validation Loss: {val_loss:.6f}")
            print(f"  Time: {epoch_time:.2f}s")
            print(f"  LR: {current_lr:.6f}")
            print(f"  GPU Memory: {gpu_memory:.2f} GB")

            # Early stopping
            if self.epochs_without_improvement >= early_stopping_patience:
                print(f"\nüõë Early stopping triggered after {early_stopping_patience} epochs without improvement")
                break

        total_time = time.time() - start_time
        print(f"\nüéâ Training completed in {total_time:.2f}s")
        print(f"üí´ Best validation loss: {self.best_val_loss:.6f}")

    def plot_training_history(self):
        """Visualiza el historial de entrenamiento"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))

        # P√©rdidas
        axes[0, 0].plot(self.train_losses, label='Training Loss', color='blue')
        axes[0, 0].plot(self.val_losses, label='Validation Loss', color='red')
        axes[0, 0].set_xlabel('Epoch')
        axes[0, 0].set_ylabel('Loss')
        axes[0, 0].set_title('Training and Validation Loss')
        axes[0, 0].legend()
        axes[0, 0].grid(True)

        # Learning Rate
        axes[0, 1].plot(self.learning_rates, color='green')
        axes[0, 1].set_xlabel('Epoch')
        axes[0, 1].set_ylabel('Learning Rate')
        axes[0, 1].set_title('Learning Rate Schedule')
        axes[0, 1].grid(True)
        axes[0, 1].set_yscale('log')

        # P√©rdida de validaci√≥n (zoom)
        axes[1, 0].plot(self.val_losses, color='red')
        axes[1, 0].set_xlabel('Epoch')
        axes[1, 0].set_ylabel('Validation Loss')
        axes[1, 0].set_title('Validation Loss (Detailed)')
        axes[1, 0].grid(True)

        # Diferencia entre train y val loss
        if len(self.train_losses) == len(self.val_losses):
            diff = [val - train for train, val in zip(self.train_losses, self.val_losses)]
            axes[1, 1].plot(diff, color='purple')
            axes[1, 1].set_xlabel('Epoch')
            axes[1, 1].set_ylabel('Val Loss - Train Loss')
            axes[1, 1].set_title('Overfitting Monitor')
            axes[1, 1].grid(True)
            axes[1, 1].axhline(y=0, color='black', linestyle='--', alpha=0.5)

        plt.tight_layout()
        plt.show()

# ============================================================================
# 3. FUNCI√ìN DE P√âRDIDA ESPECIALIZADA (ya estaba definida pero la incluyo)
# ============================================================================

class PhysicsLoss(nn.Module):
    """
    Funci√≥n de p√©rdida especializada para f√≠sica
    Pondera diferentes componentes seg√∫n importancia f√≠sica
    """

    def __init__(self, position_weight=1.0, velocity_weight=0.5):
        super().__init__()
        self.position_weight = position_weight
        self.velocity_weight = velocity_weight
        self.mse = nn.MSELoss()

    def forward(self, predictions, targets):
        # Separar posici√≥n y velocidad
        pred_pos = predictions[:, :3]
        pred_vel = predictions[:, 3:]
        target_pos = targets[:, :3]
        target_vel = targets[:, 3:]

        # P√©rdidas ponderadas
        pos_loss = self.mse(pred_pos, target_pos)
        vel_loss = self.mse(pred_vel, target_vel)

        return self.position_weight * pos_loss + self.velocity_weight * vel_loss

# ============================================================================
# 4. UTILIDADES ADICIONALES
# ============================================================================

def create_model_from_config(config: Dict) -> HybridPhysicsModel:
    """
    Crea un modelo a partir de una configuraci√≥n
    """
    return HybridPhysicsModel(
        input_dim=config['input_dim'],
        hidden_dims=config['hidden_dims'],
        output_dim=config['output_dim'],
        dropout_rate=config.get('dropout_rate', 0.1),
        activation=config.get('activation', 'relu')
    )

def load_trained_model(checkpoint_path: str, config: Dict, device: torch.device) -> HybridPhysicsModel:
    """
    Carga un modelo entrenado desde un checkpoint
    """
    model = create_model_from_config(config)
    checkpoint = torch.load(checkpoint_path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.to(device)
    model.eval()
    return model

class ModelEvaluator:
    """
    Evaluador para m√©tricas adicionales del modelo
    """

    def __init__(self, model: nn.Module, device: torch.device):
        self.model = model
        self.device = device

    def compute_metrics(self, data_loader: DataLoader) -> Dict[str, float]:
        """
        Computa m√©tricas detalladas en un dataset
        """
        self.model.eval()

        total_mse = 0.0
        total_mae = 0.0
        total_position_error = 0.0
        total_velocity_error = 0.0
        num_samples = 0

        with torch.no_grad():
            for inputs, targets in data_loader:
                inputs, targets = inputs.to(self.device), targets.to(self.device)
                outputs = self.model(inputs)

                # MSE y MAE generales
                mse = F.mse_loss(outputs, targets, reduction='sum')
                mae = F.l1_loss(outputs, targets, reduction='sum')

                # Errores espec√≠ficos de posici√≥n y velocidad
                pos_error = F.mse_loss(outputs[:, :3], targets[:, :3], reduction='sum')
                vel_error = F.mse_loss(outputs[:, 3:], targets[:, 3:], reduction='sum')

                total_mse += mse.item()
                total_mae += mae.item()
                total_position_error += pos_error.item()
                total_velocity_error += vel_error.item()
                num_samples += targets.size(0)

        return {
            'mse': total_mse / (num_samples * targets.size(1)),
            'mae': total_mae / (num_samples * targets.size(1)),
            'rmse': np.sqrt(total_mse / (num_samples * targets.size(1))),
            'position_mse': total_position_error / (num_samples * 3),
            'velocity_mse': total_velocity_error / (num_samples * 3),
            'position_rmse': np.sqrt(total_position_error / (num_samples * 3)),
            'velocity_rmse': np.sqrt(total_velocity_error / (num_samples * 3))
        }

# ============================================================================
# 5. CONFIGURACIONES DE EXPERIMENTOS
# ============================================================================

class ExperimentConfig:
    """
    Configuraciones para diferentes experimentos
    """

    @staticmethod
    def get_baseline_config() -> Dict:
        """Configuraci√≥n baseline"""
        return {
            'model': {
                'hidden_dims': [512, 256, 128],
                'dropout_rate': 0.1,
                'activation': 'relu'
            },
            'training': {
                'lr': 1e-3,
                'weight_decay': 1e-4,
                'batch_size': 256,
                'epochs': 50,
                'early_stopping_patience': 10
            },
            'loss': {
                'position_weight': 1.0,
                'velocity_weight': 0.5
            }
        }

    @staticmethod
    def get_advanced_config() -> Dict:
        """Configuraci√≥n avanzada"""
        return {
            'model': {
                'hidden_dims': [1024, 512, 256, 128],
                'dropout_rate': 0.2,
                'activation': 'leakyrelu'
            },
            'training': {
                'lr': 1e-3,
                'weight_decay': 1e-4,
                'batch_size': 512,
                'epochs': 100,
                'early_stopping_patience': 15,
                'use_warmup': True,
                'warmup_epochs': 5
            },
            'loss': {
                'position_weight': 2.0,
                'velocity_weight': 1.0
            }
        }

print("‚úÖ Todas las clases faltantes han sido definidas correctamente")
print("üìã Clases disponibles:")
print("  ‚Ä¢ HybridPhysicsModel - Red neuronal h√≠brida")
print("  ‚Ä¢ PhysicsTrainer - Entrenador optimizado")
print("  ‚Ä¢ PhysicsLoss - Funci√≥n de p√©rdida especializada")
print("  ‚Ä¢ ModelEvaluator - Evaluador de m√©tricas")
print("  ‚Ä¢ ExperimentConfig - Configuraciones de experimentos")
print("  ‚Ä¢ Funciones auxiliares para carga y configuraci√≥n de modelos")

In [None]:
# ============================================================================
# CLASES ADICIONALES PARA SIMULACI√ìN Y MANEJO DE DATOS
# ============================================================================

import numpy as np
import torch
from torch.utils.data import Dataset
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass
import json
import pickle
import random
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

# ============================================================================
# 1. DATACLASS PARA PUNTOS DE DATOS (Refinada)
# ============================================================================

@dataclass
class PhysicsDataPoint:
    """Data point extracted from real simulation - Version completa"""
    basic_state: np.ndarray  # [pos_x, pos_y, pos_z, vel_x, vel_y, vel_z]
    ground_truth_state: np.ndarray  # Real state from simulator
    residual: np.ndarray  # Difference for IA to learn
    material_properties: np.ndarray  # [obj_friction, obj_restitution, obj_damping, obj_density, floor_friction, floor_restitution, floor_damping, floor_density]
    context: Dict  # Additional info (mass, shape, etc.)

    def to_dict(self) -> Dict:
        """Convierte a diccionario para serializaci√≥n"""
        return {
            'basic_state': self.basic_state.tolist(),
            'ground_truth_state': self.ground_truth_state.tolist(),
            'residual': self.residual.tolist(),
            'material_properties': self.material_properties.tolist(),
            'context': self.context
        }

    @classmethod
    def from_dict(cls, data: Dict) -> 'PhysicsDataPoint':
        """Crea desde diccionario"""
        return cls(
            basic_state=np.array(data['basic_state']),
            ground_truth_state=np.array(data['ground_truth_state']),
            residual=np.array(data['residual']),
            material_properties=np.array(data['material_properties']),
            context=data['context']
        )

# ============================================================================
# 2. DATASET PYTORCH OPTIMIZADO (Versi√≥n Completa)
# ============================================================================

class PhysicsDataset(Dataset):
    """PyTorch Dataset optimizado para entrenamiento en GPU - Versi√≥n completa"""

    def __init__(self, data_points: List[PhysicsDataPoint],
                 device: torch.device = None, normalize: bool = True,
                 cache_in_memory: bool = True):
        self.data_points = data_points
        self.device = device or torch.device('cpu')
        self.normalize = normalize
        self.cache_in_memory = cache_in_memory

        # Precomputar tensores en GPU para m√°ximo rendimiento
        if cache_in_memory:
            self._precompute_tensors()

        if self.normalize:
            self._compute_normalization_stats()

    def _precompute_tensors(self):
        """Precompute all tensors and load them onto GPU"""
        print("üî• Precomputing tensors on GPU...")

        # Extraer datos
        basic_states = []
        residuals = []
        material_props = []

        for dp in self.data_points:
            basic_states.append(dp.basic_state)
            residuals.append(dp.residual)
            material_props.append(dp.material_properties)

        # Convertir a tensores y mover a GPU
        self.basic_states = torch.tensor(np.array(basic_states),
                                       dtype=torch.float32, device=self.device)
        self.residuals = torch.tensor(np.array(residuals),
                                    dtype=torch.float32, device=self.device)
        self.material_props = torch.tensor(np.array(material_props),
                                         dtype=torch.float32, device=self.device)

        print(f"‚úÖ Tensors loaded on {self.device}")
        print(f"  Basic states: {self.basic_states.shape}")
        print(f"  Residuals: {self.residuals.shape}")
        print(f"  Material properties: {self.material_props.shape}")

    def _compute_normalization_stats(self):
        """Compute statistics for normalization"""
        if self.cache_in_memory:
            # Normalizar basic states
            self.state_mean = self.basic_states.mean(dim=0)
            self.state_std = self.basic_states.std(dim=0) + 1e-8

            # Normalizar material properties
            self.mat_mean = self.material_props.mean(dim=0)
            self.mat_std = self.material_props.std(dim=0) + 1e-8
        else:
            # Calcular estad√≠sticas sin cargar todo en memoria
            basic_states = np.array([dp.basic_state for dp in self.data_points])
            material_props = np.array([dp.material_properties for dp in self.data_points])

            self.state_mean = torch.tensor(basic_states.mean(axis=0), dtype=torch.float32, device=self.device)
            self.state_std = torch.tensor(basic_states.std(axis=0) + 1e-8, dtype=torch.float32, device=self.device)
            self.mat_mean = torch.tensor(material_props.mean(axis=0), dtype=torch.float32, device=self.device)
            self.mat_std = torch.tensor(material_props.std(axis=0) + 1e-8, dtype=torch.float32, device=self.device)

        print("üìä Normalization statistics computed")

    def __len__(self):
        return len(self.data_points)

    def __getitem__(self, idx):
        """Get normalized sample"""
        if self.cache_in_memory:
            basic_state = self.basic_states[idx]
            residual = self.residuals[idx]
            material_prop = self.material_props[idx]
        else:
            # Cargar desde datos originales
            dp = self.data_points[idx]
            basic_state = torch.tensor(dp.basic_state, dtype=torch.float32, device=self.device)
            residual = torch.tensor(dp.residual, dtype=torch.float32, device=self.device)
            material_prop = torch.tensor(dp.material_properties, dtype=torch.float32, device=self.device)

        if self.normalize:
            basic_state = (basic_state - self.state_mean) / self.state_std
            material_prop = (material_prop - self.mat_mean) / self.mat_std

        # Concatenate state + properties as input features
        input_features = torch.cat([basic_state, material_prop])

        return input_features, residual

    def save_to_disk(self, filepath: str):
        """Guarda el dataset en disco"""
        data_to_save = {
            'data_points': [dp.to_dict() for dp in self.data_points],
            'normalization_stats': {
                'state_mean': self.state_mean.cpu().numpy() if hasattr(self, 'state_mean') else None,
                'state_std': self.state_std.cpu().numpy() if hasattr(self, 'state_std') else None,
                'mat_mean': self.mat_mean.cpu().numpy() if hasattr(self, 'mat_mean') else None,
                'mat_std': self.mat_std.cpu().numpy() if hasattr(self, 'mat_std') else None,
            },
            'normalize': self.normalize,
            'cache_in_memory': self.cache_in_memory
        }

        with open(filepath, 'wb') as f:
            pickle.dump(data_to_save, f)
        print(f"üíæ Dataset saved to {filepath}")

    @classmethod
    def load_from_disk(cls, filepath: str, device: torch.device = None) -> 'PhysicsDataset':
        """Carga el dataset desde disco"""
        with open(filepath, 'rb') as f:
            data = pickle.load(f)

        data_points = [PhysicsDataPoint.from_dict(dp_dict) for dp_dict in data['data_points']]

        dataset = cls(
            data_points=data_points,
            device=device or torch.device('cpu'),
            normalize=data['normalize'],
            cache_in_memory=data['cache_in_memory']
        )

        # Restaurar estad√≠sticas de normalizaci√≥n si existen
        if data['normalization_stats']['state_mean'] is not None:
            dataset.state_mean = torch.tensor(data['normalization_stats']['state_mean'],
                                            dtype=torch.float32, device=dataset.device)
            dataset.state_std = torch.tensor(data['normalization_stats']['state_std'],
                                           dtype=torch.float32, device=dataset.device)
            dataset.mat_mean = torch.tensor(data['normalization_stats']['mat_mean'],
                                          dtype=torch.float32, device=dataset.device)
            dataset.mat_std = torch.tensor(data['normalization_stats']['mat_std'],
                                         dtype=torch.float32, device=dataset.device)

        print(f"üìÇ Dataset loaded from {filepath}")
        return dataset

# ============================================================================
# 3. ANALIZADOR DE DATOS
# ============================================================================

class DatasetAnalyzer:
    """
    Analizador para entender las caracter√≠sticas del dataset
    """

    def __init__(self, dataset: PhysicsDataset):
        self.dataset = dataset

    def analyze_distribution(self) -> Dict[str, Any]:
        """Analiza la distribuci√≥n de los datos"""

        # Extraer datos para an√°lisis
        basic_states = np.array([dp.basic_state for dp in self.dataset.data_points])
        residuals = np.array([dp.residual for dp in self.dataset.data_points])
        material_props = np.array([dp.material_properties for dp in self.dataset.data_points])

        # An√°lisis de estados b√°sicos
        pos_data = basic_states[:, :3]  # Posiciones
        vel_data = basic_states[:, 3:]  # Velocidades

        # An√°lisis de residuales
        pos_residuals = residuals[:, :3]
        vel_residuals = residuals[:, 3:]

        analysis = {
            'dataset_size': len(self.dataset.data_points),
            'position_stats': {
                'mean': np.mean(pos_data, axis=0),
                'std': np.std(pos_data, axis=0),
                'min': np.min(pos_data, axis=0),
                'max': np.max(pos_data, axis=0)
            },
            'velocity_stats': {
                'mean': np.mean(vel_data, axis=0),
                'std': np.std(vel_data, axis=0),
                'min': np.min(vel_data, axis=0),
                'max': np.max(vel_data, axis=0)
            },
            'position_residual_stats': {
                'mean': np.mean(pos_residuals, axis=0),
                'std': np.std(pos_residuals, axis=0),
                'min': np.min(pos_residuals, axis=0),
                'max': np.max(pos_residuals, axis=0)
            },
            'velocity_residual_stats': {
                'mean': np.mean(vel_residuals, axis=0),
                'std': np.std(vel_residuals, axis=0),
                'min': np.min(vel_residuals, axis=0),
                'max': np.max(vel_residuals, axis=0)
            },
            'material_distribution': self._analyze_materials()
        }

        return analysis

    def _analyze_materials(self) -> Dict[str, Any]:
        """Analiza la distribuci√≥n de materiales"""
        materials = {}
        floor_materials = {}
        shapes = {}

        for dp in self.dataset.data_points:
            # Material del objeto
            obj_material = dp.context.get('material', 'unknown')
            materials[obj_material] = materials.get(obj_material, 0) + 1

            # Material del suelo
            floor_material = dp.context.get('floor_material', 'unknown')
            floor_materials[floor_material] = floor_materials.get(floor_material, 0) + 1

            # Forma del objeto
            shape = dp.context.get('shape', 'unknown')
            shapes[shape] = shapes.get(shape, 0) + 1

        return {
            'object_materials': materials,
            'floor_materials': floor_materials,
            'shapes': shapes
        }

    def plot_analysis(self):
        """Visualiza el an√°lisis del dataset"""
        analysis = self.analyze_distribution()

        fig, axes = plt.subplots(2, 3, figsize=(18, 12))

        # Distribuci√≥n de posiciones
        pos_data = np.array([dp.basic_state[:3] for dp in self.dataset.data_points])
        for i, axis_name in enumerate(['X', 'Y', 'Z']):
            axes[0, i].hist(pos_data[:, i], bins=50, alpha=0.7, color=['red', 'green', 'blue'][i])
            axes[0, i].set_title(f'Position {axis_name} Distribution')
            axes[0, i].set_xlabel(f'Position {axis_name}')
            axes[0, i].set_ylabel('Frequency')
            axes[0, i].grid(True, alpha=0.3)

        # Distribuci√≥n de velocidades
        vel_data = np.array([dp.basic_state[3:] for dp in self.dataset.data_points])
        for i, axis_name in enumerate(['VX', 'VY', 'VZ']):
            axes[1, i].hist(vel_data[:, i], bins=50, alpha=0.7, color=['orange', 'purple', 'brown'][i])
            axes[1, i].set_title(f'Velocity {axis_name} Distribution')
            axes[1, i].set_xlabel(f'Velocity {axis_name}')
            axes[1, i].set_ylabel('Frequency')
            axes[1, i].grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

        # Gr√°fico de distribuci√≥n de materiales
        self._plot_material_distribution(analysis['material_distribution'])

    def _plot_material_distribution(self, material_dist: Dict):
        """Grafica la distribuci√≥n de materiales"""
        fig, axes = plt.subplots(1, 3, figsize=(15, 5))

        # Materiales de objetos
        if material_dist['object_materials']:
            materials = list(material_dist['object_materials'].keys())
            counts = list(material_dist['object_materials'].values())
            axes[0].pie(counts, labels=materials, autopct='%1.1f%%')
            axes[0].set_title('Object Materials Distribution')

        # Materiales de suelo
        if material_dist['floor_materials']:
            floor_materials = list(material_dist['floor_materials'].keys())
            floor_counts = list(material_dist['floor_materials'].values())
            axes[1].pie(floor_counts, labels=floor_materials, autopct='%1.1f%%')
            axes[1].set_title('Floor Materials Distribution')

        # Formas
        if material_dist['shapes']:
            shapes = list(material_dist['shapes'].keys())
            shape_counts = list(material_dist['shapes'].values())
            axes[2].pie(shape_counts, labels=shapes, autopct='%1.1f%%')
            axes[2].set_title('Shapes Distribution')

        plt.tight_layout()
        plt.show()

# ============================================================================
# 4. GENERADOR DE DATOS SINT√âTICOS
# ============================================================================

class SyntheticDataGenerator:
    """
    Generador de datos sint√©ticos para aumentar el dataset
    """

    def __init__(self, base_dataset: PhysicsDataset):
        self.base_dataset = base_dataset
        self.analyzer = DatasetAnalyzer(base_dataset)
        self.stats = self.analyzer.analyze_distribution()

    def generate_synthetic_data(self, num_samples: int,
                              noise_level: float = 0.1) -> List[PhysicsDataPoint]:
        """
        Genera datos sint√©ticos basados en el dataset existente
        """
        synthetic_data = []

        for _ in range(num_samples):
            # Seleccionar un punto base aleatorio
            base_idx = random.randint(0, len(self.base_dataset.data_points) - 1)
            base_point = self.base_dataset.data_points[base_idx]

            # Agregar ruido controlado
            noisy_basic_state = self._add_noise(base_point.basic_state, noise_level)
            noisy_material_props = self._add_noise(base_point.material_properties, noise_level * 0.1)

            # Generar residual sint√©tico (con cierta correlaci√≥n)
            synthetic_residual = self._generate_correlated_residual(
                noisy_basic_state, base_point.residual, noise_level
            )

            # Crear nuevo punto de datos
            synthetic_point = PhysicsDataPoint(
                basic_state=noisy_basic_state,
                ground_truth_state=noisy_basic_state + synthetic_residual,
                residual=synthetic_residual,
                material_properties=noisy_material_props,
                context={**base_point.context, 'synthetic': True}
            )

            synthetic_data.append(synthetic_point)

        return synthetic_data

    def _add_noise(self, data: np.ndarray, noise_level: float) -> np.ndarray:
        """Agrega ruido gaussiano a los datos"""
        noise = np.random.normal(0, noise_level, data.shape)
        return data + noise * np.std(data, axis=0 if data.ndim > 1 else None)

    def _generate_correlated_residual(self, basic_state: np.ndarray,
                                    base_residual: np.ndarray,
                                    noise_level: float) -> np.ndarray:
        """Genera residual sint√©tico con correlaci√≥n f√≠sica"""
        # Mantener cierta correlaci√≥n con el residual base
        correlation_factor = 0.7
        noise_factor = 1 - correlation_factor

        noise = np.random.normal(0, noise_level, base_residual.shape)
        synthetic_residual = (correlation_factor * base_residual +
                            noise_factor * noise * np.std(base_residual))

        return synthetic_residual

# ============================================================================
# 5. UTILIDADES DE VISUALIZACI√ìN
# ============================================================================

class PhysicsVisualizer:
    """
    Herramientas de visualizaci√≥n para datos de f√≠sica
    """

    @staticmethod
    def plot_trajectory_3d(states: List[Dict], title: str = "3D Trajectory"):
        """Visualiza una trayectoria en 3D"""
        positions = np.array([state['position'] for state in states])

        fig = plt.figure(figsize=(10, 8))
        ax = fig.add_subplot(111, projection='3d')

        # Trayectoria
        ax.plot(positions[:, 0], positions[:, 1], positions[:, 2],
                'b-', linewidth=2, label='Trajectory')

        # Punto inicial
        ax.scatter(positions[0, 0], positions[0, 1], positions[0, 2],
                  color='green', s=100, label='Start')

        # Punto final
        ax.scatter(positions[-1, 0], positions[-1, 1], positions[-1, 2],
                  color='red', s=100, label='End')

        ax.set_xlabel('X')
        ax.set_ylabel('Y')
        ax.set_zlabel('Z')
        ax.set_title(title)
        ax.legend()

        plt.show()

    @staticmethod
    def plot_residual_analysis(dataset: PhysicsDataset):
        """Analiza y visualiza los residuales"""
        residuals = np.array([dp.residual for dp in dataset.data_points])

        fig, axes = plt.subplots(2, 3, figsize=(15, 10))

        # Residuales de posici√≥n
        pos_residuals = residuals[:, :3]
        for i, axis in enumerate(['X', 'Y', 'Z']):
            axes[0, i].hist(pos_residuals[:, i], bins=50, alpha=0.7)
            axes[0, i].set_title(f'Position Residual {axis}')
            axes[0, i].set_xlabel(f'Residual {axis}')
            axes[0, i].set_ylabel('Frequency')
            axes[0, i].grid(True, alpha=0.3)

        # Residuales de velocidad
        vel_residuals = residuals[:, 3:]
        for i, axis in enumerate(['VX', 'VY', 'VZ']):
            axes[1, i].hist(vel_residuals[:, i], bins=50, alpha=0.7)
            axes[1, i].set_title(f'Velocity Residual {axis}')
            axes[1, i].set_xlabel(f'Residual V{axis[1]}')
            axes[1, i].set_ylabel('Frequency')
            axes[1, i].grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

# ============================================================================
# MENSAJE DE CONFIRMACI√ìN
# ============================================================================

print("‚úÖ Clases adicionales de simulaci√≥n y datos completadas:")
print("  ‚Ä¢ PhysicsDataPoint (completa) - Punto de datos con serializaci√≥n")
print("  ‚Ä¢ PhysicsDataset (completa) - Dataset optimizado con cache y persistencia")
print("  ‚Ä¢ DatasetAnalyzer - Analizador de distribuciones de datos")
print("  ‚Ä¢ SyntheticDataGenerator - Generador de datos sint√©ticos")
print("  ‚Ä¢ PhysicsVisualizer - Herramientas de visualizaci√≥n")
print("\nüîß Funcionalidades adicionales:")
print("  ‚Ä¢ Guardado y carga de datasets")
print("  ‚Ä¢ An√°lisis estad√≠stico completo")
print("  ‚Ä¢ Generaci√≥n de datos sint√©ticos")
print("  ‚Ä¢ Visualizaciones especializadas")

In [None]:
# ============================================================================
# MAIN PIPELINE - SISTEMA H√çBRIDO DE F√çSICA CON IA
# ============================================================================
# Este script orquesta todo el flujo del sistema h√≠brido de f√≠sica

import os
import time
import argparse
import json
from pathlib import Path
from typing import Dict, Any, Optional

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
import numpy as np
import matplotlib.pyplot as plt

# Importar todas las clases definidas anteriormente
# (Asumiendo que est√°n en el mismo notebook o importadas)

class HybridPhysicsExperiment:
    """
    Clase principal que orquesta todo el experimento de f√≠sica h√≠brida
    """

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.results = {}

        # Directorios de trabajo
        self.setup_directories()

        print(f"üöÄ Iniciando Experimento de F√≠sica H√≠brida")
        print(f"üì± Dispositivo: {self.device}")
        print(f"üîß Configuraci√≥n: {config['experiment_name']}")

    def setup_directories(self):
        """Configura los directorios de trabajo"""
        self.base_dir = Path(self.config.get('output_dir', 'hybrid_physics_output'))
        self.base_dir.mkdir(exist_ok=True)

        self.models_dir = self.base_dir / 'models'
        self.data_dir = self.base_dir / 'data'
        self.plots_dir = self.base_dir / 'plots'
        self.logs_dir = self.base_dir / 'logs'

        for dir_path in [self.models_dir, self.data_dir, self.plots_dir, self.logs_dir]:
            dir_path.mkdir(exist_ok=True)

    def run_full_pipeline(self):
        """Ejecuta el pipeline completo"""
        print("\n" + "="*60)
        print("üéØ EJECUTANDO PIPELINE COMPLETO")
        print("="*60)

        try:
            # Paso 1: Generar datos si no existen
            dataset = self.step_1_generate_or_load_data()

            # Paso 2: Analizar datos
            self.step_2_analyze_data(dataset)

            # Paso 3: Preparar datos para entrenamiento
            train_loader, val_loader, test_loader = self.step_3_prepare_data_loaders(dataset)

            # Paso 4: Crear y entrenar modelo
            model, trainer = self.step_4_train_model(train_loader, val_loader)

            # Paso 5: Evaluar modelo
            self.step_5_evaluate_model(model, test_loader, dataset)

            # Paso 6: Crear sistema h√≠brido y validar
            self.step_6_validate_hybrid_system(model, dataset)

            # Paso 7: Generar reporte final
            self.step_7_generate_final_report()

            print("\nüéâ ¬°PIPELINE COMPLETADO EXITOSAMENTE!")

        except Exception as e:
            print(f"\n‚ùå Error en el pipeline: {e}")
            raise

    def step_1_generate_or_load_data(self):
        """Paso 1: Generar o cargar datos"""
        print("\nüìä PASO 1: Generaci√≥n/Carga de Datos")
        print("-" * 40)

        dataset_path = self.data_dir / 'physics_dataset.pkl'

        if dataset_path.exists() and not self.config.get('regenerate_data', False):
            print("üìÇ Cargando dataset existente...")
            dataset = PhysicsDataset.load_from_disk(str(dataset_path), self.device)
        else:
            print("üîÑ Generando nuevo dataset...")

            # Configurar generador de datos
            generator_config = self.config.get('data_generation', {})
            num_scenarios = generator_config.get('num_scenarios', 1000)
            steps_per_scenario = generator_config.get('steps_per_scenario', 100)

            # Generar datos
            data_generator = RealPhysicsDataGenerator(
                num_scenarios=num_scenarios,
                steps_per_scenario=steps_per_scenario
            )

            physics_dataset = data_generator.generate_dataset()

            # Crear dataset PyTorch
            dataset = PhysicsDataset(
                physics_dataset,
                device=self.device,
                normalize=True
            )

            # Guardar dataset
            dataset.save_to_disk(str(dataset_path))

        print(f"‚úÖ Dataset listo: {len(dataset)} muestras")
        self.results['dataset_size'] = len(dataset)

        return dataset

    def step_2_analyze_data(self, dataset):
        """Paso 2: Analizar distribuci√≥n de datos"""
        print("\nüîç PASO 2: An√°lisis de Datos")
        print("-" * 40)

        analyzer = DatasetAnalyzer(dataset)
        analysis = analyzer.analyze_distribution()

        # Guardar an√°lisis
        analysis_path = self.logs_dir / 'data_analysis.json'
        with open(analysis_path, 'w') as f:
            # Convertir numpy arrays a listas para JSON
            def convert_numpy(obj):
                if isinstance(obj, np.ndarray):
                    return obj.tolist()
                elif isinstance(obj, np.integer):
                    return int(obj)
                elif isinstance(obj, np.floating):
                    return float(obj)
                elif isinstance(obj, dict):
                    return {key: convert_numpy(value) for key, value in obj.items()}
                elif isinstance(obj, list):
                    return [convert_numpy(item) for item in obj]
                return obj

            json.dump(convert_numpy(analysis), f, indent=2)

        # Generar visualizaciones
        print("üìà Generando visualizaciones...")
        analyzer.plot_analysis()
        plt.savefig(self.plots_dir / 'data_distribution.png', dpi=300, bbox_inches='tight')
        plt.close()

        PhysicsVisualizer.plot_residual_analysis(dataset)
        plt.savefig(self.plots_dir / 'residual_analysis.png', dpi=300, bbox_inches='tight')
        plt.close()

        print("‚úÖ An√°lisis completado")
        self.results['data_analysis'] = analysis

    def step_3_prepare_data_loaders(self, dataset):
        """Paso 3: Preparar data loaders"""
        print("\nüîÑ PASO 3: Preparaci√≥n de Data Loaders")
        print("-" * 40)

        # Configuraci√≥n de divisi√≥n de datos
        data_config = self.config.get('data_split', {})
        train_ratio = data_config.get('train_ratio', 0.7)
        val_ratio = data_config.get('val_ratio', 0.2)
        test_ratio = data_config.get('test_ratio', 0.1)

        # Verificar que las proporciones suman 1
        total_ratio = train_ratio + val_ratio + test_ratio
        if abs(total_ratio - 1.0) > 1e-6:
            raise ValueError(f"Las proporciones deben sumar 1.0, pero suman {total_ratio}")

        # Calcular tama√±os
        dataset_size = len(dataset)
        train_size = int(train_ratio * dataset_size)
        val_size = int(val_ratio * dataset_size)
        test_size = dataset_size - train_size - val_size

        # Dividir dataset
        train_dataset, val_dataset, test_dataset = random_split(
            dataset, [train_size, val_size, test_size],
            generator=torch.Generator().manual_seed(42)
        )

        # Configurar data loaders
        batch_size = self.config.get('training', {}).get('batch_size', 512)

        train_loader = DataLoader(
            train_dataset, batch_size=batch_size, shuffle=True,
            num_workers=0, pin_memory=False
        )
        val_loader = DataLoader(
            val_dataset, batch_size=batch_size, shuffle=False,
            num_workers=0, pin_memory=False
        )
        test_loader = DataLoader(
            test_dataset, batch_size=batch_size, shuffle=False,
            num_workers=0, pin_memory=False
        )

        print(f"‚úÖ Data loaders creados:")
        print(f"  üìö Entrenamiento: {len(train_dataset)} muestras")
        print(f"  üîç Validaci√≥n: {len(val_dataset)} muestras")
        print(f"  üß™ Prueba: {len(test_dataset)} muestras")
        print(f"  üì¶ Batch size: {batch_size}")

        self.results['data_split'] = {
            'train_size': len(train_dataset),
            'val_size': len(val_dataset),
            'test_size': len(test_dataset),
            'batch_size': batch_size
        }

        return train_loader, val_loader, test_loader

    def step_4_train_model(self, train_loader, val_loader):
        """Paso 4: Entrenar modelo"""
        print("\nüß† PASO 4: Entrenamiento del Modelo")
        print("-" * 40)

        # Configuraci√≥n del modelo
        model_config = self.config.get('model', {})

        # Determinar dimensiones
        sample_input, sample_output = next(iter(train_loader))
        input_dim = sample_input.shape[1]
        output_dim = sample_output.shape[1]

        # Crear modelo
        model = HybridPhysicsModel(
            input_dim=input_dim,
            hidden_dims=model_config.get('hidden_dims', [1024, 512, 256, 128]),
            output_dim=output_dim,
            dropout_rate=model_config.get('dropout_rate', 0.2),
            activation=model_config.get('activation', 'leakyrelu')
        ).to(self.device)

        # Configurar entrenamiento
        training_config = self.config.get('training', {})

        # Optimizador
        optimizer = optim.AdamW(
            model.parameters(),
            lr=training_config.get('lr', 1e-3),
            weight_decay=training_config.get('weight_decay', 1e-4)
        )

        # Scheduler
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='min', factor=0.5, patience=5
        )

        # Funci√≥n de p√©rdida
        loss_config = self.config.get('loss', {})
        criterion = PhysicsLoss(
            position_weight=loss_config.get('position_weight', 2.0),
            velocity_weight=loss_config.get('velocity_weight', 1.0)
        )

        # Crear entrenador
        trainer = PhysicsTrainer(
            model=model,
            train_loader=train_loader,
            val_loader=val_loader,
            criterion=criterion,
            optimizer=optimizer,
            scheduler=scheduler,
            device=self.device,
            repo_id=f"hybrid-physics-{self.config['experiment_name']}",
            use_warmup=training_config.get('use_warmup', True),
            warmup_epochs=training_config.get('warmup_epochs', 5)
        )

        # Entrenar
        num_epochs = training_config.get('epochs', 30)
        early_stopping_patience = training_config.get('early_stopping_patience', 8)

        print(f"üî• Iniciando entrenamiento por {num_epochs} √©pocas...")
        trainer.train(num_epochs=num_epochs, early_stopping_patience=early_stopping_patience)

        # Guardar modelo
        model_path = self.models_dir / 'best_hybrid_model.pth'
        torch.save({
            'model_state_dict': model.state_dict(),
            'model_config': {
                'input_dim': input_dim,
                'hidden_dims': model_config.get('hidden_dims', [1024, 512, 256, 128]),
                'output_dim': output_dim,
                'dropout_rate': model_config.get('dropout_rate', 0.2),
                'activation': model_config.get('activation', 'leakyrelu')
            },
            'training_history': {
                'train_losses': trainer.train_losses,
                'val_losses': trainer.val_losses,
                'learning_rates': trainer.learning_rates,
                'best_val_loss': trainer.best_val_loss
            }
        }, model_path)

        # Visualizar historial de entrenamiento
        trainer.plot_training_history()
        plt.savefig(self.plots_dir / 'training_history.png', dpi=300, bbox_inches='tight')
        plt.close()

        print("‚úÖ Entrenamiento completado")
        self.results['training'] = {
            'best_val_loss': trainer.best_val_loss,
            'final_train_loss': trainer.train_losses[-1] if trainer.train_losses else None,
            'total_epochs': len(trainer.train_losses),
            'model_parameters': sum(p.numel() for p in model.parameters())
        }

        return model, trainer

    def step_5_evaluate_model(self, model, test_loader, dataset):
        """Paso 5: Evaluar modelo en conjunto de prueba"""
        print("\nüìä PASO 5: Evaluaci√≥n del Modelo")
        print("-" * 40)

        evaluator = ModelEvaluator(model, self.device)
        metrics = evaluator.compute_metrics(test_loader)

        print("üìà M√©tricas en conjunto de prueba:")
        for metric_name, value in metrics.items():
            print(f"  {metric_name}: {value:.6f}")

        # Guardar m√©tricas
        metrics_path = self.logs_dir / 'test_metrics.json'
        with open(metrics_path, 'w') as f:
            json.dump(metrics, f, indent=2)

        self.results['test_metrics'] = metrics
        print("‚úÖ Evaluaci√≥n completada")

    def step_6_validate_hybrid_system(self, model, dataset):
        """Paso 6: Validar sistema h√≠brido completo"""
        print("\nüî¨ PASO 6: Validaci√≥n del Sistema H√≠brido")
        print("-" * 40)

        # Crear sistema h√≠brido
        basic_physics_core = NewtonianPhysics()

        normalization_stats = {
            'state_mean': dataset.state_mean,
            'state_std': dataset.state_std,
            'mat_mean': dataset.mat_mean,
            'mat_std': dataset.mat_std
        }

        hybrid_system = RealHybridPhysicsSystem(
            trained_model=model,
            physics_core=basic_physics_core,
            normalization_stats=normalization_stats
        )

        # Crear simulador de alta fidelidad
        high_fidelity_sim = HighFidelitySimulator(gui=False)

        # Crear validador
        validator = PhysicsValidator(
            hybrid_system=hybrid_system,
            basic_physics=basic_physics_core,
            high_fidelity_sim=high_fidelity_sim
        )

        # Crear escenarios de prueba
        validation_config = self.config.get('validation', {})
        num_test_scenarios = validation_config.get('num_scenarios', 5)
        num_steps = validation_config.get('num_steps', 50)

        print(f"üß™ Creando {num_test_scenarios} escenarios de prueba...")
        test_scenarios = create_test_scenarios(num_scenarios=num_test_scenarios)

        # Ejecutar validaci√≥n
        print(f"‚ö° Ejecutando validaci√≥n con {num_steps} pasos por escenario...")
        validation_results = validator.run_comparison_test(test_scenarios, num_steps=num_steps)

        # Analizar resultados
        stats, improvement = validator.analyze_results(validation_results)

        # Visualizar comparaci√≥n para el primer escenario
        validator.plot_comparison(validation_results, scenario_idx=0)
        plt.savefig(self.plots_dir / 'hybrid_validation.png', dpi=300, bbox_inches='tight')
        plt.close()

        # Guardar resultados de validaci√≥n
        validation_summary = {
            'hybrid_stats': stats['hybrid'],
            'basic_stats': stats['basic'],
            'improvement': improvement,
            'num_scenarios': num_test_scenarios,
            'steps_per_scenario': num_steps
        }

        validation_path = self.logs_dir / 'validation_results.json'
        with open(validation_path, 'w') as f:
            json.dump(validation_summary, f, indent=2)

        print("‚úÖ Validaci√≥n del sistema h√≠brido completada")
        self.results['validation'] = validation_summary

    def step_7_generate_final_report(self):
        """Paso 7: Generar reporte final"""
        print("\nüìÑ PASO 7: Generaci√≥n de Reporte Final")
        print("-" * 40)

        # Crear reporte completo
        report = self._create_final_report()

        # Guardar reporte
        report_path = self.base_dir / 'REPORTE_FINAL.md'
        with open(report_path, 'w', encoding='utf-8') as f:
            f.write(report)

        # Guardar resultados completos en JSON
        results_path = self.logs_dir / 'experiment_results.json'
        with open(results_path, 'w') as f:
            json.dump(self.results, f, indent=2)

        print(f"üìä Reporte final guardado en: {report_path}")
        print(f"üíæ Resultados completos en: {results_path}")
        print("‚úÖ Reporte generado exitosamente")

    def _create_final_report(self) -> str:
        """Crea el reporte final en formato Markdown"""

        validation = self.results.get('validation', {})
        training = self.results.get('training', {})

        report = f"""# üöÄ REPORTE FINAL - SISTEMA H√çBRIDO DE F√çSICA CON IA

## üìã Informaci√≥n del Experimento
- **Nombre del Experimento**: {self.config['experiment_name']}
- **Fecha de Ejecuci√≥n**: {time.strftime('%Y-%m-%d %H:%M:%S')}
- **Dispositivo**: {self.device}

## üìä Resumen de Datos
- **Tama√±o del Dataset**: {self.results.get('dataset_size', 'N/A')} muestras
- **Divisi√≥n de Datos**:
  - Entrenamiento: {self.results.get('data_split', {}).get('train_size', 'N/A')} muestras
  - Validaci√≥n: {self.results.get('data_split', {}).get('val_size', 'N/A')} muestras
  - Prueba: {self.results.get('data_split', {}).get('test_size', 'N/A')} muestras

## üß† Configuraci√≥n del Modelo
- **Par√°metros del Modelo**: {training.get('model_parameters', 'N/A'):,} par√°metros
- **Arquitectura**: {self.config.get('model', {}).get('hidden_dims', 'N/A')}
- **Funci√≥n de Activaci√≥n**: {self.config.get('model', {}).get('activation', 'N/A')}

## üìà Resultados del Entrenamiento
- **Mejor P√©rdida de Validaci√≥n**: {training.get('best_val_loss', 'N/A'):.6f}
- **P√©rdida Final de Entrenamiento**: {training.get('final_train_loss', 'N/A'):.6f}
- **√âpocas Completadas**: {training.get('total_epochs', 'N/A')}

## üéØ Rendimiento del Sistema H√≠brido
- **Mejora en Precisi√≥n de Posici√≥n**: {validation.get('improvement', {}).get('position_error_reduction', 'N/A'):.1f}%
- **Mejora en Precisi√≥n de Velocidad**: {validation.get('improvement', {}).get('velocity_error_reduction', 'N/A'):.1f}%

### M√©tricas Detalladas:
#### Sistema H√≠brido:
- Error promedio posici√≥n: {validation.get('hybrid_stats', {}).get('mean_pos_error', 'N/A'):.6f}
- Error promedio velocidad: {validation.get('hybrid_stats', {}).get('mean_vel_error', 'N/A'):.6f}

#### F√≠sica B√°sica:
- Error promedio posici√≥n: {validation.get('basic_stats', {}).get('mean_pos_error', 'N/A'):.6f}
- Error promedio velocidad: {validation.get('basic_stats', {}).get('mean_vel_error', 'N/A'):.6f}

## üî¨ Innovaciones T√©cnicas
1. **F√≠sica B√°sica Mejorada**: Inclusi√≥n de fricci√≥n est√°tica/din√°mica y resistencia del aire
2. **Dataset Realista**: Generaci√≥n autom√°tica con m√∫ltiples materiales y condiciones
3. **Arquitectura H√≠brida**: Combinaci√≥n √≥ptima de f√≠sica determinista + correcci√≥n por IA
4. **Entrenamiento Optimizado**: Mixed precision, warmup, early stopping
5. **Validaci√≥n Rigurosa**: Comparaci√≥n directa con simulador de alta fidelidad PyBullet

## üìÅ Archivos Generados
- `models/best_hybrid_model.pth` - Modelo entrenado
- `data/physics_dataset.pkl` - Dataset generado
- `plots/` - Visualizaciones y gr√°ficos
- `logs/` - M√©tricas y an√°lisis detallados

## üöÄ Aplicaciones Potenciales
- Simulaciones de videojuegos en tiempo real
- Sistemas de entrenamiento rob√≥tico
- Predicci√≥n de movimiento de objetos f√≠sicos
- Simulaciones cient√≠ficas aceleradas

## üìà Pr√≥ximos Pasos
- Expansi√≥n a sistemas multi-objeto
- Inclusi√≥n de deformaciones y rotaciones complejas
- Optimizaci√≥n para inferencia en tiempo real
- Integraci√≥n con motores de f√≠sica comerciales

---
*Generado autom√°ticamente por el Sistema H√≠brido de F√≠sica con IA*
"""
        return report


def create_default_config() -> Dict[str, Any]:
    """Crea configuraci√≥n por defecto para el experimento"""
    return {
        'experiment_name': 'hybrid_physics_default',
        'output_dir': 'hybrid_physics_output',
        'regenerate_data': False,

        'data_generation': {
            'num_scenarios': 1000,
            'steps_per_scenario': 100
        },

        'data_split': {
            'train_ratio': 0.7,
            'val_ratio': 0.2,
            'test_ratio': 0.1
        },

        'model': {
            'hidden_dims': [1024, 512, 256, 128],
            'dropout_rate': 0.2,
            'activation': 'leakyrelu'
        },

        'training': {
            'batch_size': 512,
            'lr': 1e-3,
            'weight_decay': 1e-4,
            'epochs': 30,
            'early_stopping_patience': 8,
            'use_warmup': True,
            'warmup_epochs': 5
        },

        'loss': {
            'position_weight': 2.0,
            'velocity_weight': 1.0
        },

        'validation': {
            'num_scenarios': 5,
            'num_steps': 50
        }
    }


def main():
    """Funci√≥n principal"""
    print("üåü SISTEMA H√çBRIDO DE F√çSICA CON IA")
    print("=" * 50)

    # Configuraci√≥n por defecto
    config = create_default_config()

    # Crear y ejecutar experimento
    experiment = HybridPhysicsExperiment(config)
    experiment.run_full_pipeline()

    print("\nüéä ¬°EXPERIMENTO COMPLETADO EXITOSAMENTE!")
    print(f"üìÇ Resultados guardados en: {experiment.base_dir}")


def main_with_custom_config(config_path: str):
    """Funci√≥n principal con configuraci√≥n personalizada"""
    print("üåü SISTEMA H√çBRIDO DE F√çSICA CON IA (Configuraci√≥n Personalizada)")
    print("=" * 60)

    # Cargar configuraci√≥n personalizada
    with open(config_path, 'r') as f:
        config = json.load(f)

    # Crear y ejecutar experimento
    experiment = HybridPhysicsExperiment(config)
    experiment.run_full_pipeline()

    print("\nüéä ¬°EXPERIMENTO COMPLETADO EXITOSAMENTE!")
    print(f"üìÇ Resultados guardados en: {experiment.base_dir}")


def create_sample_config():
    """Crea un archivo de configuraci√≥n de ejemplo"""
    config = create_default_config()

    # Modificar para experimento de ejemplo
    config['experiment_name'] = 'physics_experiment_sample'
    config['data_generation']['num_scenarios'] = 500  # Menos datos para prueba r√°pida
    config['training']['epochs'] = 15  # Menos √©pocas para prueba r√°pida

    config_path = 'sample_config.json'
    with open(config_path, 'w') as f:
        json.dump(config, f, indent=2)

    print(f"üìÑ Configuraci√≥n de ejemplo creada: {config_path}")
    return config_path


# ============================================================================
# PUNTO DE ENTRADA PRINCIPAL
# ============================================================================

if __name__ == "__main__":
    # Si se ejecuta directamente, usar configuraci√≥n por defecto
    main()

# Para usar en Jupyter Notebook:
def run_experiment(custom_config: Optional[Dict] = None):
    """
    Funci√≥n de conveniencia para ejecutar desde Jupyter Notebook

    Args:
        custom_config: Configuraci√≥n personalizada (opcional)
    """
    if custom_config is None:
        config = create_default_config()
    else:
        config = custom_config

    experiment = HybridPhysicsExperiment(config)
    experiment.run_full_pipeline()

    return experiment

# ============================================================================
# EJEMPLOS DE USO
# ============================================================================

def example_quick_test():
    """Ejemplo para prueba r√°pida"""
    config = create_default_config()
    config['experiment_name'] = 'quick_test'
    config['data_generation']['num_scenarios'] = 100
    config['training']['epochs'] = 10
    config['validation']['num_scenarios'] = 3

    return run_experiment(config)

def example_full_experiment():
    """Ejemplo para experimento completo"""
    config = create_default_config()
    config['experiment_name'] = 'full_physics_experiment'
    config['data_generation']['num_scenarios'] = 2000
    config['training']['epochs'] = 50

    return run_experiment(config)

print("‚úÖ Main pipeline definido correctamente")
print("\nüéØ Para ejecutar el experimento completo, usa:")
print("  ‚Ä¢ main() - Ejecuta con configuraci√≥n por defecto")
print("  ‚Ä¢ run_experiment() - Para usar en Jupyter Notebook")
print("  ‚Ä¢ example_quick_test() - Prueba r√°pida")
print("  ‚Ä¢ example_full_experiment() - Experimento completo")
print("\nüìã Todas las funciones est√°n listas para usar!")

In [None]:
main()