<a href="https://colab.research.google.com/github/PETEROA/AutoML/blob/main/Utils_py.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [22]:
import torch
import torch.nn as nn
import json
import numpy as np
from pathlib import Path
from typing import Dict, Any, Tuple, Optional
import time

In [23]:
# ============================================================================
# Directory Configuration
# ============================================================================

def get_project_paths():
    """Get all project directory paths"""
    paths = {
        'output': Path('/mnt/user-data/outputs'),
        'profiles': Path('/mnt/user-data/outputs/profiles'),
        'models': Path('/mnt/user-data/outputs/models'),
        'configs': Path('/mnt/user-data/outputs/configs'),
        'results': Path('/mnt/user-data/outputs/results'),
        'checkpoints': Path('/mnt/user-data/outputs/checkpoints'),
    }


    for path in paths.values():
        path.mkdir(parents=True, exist_ok=True)

    return paths

In [24]:
# ============================================================================
# Model Utilities
# ============================================================================

def count_parameters(model: nn.Module) -> Tuple[int, int]:
    """
    Count total and trainable parameters in a model

    Returns:
        (total_params, trainable_params)
    """
    total = sum(p.numel() for p in model.parameters())
    trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
    return total, trainable


def get_model_size_mb(model: nn.Module) -> float:
    """Calculate model size in MB"""
    param_size = sum(p.nelement() * p.element_size() for p in model.parameters())
    buffer_size = sum(b.nelement() * b.element_size() for b in model.buffers())
    size_mb = (param_size + buffer_size) / 1024**2
    return size_mb


def measure_inference_time(
    model: nn.Module,
    input_data: torch.Tensor,
    num_runs: int = 100,
    warmup_runs: int = 10,
    device: str = 'cpu'
) -> Dict[str, float]:
    """
    Measure model inference time

    Args:
        model: PyTorch model
        input_data: Input tensor
        num_runs: Number of inference runs for timing
        warmup_runs: Number of warmup runs
        device: Device to run on

    Returns:
        Dictionary with timing statistics
    """
    model = model.to(device)
    model.eval()
    input_data = input_data.to(device)

    # Warmup
    with torch.no_grad():
        for _ in range(warmup_runs):
            _ = model(input_data)

    # Synchronize if using GPU
    if device == 'cuda':
        torch.cuda.synchronize()

    # Measure time
    times = []
    with torch.no_grad():
        for _ in range(num_runs):
            start = time.time()
            _ = model(input_data)
            if device == 'cuda':
                torch.cuda.synchronize()
            times.append(time.time() - start)

    times = np.array(times) * 1000  # Convert to milliseconds

    return {
        'mean_ms': float(np.mean(times)),
        'std_ms': float(np.std(times)),
        'min_ms': float(np.min(times)),
        'max_ms': float(np.max(times)),
        'median_ms': float(np.median(times))
    }


In [25]:
# ============================================================================
# Data Loading Utilities
# ============================================================================

def load_profile_data(profile_path: Path) -> Dict[str, Any]:
    """Load model profile data from JSON"""
    with open(profile_path, 'r') as f:
        return json.load(f)


def save_profile_data(profile_data: Dict[str, Any], save_path: Path):
    """Save model profile data to JSON"""
    with open(save_path, 'w') as f:
        json.dump(profile_data, f, indent=2)


def load_all_profiles() -> Dict[str, Dict]:
    """Load all model profiles"""
    paths = get_project_paths()
    profile_file = paths['output'] / 'all_model_profiles.json'

    if profile_file.exists():
        return load_profile_data(profile_file)
    else:
        return {'vision_models': {}, 'language_models': {}}


In [26]:
# ============================================================================
# Checkpoint Management
# ============================================================================

def save_checkpoint(
    model: nn.Module,
    optimizer: Optional[torch.optim.Optimizer],
    epoch: int,
    loss: float,
    metrics: Dict[str, float],
    save_path: Path
):
    """Save training checkpoint"""
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'loss': loss,
        'metrics': metrics
    }

    if optimizer is not None:
        checkpoint['optimizer_state_dict'] = optimizer.state_dict()

    torch.save(checkpoint, save_path)


def load_checkpoint(
    model: nn.Module,
    checkpoint_path: Path,
    optimizer: Optional[torch.optim.Optimizer] = None
) -> Tuple[nn.Module, int, float]:
    """
    Load training checkpoint

    Returns:
        (model, epoch, loss)
    """
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])

    if optimizer is not None and 'optimizer_state_dict' in checkpoint:
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

    return model, checkpoint['epoch'], checkpoint['loss']

In [27]:
# ============================================================================
# Distillation Utilities
# ============================================================================

def compute_kl_divergence_loss(
    student_logits: torch.Tensor,
    teacher_logits: torch.Tensor,
    temperature: float = 3.0
) -> torch.Tensor:
    """
    Compute KL divergence loss for knowledge distillation

    Args:
        student_logits: Student model logits
        teacher_logits: Teacher model logits
        temperature: Softmax temperature
    """
    student_soft = torch.nn.functional.log_softmax(student_logits / temperature, dim=-1)
    teacher_soft = torch.nn.functional.softmax(teacher_logits / temperature, dim=-1)

    kl_loss = torch.nn.functional.kl_div(
        student_soft,
        teacher_soft,
        reduction='batchmean'
    ) * (temperature ** 2)

    return kl_loss


def compute_feature_matching_loss(
    student_features: torch.Tensor,
    teacher_features: torch.Tensor,
    normalize: bool = True
) -> torch.Tensor:
    """
    Compute feature matching loss

    Args:
        student_features: Student intermediate features
        teacher_features: Teacher intermediate features
        normalize: Whether to normalize features
    """
    if normalize:
        student_features = torch.nn.functional.normalize(student_features, dim=-1)
        teacher_features = torch.nn.functional.normalize(teacher_features, dim=-1)

    loss = torch.nn.functional.mse_loss(student_features, teacher_features)
    return loss

In [28]:
# ============================================================================
# Compression Ratio Calculations
# ============================================================================

def calculate_compression_metrics(
    teacher_params: int,
    student_params: int,
    teacher_size_mb: float,
    student_size_mb: float,
    teacher_time_ms: float,
    student_time_ms: float
) -> Dict[str, float]:
    """Calculate compression metrics"""
    return {
        'param_compression_ratio': teacher_params / student_params,
        'size_compression_ratio': teacher_size_mb / student_size_mb,
        'speedup': teacher_time_ms / student_time_ms,
        'param_reduction_pct': (1 - student_params / teacher_params) * 100,
        'size_reduction_pct': (1 - student_size_mb / teacher_size_mb) * 100,
        'speedup_pct': (student_time_ms / teacher_time_ms - 1) * -100
    }


In [29]:
# ============================================================================
# Logging and Visualization Utilities
# ============================================================================

def format_number(num: float, precision: int = 2) -> str:
    """Format large numbers with suffixes (K, M, B)"""
    if num >= 1e9:
        return f"{num / 1e9:.{precision}f}B"
    elif num >= 1e6:
        return f"{num / 1e6:.{precision}f}M"
    elif num >= 1e3:
        return f"{num / 1e3:.{precision}f}K"
    else:
        return f"{num:.{precision}f}"


def print_model_summary(model_name: str, profile_data: Dict[str, Any]):
    """Pretty print model summary"""
    print(f"\n{'='*60}")
    print(f"Model: {model_name}")
    print(f"{'='*60}")
    print(f"  Parameters: {format_number(profile_data.get('total_params', 0))}")
    print(f"  Model Size: {profile_data.get('model_size_mb', 0):.2f} MB")
    print(f"  Inference Time: {profile_data.get('inference_time_ms', 0):.2f} ms")
    if 'throughput_fps' in profile_data:
        print(f"  Throughput: {profile_data['throughput_fps']:.2f} FPS")
    print(f"{'='*60}")


# ============================================================================
# Device Management
# ============================================================================

def get_device(prefer_cuda: bool = True) -> torch.device:
    """Get the appropriate device for computation"""
    if prefer_cuda and torch.cuda.is_available():
        device = torch.device('cuda')
        print(f"Using GPU: {torch.cuda.get_device_name(0)}")
    else:
        device = torch.device('cpu')
        print("Using CPU")
    return device


def clear_gpu_memory():
    """Clear GPU memory cache"""
    if torch.cuda.is_available():
        torch.cuda.empty_cache()